authentik.policies.tests.test_process

policy process tests

  1"""policy process tests"""
  2
  3from django.contrib.auth.models import AnonymousUser
  4from django.core.cache import cache
  5from django.test import RequestFactory, TestCase
  6from django.urls import resolve, reverse
  7from django.views.debug import SafeExceptionReporterFilter
  8from guardian.shortcuts import get_anonymous_user
  9
 10from authentik.core.models import Application, Group, User
 11from authentik.events.models import Event, EventAction
 12from authentik.lib.generators import generate_id
 13from authentik.policies.dummy.models import DummyPolicy
 14from authentik.policies.expression.models import ExpressionPolicy
 15from authentik.policies.models import Policy, PolicyBinding
 16from authentik.policies.process import PolicyProcess
 17from authentik.policies.types import CACHE_PREFIX, PolicyRequest
 18
 19
 20def clear_policy_cache():
 21    """Ensure no policy-related keys are still cached"""
 22    keys = cache.keys(f"{CACHE_PREFIX}*")
 23    cache.delete(keys)
 24
 25
 26class TestPolicyProcess(TestCase):
 27    """Policy Process tests"""
 28
 29    def setUp(self):
 30        clear_policy_cache()
 31        self.factory = RequestFactory()
 32        self.user = User.objects.create_user(username=generate_id())
 33
 34    def test_group_passing(self):
 35        """Test binding to group"""
 36        group = Group.objects.create(name=generate_id())
 37        group.users.add(self.user)
 38        binding = PolicyBinding(group=group)
 39
 40        request = PolicyRequest(self.user)
 41        response = PolicyProcess(binding, request, None).execute()
 42        self.assertEqual(response.passing, True)
 43
 44    def test_group_negative(self):
 45        """Test binding to group"""
 46        group = Group.objects.create(name=generate_id())
 47        binding = PolicyBinding(group=group)
 48
 49        request = PolicyRequest(self.user)
 50        response = PolicyProcess(binding, request, None).execute()
 51        self.assertEqual(response.passing, False)
 52
 53    def test_user_passing(self):
 54        """Test binding to user"""
 55        binding = PolicyBinding(user=self.user)
 56
 57        request = PolicyRequest(self.user)
 58        response = PolicyProcess(binding, request, None).execute()
 59        self.assertEqual(response.passing, True)
 60
 61    def test_user_negative(self):
 62        """Test binding to user"""
 63        binding = PolicyBinding(user=get_anonymous_user())
 64
 65        request = PolicyRequest(self.user)
 66        response = PolicyProcess(binding, request, None).execute()
 67        self.assertEqual(response.passing, False)
 68
 69    def test_empty(self):
 70        """Test binding to user"""
 71        binding = PolicyBinding()
 72
 73        request = PolicyRequest(self.user)
 74        response = PolicyProcess(binding, request, None).execute()
 75        self.assertEqual(response.passing, False)
 76
 77    def test_invalid(self):
 78        """Test Process with invalid arguments"""
 79        policy = DummyPolicy.objects.create(result=True, wait_min=0, wait_max=1)
 80        binding = PolicyBinding(policy=policy)
 81        with self.assertRaises(ValueError):
 82            PolicyProcess(binding, None, None)  # type: ignore
 83
 84    def test_true(self):
 85        """Test policy execution"""
 86        policy = DummyPolicy.objects.create(result=True, wait_min=0, wait_max=1)
 87        binding = PolicyBinding(policy=policy)
 88
 89        request = PolicyRequest(self.user)
 90        response = PolicyProcess(binding, request, None).execute()
 91        self.assertEqual(response.passing, True)
 92        self.assertEqual(response.messages, ("dummy",))
 93
 94    def test_false(self):
 95        """Test policy execution"""
 96        policy = DummyPolicy.objects.create(result=False, wait_min=0, wait_max=1)
 97        binding = PolicyBinding(policy=policy)
 98
 99        request = PolicyRequest(self.user)
100        response = PolicyProcess(binding, request, None).execute()
101        self.assertEqual(response.passing, False)
102        self.assertEqual(response.messages, ("dummy",))
103
104    def test_negate(self):
105        """Test policy execution"""
106        policy = DummyPolicy.objects.create(result=False, wait_min=0, wait_max=1)
107        binding = PolicyBinding(policy=policy, negate=True)
108
109        request = PolicyRequest(self.user)
110        response = PolicyProcess(binding, request, None).execute()
111        self.assertEqual(response.passing, True)
112        self.assertEqual(response.messages, ("dummy",))
113
114    def test_exception(self):
115        """Test policy execution"""
116        policy = Policy.objects.create(name=generate_id())
117        binding = PolicyBinding(
118            policy=policy, target=Application.objects.create(name=generate_id())
119        )
120
121        request = PolicyRequest(self.user)
122        response = PolicyProcess(binding, request, None).execute()
123        self.assertEqual(response.passing, False)
124
125    def test_execution_logging(self):
126        """Test policy execution creates event"""
127        policy = DummyPolicy.objects.create(
128            name=generate_id(),
129            result=False,
130            wait_min=0,
131            wait_max=1,
132            execution_logging=True,
133        )
134        binding = PolicyBinding(
135            policy=policy, target=Application.objects.create(name=generate_id())
136        )
137
138        http_request = self.factory.get(reverse("authentik_api:user-impersonate-end"))
139        http_request.user = self.user
140        http_request.resolver_match = resolve(reverse("authentik_api:user-impersonate-end"))
141
142        password = generate_id()
143        request = PolicyRequest(self.user)
144        request.set_http_request(http_request)
145        request.context = {
146            "complex": {
147                "dict": {"foo": "bar"},
148                "list": ["foo", "bar"],
149                "tuple": ("foo", "bar"),
150                "set": {"foo", "bar"},
151                "password": password,
152            }
153        }
154        response = PolicyProcess(binding, request, None).execute()
155        self.assertEqual(response.passing, False)
156        self.assertEqual(response.messages, ("dummy",))
157
158        events = Event.objects.filter(
159            action=EventAction.POLICY_EXECUTION,
160            context__policy_uuid=policy.policy_uuid.hex,
161        )
162        self.assertTrue(events.exists())
163        self.assertEqual(len(events), 1)
164        event = events.first()
165        self.assertEqual(event.user["username"], self.user.username)
166        self.assertEqual(event.context["result"]["passing"], False)
167        self.assertEqual(event.context["result"]["messages"], ["dummy"])
168        self.assertEqual(event.client_ip, "127.0.0.1")
169        # Python sets don't preserve order when converted to list,
170        # so ensure we sort the converted set
171        event.context["request"]["context"]["complex"]["set"].sort()
172        self.assertEqual(
173            event.context["request"]["context"],
174            {
175                "complex": {
176                    "set": [
177                        "bar",
178                        "foo",
179                    ],
180                    "dict": {"foo": "bar"},
181                    "list": ["foo", "bar"],
182                    "tuple": ["foo", "bar"],
183                    "password": SafeExceptionReporterFilter.cleansed_substitute,
184                }
185            },
186        )
187
188    def test_execution_logging_anonymous(self):
189        """Test policy execution creates event with anonymous user"""
190        policy = DummyPolicy.objects.create(
191            name=generate_id(),
192            result=False,
193            wait_min=0,
194            wait_max=1,
195            execution_logging=True,
196        )
197        binding = PolicyBinding(
198            policy=policy, target=Application.objects.create(name=generate_id())
199        )
200
201        user = AnonymousUser()
202
203        http_request = self.factory.get("/")
204        http_request.user = user
205
206        request = PolicyRequest(user)
207        request.set_http_request(http_request)
208        response = PolicyProcess(binding, request, None).execute()
209        self.assertEqual(response.passing, False)
210        self.assertEqual(response.messages, ("dummy",))
211
212        events = Event.objects.filter(
213            action=EventAction.POLICY_EXECUTION,
214            context__policy_uuid=policy.policy_uuid.hex,
215        )
216        self.assertTrue(events.exists())
217        self.assertEqual(len(events), 1)
218        event = events.first()
219        self.assertEqual(event.user["username"], "AnonymousUser")
220        self.assertEqual(event.context["result"]["passing"], False)
221        self.assertEqual(event.context["result"]["messages"], ["dummy"])
222        self.assertEqual(event.client_ip, "127.0.0.1")
223
224    def test_raises(self):
225        """Test policy that raises error"""
226        policy_raises = ExpressionPolicy.objects.create(name=generate_id(), expression="{{ 0/0 }}")
227        binding = PolicyBinding(
228            policy=policy_raises, target=Application.objects.create(name=generate_id())
229        )
230
231        request = PolicyRequest(self.user)
232        response = PolicyProcess(binding, request, None).execute()
233        self.assertEqual(response.passing, False)
234        self.assertEqual(response.messages, ("division by zero",))
235
236        events = Event.objects.filter(
237            action=EventAction.POLICY_EXCEPTION,
238            context__policy_uuid=policy_raises.policy_uuid.hex,
239        )
240        self.assertTrue(events.exists())
241        self.assertEqual(len(events), 1)
242        event = events.first()
243        self.assertEqual(event.user["username"], self.user.username)
244        self.assertIn("Policy failed to execute", event.context["message"])
def clear_policy_cache():
21def clear_policy_cache():
22    """Ensure no policy-related keys are still cached"""
23    keys = cache.keys(f"{CACHE_PREFIX}*")
24    cache.delete(keys)

Ensure no policy-related keys are still cached

class TestPolicyProcess(django.test.testcases.TestCase):
 27class TestPolicyProcess(TestCase):
 28    """Policy Process tests"""
 29
 30    def setUp(self):
 31        clear_policy_cache()
 32        self.factory = RequestFactory()
 33        self.user = User.objects.create_user(username=generate_id())
 34
 35    def test_group_passing(self):
 36        """Test binding to group"""
 37        group = Group.objects.create(name=generate_id())
 38        group.users.add(self.user)
 39        binding = PolicyBinding(group=group)
 40
 41        request = PolicyRequest(self.user)
 42        response = PolicyProcess(binding, request, None).execute()
 43        self.assertEqual(response.passing, True)
 44
 45    def test_group_negative(self):
 46        """Test binding to group"""
 47        group = Group.objects.create(name=generate_id())
 48        binding = PolicyBinding(group=group)
 49
 50        request = PolicyRequest(self.user)
 51        response = PolicyProcess(binding, request, None).execute()
 52        self.assertEqual(response.passing, False)
 53
 54    def test_user_passing(self):
 55        """Test binding to user"""
 56        binding = PolicyBinding(user=self.user)
 57
 58        request = PolicyRequest(self.user)
 59        response = PolicyProcess(binding, request, None).execute()
 60        self.assertEqual(response.passing, True)
 61
 62    def test_user_negative(self):
 63        """Test binding to user"""
 64        binding = PolicyBinding(user=get_anonymous_user())
 65
 66        request = PolicyRequest(self.user)
 67        response = PolicyProcess(binding, request, None).execute()
 68        self.assertEqual(response.passing, False)
 69
 70    def test_empty(self):
 71        """Test binding to user"""
 72        binding = PolicyBinding()
 73
 74        request = PolicyRequest(self.user)
 75        response = PolicyProcess(binding, request, None).execute()
 76        self.assertEqual(response.passing, False)
 77
 78    def test_invalid(self):
 79        """Test Process with invalid arguments"""
 80        policy = DummyPolicy.objects.create(result=True, wait_min=0, wait_max=1)
 81        binding = PolicyBinding(policy=policy)
 82        with self.assertRaises(ValueError):
 83            PolicyProcess(binding, None, None)  # type: ignore
 84
 85    def test_true(self):
 86        """Test policy execution"""
 87        policy = DummyPolicy.objects.create(result=True, wait_min=0, wait_max=1)
 88        binding = PolicyBinding(policy=policy)
 89
 90        request = PolicyRequest(self.user)
 91        response = PolicyProcess(binding, request, None).execute()
 92        self.assertEqual(response.passing, True)
 93        self.assertEqual(response.messages, ("dummy",))
 94
 95    def test_false(self):
 96        """Test policy execution"""
 97        policy = DummyPolicy.objects.create(result=False, wait_min=0, wait_max=1)
 98        binding = PolicyBinding(policy=policy)
 99
100        request = PolicyRequest(self.user)
101        response = PolicyProcess(binding, request, None).execute()
102        self.assertEqual(response.passing, False)
103        self.assertEqual(response.messages, ("dummy",))
104
105    def test_negate(self):
106        """Test policy execution"""
107        policy = DummyPolicy.objects.create(result=False, wait_min=0, wait_max=1)
108        binding = PolicyBinding(policy=policy, negate=True)
109
110        request = PolicyRequest(self.user)
111        response = PolicyProcess(binding, request, None).execute()
112        self.assertEqual(response.passing, True)
113        self.assertEqual(response.messages, ("dummy",))
114
115    def test_exception(self):
116        """Test policy execution"""
117        policy = Policy.objects.create(name=generate_id())
118        binding = PolicyBinding(
119            policy=policy, target=Application.objects.create(name=generate_id())
120        )
121
122        request = PolicyRequest(self.user)
123        response = PolicyProcess(binding, request, None).execute()
124        self.assertEqual(response.passing, False)
125
126    def test_execution_logging(self):
127        """Test policy execution creates event"""
128        policy = DummyPolicy.objects.create(
129            name=generate_id(),
130            result=False,
131            wait_min=0,
132            wait_max=1,
133            execution_logging=True,
134        )
135        binding = PolicyBinding(
136            policy=policy, target=Application.objects.create(name=generate_id())
137        )
138
139        http_request = self.factory.get(reverse("authentik_api:user-impersonate-end"))
140        http_request.user = self.user
141        http_request.resolver_match = resolve(reverse("authentik_api:user-impersonate-end"))
142
143        password = generate_id()
144        request = PolicyRequest(self.user)
145        request.set_http_request(http_request)
146        request.context = {
147            "complex": {
148                "dict": {"foo": "bar"},
149                "list": ["foo", "bar"],
150                "tuple": ("foo", "bar"),
151                "set": {"foo", "bar"},
152                "password": password,
153            }
154        }
155        response = PolicyProcess(binding, request, None).execute()
156        self.assertEqual(response.passing, False)
157        self.assertEqual(response.messages, ("dummy",))
158
159        events = Event.objects.filter(
160            action=EventAction.POLICY_EXECUTION,
161            context__policy_uuid=policy.policy_uuid.hex,
162        )
163        self.assertTrue(events.exists())
164        self.assertEqual(len(events), 1)
165        event = events.first()
166        self.assertEqual(event.user["username"], self.user.username)
167        self.assertEqual(event.context["result"]["passing"], False)
168        self.assertEqual(event.context["result"]["messages"], ["dummy"])
169        self.assertEqual(event.client_ip, "127.0.0.1")
170        # Python sets don't preserve order when converted to list,
171        # so ensure we sort the converted set
172        event.context["request"]["context"]["complex"]["set"].sort()
173        self.assertEqual(
174            event.context["request"]["context"],
175            {
176                "complex": {
177                    "set": [
178                        "bar",
179                        "foo",
180                    ],
181                    "dict": {"foo": "bar"},
182                    "list": ["foo", "bar"],
183                    "tuple": ["foo", "bar"],
184                    "password": SafeExceptionReporterFilter.cleansed_substitute,
185                }
186            },
187        )
188
189    def test_execution_logging_anonymous(self):
190        """Test policy execution creates event with anonymous user"""
191        policy = DummyPolicy.objects.create(
192            name=generate_id(),
193            result=False,
194            wait_min=0,
195            wait_max=1,
196            execution_logging=True,
197        )
198        binding = PolicyBinding(
199            policy=policy, target=Application.objects.create(name=generate_id())
200        )
201
202        user = AnonymousUser()
203
204        http_request = self.factory.get("/")
205        http_request.user = user
206
207        request = PolicyRequest(user)
208        request.set_http_request(http_request)
209        response = PolicyProcess(binding, request, None).execute()
210        self.assertEqual(response.passing, False)
211        self.assertEqual(response.messages, ("dummy",))
212
213        events = Event.objects.filter(
214            action=EventAction.POLICY_EXECUTION,
215            context__policy_uuid=policy.policy_uuid.hex,
216        )
217        self.assertTrue(events.exists())
218        self.assertEqual(len(events), 1)
219        event = events.first()
220        self.assertEqual(event.user["username"], "AnonymousUser")
221        self.assertEqual(event.context["result"]["passing"], False)
222        self.assertEqual(event.context["result"]["messages"], ["dummy"])
223        self.assertEqual(event.client_ip, "127.0.0.1")
224
225    def test_raises(self):
226        """Test policy that raises error"""
227        policy_raises = ExpressionPolicy.objects.create(name=generate_id(), expression="{{ 0/0 }}")
228        binding = PolicyBinding(
229            policy=policy_raises, target=Application.objects.create(name=generate_id())
230        )
231
232        request = PolicyRequest(self.user)
233        response = PolicyProcess(binding, request, None).execute()
234        self.assertEqual(response.passing, False)
235        self.assertEqual(response.messages, ("division by zero",))
236
237        events = Event.objects.filter(
238            action=EventAction.POLICY_EXCEPTION,
239            context__policy_uuid=policy_raises.policy_uuid.hex,
240        )
241        self.assertTrue(events.exists())
242        self.assertEqual(len(events), 1)
243        event = events.first()
244        self.assertEqual(event.user["username"], self.user.username)
245        self.assertIn("Policy failed to execute", event.context["message"])

Policy Process tests

def setUp(self):
30    def setUp(self):
31        clear_policy_cache()
32        self.factory = RequestFactory()
33        self.user = User.objects.create_user(username=generate_id())

Hook method for setting up the test fixture before exercising it.

def test_group_passing(self):
35    def test_group_passing(self):
36        """Test binding to group"""
37        group = Group.objects.create(name=generate_id())
38        group.users.add(self.user)
39        binding = PolicyBinding(group=group)
40
41        request = PolicyRequest(self.user)
42        response = PolicyProcess(binding, request, None).execute()
43        self.assertEqual(response.passing, True)

Test binding to group

def test_group_negative(self):
45    def test_group_negative(self):
46        """Test binding to group"""
47        group = Group.objects.create(name=generate_id())
48        binding = PolicyBinding(group=group)
49
50        request = PolicyRequest(self.user)
51        response = PolicyProcess(binding, request, None).execute()
52        self.assertEqual(response.passing, False)

Test binding to group

def test_user_passing(self):
54    def test_user_passing(self):
55        """Test binding to user"""
56        binding = PolicyBinding(user=self.user)
57
58        request = PolicyRequest(self.user)
59        response = PolicyProcess(binding, request, None).execute()
60        self.assertEqual(response.passing, True)

Test binding to user

def test_user_negative(self):
62    def test_user_negative(self):
63        """Test binding to user"""
64        binding = PolicyBinding(user=get_anonymous_user())
65
66        request = PolicyRequest(self.user)
67        response = PolicyProcess(binding, request, None).execute()
68        self.assertEqual(response.passing, False)

Test binding to user

def test_empty(self):
70    def test_empty(self):
71        """Test binding to user"""
72        binding = PolicyBinding()
73
74        request = PolicyRequest(self.user)
75        response = PolicyProcess(binding, request, None).execute()
76        self.assertEqual(response.passing, False)

Test binding to user

def test_invalid(self):
78    def test_invalid(self):
79        """Test Process with invalid arguments"""
80        policy = DummyPolicy.objects.create(result=True, wait_min=0, wait_max=1)
81        binding = PolicyBinding(policy=policy)
82        with self.assertRaises(ValueError):
83            PolicyProcess(binding, None, None)  # type: ignore

Test Process with invalid arguments

def test_true(self):
85    def test_true(self):
86        """Test policy execution"""
87        policy = DummyPolicy.objects.create(result=True, wait_min=0, wait_max=1)
88        binding = PolicyBinding(policy=policy)
89
90        request = PolicyRequest(self.user)
91        response = PolicyProcess(binding, request, None).execute()
92        self.assertEqual(response.passing, True)
93        self.assertEqual(response.messages, ("dummy",))

Test policy execution

def test_false(self):
 95    def test_false(self):
 96        """Test policy execution"""
 97        policy = DummyPolicy.objects.create(result=False, wait_min=0, wait_max=1)
 98        binding = PolicyBinding(policy=policy)
 99
100        request = PolicyRequest(self.user)
101        response = PolicyProcess(binding, request, None).execute()
102        self.assertEqual(response.passing, False)
103        self.assertEqual(response.messages, ("dummy",))

Test policy execution

def test_negate(self):
105    def test_negate(self):
106        """Test policy execution"""
107        policy = DummyPolicy.objects.create(result=False, wait_min=0, wait_max=1)
108        binding = PolicyBinding(policy=policy, negate=True)
109
110        request = PolicyRequest(self.user)
111        response = PolicyProcess(binding, request, None).execute()
112        self.assertEqual(response.passing, True)
113        self.assertEqual(response.messages, ("dummy",))

Test policy execution

def test_exception(self):
115    def test_exception(self):
116        """Test policy execution"""
117        policy = Policy.objects.create(name=generate_id())
118        binding = PolicyBinding(
119            policy=policy, target=Application.objects.create(name=generate_id())
120        )
121
122        request = PolicyRequest(self.user)
123        response = PolicyProcess(binding, request, None).execute()
124        self.assertEqual(response.passing, False)

Test policy execution

def test_execution_logging(self):
126    def test_execution_logging(self):
127        """Test policy execution creates event"""
128        policy = DummyPolicy.objects.create(
129            name=generate_id(),
130            result=False,
131            wait_min=0,
132            wait_max=1,
133            execution_logging=True,
134        )
135        binding = PolicyBinding(
136            policy=policy, target=Application.objects.create(name=generate_id())
137        )
138
139        http_request = self.factory.get(reverse("authentik_api:user-impersonate-end"))
140        http_request.user = self.user
141        http_request.resolver_match = resolve(reverse("authentik_api:user-impersonate-end"))
142
143        password = generate_id()
144        request = PolicyRequest(self.user)
145        request.set_http_request(http_request)
146        request.context = {
147            "complex": {
148                "dict": {"foo": "bar"},
149                "list": ["foo", "bar"],
150                "tuple": ("foo", "bar"),
151                "set": {"foo", "bar"},
152                "password": password,
153            }
154        }
155        response = PolicyProcess(binding, request, None).execute()
156        self.assertEqual(response.passing, False)
157        self.assertEqual(response.messages, ("dummy",))
158
159        events = Event.objects.filter(
160            action=EventAction.POLICY_EXECUTION,
161            context__policy_uuid=policy.policy_uuid.hex,
162        )
163        self.assertTrue(events.exists())
164        self.assertEqual(len(events), 1)
165        event = events.first()
166        self.assertEqual(event.user["username"], self.user.username)
167        self.assertEqual(event.context["result"]["passing"], False)
168        self.assertEqual(event.context["result"]["messages"], ["dummy"])
169        self.assertEqual(event.client_ip, "127.0.0.1")
170        # Python sets don't preserve order when converted to list,
171        # so ensure we sort the converted set
172        event.context["request"]["context"]["complex"]["set"].sort()
173        self.assertEqual(
174            event.context["request"]["context"],
175            {
176                "complex": {
177                    "set": [
178                        "bar",
179                        "foo",
180                    ],
181                    "dict": {"foo": "bar"},
182                    "list": ["foo", "bar"],
183                    "tuple": ["foo", "bar"],
184                    "password": SafeExceptionReporterFilter.cleansed_substitute,
185                }
186            },
187        )

Test policy execution creates event

def test_execution_logging_anonymous(self):
189    def test_execution_logging_anonymous(self):
190        """Test policy execution creates event with anonymous user"""
191        policy = DummyPolicy.objects.create(
192            name=generate_id(),
193            result=False,
194            wait_min=0,
195            wait_max=1,
196            execution_logging=True,
197        )
198        binding = PolicyBinding(
199            policy=policy, target=Application.objects.create(name=generate_id())
200        )
201
202        user = AnonymousUser()
203
204        http_request = self.factory.get("/")
205        http_request.user = user
206
207        request = PolicyRequest(user)
208        request.set_http_request(http_request)
209        response = PolicyProcess(binding, request, None).execute()
210        self.assertEqual(response.passing, False)
211        self.assertEqual(response.messages, ("dummy",))
212
213        events = Event.objects.filter(
214            action=EventAction.POLICY_EXECUTION,
215            context__policy_uuid=policy.policy_uuid.hex,
216        )
217        self.assertTrue(events.exists())
218        self.assertEqual(len(events), 1)
219        event = events.first()
220        self.assertEqual(event.user["username"], "AnonymousUser")
221        self.assertEqual(event.context["result"]["passing"], False)
222        self.assertEqual(event.context["result"]["messages"], ["dummy"])
223        self.assertEqual(event.client_ip, "127.0.0.1")

Test policy execution creates event with anonymous user

def test_raises(self):
225    def test_raises(self):
226        """Test policy that raises error"""
227        policy_raises = ExpressionPolicy.objects.create(name=generate_id(), expression="{{ 0/0 }}")
228        binding = PolicyBinding(
229            policy=policy_raises, target=Application.objects.create(name=generate_id())
230        )
231
232        request = PolicyRequest(self.user)
233        response = PolicyProcess(binding, request, None).execute()
234        self.assertEqual(response.passing, False)
235        self.assertEqual(response.messages, ("division by zero",))
236
237        events = Event.objects.filter(
238            action=EventAction.POLICY_EXCEPTION,
239            context__policy_uuid=policy_raises.policy_uuid.hex,
240        )
241        self.assertTrue(events.exists())
242        self.assertEqual(len(events), 1)
243        event = events.first()
244        self.assertEqual(event.user["username"], self.user.username)
245        self.assertIn("Policy failed to execute", event.context["message"])

Test policy that raises error