authentik.events.tests.test_event
event tests
1"""event tests""" 2 3from urllib.parse import urlencode 4 5from django.contrib.auth.hashers import make_password 6from django.contrib.contenttypes.models import ContentType 7from django.test import RequestFactory, TestCase 8from django.views.debug import SafeExceptionReporterFilter 9from guardian.shortcuts import get_anonymous_user 10 11from authentik.brands.models import Brand 12from authentik.core.models import Group, User 13from authentik.core.tests.utils import create_test_user 14from authentik.events.models import Event, EventAction 15from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan 16from authentik.flows.views.executor import QS_QUERY, SESSION_KEY_PLAN 17from authentik.lib.generators import generate_id 18from authentik.policies.dummy.models import DummyPolicy 19 20 21class TestEvents(TestCase): 22 """Test Event""" 23 24 def setUp(self) -> None: 25 self.factory = RequestFactory() 26 27 def test_new_with_model(self): 28 """Create a new Event passing a model as kwarg""" 29 test_model = Group.objects.create(name="test") 30 event = Event.new("unittest", test={"model": test_model}) 31 event.save() # We save to ensure nothing is un-saveable 32 model_content_type = ContentType.objects.get_for_model(test_model) 33 self.assertEqual( 34 event.context.get("test").get("model").get("app"), 35 model_content_type.app_label, 36 ) 37 38 def test_new_with_user(self): 39 """Create a new Event passing a user as kwarg""" 40 event = Event.new("unittest", test={"model": get_anonymous_user()}) 41 event.save() # We save to ensure nothing is un-saveable 42 self.assertEqual( 43 event.context.get("test").get("model").get("username"), 44 get_anonymous_user().username, 45 ) 46 47 def test_new_with_uuid_model(self): 48 """Create a new Event passing a model (with UUID PK) as kwarg""" 49 temp_model = DummyPolicy.objects.create(name="test", result=True) 50 event = Event.new("unittest", model=temp_model) 51 event.save() # We save to ensure nothing is un-saveable 52 model_content_type = ContentType.objects.get_for_model(temp_model) 53 self.assertEqual(event.context.get("model").get("app"), model_content_type.app_label) 54 self.assertEqual(event.context.get("model").get("pk"), temp_model.pk.hex) 55 56 def test_from_http_basic(self): 57 """Test plain from_http""" 58 event = Event.new("unittest").from_http(self.factory.get("/")) 59 self.assertEqual( 60 event.context, 61 { 62 "http_request": { 63 "args": {}, 64 "method": "GET", 65 "path": "/", 66 "user_agent": "", 67 } 68 }, 69 ) 70 71 def test_from_http_clean_querystring(self): 72 """Test cleansing query string""" 73 token = generate_id() 74 request = self.factory.get(f"/?token={token}") 75 event = Event.new("unittest").from_http(request) 76 self.assertEqual( 77 event.context, 78 { 79 "http_request": { 80 "args": {"token": SafeExceptionReporterFilter.cleansed_substitute}, 81 "method": "GET", 82 "path": "/", 83 "user_agent": "", 84 } 85 }, 86 ) 87 88 def test_from_http_clean_querystring_flow(self): 89 """Test cleansing query string (nested query string like flow executor)""" 90 token = generate_id() 91 nested_qs = {"token": token} 92 request = self.factory.get(f"/?{QS_QUERY}={urlencode(nested_qs)}") 93 event = Event.new("unittest").from_http(request) 94 self.assertEqual( 95 event.context, 96 { 97 "http_request": { 98 "args": {"token": SafeExceptionReporterFilter.cleansed_substitute}, 99 "method": "GET", 100 "path": "/", 101 "user_agent": "", 102 } 103 }, 104 ) 105 106 def test_from_http_brand(self): 107 """Test from_http brand""" 108 # Test brand 109 request = self.factory.get("/") 110 brand = Brand(domain="test-brand") 111 request.brand = brand 112 event = Event.new("unittest").from_http(request) 113 self.assertEqual( 114 event.brand, 115 { 116 "app": "authentik_brands", 117 "model_name": "brand", 118 "name": "Brand test-brand", 119 "pk": brand.pk.hex, 120 }, 121 ) 122 123 def test_from_http_flow_pending_user(self): 124 """Test request from flow request with a pending user""" 125 user = create_test_user() 126 127 session = self.client.session 128 plan = FlowPlan(generate_id()) 129 plan.context[PLAN_CONTEXT_PENDING_USER] = user 130 session[SESSION_KEY_PLAN] = plan 131 session.save() 132 133 request = self.factory.get("/") 134 request.session = session 135 request.user = user 136 137 event = Event.new("unittest").from_http(request) 138 self.assertEqual( 139 event.user, 140 { 141 "email": user.email, 142 "pk": user.pk, 143 "username": user.username, 144 }, 145 ) 146 147 def test_from_http_flow_pending_user_anon(self): 148 """Test request from flow request with a pending user""" 149 user = create_test_user() 150 anon = get_anonymous_user() 151 152 session = self.client.session 153 plan = FlowPlan(generate_id()) 154 plan.context[PLAN_CONTEXT_PENDING_USER] = user 155 session[SESSION_KEY_PLAN] = plan 156 session.save() 157 158 request = self.factory.get("/") 159 request.session = session 160 request.user = anon 161 162 event = Event.new("unittest").from_http(request) 163 self.assertEqual( 164 event.user, 165 { 166 "authenticated_as": { 167 "pk": anon.pk, 168 "is_anonymous": True, 169 "username": "AnonymousUser", 170 "email": "", 171 }, 172 "email": user.email, 173 "pk": user.pk, 174 "username": user.username, 175 }, 176 ) 177 178 def test_from_http_flow_pending_user_fake(self): 179 """Test request from flow request with a pending user""" 180 user = User( 181 username=generate_id(), 182 email=generate_id(), 183 ) 184 anon = get_anonymous_user() 185 186 session = self.client.session 187 plan = FlowPlan(generate_id()) 188 plan.context[PLAN_CONTEXT_PENDING_USER] = user 189 session[SESSION_KEY_PLAN] = plan 190 session.save() 191 192 request = self.factory.get("/") 193 request.session = session 194 request.user = anon 195 196 event = Event.new("unittest").from_http(request) 197 self.assertEqual( 198 event.user, 199 { 200 "authenticated_as": { 201 "pk": anon.pk, 202 "is_anonymous": True, 203 "username": "AnonymousUser", 204 "email": "", 205 }, 206 "email": user.email, 207 "pk": user.pk, 208 "username": user.username, 209 }, 210 ) 211 212 def test_invalid_string(self): 213 """Test creating an event with invalid unicode string data""" 214 event = Event.new("unittest", foo="foo bar \u0000 baz") 215 event.save() 216 self.assertEqual(event.context["foo"], "foo bar baz") 217 218 def test_password_set_signal_on_set_password_from_hash(self): 219 """Changing password from hash should still emit an audit event.""" 220 user = create_test_user() 221 old_count = Event.objects.filter(action=EventAction.PASSWORD_SET, user__pk=user.pk).count() 222 223 user.set_password_from_hash(make_password(generate_id())) 224 user.save() 225 226 new_count = Event.objects.filter(action=EventAction.PASSWORD_SET, user__pk=user.pk).count() 227 self.assertEqual(new_count, old_count + 1) 228 229 def test_log_deprecation(self): 230 """Test Event.log_deprecation""" 231 Event.log_deprecation(self.__module__, "Test deprecation") 232 Event.log_deprecation(self.__module__, "Test deprecation") 233 Event.log_deprecation(self.__module__, "Test deprecation") 234 Event.log_deprecation(self.__module__, "Test deprecation", cause=create_test_user()) 235 logs = Event.objects.filter( 236 action=EventAction.CONFIGURATION_WARNING, context__deprecation=self.__module__ 237 ) 238 self.assertEqual(logs.count(), 2)
class
TestEvents(django.test.testcases.TestCase):
22class TestEvents(TestCase): 23 """Test Event""" 24 25 def setUp(self) -> None: 26 self.factory = RequestFactory() 27 28 def test_new_with_model(self): 29 """Create a new Event passing a model as kwarg""" 30 test_model = Group.objects.create(name="test") 31 event = Event.new("unittest", test={"model": test_model}) 32 event.save() # We save to ensure nothing is un-saveable 33 model_content_type = ContentType.objects.get_for_model(test_model) 34 self.assertEqual( 35 event.context.get("test").get("model").get("app"), 36 model_content_type.app_label, 37 ) 38 39 def test_new_with_user(self): 40 """Create a new Event passing a user as kwarg""" 41 event = Event.new("unittest", test={"model": get_anonymous_user()}) 42 event.save() # We save to ensure nothing is un-saveable 43 self.assertEqual( 44 event.context.get("test").get("model").get("username"), 45 get_anonymous_user().username, 46 ) 47 48 def test_new_with_uuid_model(self): 49 """Create a new Event passing a model (with UUID PK) as kwarg""" 50 temp_model = DummyPolicy.objects.create(name="test", result=True) 51 event = Event.new("unittest", model=temp_model) 52 event.save() # We save to ensure nothing is un-saveable 53 model_content_type = ContentType.objects.get_for_model(temp_model) 54 self.assertEqual(event.context.get("model").get("app"), model_content_type.app_label) 55 self.assertEqual(event.context.get("model").get("pk"), temp_model.pk.hex) 56 57 def test_from_http_basic(self): 58 """Test plain from_http""" 59 event = Event.new("unittest").from_http(self.factory.get("/")) 60 self.assertEqual( 61 event.context, 62 { 63 "http_request": { 64 "args": {}, 65 "method": "GET", 66 "path": "/", 67 "user_agent": "", 68 } 69 }, 70 ) 71 72 def test_from_http_clean_querystring(self): 73 """Test cleansing query string""" 74 token = generate_id() 75 request = self.factory.get(f"/?token={token}") 76 event = Event.new("unittest").from_http(request) 77 self.assertEqual( 78 event.context, 79 { 80 "http_request": { 81 "args": {"token": SafeExceptionReporterFilter.cleansed_substitute}, 82 "method": "GET", 83 "path": "/", 84 "user_agent": "", 85 } 86 }, 87 ) 88 89 def test_from_http_clean_querystring_flow(self): 90 """Test cleansing query string (nested query string like flow executor)""" 91 token = generate_id() 92 nested_qs = {"token": token} 93 request = self.factory.get(f"/?{QS_QUERY}={urlencode(nested_qs)}") 94 event = Event.new("unittest").from_http(request) 95 self.assertEqual( 96 event.context, 97 { 98 "http_request": { 99 "args": {"token": SafeExceptionReporterFilter.cleansed_substitute}, 100 "method": "GET", 101 "path": "/", 102 "user_agent": "", 103 } 104 }, 105 ) 106 107 def test_from_http_brand(self): 108 """Test from_http brand""" 109 # Test brand 110 request = self.factory.get("/") 111 brand = Brand(domain="test-brand") 112 request.brand = brand 113 event = Event.new("unittest").from_http(request) 114 self.assertEqual( 115 event.brand, 116 { 117 "app": "authentik_brands", 118 "model_name": "brand", 119 "name": "Brand test-brand", 120 "pk": brand.pk.hex, 121 }, 122 ) 123 124 def test_from_http_flow_pending_user(self): 125 """Test request from flow request with a pending user""" 126 user = create_test_user() 127 128 session = self.client.session 129 plan = FlowPlan(generate_id()) 130 plan.context[PLAN_CONTEXT_PENDING_USER] = user 131 session[SESSION_KEY_PLAN] = plan 132 session.save() 133 134 request = self.factory.get("/") 135 request.session = session 136 request.user = user 137 138 event = Event.new("unittest").from_http(request) 139 self.assertEqual( 140 event.user, 141 { 142 "email": user.email, 143 "pk": user.pk, 144 "username": user.username, 145 }, 146 ) 147 148 def test_from_http_flow_pending_user_anon(self): 149 """Test request from flow request with a pending user""" 150 user = create_test_user() 151 anon = get_anonymous_user() 152 153 session = self.client.session 154 plan = FlowPlan(generate_id()) 155 plan.context[PLAN_CONTEXT_PENDING_USER] = user 156 session[SESSION_KEY_PLAN] = plan 157 session.save() 158 159 request = self.factory.get("/") 160 request.session = session 161 request.user = anon 162 163 event = Event.new("unittest").from_http(request) 164 self.assertEqual( 165 event.user, 166 { 167 "authenticated_as": { 168 "pk": anon.pk, 169 "is_anonymous": True, 170 "username": "AnonymousUser", 171 "email": "", 172 }, 173 "email": user.email, 174 "pk": user.pk, 175 "username": user.username, 176 }, 177 ) 178 179 def test_from_http_flow_pending_user_fake(self): 180 """Test request from flow request with a pending user""" 181 user = User( 182 username=generate_id(), 183 email=generate_id(), 184 ) 185 anon = get_anonymous_user() 186 187 session = self.client.session 188 plan = FlowPlan(generate_id()) 189 plan.context[PLAN_CONTEXT_PENDING_USER] = user 190 session[SESSION_KEY_PLAN] = plan 191 session.save() 192 193 request = self.factory.get("/") 194 request.session = session 195 request.user = anon 196 197 event = Event.new("unittest").from_http(request) 198 self.assertEqual( 199 event.user, 200 { 201 "authenticated_as": { 202 "pk": anon.pk, 203 "is_anonymous": True, 204 "username": "AnonymousUser", 205 "email": "", 206 }, 207 "email": user.email, 208 "pk": user.pk, 209 "username": user.username, 210 }, 211 ) 212 213 def test_invalid_string(self): 214 """Test creating an event with invalid unicode string data""" 215 event = Event.new("unittest", foo="foo bar \u0000 baz") 216 event.save() 217 self.assertEqual(event.context["foo"], "foo bar baz") 218 219 def test_password_set_signal_on_set_password_from_hash(self): 220 """Changing password from hash should still emit an audit event.""" 221 user = create_test_user() 222 old_count = Event.objects.filter(action=EventAction.PASSWORD_SET, user__pk=user.pk).count() 223 224 user.set_password_from_hash(make_password(generate_id())) 225 user.save() 226 227 new_count = Event.objects.filter(action=EventAction.PASSWORD_SET, user__pk=user.pk).count() 228 self.assertEqual(new_count, old_count + 1) 229 230 def test_log_deprecation(self): 231 """Test Event.log_deprecation""" 232 Event.log_deprecation(self.__module__, "Test deprecation") 233 Event.log_deprecation(self.__module__, "Test deprecation") 234 Event.log_deprecation(self.__module__, "Test deprecation") 235 Event.log_deprecation(self.__module__, "Test deprecation", cause=create_test_user()) 236 logs = Event.objects.filter( 237 action=EventAction.CONFIGURATION_WARNING, context__deprecation=self.__module__ 238 ) 239 self.assertEqual(logs.count(), 2)
Test Event
def
test_new_with_model(self):
28 def test_new_with_model(self): 29 """Create a new Event passing a model as kwarg""" 30 test_model = Group.objects.create(name="test") 31 event = Event.new("unittest", test={"model": test_model}) 32 event.save() # We save to ensure nothing is un-saveable 33 model_content_type = ContentType.objects.get_for_model(test_model) 34 self.assertEqual( 35 event.context.get("test").get("model").get("app"), 36 model_content_type.app_label, 37 )
Create a new Event passing a model as kwarg
def
test_new_with_user(self):
39 def test_new_with_user(self): 40 """Create a new Event passing a user as kwarg""" 41 event = Event.new("unittest", test={"model": get_anonymous_user()}) 42 event.save() # We save to ensure nothing is un-saveable 43 self.assertEqual( 44 event.context.get("test").get("model").get("username"), 45 get_anonymous_user().username, 46 )
Create a new Event passing a user as kwarg
def
test_new_with_uuid_model(self):
48 def test_new_with_uuid_model(self): 49 """Create a new Event passing a model (with UUID PK) as kwarg""" 50 temp_model = DummyPolicy.objects.create(name="test", result=True) 51 event = Event.new("unittest", model=temp_model) 52 event.save() # We save to ensure nothing is un-saveable 53 model_content_type = ContentType.objects.get_for_model(temp_model) 54 self.assertEqual(event.context.get("model").get("app"), model_content_type.app_label) 55 self.assertEqual(event.context.get("model").get("pk"), temp_model.pk.hex)
Create a new Event passing a model (with UUID PK) as kwarg
def
test_from_http_basic(self):
57 def test_from_http_basic(self): 58 """Test plain from_http""" 59 event = Event.new("unittest").from_http(self.factory.get("/")) 60 self.assertEqual( 61 event.context, 62 { 63 "http_request": { 64 "args": {}, 65 "method": "GET", 66 "path": "/", 67 "user_agent": "", 68 } 69 }, 70 )
Test plain from_http
def
test_from_http_clean_querystring(self):
72 def test_from_http_clean_querystring(self): 73 """Test cleansing query string""" 74 token = generate_id() 75 request = self.factory.get(f"/?token={token}") 76 event = Event.new("unittest").from_http(request) 77 self.assertEqual( 78 event.context, 79 { 80 "http_request": { 81 "args": {"token": SafeExceptionReporterFilter.cleansed_substitute}, 82 "method": "GET", 83 "path": "/", 84 "user_agent": "", 85 } 86 }, 87 )
Test cleansing query string
def
test_from_http_clean_querystring_flow(self):
89 def test_from_http_clean_querystring_flow(self): 90 """Test cleansing query string (nested query string like flow executor)""" 91 token = generate_id() 92 nested_qs = {"token": token} 93 request = self.factory.get(f"/?{QS_QUERY}={urlencode(nested_qs)}") 94 event = Event.new("unittest").from_http(request) 95 self.assertEqual( 96 event.context, 97 { 98 "http_request": { 99 "args": {"token": SafeExceptionReporterFilter.cleansed_substitute}, 100 "method": "GET", 101 "path": "/", 102 "user_agent": "", 103 } 104 }, 105 )
Test cleansing query string (nested query string like flow executor)
def
test_from_http_brand(self):
107 def test_from_http_brand(self): 108 """Test from_http brand""" 109 # Test brand 110 request = self.factory.get("/") 111 brand = Brand(domain="test-brand") 112 request.brand = brand 113 event = Event.new("unittest").from_http(request) 114 self.assertEqual( 115 event.brand, 116 { 117 "app": "authentik_brands", 118 "model_name": "brand", 119 "name": "Brand test-brand", 120 "pk": brand.pk.hex, 121 }, 122 )
Test from_http brand
def
test_from_http_flow_pending_user(self):
124 def test_from_http_flow_pending_user(self): 125 """Test request from flow request with a pending user""" 126 user = create_test_user() 127 128 session = self.client.session 129 plan = FlowPlan(generate_id()) 130 plan.context[PLAN_CONTEXT_PENDING_USER] = user 131 session[SESSION_KEY_PLAN] = plan 132 session.save() 133 134 request = self.factory.get("/") 135 request.session = session 136 request.user = user 137 138 event = Event.new("unittest").from_http(request) 139 self.assertEqual( 140 event.user, 141 { 142 "email": user.email, 143 "pk": user.pk, 144 "username": user.username, 145 }, 146 )
Test request from flow request with a pending user
def
test_from_http_flow_pending_user_anon(self):
148 def test_from_http_flow_pending_user_anon(self): 149 """Test request from flow request with a pending user""" 150 user = create_test_user() 151 anon = get_anonymous_user() 152 153 session = self.client.session 154 plan = FlowPlan(generate_id()) 155 plan.context[PLAN_CONTEXT_PENDING_USER] = user 156 session[SESSION_KEY_PLAN] = plan 157 session.save() 158 159 request = self.factory.get("/") 160 request.session = session 161 request.user = anon 162 163 event = Event.new("unittest").from_http(request) 164 self.assertEqual( 165 event.user, 166 { 167 "authenticated_as": { 168 "pk": anon.pk, 169 "is_anonymous": True, 170 "username": "AnonymousUser", 171 "email": "", 172 }, 173 "email": user.email, 174 "pk": user.pk, 175 "username": user.username, 176 }, 177 )
Test request from flow request with a pending user
def
test_from_http_flow_pending_user_fake(self):
179 def test_from_http_flow_pending_user_fake(self): 180 """Test request from flow request with a pending user""" 181 user = User( 182 username=generate_id(), 183 email=generate_id(), 184 ) 185 anon = get_anonymous_user() 186 187 session = self.client.session 188 plan = FlowPlan(generate_id()) 189 plan.context[PLAN_CONTEXT_PENDING_USER] = user 190 session[SESSION_KEY_PLAN] = plan 191 session.save() 192 193 request = self.factory.get("/") 194 request.session = session 195 request.user = anon 196 197 event = Event.new("unittest").from_http(request) 198 self.assertEqual( 199 event.user, 200 { 201 "authenticated_as": { 202 "pk": anon.pk, 203 "is_anonymous": True, 204 "username": "AnonymousUser", 205 "email": "", 206 }, 207 "email": user.email, 208 "pk": user.pk, 209 "username": user.username, 210 }, 211 )
Test request from flow request with a pending user
def
test_invalid_string(self):
213 def test_invalid_string(self): 214 """Test creating an event with invalid unicode string data""" 215 event = Event.new("unittest", foo="foo bar \u0000 baz") 216 event.save() 217 self.assertEqual(event.context["foo"], "foo bar baz")
Test creating an event with invalid unicode string data
def
test_password_set_signal_on_set_password_from_hash(self):
219 def test_password_set_signal_on_set_password_from_hash(self): 220 """Changing password from hash should still emit an audit event.""" 221 user = create_test_user() 222 old_count = Event.objects.filter(action=EventAction.PASSWORD_SET, user__pk=user.pk).count() 223 224 user.set_password_from_hash(make_password(generate_id())) 225 user.save() 226 227 new_count = Event.objects.filter(action=EventAction.PASSWORD_SET, user__pk=user.pk).count() 228 self.assertEqual(new_count, old_count + 1)
Changing password from hash should still emit an audit event.
def
test_log_deprecation(self):
230 def test_log_deprecation(self): 231 """Test Event.log_deprecation""" 232 Event.log_deprecation(self.__module__, "Test deprecation") 233 Event.log_deprecation(self.__module__, "Test deprecation") 234 Event.log_deprecation(self.__module__, "Test deprecation") 235 Event.log_deprecation(self.__module__, "Test deprecation", cause=create_test_user()) 236 logs = Event.objects.filter( 237 action=EventAction.CONFIGURATION_WARNING, context__deprecation=self.__module__ 238 ) 239 self.assertEqual(logs.count(), 2)
Test Event.log_deprecation