authentik.lib.tests.test_evaluator
Test Evaluator base functions
1"""Test Evaluator base functions""" 2 3from pathlib import Path 4from unittest.mock import patch 5 6from django.test import RequestFactory, TestCase 7from django.urls import reverse 8from jwt import decode 9 10from authentik.blueprints.tests import apply_blueprint 11from authentik.core.tests.utils import create_test_admin_user, create_test_flow, create_test_user 12from authentik.events.models import Event 13from authentik.lib.expression.evaluator import BaseEvaluator 14from authentik.lib.generators import generate_id 15from authentik.providers.oauth2.models import OAuth2Provider, ScopeMapping 16 17 18class TestEvaluator(TestCase): 19 """Test Evaluator base functions""" 20 21 def test_expr_regex_match(self): 22 """Test expr_regex_match""" 23 self.assertFalse(BaseEvaluator.expr_regex_match("foo", "bar")) 24 self.assertTrue(BaseEvaluator.expr_regex_match("foo", "foo")) 25 26 def test_expr_regex_replace(self): 27 """Test expr_regex_replace""" 28 self.assertEqual(BaseEvaluator.expr_regex_replace("foo", "o", "a"), "faa") 29 30 def test_expr_user_by(self): 31 """Test expr_user_by""" 32 user = create_test_admin_user() 33 self.assertIsNotNone(BaseEvaluator.expr_user_by(username=user.username)) 34 self.assertIsNone(BaseEvaluator.expr_user_by(username="bar")) 35 self.assertIsNone(BaseEvaluator.expr_user_by(foo="bar")) 36 37 def test_expr_is_group_member(self): 38 """Test expr_is_group_member""" 39 self.assertFalse(BaseEvaluator.expr_is_group_member(create_test_admin_user(), name="test")) 40 41 def test_expr_event_create(self): 42 """Test expr_event_create""" 43 evaluator = BaseEvaluator(generate_id()) 44 evaluator._context = { 45 "foo": "bar", 46 } 47 evaluator.evaluate("ak_create_event('foo', bar='baz')") 48 event = Event.objects.filter(action="custom_foo").first() 49 self.assertIsNotNone(event) 50 self.assertEqual(event.context, {"bar": "baz", "foo": "bar"}) 51 52 @apply_blueprint("system/providers-oauth2.yaml") 53 def test_expr_create_jwt(self): 54 """Test expr_create_jwt""" 55 rf = RequestFactory() 56 user = create_test_user() 57 provider = OAuth2Provider.objects.create( 58 name=generate_id(), 59 authorization_flow=create_test_flow(), 60 ) 61 provider.property_mappings.set( 62 ScopeMapping.objects.filter( 63 managed__in=[ 64 "goauthentik.io/providers/oauth2/scope-openid", 65 "goauthentik.io/providers/oauth2/scope-email", 66 "goauthentik.io/providers/oauth2/scope-profile", 67 ] 68 ) 69 ) 70 evaluator = BaseEvaluator(generate_id()) 71 evaluator._context = { 72 "http_request": rf.get(reverse("authentik_core:root-redirect")), 73 "user": user, 74 "provider": provider.name, 75 } 76 jwt = evaluator.evaluate( 77 "return ak_create_jwt(user, provider, ['openid', 'email', 'profile'])" 78 ) 79 decoded = decode( 80 jwt, provider.client_secret, algorithms=["HS256"], audience=provider.client_id 81 ) 82 self.assertEqual(decoded["preferred_username"], user.username) 83 84 def test_expr_create_jwt_raw(self): 85 """Test expr_create_jwt_raw""" 86 rf = RequestFactory() 87 user = create_test_user() 88 provider = OAuth2Provider.objects.create( 89 name=generate_id(), 90 authorization_flow=create_test_flow(), 91 ) 92 evaluator = BaseEvaluator(generate_id()) 93 evaluator._context = { 94 "http_request": rf.get(reverse("authentik_core:root-redirect")), 95 "user": user, 96 "provider": provider.name, 97 } 98 jwt = evaluator.evaluate("return ak_create_jwt_raw(provider, foo='bar')") 99 decoded = decode( 100 jwt, provider.client_secret, algorithms=["HS256"], audience=provider.client_id 101 ) 102 self.assertEqual(decoded["foo"], "bar") 103 104 @patch("authentik.stages.email.tasks.send_mails") 105 def test_expr_send_email_with_body(self, mock_send_mails): 106 """Test ak_send_email with body parameter""" 107 user = create_test_user() 108 evaluator = BaseEvaluator(generate_id()) 109 evaluator._context = {"user": user} 110 111 # Test sending email with body 112 result = evaluator.evaluate( 113 "return ak_send_email('test@example.com', 'Test Subject', body='Test Body')" 114 ) 115 116 self.assertTrue(result) 117 mock_send_mails.assert_called_once() 118 119 # Verify the call arguments - send_mails is called with (stage, message) 120 args, kwargs = mock_send_mails.call_args 121 stage, message = args 122 123 # Check that global settings are used (stage is None) 124 self.assertIsNone(stage) 125 126 # Check message properties 127 self.assertEqual(message.subject, "Test Subject") 128 self.assertEqual(message.to, ["test@example.com"]) 129 self.assertEqual(message.body, "Test Body") 130 131 @patch("authentik.stages.email.tasks.send_mails") 132 def test_expr_send_email_with_template(self, mock_send_mails): 133 """Test ak_send_email with template parameter""" 134 user = create_test_user() 135 evaluator = BaseEvaluator(generate_id()) 136 evaluator._context = {"user": user} 137 138 # Test sending email with template 139 result = evaluator.evaluate( 140 "return ak_send_email('test@example.com', 'Test Subject', " 141 "template='email/password_reset.html')" 142 ) 143 144 self.assertTrue(result) 145 mock_send_mails.assert_called_once() 146 147 def test_expr_send_email_validation_errors(self): 148 """Test ak_send_email validation errors""" 149 evaluator = BaseEvaluator(generate_id()) 150 151 # Test error when both body and template are provided 152 with self.assertRaises(ValueError) as cm: 153 evaluator.evaluate( 154 "return ak_send_email('test@example.com', 'Test', " 155 "body='Body', template='template.html')" 156 ) 157 self.assertIn("mutually exclusive", str(cm.exception)) 158 159 # Test error when neither body nor template are provided 160 with self.assertRaises(ValueError) as cm: 161 evaluator.evaluate("return ak_send_email('test@example.com', 'Test')") 162 self.assertIn("Either body or template parameter must be provided", str(cm.exception)) 163 164 @patch("authentik.stages.email.tasks.send_mails") 165 def test_expr_send_email_with_custom_stage(self, mock_send_mails): 166 """Test ak_send_email with custom EmailStage""" 167 from authentik.stages.email.models import EmailStage 168 169 user = create_test_user() 170 custom_stage = EmailStage( 171 name="custom-stage", use_global_settings=False, from_address="custom@example.com" 172 ) 173 174 evaluator = BaseEvaluator(generate_id()) 175 evaluator._context = {"user": user, "custom_stage": custom_stage} 176 177 # Test sending email with custom stage 178 result = evaluator.evaluate( 179 "return ak_send_email('test@example.com', 'Test Subject', " 180 "body='Test Body', stage=custom_stage)" 181 ) 182 183 self.assertTrue(result) 184 mock_send_mails.assert_called_once() 185 186 # Verify the custom stage was used 187 args, kwargs = mock_send_mails.call_args 188 stage, message = args 189 190 self.assertEqual(stage, custom_stage) 191 self.assertFalse(stage.use_global_settings) 192 193 @patch("authentik.stages.email.tasks.send_mails") 194 def test_expr_send_email_with_context(self, mock_send_mails): 195 """Test ak_send_email with custom context parameter""" 196 user = create_test_user() 197 evaluator = BaseEvaluator(generate_id()) 198 evaluator._context = {"user": user, "request_id": "123"} 199 200 # Test sending email with template and custom context 201 result = evaluator.evaluate( 202 "return ak_send_email('test@example.com', 'Test Subject', " 203 "template='email/password_reset.html', " 204 "context={'url': 'http://localhost', 'expires': '2026-01-01'})" 205 ) 206 207 self.assertTrue(result) 208 mock_send_mails.assert_called_once() 209 210 # Verify the call arguments - send_mails is called with (stage, message) 211 args, kwargs = mock_send_mails.call_args 212 stage, message = args 213 214 # Check that global settings are used (stage is None) 215 self.assertIsNone(stage) 216 217 self.assertEqual(message.subject, "Test Subject") 218 self.assertEqual(message.to, ["test@example.com"]) 219 self.assertIn("2026-01-01", message.body) 220 self.assertIn("http://localhost", message.body) 221 222 @patch("authentik.stages.email.tasks.send_mails") 223 def test_expr_send_email_multiple_addresses(self, mock_send_mails): 224 """Test ak_send_email with multiple email addresses""" 225 user = create_test_user() 226 evaluator = BaseEvaluator(generate_id()) 227 evaluator._context = {"user": user} 228 229 # Test sending email to multiple addresses 230 result = evaluator.evaluate( 231 "return ak_send_email(['user1@example.com', 'user2@example.com'], " 232 "'Test Subject', body='Test Body')" 233 ) 234 235 self.assertTrue(result) 236 mock_send_mails.assert_called_once() 237 238 # Verify the call arguments - send_mails is called with (stage, message) 239 args, kwargs = mock_send_mails.call_args 240 stage, message = args 241 242 # Check that global settings are used (stage is None) 243 self.assertIsNone(stage) 244 245 # Check message properties - should have multiple recipients 246 self.assertEqual(message.subject, "Test Subject") 247 self.assertEqual(message.to, ["user1@example.com", "user2@example.com"]) 248 self.assertEqual(message.body, "Test Body") 249 250 def test_expr_send_email_multiple_addresses_validation(self): 251 """Test ak_send_email validation with multiple addresses""" 252 evaluator = BaseEvaluator(generate_id()) 253 254 # Test error when empty list is provided 255 with self.assertRaises(ValueError) as cm: 256 evaluator.evaluate("return ak_send_email([], 'Test', body='Body')") 257 self.assertIn("Address list cannot be empty", str(cm.exception)) 258 259 # Test error when invalid type is provided 260 with self.assertRaises(ValueError) as cm: 261 evaluator.evaluate("return ak_send_email(123, 'Test', body='Body')") 262 self.assertIn("Address must be a string or list of strings", str(cm.exception)) 263 264 @patch("authentik.stages.email.tasks.send_mails") 265 def test_expr_send_email_with_cc(self, mock_send_mails): 266 """Test ak_send_email with CC parameter""" 267 user = create_test_user() 268 evaluator = BaseEvaluator(generate_id()) 269 evaluator._context = {"user": user} 270 271 # Test sending email with single CC address 272 result = evaluator.evaluate( 273 "return ak_send_email('to@example.com', 'Test Subject', " 274 "body='Test Body', cc='cc@example.com')" 275 ) 276 277 self.assertTrue(result) 278 mock_send_mails.assert_called_once() 279 280 args, kwargs = mock_send_mails.call_args 281 stage, message = args 282 283 self.assertEqual(message.to, ["to@example.com"]) 284 self.assertEqual(message.cc, ["cc@example.com"]) 285 self.assertEqual(message.bcc, []) 286 287 @patch("authentik.stages.email.tasks.send_mails") 288 def test_expr_send_email_with_bcc(self, mock_send_mails): 289 """Test ak_send_email with BCC parameter""" 290 user = create_test_user() 291 evaluator = BaseEvaluator(generate_id()) 292 evaluator._context = {"user": user} 293 294 # Test sending email with single BCC address 295 result = evaluator.evaluate( 296 "return ak_send_email('to@example.com', 'Test Subject', " 297 "body='Test Body', bcc='bcc@example.com')" 298 ) 299 300 self.assertTrue(result) 301 mock_send_mails.assert_called_once() 302 303 args, kwargs = mock_send_mails.call_args 304 stage, message = args 305 306 self.assertEqual(message.to, ["to@example.com"]) 307 self.assertEqual(message.cc, []) 308 self.assertEqual(message.bcc, ["bcc@example.com"]) 309 310 @patch("authentik.stages.email.tasks.send_mails") 311 def test_expr_send_email_with_cc_and_bcc(self, mock_send_mails): 312 """Test ak_send_email with both CC and BCC parameters""" 313 user = create_test_user() 314 evaluator = BaseEvaluator(generate_id()) 315 evaluator._context = {"user": user} 316 317 # Test sending email with both CC and BCC 318 result = evaluator.evaluate( 319 "return ak_send_email('to@example.com', 'Test Subject', " 320 "body='Test Body', cc='cc@example.com', bcc='bcc@example.com')" 321 ) 322 323 self.assertTrue(result) 324 mock_send_mails.assert_called_once() 325 326 args, kwargs = mock_send_mails.call_args 327 stage, message = args 328 329 self.assertEqual(message.to, ["to@example.com"]) 330 self.assertEqual(message.cc, ["cc@example.com"]) 331 self.assertEqual(message.bcc, ["bcc@example.com"]) 332 333 @patch("authentik.stages.email.tasks.send_mails") 334 def test_expr_send_email_with_multiple_cc_bcc(self, mock_send_mails): 335 """Test ak_send_email with multiple CC and BCC addresses""" 336 user = create_test_user() 337 evaluator = BaseEvaluator(generate_id()) 338 evaluator._context = {"user": user} 339 340 # Test sending email with multiple CC and BCC addresses 341 result = evaluator.evaluate( 342 "return ak_send_email('to@example.com', 'Test Subject', " 343 "body='Test Body', " 344 "cc=['cc1@example.com', 'cc2@example.com'], " 345 "bcc=['bcc1@example.com', 'bcc2@example.com'])" 346 ) 347 348 self.assertTrue(result) 349 mock_send_mails.assert_called_once() 350 351 args, kwargs = mock_send_mails.call_args 352 stage, message = args 353 354 self.assertEqual(message.to, ["to@example.com"]) 355 self.assertEqual(message.cc, ["cc1@example.com", "cc2@example.com"]) 356 self.assertEqual(message.bcc, ["bcc1@example.com", "bcc2@example.com"]) 357 358 def test_expr_arg_escape(self): 359 """Test escaping of arguments""" 360 eval = BaseEvaluator() 361 eval._context = { 362 'z=getattr(getattr(__import__("os"), "popen")("id > /tmp/test"), "read")()': "bar", 363 "@@": "baz", 364 "{{": "baz", 365 "aa@@": "baz", 366 } 367 res = eval.evaluate("return locals()") 368 self.assertEqual( 369 res, {"zgetattrgetattr__import__os_popenid_tmptest_read": "bar", "aa": "baz"} 370 ) 371 self.assertFalse(Path("/tmp/test").exists())
class
TestEvaluator(django.test.testcases.TestCase):
19class TestEvaluator(TestCase): 20 """Test Evaluator base functions""" 21 22 def test_expr_regex_match(self): 23 """Test expr_regex_match""" 24 self.assertFalse(BaseEvaluator.expr_regex_match("foo", "bar")) 25 self.assertTrue(BaseEvaluator.expr_regex_match("foo", "foo")) 26 27 def test_expr_regex_replace(self): 28 """Test expr_regex_replace""" 29 self.assertEqual(BaseEvaluator.expr_regex_replace("foo", "o", "a"), "faa") 30 31 def test_expr_user_by(self): 32 """Test expr_user_by""" 33 user = create_test_admin_user() 34 self.assertIsNotNone(BaseEvaluator.expr_user_by(username=user.username)) 35 self.assertIsNone(BaseEvaluator.expr_user_by(username="bar")) 36 self.assertIsNone(BaseEvaluator.expr_user_by(foo="bar")) 37 38 def test_expr_is_group_member(self): 39 """Test expr_is_group_member""" 40 self.assertFalse(BaseEvaluator.expr_is_group_member(create_test_admin_user(), name="test")) 41 42 def test_expr_event_create(self): 43 """Test expr_event_create""" 44 evaluator = BaseEvaluator(generate_id()) 45 evaluator._context = { 46 "foo": "bar", 47 } 48 evaluator.evaluate("ak_create_event('foo', bar='baz')") 49 event = Event.objects.filter(action="custom_foo").first() 50 self.assertIsNotNone(event) 51 self.assertEqual(event.context, {"bar": "baz", "foo": "bar"}) 52 53 @apply_blueprint("system/providers-oauth2.yaml") 54 def test_expr_create_jwt(self): 55 """Test expr_create_jwt""" 56 rf = RequestFactory() 57 user = create_test_user() 58 provider = OAuth2Provider.objects.create( 59 name=generate_id(), 60 authorization_flow=create_test_flow(), 61 ) 62 provider.property_mappings.set( 63 ScopeMapping.objects.filter( 64 managed__in=[ 65 "goauthentik.io/providers/oauth2/scope-openid", 66 "goauthentik.io/providers/oauth2/scope-email", 67 "goauthentik.io/providers/oauth2/scope-profile", 68 ] 69 ) 70 ) 71 evaluator = BaseEvaluator(generate_id()) 72 evaluator._context = { 73 "http_request": rf.get(reverse("authentik_core:root-redirect")), 74 "user": user, 75 "provider": provider.name, 76 } 77 jwt = evaluator.evaluate( 78 "return ak_create_jwt(user, provider, ['openid', 'email', 'profile'])" 79 ) 80 decoded = decode( 81 jwt, provider.client_secret, algorithms=["HS256"], audience=provider.client_id 82 ) 83 self.assertEqual(decoded["preferred_username"], user.username) 84 85 def test_expr_create_jwt_raw(self): 86 """Test expr_create_jwt_raw""" 87 rf = RequestFactory() 88 user = create_test_user() 89 provider = OAuth2Provider.objects.create( 90 name=generate_id(), 91 authorization_flow=create_test_flow(), 92 ) 93 evaluator = BaseEvaluator(generate_id()) 94 evaluator._context = { 95 "http_request": rf.get(reverse("authentik_core:root-redirect")), 96 "user": user, 97 "provider": provider.name, 98 } 99 jwt = evaluator.evaluate("return ak_create_jwt_raw(provider, foo='bar')") 100 decoded = decode( 101 jwt, provider.client_secret, algorithms=["HS256"], audience=provider.client_id 102 ) 103 self.assertEqual(decoded["foo"], "bar") 104 105 @patch("authentik.stages.email.tasks.send_mails") 106 def test_expr_send_email_with_body(self, mock_send_mails): 107 """Test ak_send_email with body parameter""" 108 user = create_test_user() 109 evaluator = BaseEvaluator(generate_id()) 110 evaluator._context = {"user": user} 111 112 # Test sending email with body 113 result = evaluator.evaluate( 114 "return ak_send_email('test@example.com', 'Test Subject', body='Test Body')" 115 ) 116 117 self.assertTrue(result) 118 mock_send_mails.assert_called_once() 119 120 # Verify the call arguments - send_mails is called with (stage, message) 121 args, kwargs = mock_send_mails.call_args 122 stage, message = args 123 124 # Check that global settings are used (stage is None) 125 self.assertIsNone(stage) 126 127 # Check message properties 128 self.assertEqual(message.subject, "Test Subject") 129 self.assertEqual(message.to, ["test@example.com"]) 130 self.assertEqual(message.body, "Test Body") 131 132 @patch("authentik.stages.email.tasks.send_mails") 133 def test_expr_send_email_with_template(self, mock_send_mails): 134 """Test ak_send_email with template parameter""" 135 user = create_test_user() 136 evaluator = BaseEvaluator(generate_id()) 137 evaluator._context = {"user": user} 138 139 # Test sending email with template 140 result = evaluator.evaluate( 141 "return ak_send_email('test@example.com', 'Test Subject', " 142 "template='email/password_reset.html')" 143 ) 144 145 self.assertTrue(result) 146 mock_send_mails.assert_called_once() 147 148 def test_expr_send_email_validation_errors(self): 149 """Test ak_send_email validation errors""" 150 evaluator = BaseEvaluator(generate_id()) 151 152 # Test error when both body and template are provided 153 with self.assertRaises(ValueError) as cm: 154 evaluator.evaluate( 155 "return ak_send_email('test@example.com', 'Test', " 156 "body='Body', template='template.html')" 157 ) 158 self.assertIn("mutually exclusive", str(cm.exception)) 159 160 # Test error when neither body nor template are provided 161 with self.assertRaises(ValueError) as cm: 162 evaluator.evaluate("return ak_send_email('test@example.com', 'Test')") 163 self.assertIn("Either body or template parameter must be provided", str(cm.exception)) 164 165 @patch("authentik.stages.email.tasks.send_mails") 166 def test_expr_send_email_with_custom_stage(self, mock_send_mails): 167 """Test ak_send_email with custom EmailStage""" 168 from authentik.stages.email.models import EmailStage 169 170 user = create_test_user() 171 custom_stage = EmailStage( 172 name="custom-stage", use_global_settings=False, from_address="custom@example.com" 173 ) 174 175 evaluator = BaseEvaluator(generate_id()) 176 evaluator._context = {"user": user, "custom_stage": custom_stage} 177 178 # Test sending email with custom stage 179 result = evaluator.evaluate( 180 "return ak_send_email('test@example.com', 'Test Subject', " 181 "body='Test Body', stage=custom_stage)" 182 ) 183 184 self.assertTrue(result) 185 mock_send_mails.assert_called_once() 186 187 # Verify the custom stage was used 188 args, kwargs = mock_send_mails.call_args 189 stage, message = args 190 191 self.assertEqual(stage, custom_stage) 192 self.assertFalse(stage.use_global_settings) 193 194 @patch("authentik.stages.email.tasks.send_mails") 195 def test_expr_send_email_with_context(self, mock_send_mails): 196 """Test ak_send_email with custom context parameter""" 197 user = create_test_user() 198 evaluator = BaseEvaluator(generate_id()) 199 evaluator._context = {"user": user, "request_id": "123"} 200 201 # Test sending email with template and custom context 202 result = evaluator.evaluate( 203 "return ak_send_email('test@example.com', 'Test Subject', " 204 "template='email/password_reset.html', " 205 "context={'url': 'http://localhost', 'expires': '2026-01-01'})" 206 ) 207 208 self.assertTrue(result) 209 mock_send_mails.assert_called_once() 210 211 # Verify the call arguments - send_mails is called with (stage, message) 212 args, kwargs = mock_send_mails.call_args 213 stage, message = args 214 215 # Check that global settings are used (stage is None) 216 self.assertIsNone(stage) 217 218 self.assertEqual(message.subject, "Test Subject") 219 self.assertEqual(message.to, ["test@example.com"]) 220 self.assertIn("2026-01-01", message.body) 221 self.assertIn("http://localhost", message.body) 222 223 @patch("authentik.stages.email.tasks.send_mails") 224 def test_expr_send_email_multiple_addresses(self, mock_send_mails): 225 """Test ak_send_email with multiple email addresses""" 226 user = create_test_user() 227 evaluator = BaseEvaluator(generate_id()) 228 evaluator._context = {"user": user} 229 230 # Test sending email to multiple addresses 231 result = evaluator.evaluate( 232 "return ak_send_email(['user1@example.com', 'user2@example.com'], " 233 "'Test Subject', body='Test Body')" 234 ) 235 236 self.assertTrue(result) 237 mock_send_mails.assert_called_once() 238 239 # Verify the call arguments - send_mails is called with (stage, message) 240 args, kwargs = mock_send_mails.call_args 241 stage, message = args 242 243 # Check that global settings are used (stage is None) 244 self.assertIsNone(stage) 245 246 # Check message properties - should have multiple recipients 247 self.assertEqual(message.subject, "Test Subject") 248 self.assertEqual(message.to, ["user1@example.com", "user2@example.com"]) 249 self.assertEqual(message.body, "Test Body") 250 251 def test_expr_send_email_multiple_addresses_validation(self): 252 """Test ak_send_email validation with multiple addresses""" 253 evaluator = BaseEvaluator(generate_id()) 254 255 # Test error when empty list is provided 256 with self.assertRaises(ValueError) as cm: 257 evaluator.evaluate("return ak_send_email([], 'Test', body='Body')") 258 self.assertIn("Address list cannot be empty", str(cm.exception)) 259 260 # Test error when invalid type is provided 261 with self.assertRaises(ValueError) as cm: 262 evaluator.evaluate("return ak_send_email(123, 'Test', body='Body')") 263 self.assertIn("Address must be a string or list of strings", str(cm.exception)) 264 265 @patch("authentik.stages.email.tasks.send_mails") 266 def test_expr_send_email_with_cc(self, mock_send_mails): 267 """Test ak_send_email with CC parameter""" 268 user = create_test_user() 269 evaluator = BaseEvaluator(generate_id()) 270 evaluator._context = {"user": user} 271 272 # Test sending email with single CC address 273 result = evaluator.evaluate( 274 "return ak_send_email('to@example.com', 'Test Subject', " 275 "body='Test Body', cc='cc@example.com')" 276 ) 277 278 self.assertTrue(result) 279 mock_send_mails.assert_called_once() 280 281 args, kwargs = mock_send_mails.call_args 282 stage, message = args 283 284 self.assertEqual(message.to, ["to@example.com"]) 285 self.assertEqual(message.cc, ["cc@example.com"]) 286 self.assertEqual(message.bcc, []) 287 288 @patch("authentik.stages.email.tasks.send_mails") 289 def test_expr_send_email_with_bcc(self, mock_send_mails): 290 """Test ak_send_email with BCC parameter""" 291 user = create_test_user() 292 evaluator = BaseEvaluator(generate_id()) 293 evaluator._context = {"user": user} 294 295 # Test sending email with single BCC address 296 result = evaluator.evaluate( 297 "return ak_send_email('to@example.com', 'Test Subject', " 298 "body='Test Body', bcc='bcc@example.com')" 299 ) 300 301 self.assertTrue(result) 302 mock_send_mails.assert_called_once() 303 304 args, kwargs = mock_send_mails.call_args 305 stage, message = args 306 307 self.assertEqual(message.to, ["to@example.com"]) 308 self.assertEqual(message.cc, []) 309 self.assertEqual(message.bcc, ["bcc@example.com"]) 310 311 @patch("authentik.stages.email.tasks.send_mails") 312 def test_expr_send_email_with_cc_and_bcc(self, mock_send_mails): 313 """Test ak_send_email with both CC and BCC parameters""" 314 user = create_test_user() 315 evaluator = BaseEvaluator(generate_id()) 316 evaluator._context = {"user": user} 317 318 # Test sending email with both CC and BCC 319 result = evaluator.evaluate( 320 "return ak_send_email('to@example.com', 'Test Subject', " 321 "body='Test Body', cc='cc@example.com', bcc='bcc@example.com')" 322 ) 323 324 self.assertTrue(result) 325 mock_send_mails.assert_called_once() 326 327 args, kwargs = mock_send_mails.call_args 328 stage, message = args 329 330 self.assertEqual(message.to, ["to@example.com"]) 331 self.assertEqual(message.cc, ["cc@example.com"]) 332 self.assertEqual(message.bcc, ["bcc@example.com"]) 333 334 @patch("authentik.stages.email.tasks.send_mails") 335 def test_expr_send_email_with_multiple_cc_bcc(self, mock_send_mails): 336 """Test ak_send_email with multiple CC and BCC addresses""" 337 user = create_test_user() 338 evaluator = BaseEvaluator(generate_id()) 339 evaluator._context = {"user": user} 340 341 # Test sending email with multiple CC and BCC addresses 342 result = evaluator.evaluate( 343 "return ak_send_email('to@example.com', 'Test Subject', " 344 "body='Test Body', " 345 "cc=['cc1@example.com', 'cc2@example.com'], " 346 "bcc=['bcc1@example.com', 'bcc2@example.com'])" 347 ) 348 349 self.assertTrue(result) 350 mock_send_mails.assert_called_once() 351 352 args, kwargs = mock_send_mails.call_args 353 stage, message = args 354 355 self.assertEqual(message.to, ["to@example.com"]) 356 self.assertEqual(message.cc, ["cc1@example.com", "cc2@example.com"]) 357 self.assertEqual(message.bcc, ["bcc1@example.com", "bcc2@example.com"]) 358 359 def test_expr_arg_escape(self): 360 """Test escaping of arguments""" 361 eval = BaseEvaluator() 362 eval._context = { 363 'z=getattr(getattr(__import__("os"), "popen")("id > /tmp/test"), "read")()': "bar", 364 "@@": "baz", 365 "{{": "baz", 366 "aa@@": "baz", 367 } 368 res = eval.evaluate("return locals()") 369 self.assertEqual( 370 res, {"zgetattrgetattr__import__os_popenid_tmptest_read": "bar", "aa": "baz"} 371 ) 372 self.assertFalse(Path("/tmp/test").exists())
Test Evaluator base functions
def
test_expr_regex_match(self):
22 def test_expr_regex_match(self): 23 """Test expr_regex_match""" 24 self.assertFalse(BaseEvaluator.expr_regex_match("foo", "bar")) 25 self.assertTrue(BaseEvaluator.expr_regex_match("foo", "foo"))
Test expr_regex_match
def
test_expr_regex_replace(self):
27 def test_expr_regex_replace(self): 28 """Test expr_regex_replace""" 29 self.assertEqual(BaseEvaluator.expr_regex_replace("foo", "o", "a"), "faa")
Test expr_regex_replace
def
test_expr_user_by(self):
31 def test_expr_user_by(self): 32 """Test expr_user_by""" 33 user = create_test_admin_user() 34 self.assertIsNotNone(BaseEvaluator.expr_user_by(username=user.username)) 35 self.assertIsNone(BaseEvaluator.expr_user_by(username="bar")) 36 self.assertIsNone(BaseEvaluator.expr_user_by(foo="bar"))
Test expr_user_by
def
test_expr_is_group_member(self):
38 def test_expr_is_group_member(self): 39 """Test expr_is_group_member""" 40 self.assertFalse(BaseEvaluator.expr_is_group_member(create_test_admin_user(), name="test"))
Test expr_is_group_member
def
test_expr_event_create(self):
42 def test_expr_event_create(self): 43 """Test expr_event_create""" 44 evaluator = BaseEvaluator(generate_id()) 45 evaluator._context = { 46 "foo": "bar", 47 } 48 evaluator.evaluate("ak_create_event('foo', bar='baz')") 49 event = Event.objects.filter(action="custom_foo").first() 50 self.assertIsNotNone(event) 51 self.assertEqual(event.context, {"bar": "baz", "foo": "bar"})
Test expr_event_create
@apply_blueprint('system/providers-oauth2.yaml')
def
test_expr_create_jwt(self):
53 @apply_blueprint("system/providers-oauth2.yaml") 54 def test_expr_create_jwt(self): 55 """Test expr_create_jwt""" 56 rf = RequestFactory() 57 user = create_test_user() 58 provider = OAuth2Provider.objects.create( 59 name=generate_id(), 60 authorization_flow=create_test_flow(), 61 ) 62 provider.property_mappings.set( 63 ScopeMapping.objects.filter( 64 managed__in=[ 65 "goauthentik.io/providers/oauth2/scope-openid", 66 "goauthentik.io/providers/oauth2/scope-email", 67 "goauthentik.io/providers/oauth2/scope-profile", 68 ] 69 ) 70 ) 71 evaluator = BaseEvaluator(generate_id()) 72 evaluator._context = { 73 "http_request": rf.get(reverse("authentik_core:root-redirect")), 74 "user": user, 75 "provider": provider.name, 76 } 77 jwt = evaluator.evaluate( 78 "return ak_create_jwt(user, provider, ['openid', 'email', 'profile'])" 79 ) 80 decoded = decode( 81 jwt, provider.client_secret, algorithms=["HS256"], audience=provider.client_id 82 ) 83 self.assertEqual(decoded["preferred_username"], user.username)
Test expr_create_jwt
def
test_expr_create_jwt_raw(self):
85 def test_expr_create_jwt_raw(self): 86 """Test expr_create_jwt_raw""" 87 rf = RequestFactory() 88 user = create_test_user() 89 provider = OAuth2Provider.objects.create( 90 name=generate_id(), 91 authorization_flow=create_test_flow(), 92 ) 93 evaluator = BaseEvaluator(generate_id()) 94 evaluator._context = { 95 "http_request": rf.get(reverse("authentik_core:root-redirect")), 96 "user": user, 97 "provider": provider.name, 98 } 99 jwt = evaluator.evaluate("return ak_create_jwt_raw(provider, foo='bar')") 100 decoded = decode( 101 jwt, provider.client_secret, algorithms=["HS256"], audience=provider.client_id 102 ) 103 self.assertEqual(decoded["foo"], "bar")
Test expr_create_jwt_raw
@patch('authentik.stages.email.tasks.send_mails')
def
test_expr_send_email_with_body(self, mock_send_mails):
105 @patch("authentik.stages.email.tasks.send_mails") 106 def test_expr_send_email_with_body(self, mock_send_mails): 107 """Test ak_send_email with body parameter""" 108 user = create_test_user() 109 evaluator = BaseEvaluator(generate_id()) 110 evaluator._context = {"user": user} 111 112 # Test sending email with body 113 result = evaluator.evaluate( 114 "return ak_send_email('test@example.com', 'Test Subject', body='Test Body')" 115 ) 116 117 self.assertTrue(result) 118 mock_send_mails.assert_called_once() 119 120 # Verify the call arguments - send_mails is called with (stage, message) 121 args, kwargs = mock_send_mails.call_args 122 stage, message = args 123 124 # Check that global settings are used (stage is None) 125 self.assertIsNone(stage) 126 127 # Check message properties 128 self.assertEqual(message.subject, "Test Subject") 129 self.assertEqual(message.to, ["test@example.com"]) 130 self.assertEqual(message.body, "Test Body")
Test ak_send_email with body parameter
@patch('authentik.stages.email.tasks.send_mails')
def
test_expr_send_email_with_template(self, mock_send_mails):
132 @patch("authentik.stages.email.tasks.send_mails") 133 def test_expr_send_email_with_template(self, mock_send_mails): 134 """Test ak_send_email with template parameter""" 135 user = create_test_user() 136 evaluator = BaseEvaluator(generate_id()) 137 evaluator._context = {"user": user} 138 139 # Test sending email with template 140 result = evaluator.evaluate( 141 "return ak_send_email('test@example.com', 'Test Subject', " 142 "template='email/password_reset.html')" 143 ) 144 145 self.assertTrue(result) 146 mock_send_mails.assert_called_once()
Test ak_send_email with template parameter
def
test_expr_send_email_validation_errors(self):
148 def test_expr_send_email_validation_errors(self): 149 """Test ak_send_email validation errors""" 150 evaluator = BaseEvaluator(generate_id()) 151 152 # Test error when both body and template are provided 153 with self.assertRaises(ValueError) as cm: 154 evaluator.evaluate( 155 "return ak_send_email('test@example.com', 'Test', " 156 "body='Body', template='template.html')" 157 ) 158 self.assertIn("mutually exclusive", str(cm.exception)) 159 160 # Test error when neither body nor template are provided 161 with self.assertRaises(ValueError) as cm: 162 evaluator.evaluate("return ak_send_email('test@example.com', 'Test')") 163 self.assertIn("Either body or template parameter must be provided", str(cm.exception))
Test ak_send_email validation errors
@patch('authentik.stages.email.tasks.send_mails')
def
test_expr_send_email_with_custom_stage(self, mock_send_mails):
165 @patch("authentik.stages.email.tasks.send_mails") 166 def test_expr_send_email_with_custom_stage(self, mock_send_mails): 167 """Test ak_send_email with custom EmailStage""" 168 from authentik.stages.email.models import EmailStage 169 170 user = create_test_user() 171 custom_stage = EmailStage( 172 name="custom-stage", use_global_settings=False, from_address="custom@example.com" 173 ) 174 175 evaluator = BaseEvaluator(generate_id()) 176 evaluator._context = {"user": user, "custom_stage": custom_stage} 177 178 # Test sending email with custom stage 179 result = evaluator.evaluate( 180 "return ak_send_email('test@example.com', 'Test Subject', " 181 "body='Test Body', stage=custom_stage)" 182 ) 183 184 self.assertTrue(result) 185 mock_send_mails.assert_called_once() 186 187 # Verify the custom stage was used 188 args, kwargs = mock_send_mails.call_args 189 stage, message = args 190 191 self.assertEqual(stage, custom_stage) 192 self.assertFalse(stage.use_global_settings)
Test ak_send_email with custom EmailStage
@patch('authentik.stages.email.tasks.send_mails')
def
test_expr_send_email_with_context(self, mock_send_mails):
194 @patch("authentik.stages.email.tasks.send_mails") 195 def test_expr_send_email_with_context(self, mock_send_mails): 196 """Test ak_send_email with custom context parameter""" 197 user = create_test_user() 198 evaluator = BaseEvaluator(generate_id()) 199 evaluator._context = {"user": user, "request_id": "123"} 200 201 # Test sending email with template and custom context 202 result = evaluator.evaluate( 203 "return ak_send_email('test@example.com', 'Test Subject', " 204 "template='email/password_reset.html', " 205 "context={'url': 'http://localhost', 'expires': '2026-01-01'})" 206 ) 207 208 self.assertTrue(result) 209 mock_send_mails.assert_called_once() 210 211 # Verify the call arguments - send_mails is called with (stage, message) 212 args, kwargs = mock_send_mails.call_args 213 stage, message = args 214 215 # Check that global settings are used (stage is None) 216 self.assertIsNone(stage) 217 218 self.assertEqual(message.subject, "Test Subject") 219 self.assertEqual(message.to, ["test@example.com"]) 220 self.assertIn("2026-01-01", message.body) 221 self.assertIn("http://localhost", message.body)
Test ak_send_email with custom context parameter
@patch('authentik.stages.email.tasks.send_mails')
def
test_expr_send_email_multiple_addresses(self, mock_send_mails):
223 @patch("authentik.stages.email.tasks.send_mails") 224 def test_expr_send_email_multiple_addresses(self, mock_send_mails): 225 """Test ak_send_email with multiple email addresses""" 226 user = create_test_user() 227 evaluator = BaseEvaluator(generate_id()) 228 evaluator._context = {"user": user} 229 230 # Test sending email to multiple addresses 231 result = evaluator.evaluate( 232 "return ak_send_email(['user1@example.com', 'user2@example.com'], " 233 "'Test Subject', body='Test Body')" 234 ) 235 236 self.assertTrue(result) 237 mock_send_mails.assert_called_once() 238 239 # Verify the call arguments - send_mails is called with (stage, message) 240 args, kwargs = mock_send_mails.call_args 241 stage, message = args 242 243 # Check that global settings are used (stage is None) 244 self.assertIsNone(stage) 245 246 # Check message properties - should have multiple recipients 247 self.assertEqual(message.subject, "Test Subject") 248 self.assertEqual(message.to, ["user1@example.com", "user2@example.com"]) 249 self.assertEqual(message.body, "Test Body")
Test ak_send_email with multiple email addresses
def
test_expr_send_email_multiple_addresses_validation(self):
251 def test_expr_send_email_multiple_addresses_validation(self): 252 """Test ak_send_email validation with multiple addresses""" 253 evaluator = BaseEvaluator(generate_id()) 254 255 # Test error when empty list is provided 256 with self.assertRaises(ValueError) as cm: 257 evaluator.evaluate("return ak_send_email([], 'Test', body='Body')") 258 self.assertIn("Address list cannot be empty", str(cm.exception)) 259 260 # Test error when invalid type is provided 261 with self.assertRaises(ValueError) as cm: 262 evaluator.evaluate("return ak_send_email(123, 'Test', body='Body')") 263 self.assertIn("Address must be a string or list of strings", str(cm.exception))
Test ak_send_email validation with multiple addresses
@patch('authentik.stages.email.tasks.send_mails')
def
test_expr_send_email_with_cc(self, mock_send_mails):
265 @patch("authentik.stages.email.tasks.send_mails") 266 def test_expr_send_email_with_cc(self, mock_send_mails): 267 """Test ak_send_email with CC parameter""" 268 user = create_test_user() 269 evaluator = BaseEvaluator(generate_id()) 270 evaluator._context = {"user": user} 271 272 # Test sending email with single CC address 273 result = evaluator.evaluate( 274 "return ak_send_email('to@example.com', 'Test Subject', " 275 "body='Test Body', cc='cc@example.com')" 276 ) 277 278 self.assertTrue(result) 279 mock_send_mails.assert_called_once() 280 281 args, kwargs = mock_send_mails.call_args 282 stage, message = args 283 284 self.assertEqual(message.to, ["to@example.com"]) 285 self.assertEqual(message.cc, ["cc@example.com"]) 286 self.assertEqual(message.bcc, [])
Test ak_send_email with CC parameter
@patch('authentik.stages.email.tasks.send_mails')
def
test_expr_send_email_with_bcc(self, mock_send_mails):
288 @patch("authentik.stages.email.tasks.send_mails") 289 def test_expr_send_email_with_bcc(self, mock_send_mails): 290 """Test ak_send_email with BCC parameter""" 291 user = create_test_user() 292 evaluator = BaseEvaluator(generate_id()) 293 evaluator._context = {"user": user} 294 295 # Test sending email with single BCC address 296 result = evaluator.evaluate( 297 "return ak_send_email('to@example.com', 'Test Subject', " 298 "body='Test Body', bcc='bcc@example.com')" 299 ) 300 301 self.assertTrue(result) 302 mock_send_mails.assert_called_once() 303 304 args, kwargs = mock_send_mails.call_args 305 stage, message = args 306 307 self.assertEqual(message.to, ["to@example.com"]) 308 self.assertEqual(message.cc, []) 309 self.assertEqual(message.bcc, ["bcc@example.com"])
Test ak_send_email with BCC parameter
@patch('authentik.stages.email.tasks.send_mails')
def
test_expr_send_email_with_cc_and_bcc(self, mock_send_mails):
311 @patch("authentik.stages.email.tasks.send_mails") 312 def test_expr_send_email_with_cc_and_bcc(self, mock_send_mails): 313 """Test ak_send_email with both CC and BCC parameters""" 314 user = create_test_user() 315 evaluator = BaseEvaluator(generate_id()) 316 evaluator._context = {"user": user} 317 318 # Test sending email with both CC and BCC 319 result = evaluator.evaluate( 320 "return ak_send_email('to@example.com', 'Test Subject', " 321 "body='Test Body', cc='cc@example.com', bcc='bcc@example.com')" 322 ) 323 324 self.assertTrue(result) 325 mock_send_mails.assert_called_once() 326 327 args, kwargs = mock_send_mails.call_args 328 stage, message = args 329 330 self.assertEqual(message.to, ["to@example.com"]) 331 self.assertEqual(message.cc, ["cc@example.com"]) 332 self.assertEqual(message.bcc, ["bcc@example.com"])
Test ak_send_email with both CC and BCC parameters
@patch('authentik.stages.email.tasks.send_mails')
def
test_expr_send_email_with_multiple_cc_bcc(self, mock_send_mails):
334 @patch("authentik.stages.email.tasks.send_mails") 335 def test_expr_send_email_with_multiple_cc_bcc(self, mock_send_mails): 336 """Test ak_send_email with multiple CC and BCC addresses""" 337 user = create_test_user() 338 evaluator = BaseEvaluator(generate_id()) 339 evaluator._context = {"user": user} 340 341 # Test sending email with multiple CC and BCC addresses 342 result = evaluator.evaluate( 343 "return ak_send_email('to@example.com', 'Test Subject', " 344 "body='Test Body', " 345 "cc=['cc1@example.com', 'cc2@example.com'], " 346 "bcc=['bcc1@example.com', 'bcc2@example.com'])" 347 ) 348 349 self.assertTrue(result) 350 mock_send_mails.assert_called_once() 351 352 args, kwargs = mock_send_mails.call_args 353 stage, message = args 354 355 self.assertEqual(message.to, ["to@example.com"]) 356 self.assertEqual(message.cc, ["cc1@example.com", "cc2@example.com"]) 357 self.assertEqual(message.bcc, ["bcc1@example.com", "bcc2@example.com"])
Test ak_send_email with multiple CC and BCC addresses
def
test_expr_arg_escape(self):
359 def test_expr_arg_escape(self): 360 """Test escaping of arguments""" 361 eval = BaseEvaluator() 362 eval._context = { 363 'z=getattr(getattr(__import__("os"), "popen")("id > /tmp/test"), "read")()': "bar", 364 "@@": "baz", 365 "{{": "baz", 366 "aa@@": "baz", 367 } 368 res = eval.evaluate("return locals()") 369 self.assertEqual( 370 res, {"zgetattrgetattr__import__os_popenid_tmptest_read": "bar", "aa": "baz"} 371 ) 372 self.assertFalse(Path("/tmp/test").exists())
Test escaping of arguments