authentik.events.tests.test_event

event tests

  1"""event tests"""
  2
  3from urllib.parse import urlencode
  4
  5from django.contrib.contenttypes.models import ContentType
  6from django.test import RequestFactory, TestCase
  7from django.views.debug import SafeExceptionReporterFilter
  8from guardian.shortcuts import get_anonymous_user
  9
 10from authentik.brands.models import Brand
 11from authentik.core.models import Group, User
 12from authentik.core.tests.utils import create_test_user
 13from authentik.events.models import Event
 14from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan
 15from authentik.flows.views.executor import QS_QUERY, SESSION_KEY_PLAN
 16from authentik.lib.generators import generate_id
 17from authentik.policies.dummy.models import DummyPolicy
 18
 19
 20class TestEvents(TestCase):
 21    """Test Event"""
 22
 23    def setUp(self) -> None:
 24        self.factory = RequestFactory()
 25
 26    def test_new_with_model(self):
 27        """Create a new Event passing a model as kwarg"""
 28        test_model = Group.objects.create(name="test")
 29        event = Event.new("unittest", test={"model": test_model})
 30        event.save()  # We save to ensure nothing is un-saveable
 31        model_content_type = ContentType.objects.get_for_model(test_model)
 32        self.assertEqual(
 33            event.context.get("test").get("model").get("app"),
 34            model_content_type.app_label,
 35        )
 36
 37    def test_new_with_user(self):
 38        """Create a new Event passing a user as kwarg"""
 39        event = Event.new("unittest", test={"model": get_anonymous_user()})
 40        event.save()  # We save to ensure nothing is un-saveable
 41        self.assertEqual(
 42            event.context.get("test").get("model").get("username"),
 43            get_anonymous_user().username,
 44        )
 45
 46    def test_new_with_uuid_model(self):
 47        """Create a new Event passing a model (with UUID PK) as kwarg"""
 48        temp_model = DummyPolicy.objects.create(name="test", result=True)
 49        event = Event.new("unittest", model=temp_model)
 50        event.save()  # We save to ensure nothing is un-saveable
 51        model_content_type = ContentType.objects.get_for_model(temp_model)
 52        self.assertEqual(event.context.get("model").get("app"), model_content_type.app_label)
 53        self.assertEqual(event.context.get("model").get("pk"), temp_model.pk.hex)
 54
 55    def test_from_http_basic(self):
 56        """Test plain from_http"""
 57        event = Event.new("unittest").from_http(self.factory.get("/"))
 58        self.assertEqual(
 59            event.context,
 60            {
 61                "http_request": {
 62                    "args": {},
 63                    "method": "GET",
 64                    "path": "/",
 65                    "user_agent": "",
 66                }
 67            },
 68        )
 69
 70    def test_from_http_clean_querystring(self):
 71        """Test cleansing query string"""
 72        token = generate_id()
 73        request = self.factory.get(f"/?token={token}")
 74        event = Event.new("unittest").from_http(request)
 75        self.assertEqual(
 76            event.context,
 77            {
 78                "http_request": {
 79                    "args": {"token": SafeExceptionReporterFilter.cleansed_substitute},
 80                    "method": "GET",
 81                    "path": "/",
 82                    "user_agent": "",
 83                }
 84            },
 85        )
 86
 87    def test_from_http_clean_querystring_flow(self):
 88        """Test cleansing query string (nested query string like flow executor)"""
 89        token = generate_id()
 90        nested_qs = {"token": token}
 91        request = self.factory.get(f"/?{QS_QUERY}={urlencode(nested_qs)}")
 92        event = Event.new("unittest").from_http(request)
 93        self.assertEqual(
 94            event.context,
 95            {
 96                "http_request": {
 97                    "args": {"token": SafeExceptionReporterFilter.cleansed_substitute},
 98                    "method": "GET",
 99                    "path": "/",
100                    "user_agent": "",
101                }
102            },
103        )
104
105    def test_from_http_brand(self):
106        """Test from_http brand"""
107        # Test brand
108        request = self.factory.get("/")
109        brand = Brand(domain="test-brand")
110        request.brand = brand
111        event = Event.new("unittest").from_http(request)
112        self.assertEqual(
113            event.brand,
114            {
115                "app": "authentik_brands",
116                "model_name": "brand",
117                "name": "Brand test-brand",
118                "pk": brand.pk.hex,
119            },
120        )
121
122    def test_from_http_flow_pending_user(self):
123        """Test request from flow request with a pending user"""
124        user = create_test_user()
125
126        session = self.client.session
127        plan = FlowPlan(generate_id())
128        plan.context[PLAN_CONTEXT_PENDING_USER] = user
129        session[SESSION_KEY_PLAN] = plan
130        session.save()
131
132        request = self.factory.get("/")
133        request.session = session
134        request.user = user
135
136        event = Event.new("unittest").from_http(request)
137        self.assertEqual(
138            event.user,
139            {
140                "email": user.email,
141                "pk": user.pk,
142                "username": user.username,
143            },
144        )
145
146    def test_from_http_flow_pending_user_anon(self):
147        """Test request from flow request with a pending user"""
148        user = create_test_user()
149        anon = get_anonymous_user()
150
151        session = self.client.session
152        plan = FlowPlan(generate_id())
153        plan.context[PLAN_CONTEXT_PENDING_USER] = user
154        session[SESSION_KEY_PLAN] = plan
155        session.save()
156
157        request = self.factory.get("/")
158        request.session = session
159        request.user = anon
160
161        event = Event.new("unittest").from_http(request)
162        self.assertEqual(
163            event.user,
164            {
165                "authenticated_as": {
166                    "pk": anon.pk,
167                    "is_anonymous": True,
168                    "username": "AnonymousUser",
169                    "email": "",
170                },
171                "email": user.email,
172                "pk": user.pk,
173                "username": user.username,
174            },
175        )
176
177    def test_from_http_flow_pending_user_fake(self):
178        """Test request from flow request with a pending user"""
179        user = User(
180            username=generate_id(),
181            email=generate_id(),
182        )
183        anon = get_anonymous_user()
184
185        session = self.client.session
186        plan = FlowPlan(generate_id())
187        plan.context[PLAN_CONTEXT_PENDING_USER] = user
188        session[SESSION_KEY_PLAN] = plan
189        session.save()
190
191        request = self.factory.get("/")
192        request.session = session
193        request.user = anon
194
195        event = Event.new("unittest").from_http(request)
196        self.assertEqual(
197            event.user,
198            {
199                "authenticated_as": {
200                    "pk": anon.pk,
201                    "is_anonymous": True,
202                    "username": "AnonymousUser",
203                    "email": "",
204                },
205                "email": user.email,
206                "pk": user.pk,
207                "username": user.username,
208            },
209        )
210
211    def test_invalid_string(self):
212        """Test creating an event with invalid unicode string data"""
213        event = Event.new("unittest", foo="foo bar \u0000 baz")
214        event.save()
215        self.assertEqual(event.context["foo"], "foo bar  baz")
class TestEvents(django.test.testcases.TestCase):
 21class TestEvents(TestCase):
 22    """Test Event"""
 23
 24    def setUp(self) -> None:
 25        self.factory = RequestFactory()
 26
 27    def test_new_with_model(self):
 28        """Create a new Event passing a model as kwarg"""
 29        test_model = Group.objects.create(name="test")
 30        event = Event.new("unittest", test={"model": test_model})
 31        event.save()  # We save to ensure nothing is un-saveable
 32        model_content_type = ContentType.objects.get_for_model(test_model)
 33        self.assertEqual(
 34            event.context.get("test").get("model").get("app"),
 35            model_content_type.app_label,
 36        )
 37
 38    def test_new_with_user(self):
 39        """Create a new Event passing a user as kwarg"""
 40        event = Event.new("unittest", test={"model": get_anonymous_user()})
 41        event.save()  # We save to ensure nothing is un-saveable
 42        self.assertEqual(
 43            event.context.get("test").get("model").get("username"),
 44            get_anonymous_user().username,
 45        )
 46
 47    def test_new_with_uuid_model(self):
 48        """Create a new Event passing a model (with UUID PK) as kwarg"""
 49        temp_model = DummyPolicy.objects.create(name="test", result=True)
 50        event = Event.new("unittest", model=temp_model)
 51        event.save()  # We save to ensure nothing is un-saveable
 52        model_content_type = ContentType.objects.get_for_model(temp_model)
 53        self.assertEqual(event.context.get("model").get("app"), model_content_type.app_label)
 54        self.assertEqual(event.context.get("model").get("pk"), temp_model.pk.hex)
 55
 56    def test_from_http_basic(self):
 57        """Test plain from_http"""
 58        event = Event.new("unittest").from_http(self.factory.get("/"))
 59        self.assertEqual(
 60            event.context,
 61            {
 62                "http_request": {
 63                    "args": {},
 64                    "method": "GET",
 65                    "path": "/",
 66                    "user_agent": "",
 67                }
 68            },
 69        )
 70
 71    def test_from_http_clean_querystring(self):
 72        """Test cleansing query string"""
 73        token = generate_id()
 74        request = self.factory.get(f"/?token={token}")
 75        event = Event.new("unittest").from_http(request)
 76        self.assertEqual(
 77            event.context,
 78            {
 79                "http_request": {
 80                    "args": {"token": SafeExceptionReporterFilter.cleansed_substitute},
 81                    "method": "GET",
 82                    "path": "/",
 83                    "user_agent": "",
 84                }
 85            },
 86        )
 87
 88    def test_from_http_clean_querystring_flow(self):
 89        """Test cleansing query string (nested query string like flow executor)"""
 90        token = generate_id()
 91        nested_qs = {"token": token}
 92        request = self.factory.get(f"/?{QS_QUERY}={urlencode(nested_qs)}")
 93        event = Event.new("unittest").from_http(request)
 94        self.assertEqual(
 95            event.context,
 96            {
 97                "http_request": {
 98                    "args": {"token": SafeExceptionReporterFilter.cleansed_substitute},
 99                    "method": "GET",
100                    "path": "/",
101                    "user_agent": "",
102                }
103            },
104        )
105
106    def test_from_http_brand(self):
107        """Test from_http brand"""
108        # Test brand
109        request = self.factory.get("/")
110        brand = Brand(domain="test-brand")
111        request.brand = brand
112        event = Event.new("unittest").from_http(request)
113        self.assertEqual(
114            event.brand,
115            {
116                "app": "authentik_brands",
117                "model_name": "brand",
118                "name": "Brand test-brand",
119                "pk": brand.pk.hex,
120            },
121        )
122
123    def test_from_http_flow_pending_user(self):
124        """Test request from flow request with a pending user"""
125        user = create_test_user()
126
127        session = self.client.session
128        plan = FlowPlan(generate_id())
129        plan.context[PLAN_CONTEXT_PENDING_USER] = user
130        session[SESSION_KEY_PLAN] = plan
131        session.save()
132
133        request = self.factory.get("/")
134        request.session = session
135        request.user = user
136
137        event = Event.new("unittest").from_http(request)
138        self.assertEqual(
139            event.user,
140            {
141                "email": user.email,
142                "pk": user.pk,
143                "username": user.username,
144            },
145        )
146
147    def test_from_http_flow_pending_user_anon(self):
148        """Test request from flow request with a pending user"""
149        user = create_test_user()
150        anon = get_anonymous_user()
151
152        session = self.client.session
153        plan = FlowPlan(generate_id())
154        plan.context[PLAN_CONTEXT_PENDING_USER] = user
155        session[SESSION_KEY_PLAN] = plan
156        session.save()
157
158        request = self.factory.get("/")
159        request.session = session
160        request.user = anon
161
162        event = Event.new("unittest").from_http(request)
163        self.assertEqual(
164            event.user,
165            {
166                "authenticated_as": {
167                    "pk": anon.pk,
168                    "is_anonymous": True,
169                    "username": "AnonymousUser",
170                    "email": "",
171                },
172                "email": user.email,
173                "pk": user.pk,
174                "username": user.username,
175            },
176        )
177
178    def test_from_http_flow_pending_user_fake(self):
179        """Test request from flow request with a pending user"""
180        user = User(
181            username=generate_id(),
182            email=generate_id(),
183        )
184        anon = get_anonymous_user()
185
186        session = self.client.session
187        plan = FlowPlan(generate_id())
188        plan.context[PLAN_CONTEXT_PENDING_USER] = user
189        session[SESSION_KEY_PLAN] = plan
190        session.save()
191
192        request = self.factory.get("/")
193        request.session = session
194        request.user = anon
195
196        event = Event.new("unittest").from_http(request)
197        self.assertEqual(
198            event.user,
199            {
200                "authenticated_as": {
201                    "pk": anon.pk,
202                    "is_anonymous": True,
203                    "username": "AnonymousUser",
204                    "email": "",
205                },
206                "email": user.email,
207                "pk": user.pk,
208                "username": user.username,
209            },
210        )
211
212    def test_invalid_string(self):
213        """Test creating an event with invalid unicode string data"""
214        event = Event.new("unittest", foo="foo bar \u0000 baz")
215        event.save()
216        self.assertEqual(event.context["foo"], "foo bar  baz")

Test Event

def setUp(self) -> None:
24    def setUp(self) -> None:
25        self.factory = RequestFactory()

Hook method for setting up the test fixture before exercising it.

def test_new_with_model(self):
27    def test_new_with_model(self):
28        """Create a new Event passing a model as kwarg"""
29        test_model = Group.objects.create(name="test")
30        event = Event.new("unittest", test={"model": test_model})
31        event.save()  # We save to ensure nothing is un-saveable
32        model_content_type = ContentType.objects.get_for_model(test_model)
33        self.assertEqual(
34            event.context.get("test").get("model").get("app"),
35            model_content_type.app_label,
36        )

Create a new Event passing a model as kwarg

def test_new_with_user(self):
38    def test_new_with_user(self):
39        """Create a new Event passing a user as kwarg"""
40        event = Event.new("unittest", test={"model": get_anonymous_user()})
41        event.save()  # We save to ensure nothing is un-saveable
42        self.assertEqual(
43            event.context.get("test").get("model").get("username"),
44            get_anonymous_user().username,
45        )

Create a new Event passing a user as kwarg

def test_new_with_uuid_model(self):
47    def test_new_with_uuid_model(self):
48        """Create a new Event passing a model (with UUID PK) as kwarg"""
49        temp_model = DummyPolicy.objects.create(name="test", result=True)
50        event = Event.new("unittest", model=temp_model)
51        event.save()  # We save to ensure nothing is un-saveable
52        model_content_type = ContentType.objects.get_for_model(temp_model)
53        self.assertEqual(event.context.get("model").get("app"), model_content_type.app_label)
54        self.assertEqual(event.context.get("model").get("pk"), temp_model.pk.hex)

Create a new Event passing a model (with UUID PK) as kwarg

def test_from_http_basic(self):
56    def test_from_http_basic(self):
57        """Test plain from_http"""
58        event = Event.new("unittest").from_http(self.factory.get("/"))
59        self.assertEqual(
60            event.context,
61            {
62                "http_request": {
63                    "args": {},
64                    "method": "GET",
65                    "path": "/",
66                    "user_agent": "",
67                }
68            },
69        )

Test plain from_http

def test_from_http_clean_querystring(self):
71    def test_from_http_clean_querystring(self):
72        """Test cleansing query string"""
73        token = generate_id()
74        request = self.factory.get(f"/?token={token}")
75        event = Event.new("unittest").from_http(request)
76        self.assertEqual(
77            event.context,
78            {
79                "http_request": {
80                    "args": {"token": SafeExceptionReporterFilter.cleansed_substitute},
81                    "method": "GET",
82                    "path": "/",
83                    "user_agent": "",
84                }
85            },
86        )

Test cleansing query string

def test_from_http_clean_querystring_flow(self):
 88    def test_from_http_clean_querystring_flow(self):
 89        """Test cleansing query string (nested query string like flow executor)"""
 90        token = generate_id()
 91        nested_qs = {"token": token}
 92        request = self.factory.get(f"/?{QS_QUERY}={urlencode(nested_qs)}")
 93        event = Event.new("unittest").from_http(request)
 94        self.assertEqual(
 95            event.context,
 96            {
 97                "http_request": {
 98                    "args": {"token": SafeExceptionReporterFilter.cleansed_substitute},
 99                    "method": "GET",
100                    "path": "/",
101                    "user_agent": "",
102                }
103            },
104        )

Test cleansing query string (nested query string like flow executor)

def test_from_http_brand(self):
106    def test_from_http_brand(self):
107        """Test from_http brand"""
108        # Test brand
109        request = self.factory.get("/")
110        brand = Brand(domain="test-brand")
111        request.brand = brand
112        event = Event.new("unittest").from_http(request)
113        self.assertEqual(
114            event.brand,
115            {
116                "app": "authentik_brands",
117                "model_name": "brand",
118                "name": "Brand test-brand",
119                "pk": brand.pk.hex,
120            },
121        )

Test from_http brand

def test_from_http_flow_pending_user(self):
123    def test_from_http_flow_pending_user(self):
124        """Test request from flow request with a pending user"""
125        user = create_test_user()
126
127        session = self.client.session
128        plan = FlowPlan(generate_id())
129        plan.context[PLAN_CONTEXT_PENDING_USER] = user
130        session[SESSION_KEY_PLAN] = plan
131        session.save()
132
133        request = self.factory.get("/")
134        request.session = session
135        request.user = user
136
137        event = Event.new("unittest").from_http(request)
138        self.assertEqual(
139            event.user,
140            {
141                "email": user.email,
142                "pk": user.pk,
143                "username": user.username,
144            },
145        )

Test request from flow request with a pending user

def test_from_http_flow_pending_user_anon(self):
147    def test_from_http_flow_pending_user_anon(self):
148        """Test request from flow request with a pending user"""
149        user = create_test_user()
150        anon = get_anonymous_user()
151
152        session = self.client.session
153        plan = FlowPlan(generate_id())
154        plan.context[PLAN_CONTEXT_PENDING_USER] = user
155        session[SESSION_KEY_PLAN] = plan
156        session.save()
157
158        request = self.factory.get("/")
159        request.session = session
160        request.user = anon
161
162        event = Event.new("unittest").from_http(request)
163        self.assertEqual(
164            event.user,
165            {
166                "authenticated_as": {
167                    "pk": anon.pk,
168                    "is_anonymous": True,
169                    "username": "AnonymousUser",
170                    "email": "",
171                },
172                "email": user.email,
173                "pk": user.pk,
174                "username": user.username,
175            },
176        )

Test request from flow request with a pending user

def test_from_http_flow_pending_user_fake(self):
178    def test_from_http_flow_pending_user_fake(self):
179        """Test request from flow request with a pending user"""
180        user = User(
181            username=generate_id(),
182            email=generate_id(),
183        )
184        anon = get_anonymous_user()
185
186        session = self.client.session
187        plan = FlowPlan(generate_id())
188        plan.context[PLAN_CONTEXT_PENDING_USER] = user
189        session[SESSION_KEY_PLAN] = plan
190        session.save()
191
192        request = self.factory.get("/")
193        request.session = session
194        request.user = anon
195
196        event = Event.new("unittest").from_http(request)
197        self.assertEqual(
198            event.user,
199            {
200                "authenticated_as": {
201                    "pk": anon.pk,
202                    "is_anonymous": True,
203                    "username": "AnonymousUser",
204                    "email": "",
205                },
206                "email": user.email,
207                "pk": user.pk,
208                "username": user.username,
209            },
210        )

Test request from flow request with a pending user

def test_invalid_string(self):
212    def test_invalid_string(self):
213        """Test creating an event with invalid unicode string data"""
214        event = Event.new("unittest", foo="foo bar \u0000 baz")
215        event.save()
216        self.assertEqual(event.context["foo"], "foo bar  baz")

Test creating an event with invalid unicode string data