authentik.lib.expression.evaluator
authentik expression policy evaluator
1"""authentik expression policy evaluator""" 2 3import re 4import socket 5from ipaddress import ip_address, ip_network 6from smtplib import SMTPException 7from textwrap import indent 8from types import CodeType 9from typing import TYPE_CHECKING, Any 10 11from cachetools import TLRUCache, cached 12from django.core.exceptions import FieldError 13from django.http import HttpRequest 14from django.utils.text import slugify 15from django.utils.timezone import now 16from guardian.shortcuts import get_anonymous_user 17from rest_framework.serializers import ValidationError 18from sentry_sdk import start_span 19from sentry_sdk.tracing import Span 20from structlog.stdlib import get_logger 21 22from authentik.core.models import User 23from authentik.events.models import Event 24from authentik.lib.expression.exceptions import ControlFlowException 25from authentik.lib.utils.email import normalize_addresses 26from authentik.lib.utils.http import get_http_session 27from authentik.lib.utils.time import timedelta_from_string 28from authentik.policies.models import Policy, PolicyBinding 29from authentik.policies.process import PolicyProcess 30from authentik.policies.types import PolicyRequest, PolicyResult 31from authentik.providers.oauth2.id_token import IDToken 32from authentik.providers.oauth2.models import AccessToken, OAuth2Provider 33from authentik.stages.authenticator import devices_for_user 34from authentik.stages.email.utils import TemplateEmailMessage 35 36if TYPE_CHECKING: 37 from authentik.stages.email.models import EmailStage 38 39LOGGER = get_logger() 40 41ARG_SANITIZE = re.compile(r"[:.-]") 42 43 44def sanitize_arg(arg_name: str) -> str: 45 return re.sub(ARG_SANITIZE, "_", slugify(arg_name)) 46 47 48class BaseEvaluator: 49 """Validate and evaluate python-based expressions""" 50 51 # Globals that can be used by function 52 _globals: dict[str, Any] 53 # Context passed as locals to exec() 54 _context: dict[str, Any] 55 56 # Filename used for exec 57 _filename: str 58 59 def __init__(self, filename: str | None = None): 60 self._filename = filename if filename else "BaseEvaluator" 61 # update website/docs/expressions/_objects.md 62 # update website/docs/expressions/_functions.md 63 self._globals = { 64 "ak_call_policy": self.expr_func_call_policy, 65 "ak_create_event": self.expr_event_create, 66 "ak_create_jwt": self.expr_create_jwt, 67 "ak_create_jwt_raw": self.expr_create_jwt_raw, 68 "ak_is_group_member": BaseEvaluator.expr_is_group_member, 69 "ak_logger": get_logger(self._filename).bind(), 70 "ak_send_email": self.expr_send_email, 71 "ak_user_by": BaseEvaluator.expr_user_by, 72 "ak_user_has_authenticator": BaseEvaluator.expr_func_user_has_authenticator, 73 "ip_address": ip_address, 74 "ip_network": ip_network, 75 "list_flatten": BaseEvaluator.expr_flatten, 76 "regex_match": BaseEvaluator.expr_regex_match, 77 "regex_replace": BaseEvaluator.expr_regex_replace, 78 "requests": get_http_session(), 79 "resolve_dns": BaseEvaluator.expr_resolve_dns, 80 "reverse_dns": BaseEvaluator.expr_reverse_dns, 81 "slugify": slugify, 82 } 83 self._context = {} 84 85 @cached(cache=TLRUCache(maxsize=32, ttu=lambda key, value, now: now + 180)) 86 @staticmethod 87 def expr_resolve_dns(host: str, ip_version: int | None = None) -> list[str]: 88 """Resolve host to a list of IPv4 and/or IPv6 addresses.""" 89 # Although it seems to be fine (raising OSError), docs warn 90 # against passing `None` for both the host and the port 91 # https://docs.python.org/3/library/socket.html#socket.getaddrinfo 92 host = host or "" 93 94 ip_list = [] 95 96 family = 0 97 if ip_version == 4: # noqa: PLR2004 98 family = socket.AF_INET 99 if ip_version == 6: # noqa: PLR2004 100 family = socket.AF_INET6 101 102 try: 103 for ip_addr in socket.getaddrinfo(host, None, family=family): 104 ip_list.append(str(ip_addr[4][0])) 105 except OSError: 106 pass 107 return list(set(ip_list)) 108 109 @cached(cache=TLRUCache(maxsize=32, ttu=lambda key, value, now: now + 180)) 110 @staticmethod 111 def expr_reverse_dns(ip_addr: str) -> str: 112 """Perform a reverse DNS lookup.""" 113 try: 114 return socket.getfqdn(ip_addr) 115 except OSError: 116 return ip_addr 117 118 @staticmethod 119 def expr_flatten(value: list[Any] | Any) -> Any | None: 120 """Flatten `value` if its a list""" 121 if isinstance(value, list): 122 if len(value) < 1: 123 return None 124 return value[0] 125 return value 126 127 @staticmethod 128 def expr_regex_match(value: Any, regex: str) -> bool: 129 """Expression Filter to run re.search""" 130 return re.search(regex, value) is not None 131 132 @staticmethod 133 def expr_regex_replace(value: Any, regex: str, repl: str) -> str: 134 """Expression Filter to run re.sub""" 135 return re.sub(regex, repl, value) 136 137 @staticmethod 138 def expr_is_group_member(user: User, **group_filters) -> bool: 139 """Check if `user` is member of group with name `group_name`""" 140 return user.all_groups().filter(**group_filters).exists() 141 142 @staticmethod 143 def expr_user_by(**filters) -> User | None: 144 """Get user by filters""" 145 try: 146 users = User.objects.filter(**filters) 147 if users: 148 return users.first() 149 return None 150 except FieldError: 151 return None 152 153 @staticmethod 154 def expr_func_user_has_authenticator(user: User, device_type: str | None = None) -> bool: 155 """Check if a user has any authenticator devices, optionally matching *device_type*""" 156 user_devices = devices_for_user(user) 157 if device_type: 158 for device in user_devices: 159 device_class = device.__class__.__name__.lower().replace("device", "") 160 if device_class == device_type: 161 return True 162 return False 163 return len(list(user_devices)) > 0 164 165 def expr_event_create(self, action: str, **kwargs): 166 """Create event with supplied data and try to extract as much relevant data 167 from the context""" 168 context = self._context.copy() 169 # If the result was a complex variable, we don't want to reuse it 170 context.pop("result", None) 171 context.pop("handler", None) 172 event_kwargs = context 173 event_kwargs.update(kwargs) 174 event = Event.new( 175 action, 176 app=self._filename, 177 **event_kwargs, 178 ) 179 if "request" in context and isinstance(context["request"], PolicyRequest): 180 policy_request: PolicyRequest = context["request"] 181 if policy_request.http_request: 182 event.from_http(policy_request.http_request) 183 return 184 event.save() 185 186 def expr_func_call_policy(self, name: str, **kwargs) -> PolicyResult: 187 """Call policy by name, with current request""" 188 policy = Policy.objects.filter(name=name).select_subclasses().first() 189 if not policy: 190 raise ValueError(f"Policy '{name}' not found.") 191 user = self._context.get("user", get_anonymous_user()) 192 req = PolicyRequest(user) 193 if "request" in self._context: 194 req = self._context["request"] 195 req.context.update(kwargs) 196 proc = PolicyProcess(PolicyBinding(policy=policy), request=req, connection=None) 197 return proc.profiling_wrapper() 198 199 def expr_create_jwt( 200 self, 201 user: User, 202 provider: OAuth2Provider | str, 203 scopes: list[str], 204 validity: str = "seconds=60", 205 ) -> str | None: 206 """Issue a JWT for a given provider""" 207 request: HttpRequest | None = self._context.get("http_request") 208 if not request: 209 return None 210 if not isinstance(provider, OAuth2Provider): 211 provider = OAuth2Provider.objects.get(name=provider) 212 session = None 213 if hasattr(request, "session") and request.session.session_key: 214 session = request.session["authenticatedsession"] 215 access_token = AccessToken( 216 provider=provider, 217 user=user, 218 expires=now() + timedelta_from_string(validity), 219 scope=scopes, 220 auth_time=now(), 221 session=session, 222 ) 223 access_token.id_token = IDToken.new(provider, access_token, request) 224 access_token.save() 225 return access_token.token 226 227 def expr_create_jwt_raw( 228 self, provider: OAuth2Provider | str, validity: str = "seconds=60", **kwargs 229 ) -> str: 230 """Issue a JWT for a given provider with completely customized data""" 231 if not isinstance(provider, OAuth2Provider): 232 provider = OAuth2Provider.objects.get(name=provider) 233 kwargs["exp"] = int((now() + timedelta_from_string(validity)).timestamp()) 234 kwargs["aud"] = provider.client_id 235 return provider.encode(kwargs) 236 237 def expr_send_email( # noqa: PLR0913 238 self, 239 address: str | list[str], 240 subject: str, 241 body: str | None = None, 242 stage: EmailStage | None = None, 243 template: str | None = None, 244 context: dict | None = None, 245 cc: str | list[str] | None = None, 246 bcc: str | list[str] | None = None, 247 ) -> bool: 248 """Send an email using authentik's email system 249 250 Args: 251 address: Email address(es) to send to. Can be: 252 - Single email: "user@example.com" 253 - List of emails: ["user1@example.com", "user2@example.com"] 254 subject: Email subject 255 body: Email body (plain text/HTML). Mutually exclusive with template. 256 stage: EmailStage instance to use for settings. If None, uses global settings. 257 template: Template name to render. Mutually exclusive with body. 258 context: Additional context variables for template rendering. 259 cc: Email address(es) to CC. Same format as address. 260 bcc: Email address(es) to BCC. Same format as address. 261 262 Returns: 263 bool: True if email was queued successfully, False otherwise 264 """ 265 # Deferred imports to avoid circular import issues 266 from authentik.stages.email.tasks import send_mails 267 268 if body and template: 269 raise ValueError("body and template parameters are mutually exclusive") 270 271 if not body and not template: 272 raise ValueError("Either body or template parameter must be provided") 273 274 to_addresses = normalize_addresses(address) 275 cc_addresses = normalize_addresses(cc) 276 bcc_addresses = normalize_addresses(bcc) 277 278 try: 279 if template is not None: 280 # Use all available context from the evaluator for template rendering 281 template_context = self._context.copy() 282 # Add any custom context passed to the function 283 if context: 284 template_context.update(context) 285 286 # Use template rendering 287 message = TemplateEmailMessage( 288 subject=subject, 289 to=to_addresses, 290 cc=cc_addresses, 291 bcc=bcc_addresses, 292 template_name=template, 293 template_context=template_context, 294 ) 295 else: 296 # Use plain body 297 message = TemplateEmailMessage( 298 subject=subject, 299 to=to_addresses, 300 cc=cc_addresses, 301 bcc=bcc_addresses, 302 body=body, 303 ) 304 305 send_mails(stage, message) 306 return True 307 308 except (SMTPException, ConnectionError, ValidationError, ValueError) as exc: 309 LOGGER.warning("Failed to send email", exc=exc, addresses=to_addresses, subject=subject) 310 return False 311 312 def wrap_expression(self, expression: str) -> str: 313 """Wrap expression in a function, call it, and save the result as `result`""" 314 handler_signature = ",".join( 315 [x for x in [sanitize_arg(x) for x in self._context.keys()] if x] 316 ) 317 full_expression = "" 318 full_expression += f"def handler({handler_signature}):\n" 319 full_expression += indent(expression, " ") 320 full_expression += f"\nresult = handler({handler_signature})" 321 return full_expression 322 323 def compile(self, expression: str) -> CodeType: 324 """Parse expression. Raises SyntaxError or ValueError if the syntax is incorrect.""" 325 expression = self.wrap_expression(expression) 326 return compile(expression, self._filename, "exec") 327 328 def evaluate(self, expression_source: str) -> Any: 329 """Parse and evaluate expression. If the syntax is incorrect, a SyntaxError is raised. 330 If any exception is raised during execution, it is raised. 331 The result is returned without any type-checking.""" 332 with start_span(op="authentik.lib.evaluator.evaluate") as span: 333 span: Span 334 span.description = self._filename 335 span.set_data("expression", expression_source) 336 try: 337 ast_obj = self.compile(expression_source) 338 except (SyntaxError, ValueError) as exc: 339 self.handle_error(exc, expression_source) 340 raise exc 341 try: 342 _locals = {sanitize_arg(x): y for x, y in self._context.items()} 343 # Yes this is an exec, yes it is potentially bad. Since we limit what variables are 344 # available here, and these policies can only be edited by admins, this is a risk 345 # we're willing to take. 346 347 exec(ast_obj, self._globals, _locals) # nosec # noqa 348 result = _locals["result"] 349 except Exception as exc: 350 # So, this is a bit questionable. Essentially, we are edit the stacktrace 351 # so the user only sees information relevant to them 352 # and none of our surrounding error handling 353 if exc.__traceback__ is not None: 354 exc.__traceback__ = exc.__traceback__.tb_next 355 if not isinstance(exc, ControlFlowException): 356 self.handle_error(exc, expression_source) 357 raise exc 358 return result 359 360 def handle_error(self, exc: Exception, expression_source: str): # pragma: no cover 361 """Exception Handler""" 362 LOGGER.warning("Expression error", exc=exc) 363 364 def validate(self, expression: str) -> bool: 365 """Validate expression's syntax, raise ValidationError if Syntax is invalid""" 366 try: 367 self.compile(expression) 368 return True 369 except (ValueError, SyntaxError) as exc: 370 raise ValidationError(f"Expression Syntax Error: {str(exc)}") from exc
49class BaseEvaluator: 50 """Validate and evaluate python-based expressions""" 51 52 # Globals that can be used by function 53 _globals: dict[str, Any] 54 # Context passed as locals to exec() 55 _context: dict[str, Any] 56 57 # Filename used for exec 58 _filename: str 59 60 def __init__(self, filename: str | None = None): 61 self._filename = filename if filename else "BaseEvaluator" 62 # update website/docs/expressions/_objects.md 63 # update website/docs/expressions/_functions.md 64 self._globals = { 65 "ak_call_policy": self.expr_func_call_policy, 66 "ak_create_event": self.expr_event_create, 67 "ak_create_jwt": self.expr_create_jwt, 68 "ak_create_jwt_raw": self.expr_create_jwt_raw, 69 "ak_is_group_member": BaseEvaluator.expr_is_group_member, 70 "ak_logger": get_logger(self._filename).bind(), 71 "ak_send_email": self.expr_send_email, 72 "ak_user_by": BaseEvaluator.expr_user_by, 73 "ak_user_has_authenticator": BaseEvaluator.expr_func_user_has_authenticator, 74 "ip_address": ip_address, 75 "ip_network": ip_network, 76 "list_flatten": BaseEvaluator.expr_flatten, 77 "regex_match": BaseEvaluator.expr_regex_match, 78 "regex_replace": BaseEvaluator.expr_regex_replace, 79 "requests": get_http_session(), 80 "resolve_dns": BaseEvaluator.expr_resolve_dns, 81 "reverse_dns": BaseEvaluator.expr_reverse_dns, 82 "slugify": slugify, 83 } 84 self._context = {} 85 86 @cached(cache=TLRUCache(maxsize=32, ttu=lambda key, value, now: now + 180)) 87 @staticmethod 88 def expr_resolve_dns(host: str, ip_version: int | None = None) -> list[str]: 89 """Resolve host to a list of IPv4 and/or IPv6 addresses.""" 90 # Although it seems to be fine (raising OSError), docs warn 91 # against passing `None` for both the host and the port 92 # https://docs.python.org/3/library/socket.html#socket.getaddrinfo 93 host = host or "" 94 95 ip_list = [] 96 97 family = 0 98 if ip_version == 4: # noqa: PLR2004 99 family = socket.AF_INET 100 if ip_version == 6: # noqa: PLR2004 101 family = socket.AF_INET6 102 103 try: 104 for ip_addr in socket.getaddrinfo(host, None, family=family): 105 ip_list.append(str(ip_addr[4][0])) 106 except OSError: 107 pass 108 return list(set(ip_list)) 109 110 @cached(cache=TLRUCache(maxsize=32, ttu=lambda key, value, now: now + 180)) 111 @staticmethod 112 def expr_reverse_dns(ip_addr: str) -> str: 113 """Perform a reverse DNS lookup.""" 114 try: 115 return socket.getfqdn(ip_addr) 116 except OSError: 117 return ip_addr 118 119 @staticmethod 120 def expr_flatten(value: list[Any] | Any) -> Any | None: 121 """Flatten `value` if its a list""" 122 if isinstance(value, list): 123 if len(value) < 1: 124 return None 125 return value[0] 126 return value 127 128 @staticmethod 129 def expr_regex_match(value: Any, regex: str) -> bool: 130 """Expression Filter to run re.search""" 131 return re.search(regex, value) is not None 132 133 @staticmethod 134 def expr_regex_replace(value: Any, regex: str, repl: str) -> str: 135 """Expression Filter to run re.sub""" 136 return re.sub(regex, repl, value) 137 138 @staticmethod 139 def expr_is_group_member(user: User, **group_filters) -> bool: 140 """Check if `user` is member of group with name `group_name`""" 141 return user.all_groups().filter(**group_filters).exists() 142 143 @staticmethod 144 def expr_user_by(**filters) -> User | None: 145 """Get user by filters""" 146 try: 147 users = User.objects.filter(**filters) 148 if users: 149 return users.first() 150 return None 151 except FieldError: 152 return None 153 154 @staticmethod 155 def expr_func_user_has_authenticator(user: User, device_type: str | None = None) -> bool: 156 """Check if a user has any authenticator devices, optionally matching *device_type*""" 157 user_devices = devices_for_user(user) 158 if device_type: 159 for device in user_devices: 160 device_class = device.__class__.__name__.lower().replace("device", "") 161 if device_class == device_type: 162 return True 163 return False 164 return len(list(user_devices)) > 0 165 166 def expr_event_create(self, action: str, **kwargs): 167 """Create event with supplied data and try to extract as much relevant data 168 from the context""" 169 context = self._context.copy() 170 # If the result was a complex variable, we don't want to reuse it 171 context.pop("result", None) 172 context.pop("handler", None) 173 event_kwargs = context 174 event_kwargs.update(kwargs) 175 event = Event.new( 176 action, 177 app=self._filename, 178 **event_kwargs, 179 ) 180 if "request" in context and isinstance(context["request"], PolicyRequest): 181 policy_request: PolicyRequest = context["request"] 182 if policy_request.http_request: 183 event.from_http(policy_request.http_request) 184 return 185 event.save() 186 187 def expr_func_call_policy(self, name: str, **kwargs) -> PolicyResult: 188 """Call policy by name, with current request""" 189 policy = Policy.objects.filter(name=name).select_subclasses().first() 190 if not policy: 191 raise ValueError(f"Policy '{name}' not found.") 192 user = self._context.get("user", get_anonymous_user()) 193 req = PolicyRequest(user) 194 if "request" in self._context: 195 req = self._context["request"] 196 req.context.update(kwargs) 197 proc = PolicyProcess(PolicyBinding(policy=policy), request=req, connection=None) 198 return proc.profiling_wrapper() 199 200 def expr_create_jwt( 201 self, 202 user: User, 203 provider: OAuth2Provider | str, 204 scopes: list[str], 205 validity: str = "seconds=60", 206 ) -> str | None: 207 """Issue a JWT for a given provider""" 208 request: HttpRequest | None = self._context.get("http_request") 209 if not request: 210 return None 211 if not isinstance(provider, OAuth2Provider): 212 provider = OAuth2Provider.objects.get(name=provider) 213 session = None 214 if hasattr(request, "session") and request.session.session_key: 215 session = request.session["authenticatedsession"] 216 access_token = AccessToken( 217 provider=provider, 218 user=user, 219 expires=now() + timedelta_from_string(validity), 220 scope=scopes, 221 auth_time=now(), 222 session=session, 223 ) 224 access_token.id_token = IDToken.new(provider, access_token, request) 225 access_token.save() 226 return access_token.token 227 228 def expr_create_jwt_raw( 229 self, provider: OAuth2Provider | str, validity: str = "seconds=60", **kwargs 230 ) -> str: 231 """Issue a JWT for a given provider with completely customized data""" 232 if not isinstance(provider, OAuth2Provider): 233 provider = OAuth2Provider.objects.get(name=provider) 234 kwargs["exp"] = int((now() + timedelta_from_string(validity)).timestamp()) 235 kwargs["aud"] = provider.client_id 236 return provider.encode(kwargs) 237 238 def expr_send_email( # noqa: PLR0913 239 self, 240 address: str | list[str], 241 subject: str, 242 body: str | None = None, 243 stage: EmailStage | None = None, 244 template: str | None = None, 245 context: dict | None = None, 246 cc: str | list[str] | None = None, 247 bcc: str | list[str] | None = None, 248 ) -> bool: 249 """Send an email using authentik's email system 250 251 Args: 252 address: Email address(es) to send to. Can be: 253 - Single email: "user@example.com" 254 - List of emails: ["user1@example.com", "user2@example.com"] 255 subject: Email subject 256 body: Email body (plain text/HTML). Mutually exclusive with template. 257 stage: EmailStage instance to use for settings. If None, uses global settings. 258 template: Template name to render. Mutually exclusive with body. 259 context: Additional context variables for template rendering. 260 cc: Email address(es) to CC. Same format as address. 261 bcc: Email address(es) to BCC. Same format as address. 262 263 Returns: 264 bool: True if email was queued successfully, False otherwise 265 """ 266 # Deferred imports to avoid circular import issues 267 from authentik.stages.email.tasks import send_mails 268 269 if body and template: 270 raise ValueError("body and template parameters are mutually exclusive") 271 272 if not body and not template: 273 raise ValueError("Either body or template parameter must be provided") 274 275 to_addresses = normalize_addresses(address) 276 cc_addresses = normalize_addresses(cc) 277 bcc_addresses = normalize_addresses(bcc) 278 279 try: 280 if template is not None: 281 # Use all available context from the evaluator for template rendering 282 template_context = self._context.copy() 283 # Add any custom context passed to the function 284 if context: 285 template_context.update(context) 286 287 # Use template rendering 288 message = TemplateEmailMessage( 289 subject=subject, 290 to=to_addresses, 291 cc=cc_addresses, 292 bcc=bcc_addresses, 293 template_name=template, 294 template_context=template_context, 295 ) 296 else: 297 # Use plain body 298 message = TemplateEmailMessage( 299 subject=subject, 300 to=to_addresses, 301 cc=cc_addresses, 302 bcc=bcc_addresses, 303 body=body, 304 ) 305 306 send_mails(stage, message) 307 return True 308 309 except (SMTPException, ConnectionError, ValidationError, ValueError) as exc: 310 LOGGER.warning("Failed to send email", exc=exc, addresses=to_addresses, subject=subject) 311 return False 312 313 def wrap_expression(self, expression: str) -> str: 314 """Wrap expression in a function, call it, and save the result as `result`""" 315 handler_signature = ",".join( 316 [x for x in [sanitize_arg(x) for x in self._context.keys()] if x] 317 ) 318 full_expression = "" 319 full_expression += f"def handler({handler_signature}):\n" 320 full_expression += indent(expression, " ") 321 full_expression += f"\nresult = handler({handler_signature})" 322 return full_expression 323 324 def compile(self, expression: str) -> CodeType: 325 """Parse expression. Raises SyntaxError or ValueError if the syntax is incorrect.""" 326 expression = self.wrap_expression(expression) 327 return compile(expression, self._filename, "exec") 328 329 def evaluate(self, expression_source: str) -> Any: 330 """Parse and evaluate expression. If the syntax is incorrect, a SyntaxError is raised. 331 If any exception is raised during execution, it is raised. 332 The result is returned without any type-checking.""" 333 with start_span(op="authentik.lib.evaluator.evaluate") as span: 334 span: Span 335 span.description = self._filename 336 span.set_data("expression", expression_source) 337 try: 338 ast_obj = self.compile(expression_source) 339 except (SyntaxError, ValueError) as exc: 340 self.handle_error(exc, expression_source) 341 raise exc 342 try: 343 _locals = {sanitize_arg(x): y for x, y in self._context.items()} 344 # Yes this is an exec, yes it is potentially bad. Since we limit what variables are 345 # available here, and these policies can only be edited by admins, this is a risk 346 # we're willing to take. 347 348 exec(ast_obj, self._globals, _locals) # nosec # noqa 349 result = _locals["result"] 350 except Exception as exc: 351 # So, this is a bit questionable. Essentially, we are edit the stacktrace 352 # so the user only sees information relevant to them 353 # and none of our surrounding error handling 354 if exc.__traceback__ is not None: 355 exc.__traceback__ = exc.__traceback__.tb_next 356 if not isinstance(exc, ControlFlowException): 357 self.handle_error(exc, expression_source) 358 raise exc 359 return result 360 361 def handle_error(self, exc: Exception, expression_source: str): # pragma: no cover 362 """Exception Handler""" 363 LOGGER.warning("Expression error", exc=exc) 364 365 def validate(self, expression: str) -> bool: 366 """Validate expression's syntax, raise ValidationError if Syntax is invalid""" 367 try: 368 self.compile(expression) 369 return True 370 except (ValueError, SyntaxError) as exc: 371 raise ValidationError(f"Expression Syntax Error: {str(exc)}") from exc
Validate and evaluate python-based expressions
60 def __init__(self, filename: str | None = None): 61 self._filename = filename if filename else "BaseEvaluator" 62 # update website/docs/expressions/_objects.md 63 # update website/docs/expressions/_functions.md 64 self._globals = { 65 "ak_call_policy": self.expr_func_call_policy, 66 "ak_create_event": self.expr_event_create, 67 "ak_create_jwt": self.expr_create_jwt, 68 "ak_create_jwt_raw": self.expr_create_jwt_raw, 69 "ak_is_group_member": BaseEvaluator.expr_is_group_member, 70 "ak_logger": get_logger(self._filename).bind(), 71 "ak_send_email": self.expr_send_email, 72 "ak_user_by": BaseEvaluator.expr_user_by, 73 "ak_user_has_authenticator": BaseEvaluator.expr_func_user_has_authenticator, 74 "ip_address": ip_address, 75 "ip_network": ip_network, 76 "list_flatten": BaseEvaluator.expr_flatten, 77 "regex_match": BaseEvaluator.expr_regex_match, 78 "regex_replace": BaseEvaluator.expr_regex_replace, 79 "requests": get_http_session(), 80 "resolve_dns": BaseEvaluator.expr_resolve_dns, 81 "reverse_dns": BaseEvaluator.expr_reverse_dns, 82 "slugify": slugify, 83 } 84 self._context = {}
86 @cached(cache=TLRUCache(maxsize=32, ttu=lambda key, value, now: now + 180)) 87 @staticmethod 88 def expr_resolve_dns(host: str, ip_version: int | None = None) -> list[str]: 89 """Resolve host to a list of IPv4 and/or IPv6 addresses.""" 90 # Although it seems to be fine (raising OSError), docs warn 91 # against passing `None` for both the host and the port 92 # https://docs.python.org/3/library/socket.html#socket.getaddrinfo 93 host = host or "" 94 95 ip_list = [] 96 97 family = 0 98 if ip_version == 4: # noqa: PLR2004 99 family = socket.AF_INET 100 if ip_version == 6: # noqa: PLR2004 101 family = socket.AF_INET6 102 103 try: 104 for ip_addr in socket.getaddrinfo(host, None, family=family): 105 ip_list.append(str(ip_addr[4][0])) 106 except OSError: 107 pass 108 return list(set(ip_list))
Resolve host to a list of IPv4 and/or IPv6 addresses.
110 @cached(cache=TLRUCache(maxsize=32, ttu=lambda key, value, now: now + 180)) 111 @staticmethod 112 def expr_reverse_dns(ip_addr: str) -> str: 113 """Perform a reverse DNS lookup.""" 114 try: 115 return socket.getfqdn(ip_addr) 116 except OSError: 117 return ip_addr
Perform a reverse DNS lookup.
119 @staticmethod 120 def expr_flatten(value: list[Any] | Any) -> Any | None: 121 """Flatten `value` if its a list""" 122 if isinstance(value, list): 123 if len(value) < 1: 124 return None 125 return value[0] 126 return value
Flatten value if its a list
128 @staticmethod 129 def expr_regex_match(value: Any, regex: str) -> bool: 130 """Expression Filter to run re.search""" 131 return re.search(regex, value) is not None
Expression Filter to run re.search
133 @staticmethod 134 def expr_regex_replace(value: Any, regex: str, repl: str) -> str: 135 """Expression Filter to run re.sub""" 136 return re.sub(regex, repl, value)
Expression Filter to run re.sub
138 @staticmethod 139 def expr_is_group_member(user: User, **group_filters) -> bool: 140 """Check if `user` is member of group with name `group_name`""" 141 return user.all_groups().filter(**group_filters).exists()
Check if user is member of group with name group_name
143 @staticmethod 144 def expr_user_by(**filters) -> User | None: 145 """Get user by filters""" 146 try: 147 users = User.objects.filter(**filters) 148 if users: 149 return users.first() 150 return None 151 except FieldError: 152 return None
Get user by filters
154 @staticmethod 155 def expr_func_user_has_authenticator(user: User, device_type: str | None = None) -> bool: 156 """Check if a user has any authenticator devices, optionally matching *device_type*""" 157 user_devices = devices_for_user(user) 158 if device_type: 159 for device in user_devices: 160 device_class = device.__class__.__name__.lower().replace("device", "") 161 if device_class == device_type: 162 return True 163 return False 164 return len(list(user_devices)) > 0
Check if a user has any authenticator devices, optionally matching device_type
166 def expr_event_create(self, action: str, **kwargs): 167 """Create event with supplied data and try to extract as much relevant data 168 from the context""" 169 context = self._context.copy() 170 # If the result was a complex variable, we don't want to reuse it 171 context.pop("result", None) 172 context.pop("handler", None) 173 event_kwargs = context 174 event_kwargs.update(kwargs) 175 event = Event.new( 176 action, 177 app=self._filename, 178 **event_kwargs, 179 ) 180 if "request" in context and isinstance(context["request"], PolicyRequest): 181 policy_request: PolicyRequest = context["request"] 182 if policy_request.http_request: 183 event.from_http(policy_request.http_request) 184 return 185 event.save()
Create event with supplied data and try to extract as much relevant data from the context
187 def expr_func_call_policy(self, name: str, **kwargs) -> PolicyResult: 188 """Call policy by name, with current request""" 189 policy = Policy.objects.filter(name=name).select_subclasses().first() 190 if not policy: 191 raise ValueError(f"Policy '{name}' not found.") 192 user = self._context.get("user", get_anonymous_user()) 193 req = PolicyRequest(user) 194 if "request" in self._context: 195 req = self._context["request"] 196 req.context.update(kwargs) 197 proc = PolicyProcess(PolicyBinding(policy=policy), request=req, connection=None) 198 return proc.profiling_wrapper()
Call policy by name, with current request
200 def expr_create_jwt( 201 self, 202 user: User, 203 provider: OAuth2Provider | str, 204 scopes: list[str], 205 validity: str = "seconds=60", 206 ) -> str | None: 207 """Issue a JWT for a given provider""" 208 request: HttpRequest | None = self._context.get("http_request") 209 if not request: 210 return None 211 if not isinstance(provider, OAuth2Provider): 212 provider = OAuth2Provider.objects.get(name=provider) 213 session = None 214 if hasattr(request, "session") and request.session.session_key: 215 session = request.session["authenticatedsession"] 216 access_token = AccessToken( 217 provider=provider, 218 user=user, 219 expires=now() + timedelta_from_string(validity), 220 scope=scopes, 221 auth_time=now(), 222 session=session, 223 ) 224 access_token.id_token = IDToken.new(provider, access_token, request) 225 access_token.save() 226 return access_token.token
Issue a JWT for a given provider
228 def expr_create_jwt_raw( 229 self, provider: OAuth2Provider | str, validity: str = "seconds=60", **kwargs 230 ) -> str: 231 """Issue a JWT for a given provider with completely customized data""" 232 if not isinstance(provider, OAuth2Provider): 233 provider = OAuth2Provider.objects.get(name=provider) 234 kwargs["exp"] = int((now() + timedelta_from_string(validity)).timestamp()) 235 kwargs["aud"] = provider.client_id 236 return provider.encode(kwargs)
Issue a JWT for a given provider with completely customized data
238 def expr_send_email( # noqa: PLR0913 239 self, 240 address: str | list[str], 241 subject: str, 242 body: str | None = None, 243 stage: EmailStage | None = None, 244 template: str | None = None, 245 context: dict | None = None, 246 cc: str | list[str] | None = None, 247 bcc: str | list[str] | None = None, 248 ) -> bool: 249 """Send an email using authentik's email system 250 251 Args: 252 address: Email address(es) to send to. Can be: 253 - Single email: "user@example.com" 254 - List of emails: ["user1@example.com", "user2@example.com"] 255 subject: Email subject 256 body: Email body (plain text/HTML). Mutually exclusive with template. 257 stage: EmailStage instance to use for settings. If None, uses global settings. 258 template: Template name to render. Mutually exclusive with body. 259 context: Additional context variables for template rendering. 260 cc: Email address(es) to CC. Same format as address. 261 bcc: Email address(es) to BCC. Same format as address. 262 263 Returns: 264 bool: True if email was queued successfully, False otherwise 265 """ 266 # Deferred imports to avoid circular import issues 267 from authentik.stages.email.tasks import send_mails 268 269 if body and template: 270 raise ValueError("body and template parameters are mutually exclusive") 271 272 if not body and not template: 273 raise ValueError("Either body or template parameter must be provided") 274 275 to_addresses = normalize_addresses(address) 276 cc_addresses = normalize_addresses(cc) 277 bcc_addresses = normalize_addresses(bcc) 278 279 try: 280 if template is not None: 281 # Use all available context from the evaluator for template rendering 282 template_context = self._context.copy() 283 # Add any custom context passed to the function 284 if context: 285 template_context.update(context) 286 287 # Use template rendering 288 message = TemplateEmailMessage( 289 subject=subject, 290 to=to_addresses, 291 cc=cc_addresses, 292 bcc=bcc_addresses, 293 template_name=template, 294 template_context=template_context, 295 ) 296 else: 297 # Use plain body 298 message = TemplateEmailMessage( 299 subject=subject, 300 to=to_addresses, 301 cc=cc_addresses, 302 bcc=bcc_addresses, 303 body=body, 304 ) 305 306 send_mails(stage, message) 307 return True 308 309 except (SMTPException, ConnectionError, ValidationError, ValueError) as exc: 310 LOGGER.warning("Failed to send email", exc=exc, addresses=to_addresses, subject=subject) 311 return False
Send an email using authentik's email system
Args: address: Email address(es) to send to. Can be: - Single email: "user@example.com" - List of emails: ["user1@example.com", "user2@example.com"] subject: Email subject body: Email body (plain text/HTML). Mutually exclusive with template. stage: EmailStage instance to use for settings. If None, uses global settings. template: Template name to render. Mutually exclusive with body. context: Additional context variables for template rendering. cc: Email address(es) to CC. Same format as address. bcc: Email address(es) to BCC. Same format as address.
Returns: bool: True if email was queued successfully, False otherwise
313 def wrap_expression(self, expression: str) -> str: 314 """Wrap expression in a function, call it, and save the result as `result`""" 315 handler_signature = ",".join( 316 [x for x in [sanitize_arg(x) for x in self._context.keys()] if x] 317 ) 318 full_expression = "" 319 full_expression += f"def handler({handler_signature}):\n" 320 full_expression += indent(expression, " ") 321 full_expression += f"\nresult = handler({handler_signature})" 322 return full_expression
Wrap expression in a function, call it, and save the result as result
324 def compile(self, expression: str) -> CodeType: 325 """Parse expression. Raises SyntaxError or ValueError if the syntax is incorrect.""" 326 expression = self.wrap_expression(expression) 327 return compile(expression, self._filename, "exec")
Parse expression. Raises SyntaxError or ValueError if the syntax is incorrect.
329 def evaluate(self, expression_source: str) -> Any: 330 """Parse and evaluate expression. If the syntax is incorrect, a SyntaxError is raised. 331 If any exception is raised during execution, it is raised. 332 The result is returned without any type-checking.""" 333 with start_span(op="authentik.lib.evaluator.evaluate") as span: 334 span: Span 335 span.description = self._filename 336 span.set_data("expression", expression_source) 337 try: 338 ast_obj = self.compile(expression_source) 339 except (SyntaxError, ValueError) as exc: 340 self.handle_error(exc, expression_source) 341 raise exc 342 try: 343 _locals = {sanitize_arg(x): y for x, y in self._context.items()} 344 # Yes this is an exec, yes it is potentially bad. Since we limit what variables are 345 # available here, and these policies can only be edited by admins, this is a risk 346 # we're willing to take. 347 348 exec(ast_obj, self._globals, _locals) # nosec # noqa 349 result = _locals["result"] 350 except Exception as exc: 351 # So, this is a bit questionable. Essentially, we are edit the stacktrace 352 # so the user only sees information relevant to them 353 # and none of our surrounding error handling 354 if exc.__traceback__ is not None: 355 exc.__traceback__ = exc.__traceback__.tb_next 356 if not isinstance(exc, ControlFlowException): 357 self.handle_error(exc, expression_source) 358 raise exc 359 return result
Parse and evaluate expression. If the syntax is incorrect, a SyntaxError is raised. If any exception is raised during execution, it is raised. The result is returned without any type-checking.
361 def handle_error(self, exc: Exception, expression_source: str): # pragma: no cover 362 """Exception Handler""" 363 LOGGER.warning("Expression error", exc=exc)
Exception Handler
365 def validate(self, expression: str) -> bool: 366 """Validate expression's syntax, raise ValidationError if Syntax is invalid""" 367 try: 368 self.compile(expression) 369 return True 370 except (ValueError, SyntaxError) as exc: 371 raise ValidationError(f"Expression Syntax Error: {str(exc)}") from exc
Validate expression's syntax, raise ValidationError if Syntax is invalid