authentik.sources.telegram.tests

Telegram source tests

  1"""Telegram source tests"""
  2
  3import hashlib
  4import hmac
  5from datetime import datetime, timedelta
  6from unittest.mock import Mock
  7
  8from django.test import TestCase
  9from django.urls import reverse
 10from rest_framework.exceptions import ValidationError
 11
 12from authentik.core.tests.utils import create_test_flow, create_test_user
 13from authentik.flows.models import FlowDesignation, FlowStageBinding
 14from authentik.flows.tests import FlowTestCase
 15from authentik.sources.telegram.models import UserTelegramSourceConnection
 16from authentik.sources.telegram.stage import TelegramChallengeResponse
 17from authentik.stages.identification.models import IdentificationStage, UserFields
 18
 19
 20class MockTelegramResponseMixin:
 21    def _add_hash(self, response):
 22        to_hash = "\n".join([f"{key}={value}" for key, value in sorted(response.items())])
 23        response["hash"] = hmac.new(
 24            hashlib.sha256(self.source.bot_token.encode("utf-8")).digest(),
 25            to_hash.encode("utf-8"),
 26            "sha256",
 27        ).hexdigest()
 28
 29    def _make_valid_response(self):
 30        resp = {
 31            "id": "123456789",
 32            "first_name": "Test",
 33            "last_name": "User",
 34            "username": "testuser",
 35            "auth_date": str(int(datetime.now().timestamp())),
 36        }
 37        self._add_hash(resp)
 38        return resp
 39
 40    def _make_outdated_response(self):
 41        resp = self._make_valid_response()
 42        resp["auth_date"] = str(int((datetime.now() - timedelta(days=1)).timestamp()))
 43        self._add_hash(resp)
 44        return resp
 45
 46
 47class TestTelegramSource(MockTelegramResponseMixin, TestCase):
 48    """Telegram Source tests"""
 49
 50    def setUp(self):
 51        from authentik.sources.telegram.models import TelegramSource
 52
 53        self.source = TelegramSource.objects.create(
 54            name="test",
 55            slug="test",
 56            bot_username="test_bot",
 57            bot_token="modern_token",  # nosec
 58            request_message_access=True,
 59            pre_authentication_flow=create_test_flow(),
 60        )
 61        self.mock_stage = Mock()
 62        self.mock_stage.source = self.source
 63
 64    def test_ui_login_button(self):
 65        """Test UI login button"""
 66        ui_login_button = self.source.ui_login_button(None)
 67        self.assertIsNotNone(ui_login_button)
 68        self.assertEqual(ui_login_button.name, "test")
 69        self.assertTrue(ui_login_button.challenge.is_valid(raise_exception=True))
 70
 71    def test_challenge_response(self):
 72        """Test correct Telegram response validation"""
 73        cr = TelegramChallengeResponse(data=self._make_valid_response())
 74        cr.stage = self.mock_stage
 75        self.assertTrue(cr.is_valid(raise_exception=True))
 76
 77    def test_outdated_challenge_response(self):
 78        """Test outdated Telegram response validation"""
 79        cr = TelegramChallengeResponse(data=self._make_outdated_response())
 80        cr.stage = self.mock_stage
 81        with self.assertRaises(ValidationError):
 82            cr.is_valid(raise_exception=True)
 83
 84    def test_invalid_hash_challenge_response(self):
 85        """Test invalid hash in Telegram response validation"""
 86        resp = self._make_valid_response()
 87        resp["hash"] = "invalid_hash"
 88        cr = TelegramChallengeResponse(data=resp)
 89        cr.stage = self.mock_stage
 90        with self.assertRaises(ValidationError):
 91            cr.is_valid(raise_exception=True)
 92
 93    def test_user_base_properties(self):
 94        """Test user base properties"""
 95        cr = TelegramChallengeResponse(data=self._make_valid_response())
 96        cr.stage = self.mock_stage
 97        cr.is_valid(raise_exception=True)
 98        properties = self.source.get_base_user_properties(info=cr.validated_data)
 99        self.assertEqual(
100            properties,
101            {
102                "username": "testuser",
103                "name": "Test User",
104                "email": None,
105            },
106        )
107
108    def test_group_base_properties(self):
109        """Test group base properties"""
110        for group_id in ["group 1", "group 2"]:
111            properties = self.source.get_base_group_properties(group_id=group_id)
112            self.assertEqual(properties, {"name": group_id})
113
114
115class TestTelegramViews(MockTelegramResponseMixin, FlowTestCase):
116    """Test Telegram source views"""
117
118    def setUp(self):
119        super().setUp()
120        from authentik.sources.telegram.models import TelegramSource
121
122        self.pre_auth_flow = create_test_flow()
123
124        self.source = TelegramSource.objects.create(
125            name="test",
126            slug="test",
127            bot_username="test_bot",
128            bot_token="modern_token",  # nosec
129            request_message_access=True,
130            enrollment_flow=create_test_flow(),
131            pre_authentication_flow=self.pre_auth_flow,
132        )
133
134        self.flow = create_test_flow(FlowDesignation.AUTHENTICATION)
135        self.stage = IdentificationStage.objects.create(
136            name="identification",
137            user_fields=[UserFields.E_MAIL],
138            pretend_user_exists=False,
139        )
140        self.stage.sources.set([self.source])
141        self.stage.save()
142        FlowStageBinding.objects.create(
143            target=self.flow,
144            stage=self.stage,
145            order=0,
146        )
147
148    def _make_initial_request(self):
149        return self.client.get(
150            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
151        )
152
153    def _make_start_request(self):
154        return self.client.get(
155            reverse("authentik_sources_telegram:start", kwargs={"source_slug": self.source.slug}),
156            follow=True,
157        )
158
159    def test_start_view(self):
160        """Test TelegramStartView"""
161        self.assertEqual(self._make_initial_request().status_code, 200)
162
163        response = self._make_start_request()
164        self.assertEqual(response.status_code, 200)
165        self.assertEqual(
166            response.redirect_chain[0][0],
167            reverse("authentik_core:if-flow", kwargs={"flow_slug": self.pre_auth_flow.slug}),
168        )
169
170    def test_challenge_view(self):
171        """Test TelegramLoginView"""
172        self._make_initial_request()
173        self._make_start_request()
174        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.pre_auth_flow.slug})
175        get_response = self.client.get(url)
176        self.assertEqual(get_response.status_code, 200)
177        form_data = self._make_valid_response()
178        form_data["component"] = "ak-source-telegram"
179        response = self.client.post(url, form_data)
180        self.assertEqual(response.status_code, 200)
181        self.assertStageRedirects(
182            response,
183            reverse(
184                "authentik_core:if-flow", kwargs={"flow_slug": self.source.enrollment_flow.slug}
185            ),
186        )
187
188    def test_connect_user(self):
189        user = create_test_user("testuser")
190        user2 = create_test_user("testuser2")
191        self.client.force_login(user)
192        response = self.client.post(
193            reverse("authentik_api:telegramsource-connect-user", args=[self.source.slug]),
194            self._make_valid_response(),
195        )
196        self.assertEqual(response.status_code, 201)
197        self.assertTrue(
198            UserTelegramSourceConnection.objects.filter(user=user, source=self.source).exists()
199        )
200        self.client.logout()
201        self.client.force_login(user2)
202        response = self.client.post(
203            reverse("authentik_api:telegramsource-connect-user", args=[self.source.slug]),
204            self._make_valid_response(),
205        )
206        self.assertEqual(response.status_code, 403)
class MockTelegramResponseMixin:
21class MockTelegramResponseMixin:
22    def _add_hash(self, response):
23        to_hash = "\n".join([f"{key}={value}" for key, value in sorted(response.items())])
24        response["hash"] = hmac.new(
25            hashlib.sha256(self.source.bot_token.encode("utf-8")).digest(),
26            to_hash.encode("utf-8"),
27            "sha256",
28        ).hexdigest()
29
30    def _make_valid_response(self):
31        resp = {
32            "id": "123456789",
33            "first_name": "Test",
34            "last_name": "User",
35            "username": "testuser",
36            "auth_date": str(int(datetime.now().timestamp())),
37        }
38        self._add_hash(resp)
39        return resp
40
41    def _make_outdated_response(self):
42        resp = self._make_valid_response()
43        resp["auth_date"] = str(int((datetime.now() - timedelta(days=1)).timestamp()))
44        self._add_hash(resp)
45        return resp
class TestTelegramSource(MockTelegramResponseMixin, django.test.testcases.TestCase):
 48class TestTelegramSource(MockTelegramResponseMixin, TestCase):
 49    """Telegram Source tests"""
 50
 51    def setUp(self):
 52        from authentik.sources.telegram.models import TelegramSource
 53
 54        self.source = TelegramSource.objects.create(
 55            name="test",
 56            slug="test",
 57            bot_username="test_bot",
 58            bot_token="modern_token",  # nosec
 59            request_message_access=True,
 60            pre_authentication_flow=create_test_flow(),
 61        )
 62        self.mock_stage = Mock()
 63        self.mock_stage.source = self.source
 64
 65    def test_ui_login_button(self):
 66        """Test UI login button"""
 67        ui_login_button = self.source.ui_login_button(None)
 68        self.assertIsNotNone(ui_login_button)
 69        self.assertEqual(ui_login_button.name, "test")
 70        self.assertTrue(ui_login_button.challenge.is_valid(raise_exception=True))
 71
 72    def test_challenge_response(self):
 73        """Test correct Telegram response validation"""
 74        cr = TelegramChallengeResponse(data=self._make_valid_response())
 75        cr.stage = self.mock_stage
 76        self.assertTrue(cr.is_valid(raise_exception=True))
 77
 78    def test_outdated_challenge_response(self):
 79        """Test outdated Telegram response validation"""
 80        cr = TelegramChallengeResponse(data=self._make_outdated_response())
 81        cr.stage = self.mock_stage
 82        with self.assertRaises(ValidationError):
 83            cr.is_valid(raise_exception=True)
 84
 85    def test_invalid_hash_challenge_response(self):
 86        """Test invalid hash in Telegram response validation"""
 87        resp = self._make_valid_response()
 88        resp["hash"] = "invalid_hash"
 89        cr = TelegramChallengeResponse(data=resp)
 90        cr.stage = self.mock_stage
 91        with self.assertRaises(ValidationError):
 92            cr.is_valid(raise_exception=True)
 93
 94    def test_user_base_properties(self):
 95        """Test user base properties"""
 96        cr = TelegramChallengeResponse(data=self._make_valid_response())
 97        cr.stage = self.mock_stage
 98        cr.is_valid(raise_exception=True)
 99        properties = self.source.get_base_user_properties(info=cr.validated_data)
100        self.assertEqual(
101            properties,
102            {
103                "username": "testuser",
104                "name": "Test User",
105                "email": None,
106            },
107        )
108
109    def test_group_base_properties(self):
110        """Test group base properties"""
111        for group_id in ["group 1", "group 2"]:
112            properties = self.source.get_base_group_properties(group_id=group_id)
113            self.assertEqual(properties, {"name": group_id})

Telegram Source tests

def setUp(self):
51    def setUp(self):
52        from authentik.sources.telegram.models import TelegramSource
53
54        self.source = TelegramSource.objects.create(
55            name="test",
56            slug="test",
57            bot_username="test_bot",
58            bot_token="modern_token",  # nosec
59            request_message_access=True,
60            pre_authentication_flow=create_test_flow(),
61        )
62        self.mock_stage = Mock()
63        self.mock_stage.source = self.source

Hook method for setting up the test fixture before exercising it.

def test_ui_login_button(self):
65    def test_ui_login_button(self):
66        """Test UI login button"""
67        ui_login_button = self.source.ui_login_button(None)
68        self.assertIsNotNone(ui_login_button)
69        self.assertEqual(ui_login_button.name, "test")
70        self.assertTrue(ui_login_button.challenge.is_valid(raise_exception=True))

Test UI login button

def test_challenge_response(self):
72    def test_challenge_response(self):
73        """Test correct Telegram response validation"""
74        cr = TelegramChallengeResponse(data=self._make_valid_response())
75        cr.stage = self.mock_stage
76        self.assertTrue(cr.is_valid(raise_exception=True))

Test correct Telegram response validation

def test_outdated_challenge_response(self):
78    def test_outdated_challenge_response(self):
79        """Test outdated Telegram response validation"""
80        cr = TelegramChallengeResponse(data=self._make_outdated_response())
81        cr.stage = self.mock_stage
82        with self.assertRaises(ValidationError):
83            cr.is_valid(raise_exception=True)

Test outdated Telegram response validation

def test_invalid_hash_challenge_response(self):
85    def test_invalid_hash_challenge_response(self):
86        """Test invalid hash in Telegram response validation"""
87        resp = self._make_valid_response()
88        resp["hash"] = "invalid_hash"
89        cr = TelegramChallengeResponse(data=resp)
90        cr.stage = self.mock_stage
91        with self.assertRaises(ValidationError):
92            cr.is_valid(raise_exception=True)

Test invalid hash in Telegram response validation

def test_user_base_properties(self):
 94    def test_user_base_properties(self):
 95        """Test user base properties"""
 96        cr = TelegramChallengeResponse(data=self._make_valid_response())
 97        cr.stage = self.mock_stage
 98        cr.is_valid(raise_exception=True)
 99        properties = self.source.get_base_user_properties(info=cr.validated_data)
100        self.assertEqual(
101            properties,
102            {
103                "username": "testuser",
104                "name": "Test User",
105                "email": None,
106            },
107        )

Test user base properties

def test_group_base_properties(self):
109    def test_group_base_properties(self):
110        """Test group base properties"""
111        for group_id in ["group 1", "group 2"]:
112            properties = self.source.get_base_group_properties(group_id=group_id)
113            self.assertEqual(properties, {"name": group_id})

Test group base properties

class TestTelegramViews(MockTelegramResponseMixin, authentik.flows.tests.FlowTestCase):
116class TestTelegramViews(MockTelegramResponseMixin, FlowTestCase):
117    """Test Telegram source views"""
118
119    def setUp(self):
120        super().setUp()
121        from authentik.sources.telegram.models import TelegramSource
122
123        self.pre_auth_flow = create_test_flow()
124
125        self.source = TelegramSource.objects.create(
126            name="test",
127            slug="test",
128            bot_username="test_bot",
129            bot_token="modern_token",  # nosec
130            request_message_access=True,
131            enrollment_flow=create_test_flow(),
132            pre_authentication_flow=self.pre_auth_flow,
133        )
134
135        self.flow = create_test_flow(FlowDesignation.AUTHENTICATION)
136        self.stage = IdentificationStage.objects.create(
137            name="identification",
138            user_fields=[UserFields.E_MAIL],
139            pretend_user_exists=False,
140        )
141        self.stage.sources.set([self.source])
142        self.stage.save()
143        FlowStageBinding.objects.create(
144            target=self.flow,
145            stage=self.stage,
146            order=0,
147        )
148
149    def _make_initial_request(self):
150        return self.client.get(
151            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
152        )
153
154    def _make_start_request(self):
155        return self.client.get(
156            reverse("authentik_sources_telegram:start", kwargs={"source_slug": self.source.slug}),
157            follow=True,
158        )
159
160    def test_start_view(self):
161        """Test TelegramStartView"""
162        self.assertEqual(self._make_initial_request().status_code, 200)
163
164        response = self._make_start_request()
165        self.assertEqual(response.status_code, 200)
166        self.assertEqual(
167            response.redirect_chain[0][0],
168            reverse("authentik_core:if-flow", kwargs={"flow_slug": self.pre_auth_flow.slug}),
169        )
170
171    def test_challenge_view(self):
172        """Test TelegramLoginView"""
173        self._make_initial_request()
174        self._make_start_request()
175        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.pre_auth_flow.slug})
176        get_response = self.client.get(url)
177        self.assertEqual(get_response.status_code, 200)
178        form_data = self._make_valid_response()
179        form_data["component"] = "ak-source-telegram"
180        response = self.client.post(url, form_data)
181        self.assertEqual(response.status_code, 200)
182        self.assertStageRedirects(
183            response,
184            reverse(
185                "authentik_core:if-flow", kwargs={"flow_slug": self.source.enrollment_flow.slug}
186            ),
187        )
188
189    def test_connect_user(self):
190        user = create_test_user("testuser")
191        user2 = create_test_user("testuser2")
192        self.client.force_login(user)
193        response = self.client.post(
194            reverse("authentik_api:telegramsource-connect-user", args=[self.source.slug]),
195            self._make_valid_response(),
196        )
197        self.assertEqual(response.status_code, 201)
198        self.assertTrue(
199            UserTelegramSourceConnection.objects.filter(user=user, source=self.source).exists()
200        )
201        self.client.logout()
202        self.client.force_login(user2)
203        response = self.client.post(
204            reverse("authentik_api:telegramsource-connect-user", args=[self.source.slug]),
205            self._make_valid_response(),
206        )
207        self.assertEqual(response.status_code, 403)

Test Telegram source views

def setUp(self):
119    def setUp(self):
120        super().setUp()
121        from authentik.sources.telegram.models import TelegramSource
122
123        self.pre_auth_flow = create_test_flow()
124
125        self.source = TelegramSource.objects.create(
126            name="test",
127            slug="test",
128            bot_username="test_bot",
129            bot_token="modern_token",  # nosec
130            request_message_access=True,
131            enrollment_flow=create_test_flow(),
132            pre_authentication_flow=self.pre_auth_flow,
133        )
134
135        self.flow = create_test_flow(FlowDesignation.AUTHENTICATION)
136        self.stage = IdentificationStage.objects.create(
137            name="identification",
138            user_fields=[UserFields.E_MAIL],
139            pretend_user_exists=False,
140        )
141        self.stage.sources.set([self.source])
142        self.stage.save()
143        FlowStageBinding.objects.create(
144            target=self.flow,
145            stage=self.stage,
146            order=0,
147        )

Hook method for setting up the test fixture before exercising it.

def test_start_view(self):
160    def test_start_view(self):
161        """Test TelegramStartView"""
162        self.assertEqual(self._make_initial_request().status_code, 200)
163
164        response = self._make_start_request()
165        self.assertEqual(response.status_code, 200)
166        self.assertEqual(
167            response.redirect_chain[0][0],
168            reverse("authentik_core:if-flow", kwargs={"flow_slug": self.pre_auth_flow.slug}),
169        )

Test TelegramStartView

def test_challenge_view(self):
171    def test_challenge_view(self):
172        """Test TelegramLoginView"""
173        self._make_initial_request()
174        self._make_start_request()
175        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.pre_auth_flow.slug})
176        get_response = self.client.get(url)
177        self.assertEqual(get_response.status_code, 200)
178        form_data = self._make_valid_response()
179        form_data["component"] = "ak-source-telegram"
180        response = self.client.post(url, form_data)
181        self.assertEqual(response.status_code, 200)
182        self.assertStageRedirects(
183            response,
184            reverse(
185                "authentik_core:if-flow", kwargs={"flow_slug": self.source.enrollment_flow.slug}
186            ),
187        )

Test TelegramLoginView

def test_connect_user(self):
189    def test_connect_user(self):
190        user = create_test_user("testuser")
191        user2 = create_test_user("testuser2")
192        self.client.force_login(user)
193        response = self.client.post(
194            reverse("authentik_api:telegramsource-connect-user", args=[self.source.slug]),
195            self._make_valid_response(),
196        )
197        self.assertEqual(response.status_code, 201)
198        self.assertTrue(
199            UserTelegramSourceConnection.objects.filter(user=user, source=self.source).exists()
200        )
201        self.client.logout()
202        self.client.force_login(user2)
203        response = self.client.post(
204            reverse("authentik_api:telegramsource-connect-user", args=[self.source.slug]),
205            self._make_valid_response(),
206        )
207        self.assertEqual(response.status_code, 403)