authentik.policies.tests.test_engine

policy engine tests

  1"""policy engine tests"""
  2
  3from django.core.cache import cache
  4from django.db import connections
  5from django.test import TestCase
  6from django.test.utils import CaptureQueriesContext
  7
  8from authentik.core.models import Group
  9from authentik.core.tests.utils import create_test_user
 10from authentik.lib.generators import generate_id
 11from authentik.policies.dummy.models import DummyPolicy
 12from authentik.policies.engine import PolicyEngine
 13from authentik.policies.exceptions import PolicyEngineException
 14from authentik.policies.expression.models import ExpressionPolicy
 15from authentik.policies.models import Policy, PolicyBinding, PolicyBindingModel, PolicyEngineMode
 16from authentik.policies.tests.test_process import clear_policy_cache
 17from authentik.policies.types import CACHE_PREFIX
 18
 19
 20class TestPolicyEngine(TestCase):
 21    """PolicyEngine tests"""
 22
 23    def setUp(self):
 24        clear_policy_cache()
 25        self.user = create_test_user()
 26        self.policy_false = DummyPolicy.objects.create(
 27            name=generate_id(), result=False, wait_min=0, wait_max=1
 28        )
 29        self.policy_true = DummyPolicy.objects.create(
 30            name=generate_id(), result=True, wait_min=0, wait_max=1
 31        )
 32        self.policy_wrong_type = Policy.objects.create(name=generate_id())
 33        self.policy_raises = ExpressionPolicy.objects.create(
 34            name=generate_id(), expression="{{ 0/0 }}"
 35        )
 36        self.group_member = Group.objects.create(name=generate_id())
 37        self.user.groups.add(self.group_member)
 38        self.group_non_member = Group.objects.create(name=generate_id())
 39
 40    def test_engine_empty(self):
 41        """Ensure empty policy list passes"""
 42        pbm = PolicyBindingModel.objects.create()
 43        engine = PolicyEngine(pbm, self.user)
 44        result = engine.build().result
 45        self.assertEqual(result.passing, True)
 46        self.assertEqual(result.messages, ())
 47
 48    def test_engine_simple(self):
 49        """Ensure simplest use-case"""
 50        pbm = PolicyBindingModel.objects.create()
 51        PolicyBinding.objects.create(target=pbm, policy=self.policy_true, order=0)
 52        engine = PolicyEngine(pbm, self.user)
 53        result = engine.build().result
 54        self.assertEqual(result.passing, True)
 55        self.assertEqual(result.messages, ("dummy",))
 56
 57    def test_engine_mode_all_dyn(self):
 58        """Ensure all policies passes with AND mode (false and true -> false)"""
 59        pbm = PolicyBindingModel.objects.create(policy_engine_mode=PolicyEngineMode.MODE_ALL)
 60        PolicyBinding.objects.create(target=pbm, policy=self.policy_false, order=0)
 61        PolicyBinding.objects.create(target=pbm, policy=self.policy_true, order=1)
 62        engine = PolicyEngine(pbm, self.user)
 63        result = engine.build().result
 64        self.assertEqual(result.passing, False)
 65        self.assertEqual(
 66            result.messages,
 67            (
 68                "dummy",
 69                "dummy",
 70            ),
 71        )
 72
 73    def test_engine_mode_any_dyn(self):
 74        """Ensure all policies passes with OR mode (false and true -> true)"""
 75        pbm = PolicyBindingModel.objects.create(policy_engine_mode=PolicyEngineMode.MODE_ANY)
 76        PolicyBinding.objects.create(target=pbm, policy=self.policy_false, order=0)
 77        PolicyBinding.objects.create(target=pbm, policy=self.policy_true, order=1)
 78        engine = PolicyEngine(pbm, self.user)
 79        result = engine.build().result
 80        self.assertEqual(result.passing, True)
 81        self.assertEqual(
 82            result.messages,
 83            (
 84                "dummy",
 85                "dummy",
 86            ),
 87        )
 88
 89    def test_engine_mode_all_static(self):
 90        """Ensure all policies passes with OR mode (false and true -> true)"""
 91        pbm = PolicyBindingModel.objects.create(policy_engine_mode=PolicyEngineMode.MODE_ALL)
 92        PolicyBinding.objects.create(target=pbm, group=self.group_member, order=0)
 93        PolicyBinding.objects.create(target=pbm, group=self.group_non_member, order=1)
 94        engine = PolicyEngine(pbm, self.user)
 95        result = engine.build().result
 96        self.assertEqual(result.passing, False)
 97        self.assertEqual(result.messages, ())
 98
 99    def test_engine_mode_any_static(self):
100        """Ensure all policies passes with OR mode (false and true -> true)"""
101        pbm = PolicyBindingModel.objects.create(policy_engine_mode=PolicyEngineMode.MODE_ANY)
102        PolicyBinding.objects.create(target=pbm, group=self.group_member, order=0)
103        PolicyBinding.objects.create(target=pbm, group=self.group_non_member, order=1)
104        engine = PolicyEngine(pbm, self.user)
105        result = engine.build().result
106        self.assertEqual(result.passing, True)
107        self.assertEqual(result.messages, ())
108
109    def test_engine_negate(self):
110        """Test negate flag"""
111        pbm = PolicyBindingModel.objects.create()
112        PolicyBinding.objects.create(target=pbm, policy=self.policy_true, negate=True, order=0)
113        engine = PolicyEngine(pbm, self.user)
114        result = engine.build().result
115        self.assertEqual(result.passing, False)
116        self.assertEqual(result.messages, ("dummy",))
117
118    def test_engine_policy_error(self):
119        """Test policy raising an error flag"""
120        pbm = PolicyBindingModel.objects.create()
121        PolicyBinding.objects.create(target=pbm, policy=self.policy_raises, order=0)
122        engine = PolicyEngine(pbm, self.user)
123        result = engine.build().result
124        self.assertEqual(result.passing, False)
125        self.assertEqual(result.messages, ("division by zero",))
126
127    def test_engine_policy_error_failure(self):
128        """Test policy raising an error flag"""
129        pbm = PolicyBindingModel.objects.create()
130        PolicyBinding.objects.create(
131            target=pbm, policy=self.policy_raises, order=0, failure_result=True
132        )
133        engine = PolicyEngine(pbm, self.user)
134        result = engine.build().result
135        self.assertEqual(result.passing, True)
136        self.assertEqual(result.messages, ("division by zero",))
137
138    def test_engine_policy_type(self):
139        """Test invalid policy type"""
140        pbm = PolicyBindingModel.objects.create()
141        PolicyBinding.objects.create(target=pbm, policy=self.policy_wrong_type, order=0)
142        with self.assertRaises(PolicyEngineException):
143            engine = PolicyEngine(pbm, self.user)
144            engine.build()
145
146    def test_engine_cache(self):
147        """Ensure empty policy list passes"""
148        pbm = PolicyBindingModel.objects.create()
149        binding = PolicyBinding.objects.create(target=pbm, policy=self.policy_false, order=0)
150        engine = PolicyEngine(pbm, self.user)
151        self.assertEqual(len(cache.keys(f"{CACHE_PREFIX}{binding.policy_binding_uuid.hex}*")), 0)
152        self.assertEqual(engine.build().passing, False)
153        self.assertEqual(len(cache.keys(f"{CACHE_PREFIX}{binding.policy_binding_uuid.hex}*")), 1)
154        self.assertEqual(engine.build().passing, False)
155        self.assertEqual(len(cache.keys(f"{CACHE_PREFIX}{binding.policy_binding_uuid.hex}*")), 1)
156
157    def test_engine_static_bindings(self):
158        """Test static bindings"""
159        group_a = Group.objects.create(name=generate_id())
160        group_b = Group.objects.create(name=generate_id())
161        group_b.users.add(self.user)
162        user = create_test_user()
163
164        for case in [
165            {
166                "message": "Group, not member",
167                "binding_args": {"group": group_a},
168                "passing": False,
169            },
170            {
171                "message": "Group, member",
172                "binding_args": {"group": group_b},
173                "passing": True,
174            },
175            {
176                "message": "User, other",
177                "binding_args": {"user": user},
178                "passing": False,
179            },
180            {
181                "message": "User, same",
182                "binding_args": {"user": self.user},
183                "passing": True,
184            },
185        ]:
186            with self.subTest():
187                pbm = PolicyBindingModel.objects.create()
188                for x in range(1000):
189                    PolicyBinding.objects.create(target=pbm, order=x, **case["binding_args"])
190                engine = PolicyEngine(pbm, self.user)
191                engine.use_cache = False
192                with CaptureQueriesContext(connections["default"]) as ctx:
193                    engine.build()
194                self.assertLess(ctx.final_queries, 1000)
195                self.assertEqual(engine.result.passing, case["passing"])
196
197    def test_engine_group_complex(self):
198        """Test more complex group setups"""
199        group_a = Group.objects.create(name=generate_id())
200        group_b = Group.objects.create(name=generate_id())
201        group_b.parents.add(group_a)
202        user = create_test_user()
203        group_b.users.add(user)
204        pbm = PolicyBindingModel.objects.create()
205        PolicyBinding.objects.create(target=pbm, order=0, group=group_a)
206        engine = PolicyEngine(pbm, user)
207        engine.use_cache = False
208        with CaptureQueriesContext(connections["default"]) as ctx:
209            engine.build()
210        self.assertLess(ctx.final_queries, 1000)
211        self.assertTrue(engine.result.passing)
class TestPolicyEngine(django.test.testcases.TestCase):
 21class TestPolicyEngine(TestCase):
 22    """PolicyEngine tests"""
 23
 24    def setUp(self):
 25        clear_policy_cache()
 26        self.user = create_test_user()
 27        self.policy_false = DummyPolicy.objects.create(
 28            name=generate_id(), result=False, wait_min=0, wait_max=1
 29        )
 30        self.policy_true = DummyPolicy.objects.create(
 31            name=generate_id(), result=True, wait_min=0, wait_max=1
 32        )
 33        self.policy_wrong_type = Policy.objects.create(name=generate_id())
 34        self.policy_raises = ExpressionPolicy.objects.create(
 35            name=generate_id(), expression="{{ 0/0 }}"
 36        )
 37        self.group_member = Group.objects.create(name=generate_id())
 38        self.user.groups.add(self.group_member)
 39        self.group_non_member = Group.objects.create(name=generate_id())
 40
 41    def test_engine_empty(self):
 42        """Ensure empty policy list passes"""
 43        pbm = PolicyBindingModel.objects.create()
 44        engine = PolicyEngine(pbm, self.user)
 45        result = engine.build().result
 46        self.assertEqual(result.passing, True)
 47        self.assertEqual(result.messages, ())
 48
 49    def test_engine_simple(self):
 50        """Ensure simplest use-case"""
 51        pbm = PolicyBindingModel.objects.create()
 52        PolicyBinding.objects.create(target=pbm, policy=self.policy_true, order=0)
 53        engine = PolicyEngine(pbm, self.user)
 54        result = engine.build().result
 55        self.assertEqual(result.passing, True)
 56        self.assertEqual(result.messages, ("dummy",))
 57
 58    def test_engine_mode_all_dyn(self):
 59        """Ensure all policies passes with AND mode (false and true -> false)"""
 60        pbm = PolicyBindingModel.objects.create(policy_engine_mode=PolicyEngineMode.MODE_ALL)
 61        PolicyBinding.objects.create(target=pbm, policy=self.policy_false, order=0)
 62        PolicyBinding.objects.create(target=pbm, policy=self.policy_true, order=1)
 63        engine = PolicyEngine(pbm, self.user)
 64        result = engine.build().result
 65        self.assertEqual(result.passing, False)
 66        self.assertEqual(
 67            result.messages,
 68            (
 69                "dummy",
 70                "dummy",
 71            ),
 72        )
 73
 74    def test_engine_mode_any_dyn(self):
 75        """Ensure all policies passes with OR mode (false and true -> true)"""
 76        pbm = PolicyBindingModel.objects.create(policy_engine_mode=PolicyEngineMode.MODE_ANY)
 77        PolicyBinding.objects.create(target=pbm, policy=self.policy_false, order=0)
 78        PolicyBinding.objects.create(target=pbm, policy=self.policy_true, order=1)
 79        engine = PolicyEngine(pbm, self.user)
 80        result = engine.build().result
 81        self.assertEqual(result.passing, True)
 82        self.assertEqual(
 83            result.messages,
 84            (
 85                "dummy",
 86                "dummy",
 87            ),
 88        )
 89
 90    def test_engine_mode_all_static(self):
 91        """Ensure all policies passes with OR mode (false and true -> true)"""
 92        pbm = PolicyBindingModel.objects.create(policy_engine_mode=PolicyEngineMode.MODE_ALL)
 93        PolicyBinding.objects.create(target=pbm, group=self.group_member, order=0)
 94        PolicyBinding.objects.create(target=pbm, group=self.group_non_member, order=1)
 95        engine = PolicyEngine(pbm, self.user)
 96        result = engine.build().result
 97        self.assertEqual(result.passing, False)
 98        self.assertEqual(result.messages, ())
 99
100    def test_engine_mode_any_static(self):
101        """Ensure all policies passes with OR mode (false and true -> true)"""
102        pbm = PolicyBindingModel.objects.create(policy_engine_mode=PolicyEngineMode.MODE_ANY)
103        PolicyBinding.objects.create(target=pbm, group=self.group_member, order=0)
104        PolicyBinding.objects.create(target=pbm, group=self.group_non_member, order=1)
105        engine = PolicyEngine(pbm, self.user)
106        result = engine.build().result
107        self.assertEqual(result.passing, True)
108        self.assertEqual(result.messages, ())
109
110    def test_engine_negate(self):
111        """Test negate flag"""
112        pbm = PolicyBindingModel.objects.create()
113        PolicyBinding.objects.create(target=pbm, policy=self.policy_true, negate=True, order=0)
114        engine = PolicyEngine(pbm, self.user)
115        result = engine.build().result
116        self.assertEqual(result.passing, False)
117        self.assertEqual(result.messages, ("dummy",))
118
119    def test_engine_policy_error(self):
120        """Test policy raising an error flag"""
121        pbm = PolicyBindingModel.objects.create()
122        PolicyBinding.objects.create(target=pbm, policy=self.policy_raises, order=0)
123        engine = PolicyEngine(pbm, self.user)
124        result = engine.build().result
125        self.assertEqual(result.passing, False)
126        self.assertEqual(result.messages, ("division by zero",))
127
128    def test_engine_policy_error_failure(self):
129        """Test policy raising an error flag"""
130        pbm = PolicyBindingModel.objects.create()
131        PolicyBinding.objects.create(
132            target=pbm, policy=self.policy_raises, order=0, failure_result=True
133        )
134        engine = PolicyEngine(pbm, self.user)
135        result = engine.build().result
136        self.assertEqual(result.passing, True)
137        self.assertEqual(result.messages, ("division by zero",))
138
139    def test_engine_policy_type(self):
140        """Test invalid policy type"""
141        pbm = PolicyBindingModel.objects.create()
142        PolicyBinding.objects.create(target=pbm, policy=self.policy_wrong_type, order=0)
143        with self.assertRaises(PolicyEngineException):
144            engine = PolicyEngine(pbm, self.user)
145            engine.build()
146
147    def test_engine_cache(self):
148        """Ensure empty policy list passes"""
149        pbm = PolicyBindingModel.objects.create()
150        binding = PolicyBinding.objects.create(target=pbm, policy=self.policy_false, order=0)
151        engine = PolicyEngine(pbm, self.user)
152        self.assertEqual(len(cache.keys(f"{CACHE_PREFIX}{binding.policy_binding_uuid.hex}*")), 0)
153        self.assertEqual(engine.build().passing, False)
154        self.assertEqual(len(cache.keys(f"{CACHE_PREFIX}{binding.policy_binding_uuid.hex}*")), 1)
155        self.assertEqual(engine.build().passing, False)
156        self.assertEqual(len(cache.keys(f"{CACHE_PREFIX}{binding.policy_binding_uuid.hex}*")), 1)
157
158    def test_engine_static_bindings(self):
159        """Test static bindings"""
160        group_a = Group.objects.create(name=generate_id())
161        group_b = Group.objects.create(name=generate_id())
162        group_b.users.add(self.user)
163        user = create_test_user()
164
165        for case in [
166            {
167                "message": "Group, not member",
168                "binding_args": {"group": group_a},
169                "passing": False,
170            },
171            {
172                "message": "Group, member",
173                "binding_args": {"group": group_b},
174                "passing": True,
175            },
176            {
177                "message": "User, other",
178                "binding_args": {"user": user},
179                "passing": False,
180            },
181            {
182                "message": "User, same",
183                "binding_args": {"user": self.user},
184                "passing": True,
185            },
186        ]:
187            with self.subTest():
188                pbm = PolicyBindingModel.objects.create()
189                for x in range(1000):
190                    PolicyBinding.objects.create(target=pbm, order=x, **case["binding_args"])
191                engine = PolicyEngine(pbm, self.user)
192                engine.use_cache = False
193                with CaptureQueriesContext(connections["default"]) as ctx:
194                    engine.build()
195                self.assertLess(ctx.final_queries, 1000)
196                self.assertEqual(engine.result.passing, case["passing"])
197
198    def test_engine_group_complex(self):
199        """Test more complex group setups"""
200        group_a = Group.objects.create(name=generate_id())
201        group_b = Group.objects.create(name=generate_id())
202        group_b.parents.add(group_a)
203        user = create_test_user()
204        group_b.users.add(user)
205        pbm = PolicyBindingModel.objects.create()
206        PolicyBinding.objects.create(target=pbm, order=0, group=group_a)
207        engine = PolicyEngine(pbm, user)
208        engine.use_cache = False
209        with CaptureQueriesContext(connections["default"]) as ctx:
210            engine.build()
211        self.assertLess(ctx.final_queries, 1000)
212        self.assertTrue(engine.result.passing)

PolicyEngine tests

def setUp(self):
24    def setUp(self):
25        clear_policy_cache()
26        self.user = create_test_user()
27        self.policy_false = DummyPolicy.objects.create(
28            name=generate_id(), result=False, wait_min=0, wait_max=1
29        )
30        self.policy_true = DummyPolicy.objects.create(
31            name=generate_id(), result=True, wait_min=0, wait_max=1
32        )
33        self.policy_wrong_type = Policy.objects.create(name=generate_id())
34        self.policy_raises = ExpressionPolicy.objects.create(
35            name=generate_id(), expression="{{ 0/0 }}"
36        )
37        self.group_member = Group.objects.create(name=generate_id())
38        self.user.groups.add(self.group_member)
39        self.group_non_member = Group.objects.create(name=generate_id())

Hook method for setting up the test fixture before exercising it.

def test_engine_empty(self):
41    def test_engine_empty(self):
42        """Ensure empty policy list passes"""
43        pbm = PolicyBindingModel.objects.create()
44        engine = PolicyEngine(pbm, self.user)
45        result = engine.build().result
46        self.assertEqual(result.passing, True)
47        self.assertEqual(result.messages, ())

Ensure empty policy list passes

def test_engine_simple(self):
49    def test_engine_simple(self):
50        """Ensure simplest use-case"""
51        pbm = PolicyBindingModel.objects.create()
52        PolicyBinding.objects.create(target=pbm, policy=self.policy_true, order=0)
53        engine = PolicyEngine(pbm, self.user)
54        result = engine.build().result
55        self.assertEqual(result.passing, True)
56        self.assertEqual(result.messages, ("dummy",))

Ensure simplest use-case

def test_engine_mode_all_dyn(self):
58    def test_engine_mode_all_dyn(self):
59        """Ensure all policies passes with AND mode (false and true -> false)"""
60        pbm = PolicyBindingModel.objects.create(policy_engine_mode=PolicyEngineMode.MODE_ALL)
61        PolicyBinding.objects.create(target=pbm, policy=self.policy_false, order=0)
62        PolicyBinding.objects.create(target=pbm, policy=self.policy_true, order=1)
63        engine = PolicyEngine(pbm, self.user)
64        result = engine.build().result
65        self.assertEqual(result.passing, False)
66        self.assertEqual(
67            result.messages,
68            (
69                "dummy",
70                "dummy",
71            ),
72        )

Ensure all policies passes with AND mode (false and true -> false)

def test_engine_mode_any_dyn(self):
74    def test_engine_mode_any_dyn(self):
75        """Ensure all policies passes with OR mode (false and true -> true)"""
76        pbm = PolicyBindingModel.objects.create(policy_engine_mode=PolicyEngineMode.MODE_ANY)
77        PolicyBinding.objects.create(target=pbm, policy=self.policy_false, order=0)
78        PolicyBinding.objects.create(target=pbm, policy=self.policy_true, order=1)
79        engine = PolicyEngine(pbm, self.user)
80        result = engine.build().result
81        self.assertEqual(result.passing, True)
82        self.assertEqual(
83            result.messages,
84            (
85                "dummy",
86                "dummy",
87            ),
88        )

Ensure all policies passes with OR mode (false and true -> true)

def test_engine_mode_all_static(self):
90    def test_engine_mode_all_static(self):
91        """Ensure all policies passes with OR mode (false and true -> true)"""
92        pbm = PolicyBindingModel.objects.create(policy_engine_mode=PolicyEngineMode.MODE_ALL)
93        PolicyBinding.objects.create(target=pbm, group=self.group_member, order=0)
94        PolicyBinding.objects.create(target=pbm, group=self.group_non_member, order=1)
95        engine = PolicyEngine(pbm, self.user)
96        result = engine.build().result
97        self.assertEqual(result.passing, False)
98        self.assertEqual(result.messages, ())

Ensure all policies passes with OR mode (false and true -> true)

def test_engine_mode_any_static(self):
100    def test_engine_mode_any_static(self):
101        """Ensure all policies passes with OR mode (false and true -> true)"""
102        pbm = PolicyBindingModel.objects.create(policy_engine_mode=PolicyEngineMode.MODE_ANY)
103        PolicyBinding.objects.create(target=pbm, group=self.group_member, order=0)
104        PolicyBinding.objects.create(target=pbm, group=self.group_non_member, order=1)
105        engine = PolicyEngine(pbm, self.user)
106        result = engine.build().result
107        self.assertEqual(result.passing, True)
108        self.assertEqual(result.messages, ())

Ensure all policies passes with OR mode (false and true -> true)

def test_engine_negate(self):
110    def test_engine_negate(self):
111        """Test negate flag"""
112        pbm = PolicyBindingModel.objects.create()
113        PolicyBinding.objects.create(target=pbm, policy=self.policy_true, negate=True, order=0)
114        engine = PolicyEngine(pbm, self.user)
115        result = engine.build().result
116        self.assertEqual(result.passing, False)
117        self.assertEqual(result.messages, ("dummy",))

Test negate flag

def test_engine_policy_error(self):
119    def test_engine_policy_error(self):
120        """Test policy raising an error flag"""
121        pbm = PolicyBindingModel.objects.create()
122        PolicyBinding.objects.create(target=pbm, policy=self.policy_raises, order=0)
123        engine = PolicyEngine(pbm, self.user)
124        result = engine.build().result
125        self.assertEqual(result.passing, False)
126        self.assertEqual(result.messages, ("division by zero",))

Test policy raising an error flag

def test_engine_policy_error_failure(self):
128    def test_engine_policy_error_failure(self):
129        """Test policy raising an error flag"""
130        pbm = PolicyBindingModel.objects.create()
131        PolicyBinding.objects.create(
132            target=pbm, policy=self.policy_raises, order=0, failure_result=True
133        )
134        engine = PolicyEngine(pbm, self.user)
135        result = engine.build().result
136        self.assertEqual(result.passing, True)
137        self.assertEqual(result.messages, ("division by zero",))

Test policy raising an error flag

def test_engine_policy_type(self):
139    def test_engine_policy_type(self):
140        """Test invalid policy type"""
141        pbm = PolicyBindingModel.objects.create()
142        PolicyBinding.objects.create(target=pbm, policy=self.policy_wrong_type, order=0)
143        with self.assertRaises(PolicyEngineException):
144            engine = PolicyEngine(pbm, self.user)
145            engine.build()

Test invalid policy type

def test_engine_cache(self):
147    def test_engine_cache(self):
148        """Ensure empty policy list passes"""
149        pbm = PolicyBindingModel.objects.create()
150        binding = PolicyBinding.objects.create(target=pbm, policy=self.policy_false, order=0)
151        engine = PolicyEngine(pbm, self.user)
152        self.assertEqual(len(cache.keys(f"{CACHE_PREFIX}{binding.policy_binding_uuid.hex}*")), 0)
153        self.assertEqual(engine.build().passing, False)
154        self.assertEqual(len(cache.keys(f"{CACHE_PREFIX}{binding.policy_binding_uuid.hex}*")), 1)
155        self.assertEqual(engine.build().passing, False)
156        self.assertEqual(len(cache.keys(f"{CACHE_PREFIX}{binding.policy_binding_uuid.hex}*")), 1)

Ensure empty policy list passes

def test_engine_static_bindings(self):
158    def test_engine_static_bindings(self):
159        """Test static bindings"""
160        group_a = Group.objects.create(name=generate_id())
161        group_b = Group.objects.create(name=generate_id())
162        group_b.users.add(self.user)
163        user = create_test_user()
164
165        for case in [
166            {
167                "message": "Group, not member",
168                "binding_args": {"group": group_a},
169                "passing": False,
170            },
171            {
172                "message": "Group, member",
173                "binding_args": {"group": group_b},
174                "passing": True,
175            },
176            {
177                "message": "User, other",
178                "binding_args": {"user": user},
179                "passing": False,
180            },
181            {
182                "message": "User, same",
183                "binding_args": {"user": self.user},
184                "passing": True,
185            },
186        ]:
187            with self.subTest():
188                pbm = PolicyBindingModel.objects.create()
189                for x in range(1000):
190                    PolicyBinding.objects.create(target=pbm, order=x, **case["binding_args"])
191                engine = PolicyEngine(pbm, self.user)
192                engine.use_cache = False
193                with CaptureQueriesContext(connections["default"]) as ctx:
194                    engine.build()
195                self.assertLess(ctx.final_queries, 1000)
196                self.assertEqual(engine.result.passing, case["passing"])

Test static bindings

def test_engine_group_complex(self):
198    def test_engine_group_complex(self):
199        """Test more complex group setups"""
200        group_a = Group.objects.create(name=generate_id())
201        group_b = Group.objects.create(name=generate_id())
202        group_b.parents.add(group_a)
203        user = create_test_user()
204        group_b.users.add(user)
205        pbm = PolicyBindingModel.objects.create()
206        PolicyBinding.objects.create(target=pbm, order=0, group=group_a)
207        engine = PolicyEngine(pbm, user)
208        engine.use_cache = False
209        with CaptureQueriesContext(connections["default"]) as ctx:
210            engine.build()
211        self.assertLess(ctx.final_queries, 1000)
212        self.assertTrue(engine.result.passing)

Test more complex group setups