authentik.policies.tests.test_engine
policy engine tests
1"""policy engine tests""" 2 3from django.core.cache import cache 4from django.db import connections 5from django.test import TestCase 6from django.test.utils import CaptureQueriesContext 7 8from authentik.core.models import Group 9from authentik.core.tests.utils import create_test_user 10from authentik.lib.generators import generate_id 11from authentik.policies.dummy.models import DummyPolicy 12from authentik.policies.engine import PolicyEngine 13from authentik.policies.exceptions import PolicyEngineException 14from authentik.policies.expression.models import ExpressionPolicy 15from authentik.policies.models import Policy, PolicyBinding, PolicyBindingModel, PolicyEngineMode 16from authentik.policies.tests.test_process import clear_policy_cache 17from authentik.policies.types import CACHE_PREFIX 18 19 20class TestPolicyEngine(TestCase): 21 """PolicyEngine tests""" 22 23 def setUp(self): 24 clear_policy_cache() 25 self.user = create_test_user() 26 self.policy_false = DummyPolicy.objects.create( 27 name=generate_id(), result=False, wait_min=0, wait_max=1 28 ) 29 self.policy_true = DummyPolicy.objects.create( 30 name=generate_id(), result=True, wait_min=0, wait_max=1 31 ) 32 self.policy_wrong_type = Policy.objects.create(name=generate_id()) 33 self.policy_raises = ExpressionPolicy.objects.create( 34 name=generate_id(), expression="{{ 0/0 }}" 35 ) 36 self.group_member = Group.objects.create(name=generate_id()) 37 self.user.groups.add(self.group_member) 38 self.group_non_member = Group.objects.create(name=generate_id()) 39 40 def test_engine_empty(self): 41 """Ensure empty policy list passes""" 42 pbm = PolicyBindingModel.objects.create() 43 engine = PolicyEngine(pbm, self.user) 44 result = engine.build().result 45 self.assertEqual(result.passing, True) 46 self.assertEqual(result.messages, ()) 47 48 def test_engine_simple(self): 49 """Ensure simplest use-case""" 50 pbm = PolicyBindingModel.objects.create() 51 PolicyBinding.objects.create(target=pbm, policy=self.policy_true, order=0) 52 engine = PolicyEngine(pbm, self.user) 53 result = engine.build().result 54 self.assertEqual(result.passing, True) 55 self.assertEqual(result.messages, ("dummy",)) 56 57 def test_engine_mode_all_dyn(self): 58 """Ensure all policies passes with AND mode (false and true -> false)""" 59 pbm = PolicyBindingModel.objects.create(policy_engine_mode=PolicyEngineMode.MODE_ALL) 60 PolicyBinding.objects.create(target=pbm, policy=self.policy_false, order=0) 61 PolicyBinding.objects.create(target=pbm, policy=self.policy_true, order=1) 62 engine = PolicyEngine(pbm, self.user) 63 result = engine.build().result 64 self.assertEqual(result.passing, False) 65 self.assertEqual( 66 result.messages, 67 ( 68 "dummy", 69 "dummy", 70 ), 71 ) 72 73 def test_engine_mode_any_dyn(self): 74 """Ensure all policies passes with OR mode (false and true -> true)""" 75 pbm = PolicyBindingModel.objects.create(policy_engine_mode=PolicyEngineMode.MODE_ANY) 76 PolicyBinding.objects.create(target=pbm, policy=self.policy_false, order=0) 77 PolicyBinding.objects.create(target=pbm, policy=self.policy_true, order=1) 78 engine = PolicyEngine(pbm, self.user) 79 result = engine.build().result 80 self.assertEqual(result.passing, True) 81 self.assertEqual( 82 result.messages, 83 ( 84 "dummy", 85 "dummy", 86 ), 87 ) 88 89 def test_engine_mode_all_static(self): 90 """Ensure all policies passes with OR mode (false and true -> true)""" 91 pbm = PolicyBindingModel.objects.create(policy_engine_mode=PolicyEngineMode.MODE_ALL) 92 PolicyBinding.objects.create(target=pbm, group=self.group_member, order=0) 93 PolicyBinding.objects.create(target=pbm, group=self.group_non_member, order=1) 94 engine = PolicyEngine(pbm, self.user) 95 result = engine.build().result 96 self.assertEqual(result.passing, False) 97 self.assertEqual(result.messages, ()) 98 99 def test_engine_mode_any_static(self): 100 """Ensure all policies passes with OR mode (false and true -> true)""" 101 pbm = PolicyBindingModel.objects.create(policy_engine_mode=PolicyEngineMode.MODE_ANY) 102 PolicyBinding.objects.create(target=pbm, group=self.group_member, order=0) 103 PolicyBinding.objects.create(target=pbm, group=self.group_non_member, order=1) 104 engine = PolicyEngine(pbm, self.user) 105 result = engine.build().result 106 self.assertEqual(result.passing, True) 107 self.assertEqual(result.messages, ()) 108 109 def test_engine_negate(self): 110 """Test negate flag""" 111 pbm = PolicyBindingModel.objects.create() 112 PolicyBinding.objects.create(target=pbm, policy=self.policy_true, negate=True, order=0) 113 engine = PolicyEngine(pbm, self.user) 114 result = engine.build().result 115 self.assertEqual(result.passing, False) 116 self.assertEqual(result.messages, ("dummy",)) 117 118 def test_engine_policy_error(self): 119 """Test policy raising an error flag""" 120 pbm = PolicyBindingModel.objects.create() 121 PolicyBinding.objects.create(target=pbm, policy=self.policy_raises, order=0) 122 engine = PolicyEngine(pbm, self.user) 123 result = engine.build().result 124 self.assertEqual(result.passing, False) 125 self.assertEqual(result.messages, ("division by zero",)) 126 127 def test_engine_policy_error_failure(self): 128 """Test policy raising an error flag""" 129 pbm = PolicyBindingModel.objects.create() 130 PolicyBinding.objects.create( 131 target=pbm, policy=self.policy_raises, order=0, failure_result=True 132 ) 133 engine = PolicyEngine(pbm, self.user) 134 result = engine.build().result 135 self.assertEqual(result.passing, True) 136 self.assertEqual(result.messages, ("division by zero",)) 137 138 def test_engine_policy_type(self): 139 """Test invalid policy type""" 140 pbm = PolicyBindingModel.objects.create() 141 PolicyBinding.objects.create(target=pbm, policy=self.policy_wrong_type, order=0) 142 with self.assertRaises(PolicyEngineException): 143 engine = PolicyEngine(pbm, self.user) 144 engine.build() 145 146 def test_engine_cache(self): 147 """Ensure empty policy list passes""" 148 pbm = PolicyBindingModel.objects.create() 149 binding = PolicyBinding.objects.create(target=pbm, policy=self.policy_false, order=0) 150 engine = PolicyEngine(pbm, self.user) 151 self.assertEqual(len(cache.keys(f"{CACHE_PREFIX}{binding.policy_binding_uuid.hex}*")), 0) 152 self.assertEqual(engine.build().passing, False) 153 self.assertEqual(len(cache.keys(f"{CACHE_PREFIX}{binding.policy_binding_uuid.hex}*")), 1) 154 self.assertEqual(engine.build().passing, False) 155 self.assertEqual(len(cache.keys(f"{CACHE_PREFIX}{binding.policy_binding_uuid.hex}*")), 1) 156 157 def test_engine_static_bindings(self): 158 """Test static bindings""" 159 group_a = Group.objects.create(name=generate_id()) 160 group_b = Group.objects.create(name=generate_id()) 161 group_b.users.add(self.user) 162 user = create_test_user() 163 164 for case in [ 165 { 166 "message": "Group, not member", 167 "binding_args": {"group": group_a}, 168 "passing": False, 169 }, 170 { 171 "message": "Group, member", 172 "binding_args": {"group": group_b}, 173 "passing": True, 174 }, 175 { 176 "message": "User, other", 177 "binding_args": {"user": user}, 178 "passing": False, 179 }, 180 { 181 "message": "User, same", 182 "binding_args": {"user": self.user}, 183 "passing": True, 184 }, 185 ]: 186 with self.subTest(): 187 pbm = PolicyBindingModel.objects.create() 188 for x in range(1000): 189 PolicyBinding.objects.create(target=pbm, order=x, **case["binding_args"]) 190 engine = PolicyEngine(pbm, self.user) 191 engine.use_cache = False 192 with CaptureQueriesContext(connections["default"]) as ctx: 193 engine.build() 194 self.assertLess(ctx.final_queries, 1000) 195 self.assertEqual(engine.result.passing, case["passing"]) 196 197 def test_engine_group_complex(self): 198 """Test more complex group setups""" 199 group_a = Group.objects.create(name=generate_id()) 200 group_b = Group.objects.create(name=generate_id()) 201 group_b.parents.add(group_a) 202 user = create_test_user() 203 group_b.users.add(user) 204 pbm = PolicyBindingModel.objects.create() 205 PolicyBinding.objects.create(target=pbm, order=0, group=group_a) 206 engine = PolicyEngine(pbm, user) 207 engine.use_cache = False 208 with CaptureQueriesContext(connections["default"]) as ctx: 209 engine.build() 210 self.assertLess(ctx.final_queries, 1000) 211 self.assertTrue(engine.result.passing)
class
TestPolicyEngine(django.test.testcases.TestCase):
21class TestPolicyEngine(TestCase): 22 """PolicyEngine tests""" 23 24 def setUp(self): 25 clear_policy_cache() 26 self.user = create_test_user() 27 self.policy_false = DummyPolicy.objects.create( 28 name=generate_id(), result=False, wait_min=0, wait_max=1 29 ) 30 self.policy_true = DummyPolicy.objects.create( 31 name=generate_id(), result=True, wait_min=0, wait_max=1 32 ) 33 self.policy_wrong_type = Policy.objects.create(name=generate_id()) 34 self.policy_raises = ExpressionPolicy.objects.create( 35 name=generate_id(), expression="{{ 0/0 }}" 36 ) 37 self.group_member = Group.objects.create(name=generate_id()) 38 self.user.groups.add(self.group_member) 39 self.group_non_member = Group.objects.create(name=generate_id()) 40 41 def test_engine_empty(self): 42 """Ensure empty policy list passes""" 43 pbm = PolicyBindingModel.objects.create() 44 engine = PolicyEngine(pbm, self.user) 45 result = engine.build().result 46 self.assertEqual(result.passing, True) 47 self.assertEqual(result.messages, ()) 48 49 def test_engine_simple(self): 50 """Ensure simplest use-case""" 51 pbm = PolicyBindingModel.objects.create() 52 PolicyBinding.objects.create(target=pbm, policy=self.policy_true, order=0) 53 engine = PolicyEngine(pbm, self.user) 54 result = engine.build().result 55 self.assertEqual(result.passing, True) 56 self.assertEqual(result.messages, ("dummy",)) 57 58 def test_engine_mode_all_dyn(self): 59 """Ensure all policies passes with AND mode (false and true -> false)""" 60 pbm = PolicyBindingModel.objects.create(policy_engine_mode=PolicyEngineMode.MODE_ALL) 61 PolicyBinding.objects.create(target=pbm, policy=self.policy_false, order=0) 62 PolicyBinding.objects.create(target=pbm, policy=self.policy_true, order=1) 63 engine = PolicyEngine(pbm, self.user) 64 result = engine.build().result 65 self.assertEqual(result.passing, False) 66 self.assertEqual( 67 result.messages, 68 ( 69 "dummy", 70 "dummy", 71 ), 72 ) 73 74 def test_engine_mode_any_dyn(self): 75 """Ensure all policies passes with OR mode (false and true -> true)""" 76 pbm = PolicyBindingModel.objects.create(policy_engine_mode=PolicyEngineMode.MODE_ANY) 77 PolicyBinding.objects.create(target=pbm, policy=self.policy_false, order=0) 78 PolicyBinding.objects.create(target=pbm, policy=self.policy_true, order=1) 79 engine = PolicyEngine(pbm, self.user) 80 result = engine.build().result 81 self.assertEqual(result.passing, True) 82 self.assertEqual( 83 result.messages, 84 ( 85 "dummy", 86 "dummy", 87 ), 88 ) 89 90 def test_engine_mode_all_static(self): 91 """Ensure all policies passes with OR mode (false and true -> true)""" 92 pbm = PolicyBindingModel.objects.create(policy_engine_mode=PolicyEngineMode.MODE_ALL) 93 PolicyBinding.objects.create(target=pbm, group=self.group_member, order=0) 94 PolicyBinding.objects.create(target=pbm, group=self.group_non_member, order=1) 95 engine = PolicyEngine(pbm, self.user) 96 result = engine.build().result 97 self.assertEqual(result.passing, False) 98 self.assertEqual(result.messages, ()) 99 100 def test_engine_mode_any_static(self): 101 """Ensure all policies passes with OR mode (false and true -> true)""" 102 pbm = PolicyBindingModel.objects.create(policy_engine_mode=PolicyEngineMode.MODE_ANY) 103 PolicyBinding.objects.create(target=pbm, group=self.group_member, order=0) 104 PolicyBinding.objects.create(target=pbm, group=self.group_non_member, order=1) 105 engine = PolicyEngine(pbm, self.user) 106 result = engine.build().result 107 self.assertEqual(result.passing, True) 108 self.assertEqual(result.messages, ()) 109 110 def test_engine_negate(self): 111 """Test negate flag""" 112 pbm = PolicyBindingModel.objects.create() 113 PolicyBinding.objects.create(target=pbm, policy=self.policy_true, negate=True, order=0) 114 engine = PolicyEngine(pbm, self.user) 115 result = engine.build().result 116 self.assertEqual(result.passing, False) 117 self.assertEqual(result.messages, ("dummy",)) 118 119 def test_engine_policy_error(self): 120 """Test policy raising an error flag""" 121 pbm = PolicyBindingModel.objects.create() 122 PolicyBinding.objects.create(target=pbm, policy=self.policy_raises, order=0) 123 engine = PolicyEngine(pbm, self.user) 124 result = engine.build().result 125 self.assertEqual(result.passing, False) 126 self.assertEqual(result.messages, ("division by zero",)) 127 128 def test_engine_policy_error_failure(self): 129 """Test policy raising an error flag""" 130 pbm = PolicyBindingModel.objects.create() 131 PolicyBinding.objects.create( 132 target=pbm, policy=self.policy_raises, order=0, failure_result=True 133 ) 134 engine = PolicyEngine(pbm, self.user) 135 result = engine.build().result 136 self.assertEqual(result.passing, True) 137 self.assertEqual(result.messages, ("division by zero",)) 138 139 def test_engine_policy_type(self): 140 """Test invalid policy type""" 141 pbm = PolicyBindingModel.objects.create() 142 PolicyBinding.objects.create(target=pbm, policy=self.policy_wrong_type, order=0) 143 with self.assertRaises(PolicyEngineException): 144 engine = PolicyEngine(pbm, self.user) 145 engine.build() 146 147 def test_engine_cache(self): 148 """Ensure empty policy list passes""" 149 pbm = PolicyBindingModel.objects.create() 150 binding = PolicyBinding.objects.create(target=pbm, policy=self.policy_false, order=0) 151 engine = PolicyEngine(pbm, self.user) 152 self.assertEqual(len(cache.keys(f"{CACHE_PREFIX}{binding.policy_binding_uuid.hex}*")), 0) 153 self.assertEqual(engine.build().passing, False) 154 self.assertEqual(len(cache.keys(f"{CACHE_PREFIX}{binding.policy_binding_uuid.hex}*")), 1) 155 self.assertEqual(engine.build().passing, False) 156 self.assertEqual(len(cache.keys(f"{CACHE_PREFIX}{binding.policy_binding_uuid.hex}*")), 1) 157 158 def test_engine_static_bindings(self): 159 """Test static bindings""" 160 group_a = Group.objects.create(name=generate_id()) 161 group_b = Group.objects.create(name=generate_id()) 162 group_b.users.add(self.user) 163 user = create_test_user() 164 165 for case in [ 166 { 167 "message": "Group, not member", 168 "binding_args": {"group": group_a}, 169 "passing": False, 170 }, 171 { 172 "message": "Group, member", 173 "binding_args": {"group": group_b}, 174 "passing": True, 175 }, 176 { 177 "message": "User, other", 178 "binding_args": {"user": user}, 179 "passing": False, 180 }, 181 { 182 "message": "User, same", 183 "binding_args": {"user": self.user}, 184 "passing": True, 185 }, 186 ]: 187 with self.subTest(): 188 pbm = PolicyBindingModel.objects.create() 189 for x in range(1000): 190 PolicyBinding.objects.create(target=pbm, order=x, **case["binding_args"]) 191 engine = PolicyEngine(pbm, self.user) 192 engine.use_cache = False 193 with CaptureQueriesContext(connections["default"]) as ctx: 194 engine.build() 195 self.assertLess(ctx.final_queries, 1000) 196 self.assertEqual(engine.result.passing, case["passing"]) 197 198 def test_engine_group_complex(self): 199 """Test more complex group setups""" 200 group_a = Group.objects.create(name=generate_id()) 201 group_b = Group.objects.create(name=generate_id()) 202 group_b.parents.add(group_a) 203 user = create_test_user() 204 group_b.users.add(user) 205 pbm = PolicyBindingModel.objects.create() 206 PolicyBinding.objects.create(target=pbm, order=0, group=group_a) 207 engine = PolicyEngine(pbm, user) 208 engine.use_cache = False 209 with CaptureQueriesContext(connections["default"]) as ctx: 210 engine.build() 211 self.assertLess(ctx.final_queries, 1000) 212 self.assertTrue(engine.result.passing)
PolicyEngine tests
def
setUp(self):
24 def setUp(self): 25 clear_policy_cache() 26 self.user = create_test_user() 27 self.policy_false = DummyPolicy.objects.create( 28 name=generate_id(), result=False, wait_min=0, wait_max=1 29 ) 30 self.policy_true = DummyPolicy.objects.create( 31 name=generate_id(), result=True, wait_min=0, wait_max=1 32 ) 33 self.policy_wrong_type = Policy.objects.create(name=generate_id()) 34 self.policy_raises = ExpressionPolicy.objects.create( 35 name=generate_id(), expression="{{ 0/0 }}" 36 ) 37 self.group_member = Group.objects.create(name=generate_id()) 38 self.user.groups.add(self.group_member) 39 self.group_non_member = Group.objects.create(name=generate_id())
Hook method for setting up the test fixture before exercising it.
def
test_engine_empty(self):
41 def test_engine_empty(self): 42 """Ensure empty policy list passes""" 43 pbm = PolicyBindingModel.objects.create() 44 engine = PolicyEngine(pbm, self.user) 45 result = engine.build().result 46 self.assertEqual(result.passing, True) 47 self.assertEqual(result.messages, ())
Ensure empty policy list passes
def
test_engine_simple(self):
49 def test_engine_simple(self): 50 """Ensure simplest use-case""" 51 pbm = PolicyBindingModel.objects.create() 52 PolicyBinding.objects.create(target=pbm, policy=self.policy_true, order=0) 53 engine = PolicyEngine(pbm, self.user) 54 result = engine.build().result 55 self.assertEqual(result.passing, True) 56 self.assertEqual(result.messages, ("dummy",))
Ensure simplest use-case
def
test_engine_mode_all_dyn(self):
58 def test_engine_mode_all_dyn(self): 59 """Ensure all policies passes with AND mode (false and true -> false)""" 60 pbm = PolicyBindingModel.objects.create(policy_engine_mode=PolicyEngineMode.MODE_ALL) 61 PolicyBinding.objects.create(target=pbm, policy=self.policy_false, order=0) 62 PolicyBinding.objects.create(target=pbm, policy=self.policy_true, order=1) 63 engine = PolicyEngine(pbm, self.user) 64 result = engine.build().result 65 self.assertEqual(result.passing, False) 66 self.assertEqual( 67 result.messages, 68 ( 69 "dummy", 70 "dummy", 71 ), 72 )
Ensure all policies passes with AND mode (false and true -> false)
def
test_engine_mode_any_dyn(self):
74 def test_engine_mode_any_dyn(self): 75 """Ensure all policies passes with OR mode (false and true -> true)""" 76 pbm = PolicyBindingModel.objects.create(policy_engine_mode=PolicyEngineMode.MODE_ANY) 77 PolicyBinding.objects.create(target=pbm, policy=self.policy_false, order=0) 78 PolicyBinding.objects.create(target=pbm, policy=self.policy_true, order=1) 79 engine = PolicyEngine(pbm, self.user) 80 result = engine.build().result 81 self.assertEqual(result.passing, True) 82 self.assertEqual( 83 result.messages, 84 ( 85 "dummy", 86 "dummy", 87 ), 88 )
Ensure all policies passes with OR mode (false and true -> true)
def
test_engine_mode_all_static(self):
90 def test_engine_mode_all_static(self): 91 """Ensure all policies passes with OR mode (false and true -> true)""" 92 pbm = PolicyBindingModel.objects.create(policy_engine_mode=PolicyEngineMode.MODE_ALL) 93 PolicyBinding.objects.create(target=pbm, group=self.group_member, order=0) 94 PolicyBinding.objects.create(target=pbm, group=self.group_non_member, order=1) 95 engine = PolicyEngine(pbm, self.user) 96 result = engine.build().result 97 self.assertEqual(result.passing, False) 98 self.assertEqual(result.messages, ())
Ensure all policies passes with OR mode (false and true -> true)
def
test_engine_mode_any_static(self):
100 def test_engine_mode_any_static(self): 101 """Ensure all policies passes with OR mode (false and true -> true)""" 102 pbm = PolicyBindingModel.objects.create(policy_engine_mode=PolicyEngineMode.MODE_ANY) 103 PolicyBinding.objects.create(target=pbm, group=self.group_member, order=0) 104 PolicyBinding.objects.create(target=pbm, group=self.group_non_member, order=1) 105 engine = PolicyEngine(pbm, self.user) 106 result = engine.build().result 107 self.assertEqual(result.passing, True) 108 self.assertEqual(result.messages, ())
Ensure all policies passes with OR mode (false and true -> true)
def
test_engine_negate(self):
110 def test_engine_negate(self): 111 """Test negate flag""" 112 pbm = PolicyBindingModel.objects.create() 113 PolicyBinding.objects.create(target=pbm, policy=self.policy_true, negate=True, order=0) 114 engine = PolicyEngine(pbm, self.user) 115 result = engine.build().result 116 self.assertEqual(result.passing, False) 117 self.assertEqual(result.messages, ("dummy",))
Test negate flag
def
test_engine_policy_error(self):
119 def test_engine_policy_error(self): 120 """Test policy raising an error flag""" 121 pbm = PolicyBindingModel.objects.create() 122 PolicyBinding.objects.create(target=pbm, policy=self.policy_raises, order=0) 123 engine = PolicyEngine(pbm, self.user) 124 result = engine.build().result 125 self.assertEqual(result.passing, False) 126 self.assertEqual(result.messages, ("division by zero",))
Test policy raising an error flag
def
test_engine_policy_error_failure(self):
128 def test_engine_policy_error_failure(self): 129 """Test policy raising an error flag""" 130 pbm = PolicyBindingModel.objects.create() 131 PolicyBinding.objects.create( 132 target=pbm, policy=self.policy_raises, order=0, failure_result=True 133 ) 134 engine = PolicyEngine(pbm, self.user) 135 result = engine.build().result 136 self.assertEqual(result.passing, True) 137 self.assertEqual(result.messages, ("division by zero",))
Test policy raising an error flag
def
test_engine_policy_type(self):
139 def test_engine_policy_type(self): 140 """Test invalid policy type""" 141 pbm = PolicyBindingModel.objects.create() 142 PolicyBinding.objects.create(target=pbm, policy=self.policy_wrong_type, order=0) 143 with self.assertRaises(PolicyEngineException): 144 engine = PolicyEngine(pbm, self.user) 145 engine.build()
Test invalid policy type
def
test_engine_cache(self):
147 def test_engine_cache(self): 148 """Ensure empty policy list passes""" 149 pbm = PolicyBindingModel.objects.create() 150 binding = PolicyBinding.objects.create(target=pbm, policy=self.policy_false, order=0) 151 engine = PolicyEngine(pbm, self.user) 152 self.assertEqual(len(cache.keys(f"{CACHE_PREFIX}{binding.policy_binding_uuid.hex}*")), 0) 153 self.assertEqual(engine.build().passing, False) 154 self.assertEqual(len(cache.keys(f"{CACHE_PREFIX}{binding.policy_binding_uuid.hex}*")), 1) 155 self.assertEqual(engine.build().passing, False) 156 self.assertEqual(len(cache.keys(f"{CACHE_PREFIX}{binding.policy_binding_uuid.hex}*")), 1)
Ensure empty policy list passes
def
test_engine_static_bindings(self):
158 def test_engine_static_bindings(self): 159 """Test static bindings""" 160 group_a = Group.objects.create(name=generate_id()) 161 group_b = Group.objects.create(name=generate_id()) 162 group_b.users.add(self.user) 163 user = create_test_user() 164 165 for case in [ 166 { 167 "message": "Group, not member", 168 "binding_args": {"group": group_a}, 169 "passing": False, 170 }, 171 { 172 "message": "Group, member", 173 "binding_args": {"group": group_b}, 174 "passing": True, 175 }, 176 { 177 "message": "User, other", 178 "binding_args": {"user": user}, 179 "passing": False, 180 }, 181 { 182 "message": "User, same", 183 "binding_args": {"user": self.user}, 184 "passing": True, 185 }, 186 ]: 187 with self.subTest(): 188 pbm = PolicyBindingModel.objects.create() 189 for x in range(1000): 190 PolicyBinding.objects.create(target=pbm, order=x, **case["binding_args"]) 191 engine = PolicyEngine(pbm, self.user) 192 engine.use_cache = False 193 with CaptureQueriesContext(connections["default"]) as ctx: 194 engine.build() 195 self.assertLess(ctx.final_queries, 1000) 196 self.assertEqual(engine.result.passing, case["passing"])
Test static bindings
def
test_engine_group_complex(self):
198 def test_engine_group_complex(self): 199 """Test more complex group setups""" 200 group_a = Group.objects.create(name=generate_id()) 201 group_b = Group.objects.create(name=generate_id()) 202 group_b.parents.add(group_a) 203 user = create_test_user() 204 group_b.users.add(user) 205 pbm = PolicyBindingModel.objects.create() 206 PolicyBinding.objects.create(target=pbm, order=0, group=group_a) 207 engine = PolicyEngine(pbm, user) 208 engine.use_cache = False 209 with CaptureQueriesContext(connections["default"]) as ctx: 210 engine.build() 211 self.assertLess(ctx.final_queries, 1000) 212 self.assertTrue(engine.result.passing)
Test more complex group setups