authentik.events.tests.test_event
event tests
1"""event tests""" 2 3from urllib.parse import urlencode 4 5from django.contrib.contenttypes.models import ContentType 6from django.test import RequestFactory, TestCase 7from django.views.debug import SafeExceptionReporterFilter 8from guardian.shortcuts import get_anonymous_user 9 10from authentik.brands.models import Brand 11from authentik.core.models import Group, User 12from authentik.core.tests.utils import create_test_user 13from authentik.events.models import Event 14from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan 15from authentik.flows.views.executor import QS_QUERY, SESSION_KEY_PLAN 16from authentik.lib.generators import generate_id 17from authentik.policies.dummy.models import DummyPolicy 18 19 20class TestEvents(TestCase): 21 """Test Event""" 22 23 def setUp(self) -> None: 24 self.factory = RequestFactory() 25 26 def test_new_with_model(self): 27 """Create a new Event passing a model as kwarg""" 28 test_model = Group.objects.create(name="test") 29 event = Event.new("unittest", test={"model": test_model}) 30 event.save() # We save to ensure nothing is un-saveable 31 model_content_type = ContentType.objects.get_for_model(test_model) 32 self.assertEqual( 33 event.context.get("test").get("model").get("app"), 34 model_content_type.app_label, 35 ) 36 37 def test_new_with_user(self): 38 """Create a new Event passing a user as kwarg""" 39 event = Event.new("unittest", test={"model": get_anonymous_user()}) 40 event.save() # We save to ensure nothing is un-saveable 41 self.assertEqual( 42 event.context.get("test").get("model").get("username"), 43 get_anonymous_user().username, 44 ) 45 46 def test_new_with_uuid_model(self): 47 """Create a new Event passing a model (with UUID PK) as kwarg""" 48 temp_model = DummyPolicy.objects.create(name="test", result=True) 49 event = Event.new("unittest", model=temp_model) 50 event.save() # We save to ensure nothing is un-saveable 51 model_content_type = ContentType.objects.get_for_model(temp_model) 52 self.assertEqual(event.context.get("model").get("app"), model_content_type.app_label) 53 self.assertEqual(event.context.get("model").get("pk"), temp_model.pk.hex) 54 55 def test_from_http_basic(self): 56 """Test plain from_http""" 57 event = Event.new("unittest").from_http(self.factory.get("/")) 58 self.assertEqual( 59 event.context, 60 { 61 "http_request": { 62 "args": {}, 63 "method": "GET", 64 "path": "/", 65 "user_agent": "", 66 } 67 }, 68 ) 69 70 def test_from_http_clean_querystring(self): 71 """Test cleansing query string""" 72 token = generate_id() 73 request = self.factory.get(f"/?token={token}") 74 event = Event.new("unittest").from_http(request) 75 self.assertEqual( 76 event.context, 77 { 78 "http_request": { 79 "args": {"token": SafeExceptionReporterFilter.cleansed_substitute}, 80 "method": "GET", 81 "path": "/", 82 "user_agent": "", 83 } 84 }, 85 ) 86 87 def test_from_http_clean_querystring_flow(self): 88 """Test cleansing query string (nested query string like flow executor)""" 89 token = generate_id() 90 nested_qs = {"token": token} 91 request = self.factory.get(f"/?{QS_QUERY}={urlencode(nested_qs)}") 92 event = Event.new("unittest").from_http(request) 93 self.assertEqual( 94 event.context, 95 { 96 "http_request": { 97 "args": {"token": SafeExceptionReporterFilter.cleansed_substitute}, 98 "method": "GET", 99 "path": "/", 100 "user_agent": "", 101 } 102 }, 103 ) 104 105 def test_from_http_brand(self): 106 """Test from_http brand""" 107 # Test brand 108 request = self.factory.get("/") 109 brand = Brand(domain="test-brand") 110 request.brand = brand 111 event = Event.new("unittest").from_http(request) 112 self.assertEqual( 113 event.brand, 114 { 115 "app": "authentik_brands", 116 "model_name": "brand", 117 "name": "Brand test-brand", 118 "pk": brand.pk.hex, 119 }, 120 ) 121 122 def test_from_http_flow_pending_user(self): 123 """Test request from flow request with a pending user""" 124 user = create_test_user() 125 126 session = self.client.session 127 plan = FlowPlan(generate_id()) 128 plan.context[PLAN_CONTEXT_PENDING_USER] = user 129 session[SESSION_KEY_PLAN] = plan 130 session.save() 131 132 request = self.factory.get("/") 133 request.session = session 134 request.user = user 135 136 event = Event.new("unittest").from_http(request) 137 self.assertEqual( 138 event.user, 139 { 140 "email": user.email, 141 "pk": user.pk, 142 "username": user.username, 143 }, 144 ) 145 146 def test_from_http_flow_pending_user_anon(self): 147 """Test request from flow request with a pending user""" 148 user = create_test_user() 149 anon = get_anonymous_user() 150 151 session = self.client.session 152 plan = FlowPlan(generate_id()) 153 plan.context[PLAN_CONTEXT_PENDING_USER] = user 154 session[SESSION_KEY_PLAN] = plan 155 session.save() 156 157 request = self.factory.get("/") 158 request.session = session 159 request.user = anon 160 161 event = Event.new("unittest").from_http(request) 162 self.assertEqual( 163 event.user, 164 { 165 "authenticated_as": { 166 "pk": anon.pk, 167 "is_anonymous": True, 168 "username": "AnonymousUser", 169 "email": "", 170 }, 171 "email": user.email, 172 "pk": user.pk, 173 "username": user.username, 174 }, 175 ) 176 177 def test_from_http_flow_pending_user_fake(self): 178 """Test request from flow request with a pending user""" 179 user = User( 180 username=generate_id(), 181 email=generate_id(), 182 ) 183 anon = get_anonymous_user() 184 185 session = self.client.session 186 plan = FlowPlan(generate_id()) 187 plan.context[PLAN_CONTEXT_PENDING_USER] = user 188 session[SESSION_KEY_PLAN] = plan 189 session.save() 190 191 request = self.factory.get("/") 192 request.session = session 193 request.user = anon 194 195 event = Event.new("unittest").from_http(request) 196 self.assertEqual( 197 event.user, 198 { 199 "authenticated_as": { 200 "pk": anon.pk, 201 "is_anonymous": True, 202 "username": "AnonymousUser", 203 "email": "", 204 }, 205 "email": user.email, 206 "pk": user.pk, 207 "username": user.username, 208 }, 209 ) 210 211 def test_invalid_string(self): 212 """Test creating an event with invalid unicode string data""" 213 event = Event.new("unittest", foo="foo bar \u0000 baz") 214 event.save() 215 self.assertEqual(event.context["foo"], "foo bar baz")
class
TestEvents(django.test.testcases.TestCase):
21class TestEvents(TestCase): 22 """Test Event""" 23 24 def setUp(self) -> None: 25 self.factory = RequestFactory() 26 27 def test_new_with_model(self): 28 """Create a new Event passing a model as kwarg""" 29 test_model = Group.objects.create(name="test") 30 event = Event.new("unittest", test={"model": test_model}) 31 event.save() # We save to ensure nothing is un-saveable 32 model_content_type = ContentType.objects.get_for_model(test_model) 33 self.assertEqual( 34 event.context.get("test").get("model").get("app"), 35 model_content_type.app_label, 36 ) 37 38 def test_new_with_user(self): 39 """Create a new Event passing a user as kwarg""" 40 event = Event.new("unittest", test={"model": get_anonymous_user()}) 41 event.save() # We save to ensure nothing is un-saveable 42 self.assertEqual( 43 event.context.get("test").get("model").get("username"), 44 get_anonymous_user().username, 45 ) 46 47 def test_new_with_uuid_model(self): 48 """Create a new Event passing a model (with UUID PK) as kwarg""" 49 temp_model = DummyPolicy.objects.create(name="test", result=True) 50 event = Event.new("unittest", model=temp_model) 51 event.save() # We save to ensure nothing is un-saveable 52 model_content_type = ContentType.objects.get_for_model(temp_model) 53 self.assertEqual(event.context.get("model").get("app"), model_content_type.app_label) 54 self.assertEqual(event.context.get("model").get("pk"), temp_model.pk.hex) 55 56 def test_from_http_basic(self): 57 """Test plain from_http""" 58 event = Event.new("unittest").from_http(self.factory.get("/")) 59 self.assertEqual( 60 event.context, 61 { 62 "http_request": { 63 "args": {}, 64 "method": "GET", 65 "path": "/", 66 "user_agent": "", 67 } 68 }, 69 ) 70 71 def test_from_http_clean_querystring(self): 72 """Test cleansing query string""" 73 token = generate_id() 74 request = self.factory.get(f"/?token={token}") 75 event = Event.new("unittest").from_http(request) 76 self.assertEqual( 77 event.context, 78 { 79 "http_request": { 80 "args": {"token": SafeExceptionReporterFilter.cleansed_substitute}, 81 "method": "GET", 82 "path": "/", 83 "user_agent": "", 84 } 85 }, 86 ) 87 88 def test_from_http_clean_querystring_flow(self): 89 """Test cleansing query string (nested query string like flow executor)""" 90 token = generate_id() 91 nested_qs = {"token": token} 92 request = self.factory.get(f"/?{QS_QUERY}={urlencode(nested_qs)}") 93 event = Event.new("unittest").from_http(request) 94 self.assertEqual( 95 event.context, 96 { 97 "http_request": { 98 "args": {"token": SafeExceptionReporterFilter.cleansed_substitute}, 99 "method": "GET", 100 "path": "/", 101 "user_agent": "", 102 } 103 }, 104 ) 105 106 def test_from_http_brand(self): 107 """Test from_http brand""" 108 # Test brand 109 request = self.factory.get("/") 110 brand = Brand(domain="test-brand") 111 request.brand = brand 112 event = Event.new("unittest").from_http(request) 113 self.assertEqual( 114 event.brand, 115 { 116 "app": "authentik_brands", 117 "model_name": "brand", 118 "name": "Brand test-brand", 119 "pk": brand.pk.hex, 120 }, 121 ) 122 123 def test_from_http_flow_pending_user(self): 124 """Test request from flow request with a pending user""" 125 user = create_test_user() 126 127 session = self.client.session 128 plan = FlowPlan(generate_id()) 129 plan.context[PLAN_CONTEXT_PENDING_USER] = user 130 session[SESSION_KEY_PLAN] = plan 131 session.save() 132 133 request = self.factory.get("/") 134 request.session = session 135 request.user = user 136 137 event = Event.new("unittest").from_http(request) 138 self.assertEqual( 139 event.user, 140 { 141 "email": user.email, 142 "pk": user.pk, 143 "username": user.username, 144 }, 145 ) 146 147 def test_from_http_flow_pending_user_anon(self): 148 """Test request from flow request with a pending user""" 149 user = create_test_user() 150 anon = get_anonymous_user() 151 152 session = self.client.session 153 plan = FlowPlan(generate_id()) 154 plan.context[PLAN_CONTEXT_PENDING_USER] = user 155 session[SESSION_KEY_PLAN] = plan 156 session.save() 157 158 request = self.factory.get("/") 159 request.session = session 160 request.user = anon 161 162 event = Event.new("unittest").from_http(request) 163 self.assertEqual( 164 event.user, 165 { 166 "authenticated_as": { 167 "pk": anon.pk, 168 "is_anonymous": True, 169 "username": "AnonymousUser", 170 "email": "", 171 }, 172 "email": user.email, 173 "pk": user.pk, 174 "username": user.username, 175 }, 176 ) 177 178 def test_from_http_flow_pending_user_fake(self): 179 """Test request from flow request with a pending user""" 180 user = User( 181 username=generate_id(), 182 email=generate_id(), 183 ) 184 anon = get_anonymous_user() 185 186 session = self.client.session 187 plan = FlowPlan(generate_id()) 188 plan.context[PLAN_CONTEXT_PENDING_USER] = user 189 session[SESSION_KEY_PLAN] = plan 190 session.save() 191 192 request = self.factory.get("/") 193 request.session = session 194 request.user = anon 195 196 event = Event.new("unittest").from_http(request) 197 self.assertEqual( 198 event.user, 199 { 200 "authenticated_as": { 201 "pk": anon.pk, 202 "is_anonymous": True, 203 "username": "AnonymousUser", 204 "email": "", 205 }, 206 "email": user.email, 207 "pk": user.pk, 208 "username": user.username, 209 }, 210 ) 211 212 def test_invalid_string(self): 213 """Test creating an event with invalid unicode string data""" 214 event = Event.new("unittest", foo="foo bar \u0000 baz") 215 event.save() 216 self.assertEqual(event.context["foo"], "foo bar baz")
Test Event
def
test_new_with_model(self):
27 def test_new_with_model(self): 28 """Create a new Event passing a model as kwarg""" 29 test_model = Group.objects.create(name="test") 30 event = Event.new("unittest", test={"model": test_model}) 31 event.save() # We save to ensure nothing is un-saveable 32 model_content_type = ContentType.objects.get_for_model(test_model) 33 self.assertEqual( 34 event.context.get("test").get("model").get("app"), 35 model_content_type.app_label, 36 )
Create a new Event passing a model as kwarg
def
test_new_with_user(self):
38 def test_new_with_user(self): 39 """Create a new Event passing a user as kwarg""" 40 event = Event.new("unittest", test={"model": get_anonymous_user()}) 41 event.save() # We save to ensure nothing is un-saveable 42 self.assertEqual( 43 event.context.get("test").get("model").get("username"), 44 get_anonymous_user().username, 45 )
Create a new Event passing a user as kwarg
def
test_new_with_uuid_model(self):
47 def test_new_with_uuid_model(self): 48 """Create a new Event passing a model (with UUID PK) as kwarg""" 49 temp_model = DummyPolicy.objects.create(name="test", result=True) 50 event = Event.new("unittest", model=temp_model) 51 event.save() # We save to ensure nothing is un-saveable 52 model_content_type = ContentType.objects.get_for_model(temp_model) 53 self.assertEqual(event.context.get("model").get("app"), model_content_type.app_label) 54 self.assertEqual(event.context.get("model").get("pk"), temp_model.pk.hex)
Create a new Event passing a model (with UUID PK) as kwarg
def
test_from_http_basic(self):
56 def test_from_http_basic(self): 57 """Test plain from_http""" 58 event = Event.new("unittest").from_http(self.factory.get("/")) 59 self.assertEqual( 60 event.context, 61 { 62 "http_request": { 63 "args": {}, 64 "method": "GET", 65 "path": "/", 66 "user_agent": "", 67 } 68 }, 69 )
Test plain from_http
def
test_from_http_clean_querystring(self):
71 def test_from_http_clean_querystring(self): 72 """Test cleansing query string""" 73 token = generate_id() 74 request = self.factory.get(f"/?token={token}") 75 event = Event.new("unittest").from_http(request) 76 self.assertEqual( 77 event.context, 78 { 79 "http_request": { 80 "args": {"token": SafeExceptionReporterFilter.cleansed_substitute}, 81 "method": "GET", 82 "path": "/", 83 "user_agent": "", 84 } 85 }, 86 )
Test cleansing query string
def
test_from_http_clean_querystring_flow(self):
88 def test_from_http_clean_querystring_flow(self): 89 """Test cleansing query string (nested query string like flow executor)""" 90 token = generate_id() 91 nested_qs = {"token": token} 92 request = self.factory.get(f"/?{QS_QUERY}={urlencode(nested_qs)}") 93 event = Event.new("unittest").from_http(request) 94 self.assertEqual( 95 event.context, 96 { 97 "http_request": { 98 "args": {"token": SafeExceptionReporterFilter.cleansed_substitute}, 99 "method": "GET", 100 "path": "/", 101 "user_agent": "", 102 } 103 }, 104 )
Test cleansing query string (nested query string like flow executor)
def
test_from_http_brand(self):
106 def test_from_http_brand(self): 107 """Test from_http brand""" 108 # Test brand 109 request = self.factory.get("/") 110 brand = Brand(domain="test-brand") 111 request.brand = brand 112 event = Event.new("unittest").from_http(request) 113 self.assertEqual( 114 event.brand, 115 { 116 "app": "authentik_brands", 117 "model_name": "brand", 118 "name": "Brand test-brand", 119 "pk": brand.pk.hex, 120 }, 121 )
Test from_http brand
def
test_from_http_flow_pending_user(self):
123 def test_from_http_flow_pending_user(self): 124 """Test request from flow request with a pending user""" 125 user = create_test_user() 126 127 session = self.client.session 128 plan = FlowPlan(generate_id()) 129 plan.context[PLAN_CONTEXT_PENDING_USER] = user 130 session[SESSION_KEY_PLAN] = plan 131 session.save() 132 133 request = self.factory.get("/") 134 request.session = session 135 request.user = user 136 137 event = Event.new("unittest").from_http(request) 138 self.assertEqual( 139 event.user, 140 { 141 "email": user.email, 142 "pk": user.pk, 143 "username": user.username, 144 }, 145 )
Test request from flow request with a pending user
def
test_from_http_flow_pending_user_anon(self):
147 def test_from_http_flow_pending_user_anon(self): 148 """Test request from flow request with a pending user""" 149 user = create_test_user() 150 anon = get_anonymous_user() 151 152 session = self.client.session 153 plan = FlowPlan(generate_id()) 154 plan.context[PLAN_CONTEXT_PENDING_USER] = user 155 session[SESSION_KEY_PLAN] = plan 156 session.save() 157 158 request = self.factory.get("/") 159 request.session = session 160 request.user = anon 161 162 event = Event.new("unittest").from_http(request) 163 self.assertEqual( 164 event.user, 165 { 166 "authenticated_as": { 167 "pk": anon.pk, 168 "is_anonymous": True, 169 "username": "AnonymousUser", 170 "email": "", 171 }, 172 "email": user.email, 173 "pk": user.pk, 174 "username": user.username, 175 }, 176 )
Test request from flow request with a pending user
def
test_from_http_flow_pending_user_fake(self):
178 def test_from_http_flow_pending_user_fake(self): 179 """Test request from flow request with a pending user""" 180 user = User( 181 username=generate_id(), 182 email=generate_id(), 183 ) 184 anon = get_anonymous_user() 185 186 session = self.client.session 187 plan = FlowPlan(generate_id()) 188 plan.context[PLAN_CONTEXT_PENDING_USER] = user 189 session[SESSION_KEY_PLAN] = plan 190 session.save() 191 192 request = self.factory.get("/") 193 request.session = session 194 request.user = anon 195 196 event = Event.new("unittest").from_http(request) 197 self.assertEqual( 198 event.user, 199 { 200 "authenticated_as": { 201 "pk": anon.pk, 202 "is_anonymous": True, 203 "username": "AnonymousUser", 204 "email": "", 205 }, 206 "email": user.email, 207 "pk": user.pk, 208 "username": user.username, 209 }, 210 )
Test request from flow request with a pending user
def
test_invalid_string(self):
212 def test_invalid_string(self): 213 """Test creating an event with invalid unicode string data""" 214 event = Event.new("unittest", foo="foo bar \u0000 baz") 215 event.save() 216 self.assertEqual(event.context["foo"], "foo bar baz")
Test creating an event with invalid unicode string data