authentik.events.tests.test_event

event tests

  1"""event tests"""
  2
  3from urllib.parse import urlencode
  4
  5from django.contrib.auth.hashers import make_password
  6from django.contrib.contenttypes.models import ContentType
  7from django.test import RequestFactory, TestCase
  8from django.views.debug import SafeExceptionReporterFilter
  9from guardian.shortcuts import get_anonymous_user
 10
 11from authentik.brands.models import Brand
 12from authentik.core.models import Group, User
 13from authentik.core.tests.utils import create_test_user
 14from authentik.events.models import Event, EventAction
 15from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan
 16from authentik.flows.views.executor import QS_QUERY, SESSION_KEY_PLAN
 17from authentik.lib.generators import generate_id
 18from authentik.policies.dummy.models import DummyPolicy
 19
 20
 21class TestEvents(TestCase):
 22    """Test Event"""
 23
 24    def setUp(self) -> None:
 25        self.factory = RequestFactory()
 26
 27    def test_new_with_model(self):
 28        """Create a new Event passing a model as kwarg"""
 29        test_model = Group.objects.create(name="test")
 30        event = Event.new("unittest", test={"model": test_model})
 31        event.save()  # We save to ensure nothing is un-saveable
 32        model_content_type = ContentType.objects.get_for_model(test_model)
 33        self.assertEqual(
 34            event.context.get("test").get("model").get("app"),
 35            model_content_type.app_label,
 36        )
 37
 38    def test_new_with_user(self):
 39        """Create a new Event passing a user as kwarg"""
 40        event = Event.new("unittest", test={"model": get_anonymous_user()})
 41        event.save()  # We save to ensure nothing is un-saveable
 42        self.assertEqual(
 43            event.context.get("test").get("model").get("username"),
 44            get_anonymous_user().username,
 45        )
 46
 47    def test_new_with_uuid_model(self):
 48        """Create a new Event passing a model (with UUID PK) as kwarg"""
 49        temp_model = DummyPolicy.objects.create(name="test", result=True)
 50        event = Event.new("unittest", model=temp_model)
 51        event.save()  # We save to ensure nothing is un-saveable
 52        model_content_type = ContentType.objects.get_for_model(temp_model)
 53        self.assertEqual(event.context.get("model").get("app"), model_content_type.app_label)
 54        self.assertEqual(event.context.get("model").get("pk"), temp_model.pk.hex)
 55
 56    def test_from_http_basic(self):
 57        """Test plain from_http"""
 58        event = Event.new("unittest").from_http(self.factory.get("/"))
 59        self.assertEqual(
 60            event.context,
 61            {
 62                "http_request": {
 63                    "args": {},
 64                    "method": "GET",
 65                    "path": "/",
 66                    "user_agent": "",
 67                }
 68            },
 69        )
 70
 71    def test_from_http_clean_querystring(self):
 72        """Test cleansing query string"""
 73        token = generate_id()
 74        request = self.factory.get(f"/?token={token}")
 75        event = Event.new("unittest").from_http(request)
 76        self.assertEqual(
 77            event.context,
 78            {
 79                "http_request": {
 80                    "args": {"token": SafeExceptionReporterFilter.cleansed_substitute},
 81                    "method": "GET",
 82                    "path": "/",
 83                    "user_agent": "",
 84                }
 85            },
 86        )
 87
 88    def test_from_http_clean_querystring_flow(self):
 89        """Test cleansing query string (nested query string like flow executor)"""
 90        token = generate_id()
 91        nested_qs = {"token": token}
 92        request = self.factory.get(f"/?{QS_QUERY}={urlencode(nested_qs)}")
 93        event = Event.new("unittest").from_http(request)
 94        self.assertEqual(
 95            event.context,
 96            {
 97                "http_request": {
 98                    "args": {"token": SafeExceptionReporterFilter.cleansed_substitute},
 99                    "method": "GET",
100                    "path": "/",
101                    "user_agent": "",
102                }
103            },
104        )
105
106    def test_from_http_brand(self):
107        """Test from_http brand"""
108        # Test brand
109        request = self.factory.get("/")
110        brand = Brand(domain="test-brand")
111        request.brand = brand
112        event = Event.new("unittest").from_http(request)
113        self.assertEqual(
114            event.brand,
115            {
116                "app": "authentik_brands",
117                "model_name": "brand",
118                "name": "Brand test-brand",
119                "pk": brand.pk.hex,
120            },
121        )
122
123    def test_from_http_flow_pending_user(self):
124        """Test request from flow request with a pending user"""
125        user = create_test_user()
126
127        session = self.client.session
128        plan = FlowPlan(generate_id())
129        plan.context[PLAN_CONTEXT_PENDING_USER] = user
130        session[SESSION_KEY_PLAN] = plan
131        session.save()
132
133        request = self.factory.get("/")
134        request.session = session
135        request.user = user
136
137        event = Event.new("unittest").from_http(request)
138        self.assertEqual(
139            event.user,
140            {
141                "email": user.email,
142                "pk": user.pk,
143                "username": user.username,
144            },
145        )
146
147    def test_from_http_flow_pending_user_anon(self):
148        """Test request from flow request with a pending user"""
149        user = create_test_user()
150        anon = get_anonymous_user()
151
152        session = self.client.session
153        plan = FlowPlan(generate_id())
154        plan.context[PLAN_CONTEXT_PENDING_USER] = user
155        session[SESSION_KEY_PLAN] = plan
156        session.save()
157
158        request = self.factory.get("/")
159        request.session = session
160        request.user = anon
161
162        event = Event.new("unittest").from_http(request)
163        self.assertEqual(
164            event.user,
165            {
166                "authenticated_as": {
167                    "pk": anon.pk,
168                    "is_anonymous": True,
169                    "username": "AnonymousUser",
170                    "email": "",
171                },
172                "email": user.email,
173                "pk": user.pk,
174                "username": user.username,
175            },
176        )
177
178    def test_from_http_flow_pending_user_fake(self):
179        """Test request from flow request with a pending user"""
180        user = User(
181            username=generate_id(),
182            email=generate_id(),
183        )
184        anon = get_anonymous_user()
185
186        session = self.client.session
187        plan = FlowPlan(generate_id())
188        plan.context[PLAN_CONTEXT_PENDING_USER] = user
189        session[SESSION_KEY_PLAN] = plan
190        session.save()
191
192        request = self.factory.get("/")
193        request.session = session
194        request.user = anon
195
196        event = Event.new("unittest").from_http(request)
197        self.assertEqual(
198            event.user,
199            {
200                "authenticated_as": {
201                    "pk": anon.pk,
202                    "is_anonymous": True,
203                    "username": "AnonymousUser",
204                    "email": "",
205                },
206                "email": user.email,
207                "pk": user.pk,
208                "username": user.username,
209            },
210        )
211
212    def test_invalid_string(self):
213        """Test creating an event with invalid unicode string data"""
214        event = Event.new("unittest", foo="foo bar \u0000 baz")
215        event.save()
216        self.assertEqual(event.context["foo"], "foo bar  baz")
217
218    def test_password_set_signal_on_set_password_from_hash(self):
219        """Changing password from hash should still emit an audit event."""
220        user = create_test_user()
221        old_count = Event.objects.filter(action=EventAction.PASSWORD_SET, user__pk=user.pk).count()
222
223        user.set_password_from_hash(make_password(generate_id()))
224        user.save()
225
226        new_count = Event.objects.filter(action=EventAction.PASSWORD_SET, user__pk=user.pk).count()
227        self.assertEqual(new_count, old_count + 1)
228
229    def test_log_deprecation(self):
230        """Test Event.log_deprecation"""
231        Event.log_deprecation(self.__module__, "Test deprecation")
232        Event.log_deprecation(self.__module__, "Test deprecation")
233        Event.log_deprecation(self.__module__, "Test deprecation")
234        Event.log_deprecation(self.__module__, "Test deprecation", cause=create_test_user())
235        logs = Event.objects.filter(
236            action=EventAction.CONFIGURATION_WARNING, context__deprecation=self.__module__
237        )
238        self.assertEqual(logs.count(), 2)
class TestEvents(django.test.testcases.TestCase):
 22class TestEvents(TestCase):
 23    """Test Event"""
 24
 25    def setUp(self) -> None:
 26        self.factory = RequestFactory()
 27
 28    def test_new_with_model(self):
 29        """Create a new Event passing a model as kwarg"""
 30        test_model = Group.objects.create(name="test")
 31        event = Event.new("unittest", test={"model": test_model})
 32        event.save()  # We save to ensure nothing is un-saveable
 33        model_content_type = ContentType.objects.get_for_model(test_model)
 34        self.assertEqual(
 35            event.context.get("test").get("model").get("app"),
 36            model_content_type.app_label,
 37        )
 38
 39    def test_new_with_user(self):
 40        """Create a new Event passing a user as kwarg"""
 41        event = Event.new("unittest", test={"model": get_anonymous_user()})
 42        event.save()  # We save to ensure nothing is un-saveable
 43        self.assertEqual(
 44            event.context.get("test").get("model").get("username"),
 45            get_anonymous_user().username,
 46        )
 47
 48    def test_new_with_uuid_model(self):
 49        """Create a new Event passing a model (with UUID PK) as kwarg"""
 50        temp_model = DummyPolicy.objects.create(name="test", result=True)
 51        event = Event.new("unittest", model=temp_model)
 52        event.save()  # We save to ensure nothing is un-saveable
 53        model_content_type = ContentType.objects.get_for_model(temp_model)
 54        self.assertEqual(event.context.get("model").get("app"), model_content_type.app_label)
 55        self.assertEqual(event.context.get("model").get("pk"), temp_model.pk.hex)
 56
 57    def test_from_http_basic(self):
 58        """Test plain from_http"""
 59        event = Event.new("unittest").from_http(self.factory.get("/"))
 60        self.assertEqual(
 61            event.context,
 62            {
 63                "http_request": {
 64                    "args": {},
 65                    "method": "GET",
 66                    "path": "/",
 67                    "user_agent": "",
 68                }
 69            },
 70        )
 71
 72    def test_from_http_clean_querystring(self):
 73        """Test cleansing query string"""
 74        token = generate_id()
 75        request = self.factory.get(f"/?token={token}")
 76        event = Event.new("unittest").from_http(request)
 77        self.assertEqual(
 78            event.context,
 79            {
 80                "http_request": {
 81                    "args": {"token": SafeExceptionReporterFilter.cleansed_substitute},
 82                    "method": "GET",
 83                    "path": "/",
 84                    "user_agent": "",
 85                }
 86            },
 87        )
 88
 89    def test_from_http_clean_querystring_flow(self):
 90        """Test cleansing query string (nested query string like flow executor)"""
 91        token = generate_id()
 92        nested_qs = {"token": token}
 93        request = self.factory.get(f"/?{QS_QUERY}={urlencode(nested_qs)}")
 94        event = Event.new("unittest").from_http(request)
 95        self.assertEqual(
 96            event.context,
 97            {
 98                "http_request": {
 99                    "args": {"token": SafeExceptionReporterFilter.cleansed_substitute},
100                    "method": "GET",
101                    "path": "/",
102                    "user_agent": "",
103                }
104            },
105        )
106
107    def test_from_http_brand(self):
108        """Test from_http brand"""
109        # Test brand
110        request = self.factory.get("/")
111        brand = Brand(domain="test-brand")
112        request.brand = brand
113        event = Event.new("unittest").from_http(request)
114        self.assertEqual(
115            event.brand,
116            {
117                "app": "authentik_brands",
118                "model_name": "brand",
119                "name": "Brand test-brand",
120                "pk": brand.pk.hex,
121            },
122        )
123
124    def test_from_http_flow_pending_user(self):
125        """Test request from flow request with a pending user"""
126        user = create_test_user()
127
128        session = self.client.session
129        plan = FlowPlan(generate_id())
130        plan.context[PLAN_CONTEXT_PENDING_USER] = user
131        session[SESSION_KEY_PLAN] = plan
132        session.save()
133
134        request = self.factory.get("/")
135        request.session = session
136        request.user = user
137
138        event = Event.new("unittest").from_http(request)
139        self.assertEqual(
140            event.user,
141            {
142                "email": user.email,
143                "pk": user.pk,
144                "username": user.username,
145            },
146        )
147
148    def test_from_http_flow_pending_user_anon(self):
149        """Test request from flow request with a pending user"""
150        user = create_test_user()
151        anon = get_anonymous_user()
152
153        session = self.client.session
154        plan = FlowPlan(generate_id())
155        plan.context[PLAN_CONTEXT_PENDING_USER] = user
156        session[SESSION_KEY_PLAN] = plan
157        session.save()
158
159        request = self.factory.get("/")
160        request.session = session
161        request.user = anon
162
163        event = Event.new("unittest").from_http(request)
164        self.assertEqual(
165            event.user,
166            {
167                "authenticated_as": {
168                    "pk": anon.pk,
169                    "is_anonymous": True,
170                    "username": "AnonymousUser",
171                    "email": "",
172                },
173                "email": user.email,
174                "pk": user.pk,
175                "username": user.username,
176            },
177        )
178
179    def test_from_http_flow_pending_user_fake(self):
180        """Test request from flow request with a pending user"""
181        user = User(
182            username=generate_id(),
183            email=generate_id(),
184        )
185        anon = get_anonymous_user()
186
187        session = self.client.session
188        plan = FlowPlan(generate_id())
189        plan.context[PLAN_CONTEXT_PENDING_USER] = user
190        session[SESSION_KEY_PLAN] = plan
191        session.save()
192
193        request = self.factory.get("/")
194        request.session = session
195        request.user = anon
196
197        event = Event.new("unittest").from_http(request)
198        self.assertEqual(
199            event.user,
200            {
201                "authenticated_as": {
202                    "pk": anon.pk,
203                    "is_anonymous": True,
204                    "username": "AnonymousUser",
205                    "email": "",
206                },
207                "email": user.email,
208                "pk": user.pk,
209                "username": user.username,
210            },
211        )
212
213    def test_invalid_string(self):
214        """Test creating an event with invalid unicode string data"""
215        event = Event.new("unittest", foo="foo bar \u0000 baz")
216        event.save()
217        self.assertEqual(event.context["foo"], "foo bar  baz")
218
219    def test_password_set_signal_on_set_password_from_hash(self):
220        """Changing password from hash should still emit an audit event."""
221        user = create_test_user()
222        old_count = Event.objects.filter(action=EventAction.PASSWORD_SET, user__pk=user.pk).count()
223
224        user.set_password_from_hash(make_password(generate_id()))
225        user.save()
226
227        new_count = Event.objects.filter(action=EventAction.PASSWORD_SET, user__pk=user.pk).count()
228        self.assertEqual(new_count, old_count + 1)
229
230    def test_log_deprecation(self):
231        """Test Event.log_deprecation"""
232        Event.log_deprecation(self.__module__, "Test deprecation")
233        Event.log_deprecation(self.__module__, "Test deprecation")
234        Event.log_deprecation(self.__module__, "Test deprecation")
235        Event.log_deprecation(self.__module__, "Test deprecation", cause=create_test_user())
236        logs = Event.objects.filter(
237            action=EventAction.CONFIGURATION_WARNING, context__deprecation=self.__module__
238        )
239        self.assertEqual(logs.count(), 2)

Test Event

def setUp(self) -> None:
25    def setUp(self) -> None:
26        self.factory = RequestFactory()

Hook method for setting up the test fixture before exercising it.

def test_new_with_model(self):
28    def test_new_with_model(self):
29        """Create a new Event passing a model as kwarg"""
30        test_model = Group.objects.create(name="test")
31        event = Event.new("unittest", test={"model": test_model})
32        event.save()  # We save to ensure nothing is un-saveable
33        model_content_type = ContentType.objects.get_for_model(test_model)
34        self.assertEqual(
35            event.context.get("test").get("model").get("app"),
36            model_content_type.app_label,
37        )

Create a new Event passing a model as kwarg

def test_new_with_user(self):
39    def test_new_with_user(self):
40        """Create a new Event passing a user as kwarg"""
41        event = Event.new("unittest", test={"model": get_anonymous_user()})
42        event.save()  # We save to ensure nothing is un-saveable
43        self.assertEqual(
44            event.context.get("test").get("model").get("username"),
45            get_anonymous_user().username,
46        )

Create a new Event passing a user as kwarg

def test_new_with_uuid_model(self):
48    def test_new_with_uuid_model(self):
49        """Create a new Event passing a model (with UUID PK) as kwarg"""
50        temp_model = DummyPolicy.objects.create(name="test", result=True)
51        event = Event.new("unittest", model=temp_model)
52        event.save()  # We save to ensure nothing is un-saveable
53        model_content_type = ContentType.objects.get_for_model(temp_model)
54        self.assertEqual(event.context.get("model").get("app"), model_content_type.app_label)
55        self.assertEqual(event.context.get("model").get("pk"), temp_model.pk.hex)

Create a new Event passing a model (with UUID PK) as kwarg

def test_from_http_basic(self):
57    def test_from_http_basic(self):
58        """Test plain from_http"""
59        event = Event.new("unittest").from_http(self.factory.get("/"))
60        self.assertEqual(
61            event.context,
62            {
63                "http_request": {
64                    "args": {},
65                    "method": "GET",
66                    "path": "/",
67                    "user_agent": "",
68                }
69            },
70        )

Test plain from_http

def test_from_http_clean_querystring(self):
72    def test_from_http_clean_querystring(self):
73        """Test cleansing query string"""
74        token = generate_id()
75        request = self.factory.get(f"/?token={token}")
76        event = Event.new("unittest").from_http(request)
77        self.assertEqual(
78            event.context,
79            {
80                "http_request": {
81                    "args": {"token": SafeExceptionReporterFilter.cleansed_substitute},
82                    "method": "GET",
83                    "path": "/",
84                    "user_agent": "",
85                }
86            },
87        )

Test cleansing query string

def test_from_http_clean_querystring_flow(self):
 89    def test_from_http_clean_querystring_flow(self):
 90        """Test cleansing query string (nested query string like flow executor)"""
 91        token = generate_id()
 92        nested_qs = {"token": token}
 93        request = self.factory.get(f"/?{QS_QUERY}={urlencode(nested_qs)}")
 94        event = Event.new("unittest").from_http(request)
 95        self.assertEqual(
 96            event.context,
 97            {
 98                "http_request": {
 99                    "args": {"token": SafeExceptionReporterFilter.cleansed_substitute},
100                    "method": "GET",
101                    "path": "/",
102                    "user_agent": "",
103                }
104            },
105        )

Test cleansing query string (nested query string like flow executor)

def test_from_http_brand(self):
107    def test_from_http_brand(self):
108        """Test from_http brand"""
109        # Test brand
110        request = self.factory.get("/")
111        brand = Brand(domain="test-brand")
112        request.brand = brand
113        event = Event.new("unittest").from_http(request)
114        self.assertEqual(
115            event.brand,
116            {
117                "app": "authentik_brands",
118                "model_name": "brand",
119                "name": "Brand test-brand",
120                "pk": brand.pk.hex,
121            },
122        )

Test from_http brand

def test_from_http_flow_pending_user(self):
124    def test_from_http_flow_pending_user(self):
125        """Test request from flow request with a pending user"""
126        user = create_test_user()
127
128        session = self.client.session
129        plan = FlowPlan(generate_id())
130        plan.context[PLAN_CONTEXT_PENDING_USER] = user
131        session[SESSION_KEY_PLAN] = plan
132        session.save()
133
134        request = self.factory.get("/")
135        request.session = session
136        request.user = user
137
138        event = Event.new("unittest").from_http(request)
139        self.assertEqual(
140            event.user,
141            {
142                "email": user.email,
143                "pk": user.pk,
144                "username": user.username,
145            },
146        )

Test request from flow request with a pending user

def test_from_http_flow_pending_user_anon(self):
148    def test_from_http_flow_pending_user_anon(self):
149        """Test request from flow request with a pending user"""
150        user = create_test_user()
151        anon = get_anonymous_user()
152
153        session = self.client.session
154        plan = FlowPlan(generate_id())
155        plan.context[PLAN_CONTEXT_PENDING_USER] = user
156        session[SESSION_KEY_PLAN] = plan
157        session.save()
158
159        request = self.factory.get("/")
160        request.session = session
161        request.user = anon
162
163        event = Event.new("unittest").from_http(request)
164        self.assertEqual(
165            event.user,
166            {
167                "authenticated_as": {
168                    "pk": anon.pk,
169                    "is_anonymous": True,
170                    "username": "AnonymousUser",
171                    "email": "",
172                },
173                "email": user.email,
174                "pk": user.pk,
175                "username": user.username,
176            },
177        )

Test request from flow request with a pending user

def test_from_http_flow_pending_user_fake(self):
179    def test_from_http_flow_pending_user_fake(self):
180        """Test request from flow request with a pending user"""
181        user = User(
182            username=generate_id(),
183            email=generate_id(),
184        )
185        anon = get_anonymous_user()
186
187        session = self.client.session
188        plan = FlowPlan(generate_id())
189        plan.context[PLAN_CONTEXT_PENDING_USER] = user
190        session[SESSION_KEY_PLAN] = plan
191        session.save()
192
193        request = self.factory.get("/")
194        request.session = session
195        request.user = anon
196
197        event = Event.new("unittest").from_http(request)
198        self.assertEqual(
199            event.user,
200            {
201                "authenticated_as": {
202                    "pk": anon.pk,
203                    "is_anonymous": True,
204                    "username": "AnonymousUser",
205                    "email": "",
206                },
207                "email": user.email,
208                "pk": user.pk,
209                "username": user.username,
210            },
211        )

Test request from flow request with a pending user

def test_invalid_string(self):
213    def test_invalid_string(self):
214        """Test creating an event with invalid unicode string data"""
215        event = Event.new("unittest", foo="foo bar \u0000 baz")
216        event.save()
217        self.assertEqual(event.context["foo"], "foo bar  baz")

Test creating an event with invalid unicode string data

def test_password_set_signal_on_set_password_from_hash(self):
219    def test_password_set_signal_on_set_password_from_hash(self):
220        """Changing password from hash should still emit an audit event."""
221        user = create_test_user()
222        old_count = Event.objects.filter(action=EventAction.PASSWORD_SET, user__pk=user.pk).count()
223
224        user.set_password_from_hash(make_password(generate_id()))
225        user.save()
226
227        new_count = Event.objects.filter(action=EventAction.PASSWORD_SET, user__pk=user.pk).count()
228        self.assertEqual(new_count, old_count + 1)

Changing password from hash should still emit an audit event.

def test_log_deprecation(self):
230    def test_log_deprecation(self):
231        """Test Event.log_deprecation"""
232        Event.log_deprecation(self.__module__, "Test deprecation")
233        Event.log_deprecation(self.__module__, "Test deprecation")
234        Event.log_deprecation(self.__module__, "Test deprecation")
235        Event.log_deprecation(self.__module__, "Test deprecation", cause=create_test_user())
236        logs = Event.objects.filter(
237            action=EventAction.CONFIGURATION_WARNING, context__deprecation=self.__module__
238        )
239        self.assertEqual(logs.count(), 2)

Test Event.log_deprecation