authentik.lib.expression.evaluator

authentik expression policy evaluator

  1"""authentik expression policy evaluator"""
  2
  3import re
  4import socket
  5from ipaddress import ip_address, ip_network
  6from smtplib import SMTPException
  7from textwrap import indent
  8from types import CodeType
  9from typing import TYPE_CHECKING, Any
 10
 11from cachetools import TLRUCache, cached
 12from django.core.exceptions import FieldError
 13from django.http import HttpRequest
 14from django.utils.text import slugify
 15from django.utils.timezone import now
 16from guardian.shortcuts import get_anonymous_user
 17from rest_framework.serializers import ValidationError
 18from sentry_sdk import start_span
 19from sentry_sdk.tracing import Span
 20from structlog.stdlib import get_logger
 21
 22from authentik.core.models import User
 23from authentik.events.models import Event
 24from authentik.lib.expression.exceptions import ControlFlowException
 25from authentik.lib.utils.email import normalize_addresses
 26from authentik.lib.utils.http import get_http_session
 27from authentik.lib.utils.time import timedelta_from_string
 28from authentik.policies.models import Policy, PolicyBinding
 29from authentik.policies.process import PolicyProcess
 30from authentik.policies.types import PolicyRequest, PolicyResult
 31from authentik.providers.oauth2.id_token import IDToken
 32from authentik.providers.oauth2.models import AccessToken, OAuth2Provider
 33from authentik.stages.authenticator import devices_for_user
 34from authentik.stages.email.utils import TemplateEmailMessage
 35
 36if TYPE_CHECKING:
 37    from authentik.stages.email.models import EmailStage
 38
 39LOGGER = get_logger()
 40
 41ARG_SANITIZE = re.compile(r"[:.-]")
 42
 43
 44def sanitize_arg(arg_name: str) -> str:
 45    return re.sub(ARG_SANITIZE, "_", slugify(arg_name))
 46
 47
 48class BaseEvaluator:
 49    """Validate and evaluate python-based expressions"""
 50
 51    # Globals that can be used by function
 52    _globals: dict[str, Any]
 53    # Context passed as locals to exec()
 54    _context: dict[str, Any]
 55
 56    # Filename used for exec
 57    _filename: str
 58
 59    def __init__(self, filename: str | None = None):
 60        self._filename = filename if filename else "BaseEvaluator"
 61        # update website/docs/expressions/_objects.md
 62        # update website/docs/expressions/_functions.md
 63        self._globals = {
 64            "ak_call_policy": self.expr_func_call_policy,
 65            "ak_create_event": self.expr_event_create,
 66            "ak_create_jwt": self.expr_create_jwt,
 67            "ak_create_jwt_raw": self.expr_create_jwt_raw,
 68            "ak_is_group_member": BaseEvaluator.expr_is_group_member,
 69            "ak_logger": get_logger(self._filename).bind(),
 70            "ak_send_email": self.expr_send_email,
 71            "ak_user_by": BaseEvaluator.expr_user_by,
 72            "ak_user_has_authenticator": BaseEvaluator.expr_func_user_has_authenticator,
 73            "ip_address": ip_address,
 74            "ip_network": ip_network,
 75            "list_flatten": BaseEvaluator.expr_flatten,
 76            "regex_match": BaseEvaluator.expr_regex_match,
 77            "regex_replace": BaseEvaluator.expr_regex_replace,
 78            "requests": get_http_session(),
 79            "resolve_dns": BaseEvaluator.expr_resolve_dns,
 80            "reverse_dns": BaseEvaluator.expr_reverse_dns,
 81            "slugify": slugify,
 82        }
 83        self._context = {}
 84
 85    @cached(cache=TLRUCache(maxsize=32, ttu=lambda key, value, now: now + 180))
 86    @staticmethod
 87    def expr_resolve_dns(host: str, ip_version: int | None = None) -> list[str]:
 88        """Resolve host to a list of IPv4 and/or IPv6 addresses."""
 89        # Although it seems to be fine (raising OSError), docs warn
 90        # against passing `None` for both the host and the port
 91        # https://docs.python.org/3/library/socket.html#socket.getaddrinfo
 92        host = host or ""
 93
 94        ip_list = []
 95
 96        family = 0
 97        if ip_version == 4:  # noqa: PLR2004
 98            family = socket.AF_INET
 99        if ip_version == 6:  # noqa: PLR2004
100            family = socket.AF_INET6
101
102        try:
103            for ip_addr in socket.getaddrinfo(host, None, family=family):
104                ip_list.append(str(ip_addr[4][0]))
105        except OSError:
106            pass
107        return list(set(ip_list))
108
109    @cached(cache=TLRUCache(maxsize=32, ttu=lambda key, value, now: now + 180))
110    @staticmethod
111    def expr_reverse_dns(ip_addr: str) -> str:
112        """Perform a reverse DNS lookup."""
113        try:
114            return socket.getfqdn(ip_addr)
115        except OSError:
116            return ip_addr
117
118    @staticmethod
119    def expr_flatten(value: list[Any] | Any) -> Any | None:
120        """Flatten `value` if its a list"""
121        if isinstance(value, list):
122            if len(value) < 1:
123                return None
124            return value[0]
125        return value
126
127    @staticmethod
128    def expr_regex_match(value: Any, regex: str) -> bool:
129        """Expression Filter to run re.search"""
130        return re.search(regex, value) is not None
131
132    @staticmethod
133    def expr_regex_replace(value: Any, regex: str, repl: str) -> str:
134        """Expression Filter to run re.sub"""
135        return re.sub(regex, repl, value)
136
137    @staticmethod
138    def expr_is_group_member(user: User, **group_filters) -> bool:
139        """Check if `user` is member of group with name `group_name`"""
140        return user.all_groups().filter(**group_filters).exists()
141
142    @staticmethod
143    def expr_user_by(**filters) -> User | None:
144        """Get user by filters"""
145        try:
146            users = User.objects.filter(**filters)
147            if users:
148                return users.first()
149            return None
150        except FieldError:
151            return None
152
153    @staticmethod
154    def expr_func_user_has_authenticator(user: User, device_type: str | None = None) -> bool:
155        """Check if a user has any authenticator devices, optionally matching *device_type*"""
156        user_devices = devices_for_user(user)
157        if device_type:
158            for device in user_devices:
159                device_class = device.__class__.__name__.lower().replace("device", "")
160                if device_class == device_type:
161                    return True
162            return False
163        return len(list(user_devices)) > 0
164
165    def expr_event_create(self, action: str, **kwargs):
166        """Create event with supplied data and try to extract as much relevant data
167        from the context"""
168        context = self._context.copy()
169        # If the result was a complex variable, we don't want to reuse it
170        context.pop("result", None)
171        context.pop("handler", None)
172        event_kwargs = context
173        event_kwargs.update(kwargs)
174        event = Event.new(
175            action,
176            app=self._filename,
177            **event_kwargs,
178        )
179        if "request" in context and isinstance(context["request"], PolicyRequest):
180            policy_request: PolicyRequest = context["request"]
181            if policy_request.http_request:
182                event.from_http(policy_request.http_request)
183                return
184        event.save()
185
186    def expr_func_call_policy(self, name: str, **kwargs) -> PolicyResult:
187        """Call policy by name, with current request"""
188        policy = Policy.objects.filter(name=name).select_subclasses().first()
189        if not policy:
190            raise ValueError(f"Policy '{name}' not found.")
191        user = self._context.get("user", get_anonymous_user())
192        req = PolicyRequest(user)
193        if "request" in self._context:
194            req = self._context["request"]
195        req.context.update(kwargs)
196        proc = PolicyProcess(PolicyBinding(policy=policy), request=req, connection=None)
197        return proc.profiling_wrapper()
198
199    def expr_create_jwt(
200        self,
201        user: User,
202        provider: OAuth2Provider | str,
203        scopes: list[str],
204        validity: str = "seconds=60",
205    ) -> str | None:
206        """Issue a JWT for a given provider"""
207        request: HttpRequest | None = self._context.get("http_request")
208        if not request:
209            return None
210        if not isinstance(provider, OAuth2Provider):
211            provider = OAuth2Provider.objects.get(name=provider)
212        session = None
213        if hasattr(request, "session") and request.session.session_key:
214            session = request.session["authenticatedsession"]
215        access_token = AccessToken(
216            provider=provider,
217            user=user,
218            expires=now() + timedelta_from_string(validity),
219            scope=scopes,
220            auth_time=now(),
221            session=session,
222        )
223        access_token.id_token = IDToken.new(provider, access_token, request)
224        access_token.save()
225        return access_token.token
226
227    def expr_create_jwt_raw(
228        self, provider: OAuth2Provider | str, validity: str = "seconds=60", **kwargs
229    ) -> str:
230        """Issue a JWT for a given provider with completely customized data"""
231        if not isinstance(provider, OAuth2Provider):
232            provider = OAuth2Provider.objects.get(name=provider)
233        kwargs["exp"] = int((now() + timedelta_from_string(validity)).timestamp())
234        kwargs["aud"] = provider.client_id
235        return provider.encode(kwargs)
236
237    def expr_send_email(  # noqa: PLR0913
238        self,
239        address: str | list[str],
240        subject: str,
241        body: str | None = None,
242        stage: EmailStage | None = None,
243        template: str | None = None,
244        context: dict | None = None,
245        cc: str | list[str] | None = None,
246        bcc: str | list[str] | None = None,
247    ) -> bool:
248        """Send an email using authentik's email system
249
250        Args:
251            address: Email address(es) to send to. Can be:
252                - Single email: "user@example.com"
253                - List of emails: ["user1@example.com", "user2@example.com"]
254            subject: Email subject
255            body: Email body (plain text/HTML). Mutually exclusive with template.
256            stage: EmailStage instance to use for settings. If None, uses global settings.
257            template: Template name to render. Mutually exclusive with body.
258            context: Additional context variables for template rendering.
259            cc: Email address(es) to CC. Same format as address.
260            bcc: Email address(es) to BCC. Same format as address.
261
262        Returns:
263            bool: True if email was queued successfully, False otherwise
264        """
265        # Deferred imports to avoid circular import issues
266        from authentik.stages.email.tasks import send_mails
267
268        if body and template:
269            raise ValueError("body and template parameters are mutually exclusive")
270
271        if not body and not template:
272            raise ValueError("Either body or template parameter must be provided")
273
274        to_addresses = normalize_addresses(address)
275        cc_addresses = normalize_addresses(cc)
276        bcc_addresses = normalize_addresses(bcc)
277
278        try:
279            if template is not None:
280                # Use all available context from the evaluator for template rendering
281                template_context = self._context.copy()
282                # Add any custom context passed to the function
283                if context:
284                    template_context.update(context)
285
286                # Use template rendering
287                message = TemplateEmailMessage(
288                    subject=subject,
289                    to=to_addresses,
290                    cc=cc_addresses,
291                    bcc=bcc_addresses,
292                    template_name=template,
293                    template_context=template_context,
294                )
295            else:
296                # Use plain body
297                message = TemplateEmailMessage(
298                    subject=subject,
299                    to=to_addresses,
300                    cc=cc_addresses,
301                    bcc=bcc_addresses,
302                    body=body,
303                )
304
305            send_mails(stage, message)
306            return True
307
308        except (SMTPException, ConnectionError, ValidationError, ValueError) as exc:
309            LOGGER.warning("Failed to send email", exc=exc, addresses=to_addresses, subject=subject)
310            return False
311
312    def wrap_expression(self, expression: str) -> str:
313        """Wrap expression in a function, call it, and save the result as `result`"""
314        handler_signature = ",".join(
315            [x for x in [sanitize_arg(x) for x in self._context.keys()] if x]
316        )
317        full_expression = ""
318        full_expression += f"def handler({handler_signature}):\n"
319        full_expression += indent(expression, "    ")
320        full_expression += f"\nresult = handler({handler_signature})"
321        return full_expression
322
323    def compile(self, expression: str) -> CodeType:
324        """Parse expression. Raises SyntaxError or ValueError if the syntax is incorrect."""
325        expression = self.wrap_expression(expression)
326        return compile(expression, self._filename, "exec")
327
328    def evaluate(self, expression_source: str) -> Any:
329        """Parse and evaluate expression. If the syntax is incorrect, a SyntaxError is raised.
330        If any exception is raised during execution, it is raised.
331        The result is returned without any type-checking."""
332        with start_span(op="authentik.lib.evaluator.evaluate") as span:
333            span: Span
334            span.description = self._filename
335            span.set_data("expression", expression_source)
336            try:
337                ast_obj = self.compile(expression_source)
338            except (SyntaxError, ValueError) as exc:
339                self.handle_error(exc, expression_source)
340                raise exc
341            try:
342                _locals = {sanitize_arg(x): y for x, y in self._context.items()}
343                # Yes this is an exec, yes it is potentially bad. Since we limit what variables are
344                # available here, and these policies can only be edited by admins, this is a risk
345                # we're willing to take.
346
347                exec(ast_obj, self._globals, _locals)  # nosec # noqa
348                result = _locals["result"]
349            except Exception as exc:
350                # So, this is a bit questionable. Essentially, we are edit the stacktrace
351                # so the user only sees information relevant to them
352                # and none of our surrounding error handling
353                if exc.__traceback__ is not None:
354                    exc.__traceback__ = exc.__traceback__.tb_next
355                if not isinstance(exc, ControlFlowException):
356                    self.handle_error(exc, expression_source)
357                raise exc
358            return result
359
360    def handle_error(self, exc: Exception, expression_source: str):  # pragma: no cover
361        """Exception Handler"""
362        LOGGER.warning("Expression error", exc=exc)
363
364    def validate(self, expression: str) -> bool:
365        """Validate expression's syntax, raise ValidationError if Syntax is invalid"""
366        try:
367            self.compile(expression)
368            return True
369        except (ValueError, SyntaxError) as exc:
370            raise ValidationError(f"Expression Syntax Error: {str(exc)}") from exc
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
ARG_SANITIZE = re.compile('[:.-]')
def sanitize_arg(arg_name: str) -> str:
45def sanitize_arg(arg_name: str) -> str:
46    return re.sub(ARG_SANITIZE, "_", slugify(arg_name))
class BaseEvaluator:
 49class BaseEvaluator:
 50    """Validate and evaluate python-based expressions"""
 51
 52    # Globals that can be used by function
 53    _globals: dict[str, Any]
 54    # Context passed as locals to exec()
 55    _context: dict[str, Any]
 56
 57    # Filename used for exec
 58    _filename: str
 59
 60    def __init__(self, filename: str | None = None):
 61        self._filename = filename if filename else "BaseEvaluator"
 62        # update website/docs/expressions/_objects.md
 63        # update website/docs/expressions/_functions.md
 64        self._globals = {
 65            "ak_call_policy": self.expr_func_call_policy,
 66            "ak_create_event": self.expr_event_create,
 67            "ak_create_jwt": self.expr_create_jwt,
 68            "ak_create_jwt_raw": self.expr_create_jwt_raw,
 69            "ak_is_group_member": BaseEvaluator.expr_is_group_member,
 70            "ak_logger": get_logger(self._filename).bind(),
 71            "ak_send_email": self.expr_send_email,
 72            "ak_user_by": BaseEvaluator.expr_user_by,
 73            "ak_user_has_authenticator": BaseEvaluator.expr_func_user_has_authenticator,
 74            "ip_address": ip_address,
 75            "ip_network": ip_network,
 76            "list_flatten": BaseEvaluator.expr_flatten,
 77            "regex_match": BaseEvaluator.expr_regex_match,
 78            "regex_replace": BaseEvaluator.expr_regex_replace,
 79            "requests": get_http_session(),
 80            "resolve_dns": BaseEvaluator.expr_resolve_dns,
 81            "reverse_dns": BaseEvaluator.expr_reverse_dns,
 82            "slugify": slugify,
 83        }
 84        self._context = {}
 85
 86    @cached(cache=TLRUCache(maxsize=32, ttu=lambda key, value, now: now + 180))
 87    @staticmethod
 88    def expr_resolve_dns(host: str, ip_version: int | None = None) -> list[str]:
 89        """Resolve host to a list of IPv4 and/or IPv6 addresses."""
 90        # Although it seems to be fine (raising OSError), docs warn
 91        # against passing `None` for both the host and the port
 92        # https://docs.python.org/3/library/socket.html#socket.getaddrinfo
 93        host = host or ""
 94
 95        ip_list = []
 96
 97        family = 0
 98        if ip_version == 4:  # noqa: PLR2004
 99            family = socket.AF_INET
100        if ip_version == 6:  # noqa: PLR2004
101            family = socket.AF_INET6
102
103        try:
104            for ip_addr in socket.getaddrinfo(host, None, family=family):
105                ip_list.append(str(ip_addr[4][0]))
106        except OSError:
107            pass
108        return list(set(ip_list))
109
110    @cached(cache=TLRUCache(maxsize=32, ttu=lambda key, value, now: now + 180))
111    @staticmethod
112    def expr_reverse_dns(ip_addr: str) -> str:
113        """Perform a reverse DNS lookup."""
114        try:
115            return socket.getfqdn(ip_addr)
116        except OSError:
117            return ip_addr
118
119    @staticmethod
120    def expr_flatten(value: list[Any] | Any) -> Any | None:
121        """Flatten `value` if its a list"""
122        if isinstance(value, list):
123            if len(value) < 1:
124                return None
125            return value[0]
126        return value
127
128    @staticmethod
129    def expr_regex_match(value: Any, regex: str) -> bool:
130        """Expression Filter to run re.search"""
131        return re.search(regex, value) is not None
132
133    @staticmethod
134    def expr_regex_replace(value: Any, regex: str, repl: str) -> str:
135        """Expression Filter to run re.sub"""
136        return re.sub(regex, repl, value)
137
138    @staticmethod
139    def expr_is_group_member(user: User, **group_filters) -> bool:
140        """Check if `user` is member of group with name `group_name`"""
141        return user.all_groups().filter(**group_filters).exists()
142
143    @staticmethod
144    def expr_user_by(**filters) -> User | None:
145        """Get user by filters"""
146        try:
147            users = User.objects.filter(**filters)
148            if users:
149                return users.first()
150            return None
151        except FieldError:
152            return None
153
154    @staticmethod
155    def expr_func_user_has_authenticator(user: User, device_type: str | None = None) -> bool:
156        """Check if a user has any authenticator devices, optionally matching *device_type*"""
157        user_devices = devices_for_user(user)
158        if device_type:
159            for device in user_devices:
160                device_class = device.__class__.__name__.lower().replace("device", "")
161                if device_class == device_type:
162                    return True
163            return False
164        return len(list(user_devices)) > 0
165
166    def expr_event_create(self, action: str, **kwargs):
167        """Create event with supplied data and try to extract as much relevant data
168        from the context"""
169        context = self._context.copy()
170        # If the result was a complex variable, we don't want to reuse it
171        context.pop("result", None)
172        context.pop("handler", None)
173        event_kwargs = context
174        event_kwargs.update(kwargs)
175        event = Event.new(
176            action,
177            app=self._filename,
178            **event_kwargs,
179        )
180        if "request" in context and isinstance(context["request"], PolicyRequest):
181            policy_request: PolicyRequest = context["request"]
182            if policy_request.http_request:
183                event.from_http(policy_request.http_request)
184                return
185        event.save()
186
187    def expr_func_call_policy(self, name: str, **kwargs) -> PolicyResult:
188        """Call policy by name, with current request"""
189        policy = Policy.objects.filter(name=name).select_subclasses().first()
190        if not policy:
191            raise ValueError(f"Policy '{name}' not found.")
192        user = self._context.get("user", get_anonymous_user())
193        req = PolicyRequest(user)
194        if "request" in self._context:
195            req = self._context["request"]
196        req.context.update(kwargs)
197        proc = PolicyProcess(PolicyBinding(policy=policy), request=req, connection=None)
198        return proc.profiling_wrapper()
199
200    def expr_create_jwt(
201        self,
202        user: User,
203        provider: OAuth2Provider | str,
204        scopes: list[str],
205        validity: str = "seconds=60",
206    ) -> str | None:
207        """Issue a JWT for a given provider"""
208        request: HttpRequest | None = self._context.get("http_request")
209        if not request:
210            return None
211        if not isinstance(provider, OAuth2Provider):
212            provider = OAuth2Provider.objects.get(name=provider)
213        session = None
214        if hasattr(request, "session") and request.session.session_key:
215            session = request.session["authenticatedsession"]
216        access_token = AccessToken(
217            provider=provider,
218            user=user,
219            expires=now() + timedelta_from_string(validity),
220            scope=scopes,
221            auth_time=now(),
222            session=session,
223        )
224        access_token.id_token = IDToken.new(provider, access_token, request)
225        access_token.save()
226        return access_token.token
227
228    def expr_create_jwt_raw(
229        self, provider: OAuth2Provider | str, validity: str = "seconds=60", **kwargs
230    ) -> str:
231        """Issue a JWT for a given provider with completely customized data"""
232        if not isinstance(provider, OAuth2Provider):
233            provider = OAuth2Provider.objects.get(name=provider)
234        kwargs["exp"] = int((now() + timedelta_from_string(validity)).timestamp())
235        kwargs["aud"] = provider.client_id
236        return provider.encode(kwargs)
237
238    def expr_send_email(  # noqa: PLR0913
239        self,
240        address: str | list[str],
241        subject: str,
242        body: str | None = None,
243        stage: EmailStage | None = None,
244        template: str | None = None,
245        context: dict | None = None,
246        cc: str | list[str] | None = None,
247        bcc: str | list[str] | None = None,
248    ) -> bool:
249        """Send an email using authentik's email system
250
251        Args:
252            address: Email address(es) to send to. Can be:
253                - Single email: "user@example.com"
254                - List of emails: ["user1@example.com", "user2@example.com"]
255            subject: Email subject
256            body: Email body (plain text/HTML). Mutually exclusive with template.
257            stage: EmailStage instance to use for settings. If None, uses global settings.
258            template: Template name to render. Mutually exclusive with body.
259            context: Additional context variables for template rendering.
260            cc: Email address(es) to CC. Same format as address.
261            bcc: Email address(es) to BCC. Same format as address.
262
263        Returns:
264            bool: True if email was queued successfully, False otherwise
265        """
266        # Deferred imports to avoid circular import issues
267        from authentik.stages.email.tasks import send_mails
268
269        if body and template:
270            raise ValueError("body and template parameters are mutually exclusive")
271
272        if not body and not template:
273            raise ValueError("Either body or template parameter must be provided")
274
275        to_addresses = normalize_addresses(address)
276        cc_addresses = normalize_addresses(cc)
277        bcc_addresses = normalize_addresses(bcc)
278
279        try:
280            if template is not None:
281                # Use all available context from the evaluator for template rendering
282                template_context = self._context.copy()
283                # Add any custom context passed to the function
284                if context:
285                    template_context.update(context)
286
287                # Use template rendering
288                message = TemplateEmailMessage(
289                    subject=subject,
290                    to=to_addresses,
291                    cc=cc_addresses,
292                    bcc=bcc_addresses,
293                    template_name=template,
294                    template_context=template_context,
295                )
296            else:
297                # Use plain body
298                message = TemplateEmailMessage(
299                    subject=subject,
300                    to=to_addresses,
301                    cc=cc_addresses,
302                    bcc=bcc_addresses,
303                    body=body,
304                )
305
306            send_mails(stage, message)
307            return True
308
309        except (SMTPException, ConnectionError, ValidationError, ValueError) as exc:
310            LOGGER.warning("Failed to send email", exc=exc, addresses=to_addresses, subject=subject)
311            return False
312
313    def wrap_expression(self, expression: str) -> str:
314        """Wrap expression in a function, call it, and save the result as `result`"""
315        handler_signature = ",".join(
316            [x for x in [sanitize_arg(x) for x in self._context.keys()] if x]
317        )
318        full_expression = ""
319        full_expression += f"def handler({handler_signature}):\n"
320        full_expression += indent(expression, "    ")
321        full_expression += f"\nresult = handler({handler_signature})"
322        return full_expression
323
324    def compile(self, expression: str) -> CodeType:
325        """Parse expression. Raises SyntaxError or ValueError if the syntax is incorrect."""
326        expression = self.wrap_expression(expression)
327        return compile(expression, self._filename, "exec")
328
329    def evaluate(self, expression_source: str) -> Any:
330        """Parse and evaluate expression. If the syntax is incorrect, a SyntaxError is raised.
331        If any exception is raised during execution, it is raised.
332        The result is returned without any type-checking."""
333        with start_span(op="authentik.lib.evaluator.evaluate") as span:
334            span: Span
335            span.description = self._filename
336            span.set_data("expression", expression_source)
337            try:
338                ast_obj = self.compile(expression_source)
339            except (SyntaxError, ValueError) as exc:
340                self.handle_error(exc, expression_source)
341                raise exc
342            try:
343                _locals = {sanitize_arg(x): y for x, y in self._context.items()}
344                # Yes this is an exec, yes it is potentially bad. Since we limit what variables are
345                # available here, and these policies can only be edited by admins, this is a risk
346                # we're willing to take.
347
348                exec(ast_obj, self._globals, _locals)  # nosec # noqa
349                result = _locals["result"]
350            except Exception as exc:
351                # So, this is a bit questionable. Essentially, we are edit the stacktrace
352                # so the user only sees information relevant to them
353                # and none of our surrounding error handling
354                if exc.__traceback__ is not None:
355                    exc.__traceback__ = exc.__traceback__.tb_next
356                if not isinstance(exc, ControlFlowException):
357                    self.handle_error(exc, expression_source)
358                raise exc
359            return result
360
361    def handle_error(self, exc: Exception, expression_source: str):  # pragma: no cover
362        """Exception Handler"""
363        LOGGER.warning("Expression error", exc=exc)
364
365    def validate(self, expression: str) -> bool:
366        """Validate expression's syntax, raise ValidationError if Syntax is invalid"""
367        try:
368            self.compile(expression)
369            return True
370        except (ValueError, SyntaxError) as exc:
371            raise ValidationError(f"Expression Syntax Error: {str(exc)}") from exc

Validate and evaluate python-based expressions

BaseEvaluator(filename: str | None = None)
60    def __init__(self, filename: str | None = None):
61        self._filename = filename if filename else "BaseEvaluator"
62        # update website/docs/expressions/_objects.md
63        # update website/docs/expressions/_functions.md
64        self._globals = {
65            "ak_call_policy": self.expr_func_call_policy,
66            "ak_create_event": self.expr_event_create,
67            "ak_create_jwt": self.expr_create_jwt,
68            "ak_create_jwt_raw": self.expr_create_jwt_raw,
69            "ak_is_group_member": BaseEvaluator.expr_is_group_member,
70            "ak_logger": get_logger(self._filename).bind(),
71            "ak_send_email": self.expr_send_email,
72            "ak_user_by": BaseEvaluator.expr_user_by,
73            "ak_user_has_authenticator": BaseEvaluator.expr_func_user_has_authenticator,
74            "ip_address": ip_address,
75            "ip_network": ip_network,
76            "list_flatten": BaseEvaluator.expr_flatten,
77            "regex_match": BaseEvaluator.expr_regex_match,
78            "regex_replace": BaseEvaluator.expr_regex_replace,
79            "requests": get_http_session(),
80            "resolve_dns": BaseEvaluator.expr_resolve_dns,
81            "reverse_dns": BaseEvaluator.expr_reverse_dns,
82            "slugify": slugify,
83        }
84        self._context = {}
@cached(cache=TLRUCache(maxsize=32, ttu=lambda key, value, now: now + 180))
@staticmethod
def expr_resolve_dns(host: str, ip_version: int | None = None) -> list[str]:
 86    @cached(cache=TLRUCache(maxsize=32, ttu=lambda key, value, now: now + 180))
 87    @staticmethod
 88    def expr_resolve_dns(host: str, ip_version: int | None = None) -> list[str]:
 89        """Resolve host to a list of IPv4 and/or IPv6 addresses."""
 90        # Although it seems to be fine (raising OSError), docs warn
 91        # against passing `None` for both the host and the port
 92        # https://docs.python.org/3/library/socket.html#socket.getaddrinfo
 93        host = host or ""
 94
 95        ip_list = []
 96
 97        family = 0
 98        if ip_version == 4:  # noqa: PLR2004
 99            family = socket.AF_INET
100        if ip_version == 6:  # noqa: PLR2004
101            family = socket.AF_INET6
102
103        try:
104            for ip_addr in socket.getaddrinfo(host, None, family=family):
105                ip_list.append(str(ip_addr[4][0]))
106        except OSError:
107            pass
108        return list(set(ip_list))

Resolve host to a list of IPv4 and/or IPv6 addresses.

@cached(cache=TLRUCache(maxsize=32, ttu=lambda key, value, now: now + 180))
@staticmethod
def expr_reverse_dns(ip_addr: str) -> str:
110    @cached(cache=TLRUCache(maxsize=32, ttu=lambda key, value, now: now + 180))
111    @staticmethod
112    def expr_reverse_dns(ip_addr: str) -> str:
113        """Perform a reverse DNS lookup."""
114        try:
115            return socket.getfqdn(ip_addr)
116        except OSError:
117            return ip_addr

Perform a reverse DNS lookup.

@staticmethod
def expr_flatten(value: list[Any] | Any) -> Any | None:
119    @staticmethod
120    def expr_flatten(value: list[Any] | Any) -> Any | None:
121        """Flatten `value` if its a list"""
122        if isinstance(value, list):
123            if len(value) < 1:
124                return None
125            return value[0]
126        return value

Flatten value if its a list

@staticmethod
def expr_regex_match(value: Any, regex: str) -> bool:
128    @staticmethod
129    def expr_regex_match(value: Any, regex: str) -> bool:
130        """Expression Filter to run re.search"""
131        return re.search(regex, value) is not None

Expression Filter to run re.search

@staticmethod
def expr_regex_replace(value: Any, regex: str, repl: str) -> str:
133    @staticmethod
134    def expr_regex_replace(value: Any, regex: str, repl: str) -> str:
135        """Expression Filter to run re.sub"""
136        return re.sub(regex, repl, value)

Expression Filter to run re.sub

@staticmethod
def expr_is_group_member(user: authentik.core.models.User, **group_filters) -> bool:
138    @staticmethod
139    def expr_is_group_member(user: User, **group_filters) -> bool:
140        """Check if `user` is member of group with name `group_name`"""
141        return user.all_groups().filter(**group_filters).exists()

Check if user is member of group with name group_name

@staticmethod
def expr_user_by(**filters) -> authentik.core.models.User | None:
143    @staticmethod
144    def expr_user_by(**filters) -> User | None:
145        """Get user by filters"""
146        try:
147            users = User.objects.filter(**filters)
148            if users:
149                return users.first()
150            return None
151        except FieldError:
152            return None

Get user by filters

@staticmethod
def expr_func_user_has_authenticator(user: authentik.core.models.User, device_type: str | None = None) -> bool:
154    @staticmethod
155    def expr_func_user_has_authenticator(user: User, device_type: str | None = None) -> bool:
156        """Check if a user has any authenticator devices, optionally matching *device_type*"""
157        user_devices = devices_for_user(user)
158        if device_type:
159            for device in user_devices:
160                device_class = device.__class__.__name__.lower().replace("device", "")
161                if device_class == device_type:
162                    return True
163            return False
164        return len(list(user_devices)) > 0

Check if a user has any authenticator devices, optionally matching device_type

def expr_event_create(self, action: str, **kwargs):
166    def expr_event_create(self, action: str, **kwargs):
167        """Create event with supplied data and try to extract as much relevant data
168        from the context"""
169        context = self._context.copy()
170        # If the result was a complex variable, we don't want to reuse it
171        context.pop("result", None)
172        context.pop("handler", None)
173        event_kwargs = context
174        event_kwargs.update(kwargs)
175        event = Event.new(
176            action,
177            app=self._filename,
178            **event_kwargs,
179        )
180        if "request" in context and isinstance(context["request"], PolicyRequest):
181            policy_request: PolicyRequest = context["request"]
182            if policy_request.http_request:
183                event.from_http(policy_request.http_request)
184                return
185        event.save()

Create event with supplied data and try to extract as much relevant data from the context

def expr_func_call_policy(self, name: str, **kwargs) -> authentik.policies.types.PolicyResult:
187    def expr_func_call_policy(self, name: str, **kwargs) -> PolicyResult:
188        """Call policy by name, with current request"""
189        policy = Policy.objects.filter(name=name).select_subclasses().first()
190        if not policy:
191            raise ValueError(f"Policy '{name}' not found.")
192        user = self._context.get("user", get_anonymous_user())
193        req = PolicyRequest(user)
194        if "request" in self._context:
195            req = self._context["request"]
196        req.context.update(kwargs)
197        proc = PolicyProcess(PolicyBinding(policy=policy), request=req, connection=None)
198        return proc.profiling_wrapper()

Call policy by name, with current request

def expr_create_jwt( self, user: authentik.core.models.User, provider: authentik.providers.oauth2.models.OAuth2Provider | str, scopes: list[str], validity: str = 'seconds=60') -> str | None:
200    def expr_create_jwt(
201        self,
202        user: User,
203        provider: OAuth2Provider | str,
204        scopes: list[str],
205        validity: str = "seconds=60",
206    ) -> str | None:
207        """Issue a JWT for a given provider"""
208        request: HttpRequest | None = self._context.get("http_request")
209        if not request:
210            return None
211        if not isinstance(provider, OAuth2Provider):
212            provider = OAuth2Provider.objects.get(name=provider)
213        session = None
214        if hasattr(request, "session") and request.session.session_key:
215            session = request.session["authenticatedsession"]
216        access_token = AccessToken(
217            provider=provider,
218            user=user,
219            expires=now() + timedelta_from_string(validity),
220            scope=scopes,
221            auth_time=now(),
222            session=session,
223        )
224        access_token.id_token = IDToken.new(provider, access_token, request)
225        access_token.save()
226        return access_token.token

Issue a JWT for a given provider

def expr_create_jwt_raw( self, provider: authentik.providers.oauth2.models.OAuth2Provider | str, validity: str = 'seconds=60', **kwargs) -> str:
228    def expr_create_jwt_raw(
229        self, provider: OAuth2Provider | str, validity: str = "seconds=60", **kwargs
230    ) -> str:
231        """Issue a JWT for a given provider with completely customized data"""
232        if not isinstance(provider, OAuth2Provider):
233            provider = OAuth2Provider.objects.get(name=provider)
234        kwargs["exp"] = int((now() + timedelta_from_string(validity)).timestamp())
235        kwargs["aud"] = provider.client_id
236        return provider.encode(kwargs)

Issue a JWT for a given provider with completely customized data

def expr_send_email(unknown):
238    def expr_send_email(  # noqa: PLR0913
239        self,
240        address: str | list[str],
241        subject: str,
242        body: str | None = None,
243        stage: EmailStage | None = None,
244        template: str | None = None,
245        context: dict | None = None,
246        cc: str | list[str] | None = None,
247        bcc: str | list[str] | None = None,
248    ) -> bool:
249        """Send an email using authentik's email system
250
251        Args:
252            address: Email address(es) to send to. Can be:
253                - Single email: "user@example.com"
254                - List of emails: ["user1@example.com", "user2@example.com"]
255            subject: Email subject
256            body: Email body (plain text/HTML). Mutually exclusive with template.
257            stage: EmailStage instance to use for settings. If None, uses global settings.
258            template: Template name to render. Mutually exclusive with body.
259            context: Additional context variables for template rendering.
260            cc: Email address(es) to CC. Same format as address.
261            bcc: Email address(es) to BCC. Same format as address.
262
263        Returns:
264            bool: True if email was queued successfully, False otherwise
265        """
266        # Deferred imports to avoid circular import issues
267        from authentik.stages.email.tasks import send_mails
268
269        if body and template:
270            raise ValueError("body and template parameters are mutually exclusive")
271
272        if not body and not template:
273            raise ValueError("Either body or template parameter must be provided")
274
275        to_addresses = normalize_addresses(address)
276        cc_addresses = normalize_addresses(cc)
277        bcc_addresses = normalize_addresses(bcc)
278
279        try:
280            if template is not None:
281                # Use all available context from the evaluator for template rendering
282                template_context = self._context.copy()
283                # Add any custom context passed to the function
284                if context:
285                    template_context.update(context)
286
287                # Use template rendering
288                message = TemplateEmailMessage(
289                    subject=subject,
290                    to=to_addresses,
291                    cc=cc_addresses,
292                    bcc=bcc_addresses,
293                    template_name=template,
294                    template_context=template_context,
295                )
296            else:
297                # Use plain body
298                message = TemplateEmailMessage(
299                    subject=subject,
300                    to=to_addresses,
301                    cc=cc_addresses,
302                    bcc=bcc_addresses,
303                    body=body,
304                )
305
306            send_mails(stage, message)
307            return True
308
309        except (SMTPException, ConnectionError, ValidationError, ValueError) as exc:
310            LOGGER.warning("Failed to send email", exc=exc, addresses=to_addresses, subject=subject)
311            return False

Send an email using authentik's email system

Args: address: Email address(es) to send to. Can be: - Single email: "user@example.com" - List of emails: ["user1@example.com", "user2@example.com"] subject: Email subject body: Email body (plain text/HTML). Mutually exclusive with template. stage: EmailStage instance to use for settings. If None, uses global settings. template: Template name to render. Mutually exclusive with body. context: Additional context variables for template rendering. cc: Email address(es) to CC. Same format as address. bcc: Email address(es) to BCC. Same format as address.

Returns: bool: True if email was queued successfully, False otherwise

def wrap_expression(self, expression: str) -> str:
313    def wrap_expression(self, expression: str) -> str:
314        """Wrap expression in a function, call it, and save the result as `result`"""
315        handler_signature = ",".join(
316            [x for x in [sanitize_arg(x) for x in self._context.keys()] if x]
317        )
318        full_expression = ""
319        full_expression += f"def handler({handler_signature}):\n"
320        full_expression += indent(expression, "    ")
321        full_expression += f"\nresult = handler({handler_signature})"
322        return full_expression

Wrap expression in a function, call it, and save the result as result

def compile(self, expression: str) -> code:
324    def compile(self, expression: str) -> CodeType:
325        """Parse expression. Raises SyntaxError or ValueError if the syntax is incorrect."""
326        expression = self.wrap_expression(expression)
327        return compile(expression, self._filename, "exec")

Parse expression. Raises SyntaxError or ValueError if the syntax is incorrect.

def evaluate(self, expression_source: str) -> Any:
329    def evaluate(self, expression_source: str) -> Any:
330        """Parse and evaluate expression. If the syntax is incorrect, a SyntaxError is raised.
331        If any exception is raised during execution, it is raised.
332        The result is returned without any type-checking."""
333        with start_span(op="authentik.lib.evaluator.evaluate") as span:
334            span: Span
335            span.description = self._filename
336            span.set_data("expression", expression_source)
337            try:
338                ast_obj = self.compile(expression_source)
339            except (SyntaxError, ValueError) as exc:
340                self.handle_error(exc, expression_source)
341                raise exc
342            try:
343                _locals = {sanitize_arg(x): y for x, y in self._context.items()}
344                # Yes this is an exec, yes it is potentially bad. Since we limit what variables are
345                # available here, and these policies can only be edited by admins, this is a risk
346                # we're willing to take.
347
348                exec(ast_obj, self._globals, _locals)  # nosec # noqa
349                result = _locals["result"]
350            except Exception as exc:
351                # So, this is a bit questionable. Essentially, we are edit the stacktrace
352                # so the user only sees information relevant to them
353                # and none of our surrounding error handling
354                if exc.__traceback__ is not None:
355                    exc.__traceback__ = exc.__traceback__.tb_next
356                if not isinstance(exc, ControlFlowException):
357                    self.handle_error(exc, expression_source)
358                raise exc
359            return result

Parse and evaluate expression. If the syntax is incorrect, a SyntaxError is raised. If any exception is raised during execution, it is raised. The result is returned without any type-checking.

def handle_error(self, exc: Exception, expression_source: str):
361    def handle_error(self, exc: Exception, expression_source: str):  # pragma: no cover
362        """Exception Handler"""
363        LOGGER.warning("Expression error", exc=exc)

Exception Handler

def validate(self, expression: str) -> bool:
365    def validate(self, expression: str) -> bool:
366        """Validate expression's syntax, raise ValidationError if Syntax is invalid"""
367        try:
368            self.compile(expression)
369            return True
370        except (ValueError, SyntaxError) as exc:
371            raise ValidationError(f"Expression Syntax Error: {str(exc)}") from exc

Validate expression's syntax, raise ValidationError if Syntax is invalid