authentik.lib.tests.test_evaluator

Test Evaluator base functions

  1"""Test Evaluator base functions"""
  2
  3from pathlib import Path
  4from unittest.mock import patch
  5
  6from django.test import RequestFactory, TestCase
  7from django.urls import reverse
  8from jwt import decode
  9
 10from authentik.blueprints.tests import apply_blueprint
 11from authentik.core.tests.utils import create_test_admin_user, create_test_flow, create_test_user
 12from authentik.events.models import Event
 13from authentik.lib.expression.evaluator import BaseEvaluator
 14from authentik.lib.generators import generate_id
 15from authentik.providers.oauth2.models import OAuth2Provider, ScopeMapping
 16
 17
 18class TestEvaluator(TestCase):
 19    """Test Evaluator base functions"""
 20
 21    def test_expr_regex_match(self):
 22        """Test expr_regex_match"""
 23        self.assertFalse(BaseEvaluator.expr_regex_match("foo", "bar"))
 24        self.assertTrue(BaseEvaluator.expr_regex_match("foo", "foo"))
 25
 26    def test_expr_regex_replace(self):
 27        """Test expr_regex_replace"""
 28        self.assertEqual(BaseEvaluator.expr_regex_replace("foo", "o", "a"), "faa")
 29
 30    def test_expr_user_by(self):
 31        """Test expr_user_by"""
 32        user = create_test_admin_user()
 33        self.assertIsNotNone(BaseEvaluator.expr_user_by(username=user.username))
 34        self.assertIsNone(BaseEvaluator.expr_user_by(username="bar"))
 35        self.assertIsNone(BaseEvaluator.expr_user_by(foo="bar"))
 36
 37    def test_expr_is_group_member(self):
 38        """Test expr_is_group_member"""
 39        self.assertFalse(BaseEvaluator.expr_is_group_member(create_test_admin_user(), name="test"))
 40
 41    def test_expr_event_create(self):
 42        """Test expr_event_create"""
 43        evaluator = BaseEvaluator(generate_id())
 44        evaluator._context = {
 45            "foo": "bar",
 46        }
 47        evaluator.evaluate("ak_create_event('foo', bar='baz')")
 48        event = Event.objects.filter(action="custom_foo").first()
 49        self.assertIsNotNone(event)
 50        self.assertEqual(event.context, {"bar": "baz", "foo": "bar"})
 51
 52    @apply_blueprint("system/providers-oauth2.yaml")
 53    def test_expr_create_jwt(self):
 54        """Test expr_create_jwt"""
 55        rf = RequestFactory()
 56        user = create_test_user()
 57        provider = OAuth2Provider.objects.create(
 58            name=generate_id(),
 59            authorization_flow=create_test_flow(),
 60        )
 61        provider.property_mappings.set(
 62            ScopeMapping.objects.filter(
 63                managed__in=[
 64                    "goauthentik.io/providers/oauth2/scope-openid",
 65                    "goauthentik.io/providers/oauth2/scope-email",
 66                    "goauthentik.io/providers/oauth2/scope-profile",
 67                ]
 68            )
 69        )
 70        evaluator = BaseEvaluator(generate_id())
 71        evaluator._context = {
 72            "http_request": rf.get(reverse("authentik_core:root-redirect")),
 73            "user": user,
 74            "provider": provider.name,
 75        }
 76        jwt = evaluator.evaluate(
 77            "return ak_create_jwt(user, provider, ['openid', 'email', 'profile'])"
 78        )
 79        decoded = decode(
 80            jwt, provider.client_secret, algorithms=["HS256"], audience=provider.client_id
 81        )
 82        self.assertEqual(decoded["preferred_username"], user.username)
 83
 84    def test_expr_create_jwt_raw(self):
 85        """Test expr_create_jwt_raw"""
 86        rf = RequestFactory()
 87        user = create_test_user()
 88        provider = OAuth2Provider.objects.create(
 89            name=generate_id(),
 90            authorization_flow=create_test_flow(),
 91        )
 92        evaluator = BaseEvaluator(generate_id())
 93        evaluator._context = {
 94            "http_request": rf.get(reverse("authentik_core:root-redirect")),
 95            "user": user,
 96            "provider": provider.name,
 97        }
 98        jwt = evaluator.evaluate("return ak_create_jwt_raw(provider, foo='bar')")
 99        decoded = decode(
100            jwt, provider.client_secret, algorithms=["HS256"], audience=provider.client_id
101        )
102        self.assertEqual(decoded["foo"], "bar")
103
104    @patch("authentik.stages.email.tasks.send_mails")
105    def test_expr_send_email_with_body(self, mock_send_mails):
106        """Test ak_send_email with body parameter"""
107        user = create_test_user()
108        evaluator = BaseEvaluator(generate_id())
109        evaluator._context = {"user": user}
110
111        # Test sending email with body
112        result = evaluator.evaluate(
113            "return ak_send_email('test@example.com', 'Test Subject', body='Test Body')"
114        )
115
116        self.assertTrue(result)
117        mock_send_mails.assert_called_once()
118
119        # Verify the call arguments - send_mails is called with (stage, message)
120        args, kwargs = mock_send_mails.call_args
121        stage, message = args
122
123        # Check that global settings are used (stage is None)
124        self.assertIsNone(stage)
125
126        # Check message properties
127        self.assertEqual(message.subject, "Test Subject")
128        self.assertEqual(message.to, ["test@example.com"])
129        self.assertEqual(message.body, "Test Body")
130
131    @patch("authentik.stages.email.tasks.send_mails")
132    def test_expr_send_email_with_template(self, mock_send_mails):
133        """Test ak_send_email with template parameter"""
134        user = create_test_user()
135        evaluator = BaseEvaluator(generate_id())
136        evaluator._context = {"user": user}
137
138        # Test sending email with template
139        result = evaluator.evaluate(
140            "return ak_send_email('test@example.com', 'Test Subject', "
141            "template='email/password_reset.html')"
142        )
143
144        self.assertTrue(result)
145        mock_send_mails.assert_called_once()
146
147    def test_expr_send_email_validation_errors(self):
148        """Test ak_send_email validation errors"""
149        evaluator = BaseEvaluator(generate_id())
150
151        # Test error when both body and template are provided
152        with self.assertRaises(ValueError) as cm:
153            evaluator.evaluate(
154                "return ak_send_email('test@example.com', 'Test', "
155                "body='Body', template='template.html')"
156            )
157        self.assertIn("mutually exclusive", str(cm.exception))
158
159        # Test error when neither body nor template are provided
160        with self.assertRaises(ValueError) as cm:
161            evaluator.evaluate("return ak_send_email('test@example.com', 'Test')")
162        self.assertIn("Either body or template parameter must be provided", str(cm.exception))
163
164    @patch("authentik.stages.email.tasks.send_mails")
165    def test_expr_send_email_with_custom_stage(self, mock_send_mails):
166        """Test ak_send_email with custom EmailStage"""
167        from authentik.stages.email.models import EmailStage
168
169        user = create_test_user()
170        custom_stage = EmailStage(
171            name="custom-stage", use_global_settings=False, from_address="custom@example.com"
172        )
173
174        evaluator = BaseEvaluator(generate_id())
175        evaluator._context = {"user": user, "custom_stage": custom_stage}
176
177        # Test sending email with custom stage
178        result = evaluator.evaluate(
179            "return ak_send_email('test@example.com', 'Test Subject', "
180            "body='Test Body', stage=custom_stage)"
181        )
182
183        self.assertTrue(result)
184        mock_send_mails.assert_called_once()
185
186        # Verify the custom stage was used
187        args, kwargs = mock_send_mails.call_args
188        stage, message = args
189
190        self.assertEqual(stage, custom_stage)
191        self.assertFalse(stage.use_global_settings)
192
193    @patch("authentik.stages.email.tasks.send_mails")
194    def test_expr_send_email_with_context(self, mock_send_mails):
195        """Test ak_send_email with custom context parameter"""
196        user = create_test_user()
197        evaluator = BaseEvaluator(generate_id())
198        evaluator._context = {"user": user, "request_id": "123"}
199
200        # Test sending email with template and custom context
201        result = evaluator.evaluate(
202            "return ak_send_email('test@example.com', 'Test Subject', "
203            "template='email/password_reset.html', "
204            "context={'url': 'http://localhost', 'expires': '2026-01-01'})"
205        )
206
207        self.assertTrue(result)
208        mock_send_mails.assert_called_once()
209
210        # Verify the call arguments - send_mails is called with (stage, message)
211        args, kwargs = mock_send_mails.call_args
212        stage, message = args
213
214        # Check that global settings are used (stage is None)
215        self.assertIsNone(stage)
216
217        self.assertEqual(message.subject, "Test Subject")
218        self.assertEqual(message.to, ["test@example.com"])
219        self.assertIn("2026-01-01", message.body)
220        self.assertIn("http://localhost", message.body)
221
222    @patch("authentik.stages.email.tasks.send_mails")
223    def test_expr_send_email_multiple_addresses(self, mock_send_mails):
224        """Test ak_send_email with multiple email addresses"""
225        user = create_test_user()
226        evaluator = BaseEvaluator(generate_id())
227        evaluator._context = {"user": user}
228
229        # Test sending email to multiple addresses
230        result = evaluator.evaluate(
231            "return ak_send_email(['user1@example.com', 'user2@example.com'], "
232            "'Test Subject', body='Test Body')"
233        )
234
235        self.assertTrue(result)
236        mock_send_mails.assert_called_once()
237
238        # Verify the call arguments - send_mails is called with (stage, message)
239        args, kwargs = mock_send_mails.call_args
240        stage, message = args
241
242        # Check that global settings are used (stage is None)
243        self.assertIsNone(stage)
244
245        # Check message properties - should have multiple recipients
246        self.assertEqual(message.subject, "Test Subject")
247        self.assertEqual(message.to, ["user1@example.com", "user2@example.com"])
248        self.assertEqual(message.body, "Test Body")
249
250    def test_expr_send_email_multiple_addresses_validation(self):
251        """Test ak_send_email validation with multiple addresses"""
252        evaluator = BaseEvaluator(generate_id())
253
254        # Test error when empty list is provided
255        with self.assertRaises(ValueError) as cm:
256            evaluator.evaluate("return ak_send_email([], 'Test', body='Body')")
257        self.assertIn("Address list cannot be empty", str(cm.exception))
258
259        # Test error when invalid type is provided
260        with self.assertRaises(ValueError) as cm:
261            evaluator.evaluate("return ak_send_email(123, 'Test', body='Body')")
262        self.assertIn("Address must be a string or list of strings", str(cm.exception))
263
264    @patch("authentik.stages.email.tasks.send_mails")
265    def test_expr_send_email_with_cc(self, mock_send_mails):
266        """Test ak_send_email with CC parameter"""
267        user = create_test_user()
268        evaluator = BaseEvaluator(generate_id())
269        evaluator._context = {"user": user}
270
271        # Test sending email with single CC address
272        result = evaluator.evaluate(
273            "return ak_send_email('to@example.com', 'Test Subject', "
274            "body='Test Body', cc='cc@example.com')"
275        )
276
277        self.assertTrue(result)
278        mock_send_mails.assert_called_once()
279
280        args, kwargs = mock_send_mails.call_args
281        stage, message = args
282
283        self.assertEqual(message.to, ["to@example.com"])
284        self.assertEqual(message.cc, ["cc@example.com"])
285        self.assertEqual(message.bcc, [])
286
287    @patch("authentik.stages.email.tasks.send_mails")
288    def test_expr_send_email_with_bcc(self, mock_send_mails):
289        """Test ak_send_email with BCC parameter"""
290        user = create_test_user()
291        evaluator = BaseEvaluator(generate_id())
292        evaluator._context = {"user": user}
293
294        # Test sending email with single BCC address
295        result = evaluator.evaluate(
296            "return ak_send_email('to@example.com', 'Test Subject', "
297            "body='Test Body', bcc='bcc@example.com')"
298        )
299
300        self.assertTrue(result)
301        mock_send_mails.assert_called_once()
302
303        args, kwargs = mock_send_mails.call_args
304        stage, message = args
305
306        self.assertEqual(message.to, ["to@example.com"])
307        self.assertEqual(message.cc, [])
308        self.assertEqual(message.bcc, ["bcc@example.com"])
309
310    @patch("authentik.stages.email.tasks.send_mails")
311    def test_expr_send_email_with_cc_and_bcc(self, mock_send_mails):
312        """Test ak_send_email with both CC and BCC parameters"""
313        user = create_test_user()
314        evaluator = BaseEvaluator(generate_id())
315        evaluator._context = {"user": user}
316
317        # Test sending email with both CC and BCC
318        result = evaluator.evaluate(
319            "return ak_send_email('to@example.com', 'Test Subject', "
320            "body='Test Body', cc='cc@example.com', bcc='bcc@example.com')"
321        )
322
323        self.assertTrue(result)
324        mock_send_mails.assert_called_once()
325
326        args, kwargs = mock_send_mails.call_args
327        stage, message = args
328
329        self.assertEqual(message.to, ["to@example.com"])
330        self.assertEqual(message.cc, ["cc@example.com"])
331        self.assertEqual(message.bcc, ["bcc@example.com"])
332
333    @patch("authentik.stages.email.tasks.send_mails")
334    def test_expr_send_email_with_multiple_cc_bcc(self, mock_send_mails):
335        """Test ak_send_email with multiple CC and BCC addresses"""
336        user = create_test_user()
337        evaluator = BaseEvaluator(generate_id())
338        evaluator._context = {"user": user}
339
340        # Test sending email with multiple CC and BCC addresses
341        result = evaluator.evaluate(
342            "return ak_send_email('to@example.com', 'Test Subject', "
343            "body='Test Body', "
344            "cc=['cc1@example.com', 'cc2@example.com'], "
345            "bcc=['bcc1@example.com', 'bcc2@example.com'])"
346        )
347
348        self.assertTrue(result)
349        mock_send_mails.assert_called_once()
350
351        args, kwargs = mock_send_mails.call_args
352        stage, message = args
353
354        self.assertEqual(message.to, ["to@example.com"])
355        self.assertEqual(message.cc, ["cc1@example.com", "cc2@example.com"])
356        self.assertEqual(message.bcc, ["bcc1@example.com", "bcc2@example.com"])
357
358    def test_expr_arg_escape(self):
359        """Test escaping of arguments"""
360        eval = BaseEvaluator()
361        eval._context = {
362            'z=getattr(getattr(__import__("os"), "popen")("id > /tmp/test"), "read")()': "bar",
363            "@@": "baz",
364            "{{": "baz",
365            "aa@@": "baz",
366        }
367        res = eval.evaluate("return locals()")
368        self.assertEqual(
369            res, {"zgetattrgetattr__import__os_popenid_tmptest_read": "bar", "aa": "baz"}
370        )
371        self.assertFalse(Path("/tmp/test").exists())
class TestEvaluator(django.test.testcases.TestCase):
 19class TestEvaluator(TestCase):
 20    """Test Evaluator base functions"""
 21
 22    def test_expr_regex_match(self):
 23        """Test expr_regex_match"""
 24        self.assertFalse(BaseEvaluator.expr_regex_match("foo", "bar"))
 25        self.assertTrue(BaseEvaluator.expr_regex_match("foo", "foo"))
 26
 27    def test_expr_regex_replace(self):
 28        """Test expr_regex_replace"""
 29        self.assertEqual(BaseEvaluator.expr_regex_replace("foo", "o", "a"), "faa")
 30
 31    def test_expr_user_by(self):
 32        """Test expr_user_by"""
 33        user = create_test_admin_user()
 34        self.assertIsNotNone(BaseEvaluator.expr_user_by(username=user.username))
 35        self.assertIsNone(BaseEvaluator.expr_user_by(username="bar"))
 36        self.assertIsNone(BaseEvaluator.expr_user_by(foo="bar"))
 37
 38    def test_expr_is_group_member(self):
 39        """Test expr_is_group_member"""
 40        self.assertFalse(BaseEvaluator.expr_is_group_member(create_test_admin_user(), name="test"))
 41
 42    def test_expr_event_create(self):
 43        """Test expr_event_create"""
 44        evaluator = BaseEvaluator(generate_id())
 45        evaluator._context = {
 46            "foo": "bar",
 47        }
 48        evaluator.evaluate("ak_create_event('foo', bar='baz')")
 49        event = Event.objects.filter(action="custom_foo").first()
 50        self.assertIsNotNone(event)
 51        self.assertEqual(event.context, {"bar": "baz", "foo": "bar"})
 52
 53    @apply_blueprint("system/providers-oauth2.yaml")
 54    def test_expr_create_jwt(self):
 55        """Test expr_create_jwt"""
 56        rf = RequestFactory()
 57        user = create_test_user()
 58        provider = OAuth2Provider.objects.create(
 59            name=generate_id(),
 60            authorization_flow=create_test_flow(),
 61        )
 62        provider.property_mappings.set(
 63            ScopeMapping.objects.filter(
 64                managed__in=[
 65                    "goauthentik.io/providers/oauth2/scope-openid",
 66                    "goauthentik.io/providers/oauth2/scope-email",
 67                    "goauthentik.io/providers/oauth2/scope-profile",
 68                ]
 69            )
 70        )
 71        evaluator = BaseEvaluator(generate_id())
 72        evaluator._context = {
 73            "http_request": rf.get(reverse("authentik_core:root-redirect")),
 74            "user": user,
 75            "provider": provider.name,
 76        }
 77        jwt = evaluator.evaluate(
 78            "return ak_create_jwt(user, provider, ['openid', 'email', 'profile'])"
 79        )
 80        decoded = decode(
 81            jwt, provider.client_secret, algorithms=["HS256"], audience=provider.client_id
 82        )
 83        self.assertEqual(decoded["preferred_username"], user.username)
 84
 85    def test_expr_create_jwt_raw(self):
 86        """Test expr_create_jwt_raw"""
 87        rf = RequestFactory()
 88        user = create_test_user()
 89        provider = OAuth2Provider.objects.create(
 90            name=generate_id(),
 91            authorization_flow=create_test_flow(),
 92        )
 93        evaluator = BaseEvaluator(generate_id())
 94        evaluator._context = {
 95            "http_request": rf.get(reverse("authentik_core:root-redirect")),
 96            "user": user,
 97            "provider": provider.name,
 98        }
 99        jwt = evaluator.evaluate("return ak_create_jwt_raw(provider, foo='bar')")
100        decoded = decode(
101            jwt, provider.client_secret, algorithms=["HS256"], audience=provider.client_id
102        )
103        self.assertEqual(decoded["foo"], "bar")
104
105    @patch("authentik.stages.email.tasks.send_mails")
106    def test_expr_send_email_with_body(self, mock_send_mails):
107        """Test ak_send_email with body parameter"""
108        user = create_test_user()
109        evaluator = BaseEvaluator(generate_id())
110        evaluator._context = {"user": user}
111
112        # Test sending email with body
113        result = evaluator.evaluate(
114            "return ak_send_email('test@example.com', 'Test Subject', body='Test Body')"
115        )
116
117        self.assertTrue(result)
118        mock_send_mails.assert_called_once()
119
120        # Verify the call arguments - send_mails is called with (stage, message)
121        args, kwargs = mock_send_mails.call_args
122        stage, message = args
123
124        # Check that global settings are used (stage is None)
125        self.assertIsNone(stage)
126
127        # Check message properties
128        self.assertEqual(message.subject, "Test Subject")
129        self.assertEqual(message.to, ["test@example.com"])
130        self.assertEqual(message.body, "Test Body")
131
132    @patch("authentik.stages.email.tasks.send_mails")
133    def test_expr_send_email_with_template(self, mock_send_mails):
134        """Test ak_send_email with template parameter"""
135        user = create_test_user()
136        evaluator = BaseEvaluator(generate_id())
137        evaluator._context = {"user": user}
138
139        # Test sending email with template
140        result = evaluator.evaluate(
141            "return ak_send_email('test@example.com', 'Test Subject', "
142            "template='email/password_reset.html')"
143        )
144
145        self.assertTrue(result)
146        mock_send_mails.assert_called_once()
147
148    def test_expr_send_email_validation_errors(self):
149        """Test ak_send_email validation errors"""
150        evaluator = BaseEvaluator(generate_id())
151
152        # Test error when both body and template are provided
153        with self.assertRaises(ValueError) as cm:
154            evaluator.evaluate(
155                "return ak_send_email('test@example.com', 'Test', "
156                "body='Body', template='template.html')"
157            )
158        self.assertIn("mutually exclusive", str(cm.exception))
159
160        # Test error when neither body nor template are provided
161        with self.assertRaises(ValueError) as cm:
162            evaluator.evaluate("return ak_send_email('test@example.com', 'Test')")
163        self.assertIn("Either body or template parameter must be provided", str(cm.exception))
164
165    @patch("authentik.stages.email.tasks.send_mails")
166    def test_expr_send_email_with_custom_stage(self, mock_send_mails):
167        """Test ak_send_email with custom EmailStage"""
168        from authentik.stages.email.models import EmailStage
169
170        user = create_test_user()
171        custom_stage = EmailStage(
172            name="custom-stage", use_global_settings=False, from_address="custom@example.com"
173        )
174
175        evaluator = BaseEvaluator(generate_id())
176        evaluator._context = {"user": user, "custom_stage": custom_stage}
177
178        # Test sending email with custom stage
179        result = evaluator.evaluate(
180            "return ak_send_email('test@example.com', 'Test Subject', "
181            "body='Test Body', stage=custom_stage)"
182        )
183
184        self.assertTrue(result)
185        mock_send_mails.assert_called_once()
186
187        # Verify the custom stage was used
188        args, kwargs = mock_send_mails.call_args
189        stage, message = args
190
191        self.assertEqual(stage, custom_stage)
192        self.assertFalse(stage.use_global_settings)
193
194    @patch("authentik.stages.email.tasks.send_mails")
195    def test_expr_send_email_with_context(self, mock_send_mails):
196        """Test ak_send_email with custom context parameter"""
197        user = create_test_user()
198        evaluator = BaseEvaluator(generate_id())
199        evaluator._context = {"user": user, "request_id": "123"}
200
201        # Test sending email with template and custom context
202        result = evaluator.evaluate(
203            "return ak_send_email('test@example.com', 'Test Subject', "
204            "template='email/password_reset.html', "
205            "context={'url': 'http://localhost', 'expires': '2026-01-01'})"
206        )
207
208        self.assertTrue(result)
209        mock_send_mails.assert_called_once()
210
211        # Verify the call arguments - send_mails is called with (stage, message)
212        args, kwargs = mock_send_mails.call_args
213        stage, message = args
214
215        # Check that global settings are used (stage is None)
216        self.assertIsNone(stage)
217
218        self.assertEqual(message.subject, "Test Subject")
219        self.assertEqual(message.to, ["test@example.com"])
220        self.assertIn("2026-01-01", message.body)
221        self.assertIn("http://localhost", message.body)
222
223    @patch("authentik.stages.email.tasks.send_mails")
224    def test_expr_send_email_multiple_addresses(self, mock_send_mails):
225        """Test ak_send_email with multiple email addresses"""
226        user = create_test_user()
227        evaluator = BaseEvaluator(generate_id())
228        evaluator._context = {"user": user}
229
230        # Test sending email to multiple addresses
231        result = evaluator.evaluate(
232            "return ak_send_email(['user1@example.com', 'user2@example.com'], "
233            "'Test Subject', body='Test Body')"
234        )
235
236        self.assertTrue(result)
237        mock_send_mails.assert_called_once()
238
239        # Verify the call arguments - send_mails is called with (stage, message)
240        args, kwargs = mock_send_mails.call_args
241        stage, message = args
242
243        # Check that global settings are used (stage is None)
244        self.assertIsNone(stage)
245
246        # Check message properties - should have multiple recipients
247        self.assertEqual(message.subject, "Test Subject")
248        self.assertEqual(message.to, ["user1@example.com", "user2@example.com"])
249        self.assertEqual(message.body, "Test Body")
250
251    def test_expr_send_email_multiple_addresses_validation(self):
252        """Test ak_send_email validation with multiple addresses"""
253        evaluator = BaseEvaluator(generate_id())
254
255        # Test error when empty list is provided
256        with self.assertRaises(ValueError) as cm:
257            evaluator.evaluate("return ak_send_email([], 'Test', body='Body')")
258        self.assertIn("Address list cannot be empty", str(cm.exception))
259
260        # Test error when invalid type is provided
261        with self.assertRaises(ValueError) as cm:
262            evaluator.evaluate("return ak_send_email(123, 'Test', body='Body')")
263        self.assertIn("Address must be a string or list of strings", str(cm.exception))
264
265    @patch("authentik.stages.email.tasks.send_mails")
266    def test_expr_send_email_with_cc(self, mock_send_mails):
267        """Test ak_send_email with CC parameter"""
268        user = create_test_user()
269        evaluator = BaseEvaluator(generate_id())
270        evaluator._context = {"user": user}
271
272        # Test sending email with single CC address
273        result = evaluator.evaluate(
274            "return ak_send_email('to@example.com', 'Test Subject', "
275            "body='Test Body', cc='cc@example.com')"
276        )
277
278        self.assertTrue(result)
279        mock_send_mails.assert_called_once()
280
281        args, kwargs = mock_send_mails.call_args
282        stage, message = args
283
284        self.assertEqual(message.to, ["to@example.com"])
285        self.assertEqual(message.cc, ["cc@example.com"])
286        self.assertEqual(message.bcc, [])
287
288    @patch("authentik.stages.email.tasks.send_mails")
289    def test_expr_send_email_with_bcc(self, mock_send_mails):
290        """Test ak_send_email with BCC parameter"""
291        user = create_test_user()
292        evaluator = BaseEvaluator(generate_id())
293        evaluator._context = {"user": user}
294
295        # Test sending email with single BCC address
296        result = evaluator.evaluate(
297            "return ak_send_email('to@example.com', 'Test Subject', "
298            "body='Test Body', bcc='bcc@example.com')"
299        )
300
301        self.assertTrue(result)
302        mock_send_mails.assert_called_once()
303
304        args, kwargs = mock_send_mails.call_args
305        stage, message = args
306
307        self.assertEqual(message.to, ["to@example.com"])
308        self.assertEqual(message.cc, [])
309        self.assertEqual(message.bcc, ["bcc@example.com"])
310
311    @patch("authentik.stages.email.tasks.send_mails")
312    def test_expr_send_email_with_cc_and_bcc(self, mock_send_mails):
313        """Test ak_send_email with both CC and BCC parameters"""
314        user = create_test_user()
315        evaluator = BaseEvaluator(generate_id())
316        evaluator._context = {"user": user}
317
318        # Test sending email with both CC and BCC
319        result = evaluator.evaluate(
320            "return ak_send_email('to@example.com', 'Test Subject', "
321            "body='Test Body', cc='cc@example.com', bcc='bcc@example.com')"
322        )
323
324        self.assertTrue(result)
325        mock_send_mails.assert_called_once()
326
327        args, kwargs = mock_send_mails.call_args
328        stage, message = args
329
330        self.assertEqual(message.to, ["to@example.com"])
331        self.assertEqual(message.cc, ["cc@example.com"])
332        self.assertEqual(message.bcc, ["bcc@example.com"])
333
334    @patch("authentik.stages.email.tasks.send_mails")
335    def test_expr_send_email_with_multiple_cc_bcc(self, mock_send_mails):
336        """Test ak_send_email with multiple CC and BCC addresses"""
337        user = create_test_user()
338        evaluator = BaseEvaluator(generate_id())
339        evaluator._context = {"user": user}
340
341        # Test sending email with multiple CC and BCC addresses
342        result = evaluator.evaluate(
343            "return ak_send_email('to@example.com', 'Test Subject', "
344            "body='Test Body', "
345            "cc=['cc1@example.com', 'cc2@example.com'], "
346            "bcc=['bcc1@example.com', 'bcc2@example.com'])"
347        )
348
349        self.assertTrue(result)
350        mock_send_mails.assert_called_once()
351
352        args, kwargs = mock_send_mails.call_args
353        stage, message = args
354
355        self.assertEqual(message.to, ["to@example.com"])
356        self.assertEqual(message.cc, ["cc1@example.com", "cc2@example.com"])
357        self.assertEqual(message.bcc, ["bcc1@example.com", "bcc2@example.com"])
358
359    def test_expr_arg_escape(self):
360        """Test escaping of arguments"""
361        eval = BaseEvaluator()
362        eval._context = {
363            'z=getattr(getattr(__import__("os"), "popen")("id > /tmp/test"), "read")()': "bar",
364            "@@": "baz",
365            "{{": "baz",
366            "aa@@": "baz",
367        }
368        res = eval.evaluate("return locals()")
369        self.assertEqual(
370            res, {"zgetattrgetattr__import__os_popenid_tmptest_read": "bar", "aa": "baz"}
371        )
372        self.assertFalse(Path("/tmp/test").exists())

Test Evaluator base functions

def test_expr_regex_match(self):
22    def test_expr_regex_match(self):
23        """Test expr_regex_match"""
24        self.assertFalse(BaseEvaluator.expr_regex_match("foo", "bar"))
25        self.assertTrue(BaseEvaluator.expr_regex_match("foo", "foo"))

Test expr_regex_match

def test_expr_regex_replace(self):
27    def test_expr_regex_replace(self):
28        """Test expr_regex_replace"""
29        self.assertEqual(BaseEvaluator.expr_regex_replace("foo", "o", "a"), "faa")

Test expr_regex_replace

def test_expr_user_by(self):
31    def test_expr_user_by(self):
32        """Test expr_user_by"""
33        user = create_test_admin_user()
34        self.assertIsNotNone(BaseEvaluator.expr_user_by(username=user.username))
35        self.assertIsNone(BaseEvaluator.expr_user_by(username="bar"))
36        self.assertIsNone(BaseEvaluator.expr_user_by(foo="bar"))

Test expr_user_by

def test_expr_is_group_member(self):
38    def test_expr_is_group_member(self):
39        """Test expr_is_group_member"""
40        self.assertFalse(BaseEvaluator.expr_is_group_member(create_test_admin_user(), name="test"))

Test expr_is_group_member

def test_expr_event_create(self):
42    def test_expr_event_create(self):
43        """Test expr_event_create"""
44        evaluator = BaseEvaluator(generate_id())
45        evaluator._context = {
46            "foo": "bar",
47        }
48        evaluator.evaluate("ak_create_event('foo', bar='baz')")
49        event = Event.objects.filter(action="custom_foo").first()
50        self.assertIsNotNone(event)
51        self.assertEqual(event.context, {"bar": "baz", "foo": "bar"})

Test expr_event_create

@apply_blueprint('system/providers-oauth2.yaml')
def test_expr_create_jwt(self):
53    @apply_blueprint("system/providers-oauth2.yaml")
54    def test_expr_create_jwt(self):
55        """Test expr_create_jwt"""
56        rf = RequestFactory()
57        user = create_test_user()
58        provider = OAuth2Provider.objects.create(
59            name=generate_id(),
60            authorization_flow=create_test_flow(),
61        )
62        provider.property_mappings.set(
63            ScopeMapping.objects.filter(
64                managed__in=[
65                    "goauthentik.io/providers/oauth2/scope-openid",
66                    "goauthentik.io/providers/oauth2/scope-email",
67                    "goauthentik.io/providers/oauth2/scope-profile",
68                ]
69            )
70        )
71        evaluator = BaseEvaluator(generate_id())
72        evaluator._context = {
73            "http_request": rf.get(reverse("authentik_core:root-redirect")),
74            "user": user,
75            "provider": provider.name,
76        }
77        jwt = evaluator.evaluate(
78            "return ak_create_jwt(user, provider, ['openid', 'email', 'profile'])"
79        )
80        decoded = decode(
81            jwt, provider.client_secret, algorithms=["HS256"], audience=provider.client_id
82        )
83        self.assertEqual(decoded["preferred_username"], user.username)

Test expr_create_jwt

def test_expr_create_jwt_raw(self):
 85    def test_expr_create_jwt_raw(self):
 86        """Test expr_create_jwt_raw"""
 87        rf = RequestFactory()
 88        user = create_test_user()
 89        provider = OAuth2Provider.objects.create(
 90            name=generate_id(),
 91            authorization_flow=create_test_flow(),
 92        )
 93        evaluator = BaseEvaluator(generate_id())
 94        evaluator._context = {
 95            "http_request": rf.get(reverse("authentik_core:root-redirect")),
 96            "user": user,
 97            "provider": provider.name,
 98        }
 99        jwt = evaluator.evaluate("return ak_create_jwt_raw(provider, foo='bar')")
100        decoded = decode(
101            jwt, provider.client_secret, algorithms=["HS256"], audience=provider.client_id
102        )
103        self.assertEqual(decoded["foo"], "bar")

Test expr_create_jwt_raw

@patch('authentik.stages.email.tasks.send_mails')
def test_expr_send_email_with_body(self, mock_send_mails):
105    @patch("authentik.stages.email.tasks.send_mails")
106    def test_expr_send_email_with_body(self, mock_send_mails):
107        """Test ak_send_email with body parameter"""
108        user = create_test_user()
109        evaluator = BaseEvaluator(generate_id())
110        evaluator._context = {"user": user}
111
112        # Test sending email with body
113        result = evaluator.evaluate(
114            "return ak_send_email('test@example.com', 'Test Subject', body='Test Body')"
115        )
116
117        self.assertTrue(result)
118        mock_send_mails.assert_called_once()
119
120        # Verify the call arguments - send_mails is called with (stage, message)
121        args, kwargs = mock_send_mails.call_args
122        stage, message = args
123
124        # Check that global settings are used (stage is None)
125        self.assertIsNone(stage)
126
127        # Check message properties
128        self.assertEqual(message.subject, "Test Subject")
129        self.assertEqual(message.to, ["test@example.com"])
130        self.assertEqual(message.body, "Test Body")

Test ak_send_email with body parameter

@patch('authentik.stages.email.tasks.send_mails')
def test_expr_send_email_with_template(self, mock_send_mails):
132    @patch("authentik.stages.email.tasks.send_mails")
133    def test_expr_send_email_with_template(self, mock_send_mails):
134        """Test ak_send_email with template parameter"""
135        user = create_test_user()
136        evaluator = BaseEvaluator(generate_id())
137        evaluator._context = {"user": user}
138
139        # Test sending email with template
140        result = evaluator.evaluate(
141            "return ak_send_email('test@example.com', 'Test Subject', "
142            "template='email/password_reset.html')"
143        )
144
145        self.assertTrue(result)
146        mock_send_mails.assert_called_once()

Test ak_send_email with template parameter

def test_expr_send_email_validation_errors(self):
148    def test_expr_send_email_validation_errors(self):
149        """Test ak_send_email validation errors"""
150        evaluator = BaseEvaluator(generate_id())
151
152        # Test error when both body and template are provided
153        with self.assertRaises(ValueError) as cm:
154            evaluator.evaluate(
155                "return ak_send_email('test@example.com', 'Test', "
156                "body='Body', template='template.html')"
157            )
158        self.assertIn("mutually exclusive", str(cm.exception))
159
160        # Test error when neither body nor template are provided
161        with self.assertRaises(ValueError) as cm:
162            evaluator.evaluate("return ak_send_email('test@example.com', 'Test')")
163        self.assertIn("Either body or template parameter must be provided", str(cm.exception))

Test ak_send_email validation errors

@patch('authentik.stages.email.tasks.send_mails')
def test_expr_send_email_with_custom_stage(self, mock_send_mails):
165    @patch("authentik.stages.email.tasks.send_mails")
166    def test_expr_send_email_with_custom_stage(self, mock_send_mails):
167        """Test ak_send_email with custom EmailStage"""
168        from authentik.stages.email.models import EmailStage
169
170        user = create_test_user()
171        custom_stage = EmailStage(
172            name="custom-stage", use_global_settings=False, from_address="custom@example.com"
173        )
174
175        evaluator = BaseEvaluator(generate_id())
176        evaluator._context = {"user": user, "custom_stage": custom_stage}
177
178        # Test sending email with custom stage
179        result = evaluator.evaluate(
180            "return ak_send_email('test@example.com', 'Test Subject', "
181            "body='Test Body', stage=custom_stage)"
182        )
183
184        self.assertTrue(result)
185        mock_send_mails.assert_called_once()
186
187        # Verify the custom stage was used
188        args, kwargs = mock_send_mails.call_args
189        stage, message = args
190
191        self.assertEqual(stage, custom_stage)
192        self.assertFalse(stage.use_global_settings)

Test ak_send_email with custom EmailStage

@patch('authentik.stages.email.tasks.send_mails')
def test_expr_send_email_with_context(self, mock_send_mails):
194    @patch("authentik.stages.email.tasks.send_mails")
195    def test_expr_send_email_with_context(self, mock_send_mails):
196        """Test ak_send_email with custom context parameter"""
197        user = create_test_user()
198        evaluator = BaseEvaluator(generate_id())
199        evaluator._context = {"user": user, "request_id": "123"}
200
201        # Test sending email with template and custom context
202        result = evaluator.evaluate(
203            "return ak_send_email('test@example.com', 'Test Subject', "
204            "template='email/password_reset.html', "
205            "context={'url': 'http://localhost', 'expires': '2026-01-01'})"
206        )
207
208        self.assertTrue(result)
209        mock_send_mails.assert_called_once()
210
211        # Verify the call arguments - send_mails is called with (stage, message)
212        args, kwargs = mock_send_mails.call_args
213        stage, message = args
214
215        # Check that global settings are used (stage is None)
216        self.assertIsNone(stage)
217
218        self.assertEqual(message.subject, "Test Subject")
219        self.assertEqual(message.to, ["test@example.com"])
220        self.assertIn("2026-01-01", message.body)
221        self.assertIn("http://localhost", message.body)

Test ak_send_email with custom context parameter

@patch('authentik.stages.email.tasks.send_mails')
def test_expr_send_email_multiple_addresses(self, mock_send_mails):
223    @patch("authentik.stages.email.tasks.send_mails")
224    def test_expr_send_email_multiple_addresses(self, mock_send_mails):
225        """Test ak_send_email with multiple email addresses"""
226        user = create_test_user()
227        evaluator = BaseEvaluator(generate_id())
228        evaluator._context = {"user": user}
229
230        # Test sending email to multiple addresses
231        result = evaluator.evaluate(
232            "return ak_send_email(['user1@example.com', 'user2@example.com'], "
233            "'Test Subject', body='Test Body')"
234        )
235
236        self.assertTrue(result)
237        mock_send_mails.assert_called_once()
238
239        # Verify the call arguments - send_mails is called with (stage, message)
240        args, kwargs = mock_send_mails.call_args
241        stage, message = args
242
243        # Check that global settings are used (stage is None)
244        self.assertIsNone(stage)
245
246        # Check message properties - should have multiple recipients
247        self.assertEqual(message.subject, "Test Subject")
248        self.assertEqual(message.to, ["user1@example.com", "user2@example.com"])
249        self.assertEqual(message.body, "Test Body")

Test ak_send_email with multiple email addresses

def test_expr_send_email_multiple_addresses_validation(self):
251    def test_expr_send_email_multiple_addresses_validation(self):
252        """Test ak_send_email validation with multiple addresses"""
253        evaluator = BaseEvaluator(generate_id())
254
255        # Test error when empty list is provided
256        with self.assertRaises(ValueError) as cm:
257            evaluator.evaluate("return ak_send_email([], 'Test', body='Body')")
258        self.assertIn("Address list cannot be empty", str(cm.exception))
259
260        # Test error when invalid type is provided
261        with self.assertRaises(ValueError) as cm:
262            evaluator.evaluate("return ak_send_email(123, 'Test', body='Body')")
263        self.assertIn("Address must be a string or list of strings", str(cm.exception))

Test ak_send_email validation with multiple addresses

@patch('authentik.stages.email.tasks.send_mails')
def test_expr_send_email_with_cc(self, mock_send_mails):
265    @patch("authentik.stages.email.tasks.send_mails")
266    def test_expr_send_email_with_cc(self, mock_send_mails):
267        """Test ak_send_email with CC parameter"""
268        user = create_test_user()
269        evaluator = BaseEvaluator(generate_id())
270        evaluator._context = {"user": user}
271
272        # Test sending email with single CC address
273        result = evaluator.evaluate(
274            "return ak_send_email('to@example.com', 'Test Subject', "
275            "body='Test Body', cc='cc@example.com')"
276        )
277
278        self.assertTrue(result)
279        mock_send_mails.assert_called_once()
280
281        args, kwargs = mock_send_mails.call_args
282        stage, message = args
283
284        self.assertEqual(message.to, ["to@example.com"])
285        self.assertEqual(message.cc, ["cc@example.com"])
286        self.assertEqual(message.bcc, [])

Test ak_send_email with CC parameter

@patch('authentik.stages.email.tasks.send_mails')
def test_expr_send_email_with_bcc(self, mock_send_mails):
288    @patch("authentik.stages.email.tasks.send_mails")
289    def test_expr_send_email_with_bcc(self, mock_send_mails):
290        """Test ak_send_email with BCC parameter"""
291        user = create_test_user()
292        evaluator = BaseEvaluator(generate_id())
293        evaluator._context = {"user": user}
294
295        # Test sending email with single BCC address
296        result = evaluator.evaluate(
297            "return ak_send_email('to@example.com', 'Test Subject', "
298            "body='Test Body', bcc='bcc@example.com')"
299        )
300
301        self.assertTrue(result)
302        mock_send_mails.assert_called_once()
303
304        args, kwargs = mock_send_mails.call_args
305        stage, message = args
306
307        self.assertEqual(message.to, ["to@example.com"])
308        self.assertEqual(message.cc, [])
309        self.assertEqual(message.bcc, ["bcc@example.com"])

Test ak_send_email with BCC parameter

@patch('authentik.stages.email.tasks.send_mails')
def test_expr_send_email_with_cc_and_bcc(self, mock_send_mails):
311    @patch("authentik.stages.email.tasks.send_mails")
312    def test_expr_send_email_with_cc_and_bcc(self, mock_send_mails):
313        """Test ak_send_email with both CC and BCC parameters"""
314        user = create_test_user()
315        evaluator = BaseEvaluator(generate_id())
316        evaluator._context = {"user": user}
317
318        # Test sending email with both CC and BCC
319        result = evaluator.evaluate(
320            "return ak_send_email('to@example.com', 'Test Subject', "
321            "body='Test Body', cc='cc@example.com', bcc='bcc@example.com')"
322        )
323
324        self.assertTrue(result)
325        mock_send_mails.assert_called_once()
326
327        args, kwargs = mock_send_mails.call_args
328        stage, message = args
329
330        self.assertEqual(message.to, ["to@example.com"])
331        self.assertEqual(message.cc, ["cc@example.com"])
332        self.assertEqual(message.bcc, ["bcc@example.com"])

Test ak_send_email with both CC and BCC parameters

@patch('authentik.stages.email.tasks.send_mails')
def test_expr_send_email_with_multiple_cc_bcc(self, mock_send_mails):
334    @patch("authentik.stages.email.tasks.send_mails")
335    def test_expr_send_email_with_multiple_cc_bcc(self, mock_send_mails):
336        """Test ak_send_email with multiple CC and BCC addresses"""
337        user = create_test_user()
338        evaluator = BaseEvaluator(generate_id())
339        evaluator._context = {"user": user}
340
341        # Test sending email with multiple CC and BCC addresses
342        result = evaluator.evaluate(
343            "return ak_send_email('to@example.com', 'Test Subject', "
344            "body='Test Body', "
345            "cc=['cc1@example.com', 'cc2@example.com'], "
346            "bcc=['bcc1@example.com', 'bcc2@example.com'])"
347        )
348
349        self.assertTrue(result)
350        mock_send_mails.assert_called_once()
351
352        args, kwargs = mock_send_mails.call_args
353        stage, message = args
354
355        self.assertEqual(message.to, ["to@example.com"])
356        self.assertEqual(message.cc, ["cc1@example.com", "cc2@example.com"])
357        self.assertEqual(message.bcc, ["bcc1@example.com", "bcc2@example.com"])

Test ak_send_email with multiple CC and BCC addresses

def test_expr_arg_escape(self):
359    def test_expr_arg_escape(self):
360        """Test escaping of arguments"""
361        eval = BaseEvaluator()
362        eval._context = {
363            'z=getattr(getattr(__import__("os"), "popen")("id > /tmp/test"), "read")()': "bar",
364            "@@": "baz",
365            "{{": "baz",
366            "aa@@": "baz",
367        }
368        res = eval.evaluate("return locals()")
369        self.assertEqual(
370            res, {"zgetattrgetattr__import__os_popenid_tmptest_read": "bar", "aa": "baz"}
371        )
372        self.assertFalse(Path("/tmp/test").exists())

Test escaping of arguments