authentik.core.expression.evaluator

Property Mapping Evaluator

 1"""Property Mapping Evaluator"""
 2
 3from types import CodeType
 4from typing import Any
 5
 6from django.db.models import Model
 7from django.http import HttpRequest
 8from prometheus_client import Histogram
 9
10from authentik.core.expression.exceptions import SkipObjectException
11from authentik.core.models import User
12from authentik.events.models import Event, EventAction
13from authentik.lib.expression.evaluator import BaseEvaluator
14from authentik.policies.types import PolicyRequest
15
16PROPERTY_MAPPING_TIME = Histogram(
17    "authentik_property_mapping_execution_time",
18    "Evaluation time of property mappings",
19    ["mapping_name"],
20)
21
22
23class PropertyMappingEvaluator(BaseEvaluator):
24    """Custom Evaluator that adds some different context variables."""
25
26    dry_run: bool
27    model: Model
28    _compiled: CodeType | None = None
29
30    def __init__(
31        self,
32        model: Model,
33        user: User | None = None,
34        request: HttpRequest | None = None,
35        dry_run: bool | None = False,
36        **kwargs,
37    ):
38        self.model = model
39        if hasattr(model, "name"):
40            _filename = model.name
41        else:
42            _filename = str(model)
43        super().__init__(filename=_filename)
44        self.dry_run = dry_run
45        self.set_context(user, request, **kwargs)
46
47    def set_context(
48        self,
49        user: User | None = None,
50        request: HttpRequest | None = None,
51        **kwargs,
52    ):
53        req = PolicyRequest(user=User())
54        req.obj = self.model
55        if user:
56            req.user = user
57            self._context["user"] = user
58        if request:
59            req.http_request = request
60            self._context["http_request"] = request
61        req.context.update(**kwargs)
62        self._context["request"] = req
63        self._context.update(**kwargs)
64        self._globals["SkipObject"] = SkipObjectException
65
66    def handle_error(self, exc: Exception, expression_source: str):
67        """Exception Handler"""
68        # For dry-run requests we don't save exceptions
69        if self.dry_run:
70            return
71        event = Event.new(
72            EventAction.PROPERTY_MAPPING_EXCEPTION,
73            expression=expression_source,
74            message="Failed to execute property mapping",
75        ).with_exception(exc)
76        if "request" in self._context:
77            req: PolicyRequest = self._context["request"]
78            if req.http_request:
79                event.from_http(req.http_request, req.user)
80                return
81            elif req.user:
82                event.set_user(req.user)
83        event.save()
84
85    def evaluate(self, *args, **kwargs) -> Any:
86        with PROPERTY_MAPPING_TIME.labels(mapping_name=self._filename).time():
87            return super().evaluate(*args, **kwargs)
88
89    def compile(self, expression: str | None = None) -> Any:
90        if not self._compiled:
91            compiled = super().compile(expression or self.model.expression)
92            self._compiled = compiled
93        return self._compiled
PROPERTY_MAPPING_TIME = prometheus_client.metrics.Histogram(authentik_property_mapping_execution_time)
class PropertyMappingEvaluator(authentik.lib.expression.evaluator.BaseEvaluator):
24class PropertyMappingEvaluator(BaseEvaluator):
25    """Custom Evaluator that adds some different context variables."""
26
27    dry_run: bool
28    model: Model
29    _compiled: CodeType | None = None
30
31    def __init__(
32        self,
33        model: Model,
34        user: User | None = None,
35        request: HttpRequest | None = None,
36        dry_run: bool | None = False,
37        **kwargs,
38    ):
39        self.model = model
40        if hasattr(model, "name"):
41            _filename = model.name
42        else:
43            _filename = str(model)
44        super().__init__(filename=_filename)
45        self.dry_run = dry_run
46        self.set_context(user, request, **kwargs)
47
48    def set_context(
49        self,
50        user: User | None = None,
51        request: HttpRequest | None = None,
52        **kwargs,
53    ):
54        req = PolicyRequest(user=User())
55        req.obj = self.model
56        if user:
57            req.user = user
58            self._context["user"] = user
59        if request:
60            req.http_request = request
61            self._context["http_request"] = request
62        req.context.update(**kwargs)
63        self._context["request"] = req
64        self._context.update(**kwargs)
65        self._globals["SkipObject"] = SkipObjectException
66
67    def handle_error(self, exc: Exception, expression_source: str):
68        """Exception Handler"""
69        # For dry-run requests we don't save exceptions
70        if self.dry_run:
71            return
72        event = Event.new(
73            EventAction.PROPERTY_MAPPING_EXCEPTION,
74            expression=expression_source,
75            message="Failed to execute property mapping",
76        ).with_exception(exc)
77        if "request" in self._context:
78            req: PolicyRequest = self._context["request"]
79            if req.http_request:
80                event.from_http(req.http_request, req.user)
81                return
82            elif req.user:
83                event.set_user(req.user)
84        event.save()
85
86    def evaluate(self, *args, **kwargs) -> Any:
87        with PROPERTY_MAPPING_TIME.labels(mapping_name=self._filename).time():
88            return super().evaluate(*args, **kwargs)
89
90    def compile(self, expression: str | None = None) -> Any:
91        if not self._compiled:
92            compiled = super().compile(expression or self.model.expression)
93            self._compiled = compiled
94        return self._compiled

Custom Evaluator that adds some different context variables.

PropertyMappingEvaluator( model: django.db.models.base.Model, user: authentik.core.models.User | None = None, request: django.http.request.HttpRequest | None = None, dry_run: bool | None = False, **kwargs)
31    def __init__(
32        self,
33        model: Model,
34        user: User | None = None,
35        request: HttpRequest | None = None,
36        dry_run: bool | None = False,
37        **kwargs,
38    ):
39        self.model = model
40        if hasattr(model, "name"):
41            _filename = model.name
42        else:
43            _filename = str(model)
44        super().__init__(filename=_filename)
45        self.dry_run = dry_run
46        self.set_context(user, request, **kwargs)
dry_run: bool
model: django.db.models.base.Model
def set_context( self, user: authentik.core.models.User | None = None, request: django.http.request.HttpRequest | None = None, **kwargs):
48    def set_context(
49        self,
50        user: User | None = None,
51        request: HttpRequest | None = None,
52        **kwargs,
53    ):
54        req = PolicyRequest(user=User())
55        req.obj = self.model
56        if user:
57            req.user = user
58            self._context["user"] = user
59        if request:
60            req.http_request = request
61            self._context["http_request"] = request
62        req.context.update(**kwargs)
63        self._context["request"] = req
64        self._context.update(**kwargs)
65        self._globals["SkipObject"] = SkipObjectException
def handle_error(self, exc: Exception, expression_source: str):
67    def handle_error(self, exc: Exception, expression_source: str):
68        """Exception Handler"""
69        # For dry-run requests we don't save exceptions
70        if self.dry_run:
71            return
72        event = Event.new(
73            EventAction.PROPERTY_MAPPING_EXCEPTION,
74            expression=expression_source,
75            message="Failed to execute property mapping",
76        ).with_exception(exc)
77        if "request" in self._context:
78            req: PolicyRequest = self._context["request"]
79            if req.http_request:
80                event.from_http(req.http_request, req.user)
81                return
82            elif req.user:
83                event.set_user(req.user)
84        event.save()

Exception Handler

def evaluate(self, *args, **kwargs) -> Any:
86    def evaluate(self, *args, **kwargs) -> Any:
87        with PROPERTY_MAPPING_TIME.labels(mapping_name=self._filename).time():
88            return super().evaluate(*args, **kwargs)

Parse and evaluate expression. If the syntax is incorrect, a SyntaxError is raised. If any exception is raised during execution, it is raised. The result is returned without any type-checking.

def compile(self, expression: str | None = None) -> Any:
90    def compile(self, expression: str | None = None) -> Any:
91        if not self._compiled:
92            compiled = super().compile(expression or self.model.expression)
93            self._compiled = compiled
94        return self._compiled

Parse expression. Raises SyntaxError or ValueError if the syntax is incorrect.