authentik.sources.telegram.tests
Telegram source tests
1"""Telegram source tests""" 2 3import hashlib 4import hmac 5from datetime import datetime, timedelta 6from unittest.mock import Mock 7 8from django.test import TestCase 9from django.urls import reverse 10from rest_framework.exceptions import ValidationError 11 12from authentik.core.tests.utils import create_test_flow, create_test_user 13from authentik.flows.models import FlowDesignation, FlowStageBinding 14from authentik.flows.tests import FlowTestCase 15from authentik.sources.telegram.models import UserTelegramSourceConnection 16from authentik.sources.telegram.stage import TelegramChallengeResponse 17from authentik.stages.identification.models import IdentificationStage, UserFields 18 19 20class MockTelegramResponseMixin: 21 def _add_hash(self, response): 22 to_hash = "\n".join([f"{key}={value}" for key, value in sorted(response.items())]) 23 response["hash"] = hmac.new( 24 hashlib.sha256(self.source.bot_token.encode("utf-8")).digest(), 25 to_hash.encode("utf-8"), 26 "sha256", 27 ).hexdigest() 28 29 def _make_valid_response(self): 30 resp = { 31 "id": "123456789", 32 "first_name": "Test", 33 "last_name": "User", 34 "username": "testuser", 35 "auth_date": str(int(datetime.now().timestamp())), 36 } 37 self._add_hash(resp) 38 return resp 39 40 def _make_outdated_response(self): 41 resp = self._make_valid_response() 42 resp["auth_date"] = str(int((datetime.now() - timedelta(days=1)).timestamp())) 43 self._add_hash(resp) 44 return resp 45 46 47class TestTelegramSource(MockTelegramResponseMixin, TestCase): 48 """Telegram Source tests""" 49 50 def setUp(self): 51 from authentik.sources.telegram.models import TelegramSource 52 53 self.source = TelegramSource.objects.create( 54 name="test", 55 slug="test", 56 bot_username="test_bot", 57 bot_token="modern_token", # nosec 58 request_message_access=True, 59 pre_authentication_flow=create_test_flow(), 60 ) 61 self.mock_stage = Mock() 62 self.mock_stage.source = self.source 63 64 def test_ui_login_button(self): 65 """Test UI login button""" 66 ui_login_button = self.source.ui_login_button(None) 67 self.assertIsNotNone(ui_login_button) 68 self.assertEqual(ui_login_button.name, "test") 69 self.assertTrue(ui_login_button.challenge.is_valid(raise_exception=True)) 70 71 def test_challenge_response(self): 72 """Test correct Telegram response validation""" 73 cr = TelegramChallengeResponse(data=self._make_valid_response()) 74 cr.stage = self.mock_stage 75 self.assertTrue(cr.is_valid(raise_exception=True)) 76 77 def test_outdated_challenge_response(self): 78 """Test outdated Telegram response validation""" 79 cr = TelegramChallengeResponse(data=self._make_outdated_response()) 80 cr.stage = self.mock_stage 81 with self.assertRaises(ValidationError): 82 cr.is_valid(raise_exception=True) 83 84 def test_invalid_hash_challenge_response(self): 85 """Test invalid hash in Telegram response validation""" 86 resp = self._make_valid_response() 87 resp["hash"] = "invalid_hash" 88 cr = TelegramChallengeResponse(data=resp) 89 cr.stage = self.mock_stage 90 with self.assertRaises(ValidationError): 91 cr.is_valid(raise_exception=True) 92 93 def test_user_base_properties(self): 94 """Test user base properties""" 95 cr = TelegramChallengeResponse(data=self._make_valid_response()) 96 cr.stage = self.mock_stage 97 cr.is_valid(raise_exception=True) 98 properties = self.source.get_base_user_properties(info=cr.validated_data) 99 self.assertEqual( 100 properties, 101 { 102 "username": "testuser", 103 "name": "Test User", 104 "email": None, 105 }, 106 ) 107 108 def test_group_base_properties(self): 109 """Test group base properties""" 110 for group_id in ["group 1", "group 2"]: 111 properties = self.source.get_base_group_properties(group_id=group_id) 112 self.assertEqual(properties, {"name": group_id}) 113 114 115class TestTelegramViews(MockTelegramResponseMixin, FlowTestCase): 116 """Test Telegram source views""" 117 118 def setUp(self): 119 super().setUp() 120 from authentik.sources.telegram.models import TelegramSource 121 122 self.pre_auth_flow = create_test_flow() 123 124 self.source = TelegramSource.objects.create( 125 name="test", 126 slug="test", 127 bot_username="test_bot", 128 bot_token="modern_token", # nosec 129 request_message_access=True, 130 enrollment_flow=create_test_flow(), 131 pre_authentication_flow=self.pre_auth_flow, 132 ) 133 134 self.flow = create_test_flow(FlowDesignation.AUTHENTICATION) 135 self.stage = IdentificationStage.objects.create( 136 name="identification", 137 user_fields=[UserFields.E_MAIL], 138 pretend_user_exists=False, 139 ) 140 self.stage.sources.set([self.source]) 141 self.stage.save() 142 FlowStageBinding.objects.create( 143 target=self.flow, 144 stage=self.stage, 145 order=0, 146 ) 147 148 def _make_initial_request(self): 149 return self.client.get( 150 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 151 ) 152 153 def _make_start_request(self): 154 return self.client.get( 155 reverse("authentik_sources_telegram:start", kwargs={"source_slug": self.source.slug}), 156 follow=True, 157 ) 158 159 def test_start_view(self): 160 """Test TelegramStartView""" 161 self.assertEqual(self._make_initial_request().status_code, 200) 162 163 response = self._make_start_request() 164 self.assertEqual(response.status_code, 200) 165 self.assertEqual( 166 response.redirect_chain[0][0], 167 reverse("authentik_core:if-flow", kwargs={"flow_slug": self.pre_auth_flow.slug}), 168 ) 169 170 def test_challenge_view(self): 171 """Test TelegramLoginView""" 172 self._make_initial_request() 173 self._make_start_request() 174 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.pre_auth_flow.slug}) 175 get_response = self.client.get(url) 176 self.assertEqual(get_response.status_code, 200) 177 form_data = self._make_valid_response() 178 form_data["component"] = "ak-source-telegram" 179 response = self.client.post(url, form_data) 180 self.assertEqual(response.status_code, 200) 181 self.assertStageRedirects( 182 response, 183 reverse( 184 "authentik_core:if-flow", kwargs={"flow_slug": self.source.enrollment_flow.slug} 185 ), 186 ) 187 188 def test_connect_user(self): 189 user = create_test_user("testuser") 190 user2 = create_test_user("testuser2") 191 self.client.force_login(user) 192 response = self.client.post( 193 reverse("authentik_api:telegramsource-connect-user", args=[self.source.slug]), 194 self._make_valid_response(), 195 ) 196 self.assertEqual(response.status_code, 201) 197 self.assertTrue( 198 UserTelegramSourceConnection.objects.filter(user=user, source=self.source).exists() 199 ) 200 self.client.logout() 201 self.client.force_login(user2) 202 response = self.client.post( 203 reverse("authentik_api:telegramsource-connect-user", args=[self.source.slug]), 204 self._make_valid_response(), 205 ) 206 self.assertEqual(response.status_code, 403)
class
MockTelegramResponseMixin:
21class MockTelegramResponseMixin: 22 def _add_hash(self, response): 23 to_hash = "\n".join([f"{key}={value}" for key, value in sorted(response.items())]) 24 response["hash"] = hmac.new( 25 hashlib.sha256(self.source.bot_token.encode("utf-8")).digest(), 26 to_hash.encode("utf-8"), 27 "sha256", 28 ).hexdigest() 29 30 def _make_valid_response(self): 31 resp = { 32 "id": "123456789", 33 "first_name": "Test", 34 "last_name": "User", 35 "username": "testuser", 36 "auth_date": str(int(datetime.now().timestamp())), 37 } 38 self._add_hash(resp) 39 return resp 40 41 def _make_outdated_response(self): 42 resp = self._make_valid_response() 43 resp["auth_date"] = str(int((datetime.now() - timedelta(days=1)).timestamp())) 44 self._add_hash(resp) 45 return resp
48class TestTelegramSource(MockTelegramResponseMixin, TestCase): 49 """Telegram Source tests""" 50 51 def setUp(self): 52 from authentik.sources.telegram.models import TelegramSource 53 54 self.source = TelegramSource.objects.create( 55 name="test", 56 slug="test", 57 bot_username="test_bot", 58 bot_token="modern_token", # nosec 59 request_message_access=True, 60 pre_authentication_flow=create_test_flow(), 61 ) 62 self.mock_stage = Mock() 63 self.mock_stage.source = self.source 64 65 def test_ui_login_button(self): 66 """Test UI login button""" 67 ui_login_button = self.source.ui_login_button(None) 68 self.assertIsNotNone(ui_login_button) 69 self.assertEqual(ui_login_button.name, "test") 70 self.assertTrue(ui_login_button.challenge.is_valid(raise_exception=True)) 71 72 def test_challenge_response(self): 73 """Test correct Telegram response validation""" 74 cr = TelegramChallengeResponse(data=self._make_valid_response()) 75 cr.stage = self.mock_stage 76 self.assertTrue(cr.is_valid(raise_exception=True)) 77 78 def test_outdated_challenge_response(self): 79 """Test outdated Telegram response validation""" 80 cr = TelegramChallengeResponse(data=self._make_outdated_response()) 81 cr.stage = self.mock_stage 82 with self.assertRaises(ValidationError): 83 cr.is_valid(raise_exception=True) 84 85 def test_invalid_hash_challenge_response(self): 86 """Test invalid hash in Telegram response validation""" 87 resp = self._make_valid_response() 88 resp["hash"] = "invalid_hash" 89 cr = TelegramChallengeResponse(data=resp) 90 cr.stage = self.mock_stage 91 with self.assertRaises(ValidationError): 92 cr.is_valid(raise_exception=True) 93 94 def test_user_base_properties(self): 95 """Test user base properties""" 96 cr = TelegramChallengeResponse(data=self._make_valid_response()) 97 cr.stage = self.mock_stage 98 cr.is_valid(raise_exception=True) 99 properties = self.source.get_base_user_properties(info=cr.validated_data) 100 self.assertEqual( 101 properties, 102 { 103 "username": "testuser", 104 "name": "Test User", 105 "email": None, 106 }, 107 ) 108 109 def test_group_base_properties(self): 110 """Test group base properties""" 111 for group_id in ["group 1", "group 2"]: 112 properties = self.source.get_base_group_properties(group_id=group_id) 113 self.assertEqual(properties, {"name": group_id})
Telegram Source tests
def
setUp(self):
51 def setUp(self): 52 from authentik.sources.telegram.models import TelegramSource 53 54 self.source = TelegramSource.objects.create( 55 name="test", 56 slug="test", 57 bot_username="test_bot", 58 bot_token="modern_token", # nosec 59 request_message_access=True, 60 pre_authentication_flow=create_test_flow(), 61 ) 62 self.mock_stage = Mock() 63 self.mock_stage.source = self.source
Hook method for setting up the test fixture before exercising it.
def
test_challenge_response(self):
72 def test_challenge_response(self): 73 """Test correct Telegram response validation""" 74 cr = TelegramChallengeResponse(data=self._make_valid_response()) 75 cr.stage = self.mock_stage 76 self.assertTrue(cr.is_valid(raise_exception=True))
Test correct Telegram response validation
def
test_outdated_challenge_response(self):
78 def test_outdated_challenge_response(self): 79 """Test outdated Telegram response validation""" 80 cr = TelegramChallengeResponse(data=self._make_outdated_response()) 81 cr.stage = self.mock_stage 82 with self.assertRaises(ValidationError): 83 cr.is_valid(raise_exception=True)
Test outdated Telegram response validation
def
test_invalid_hash_challenge_response(self):
85 def test_invalid_hash_challenge_response(self): 86 """Test invalid hash in Telegram response validation""" 87 resp = self._make_valid_response() 88 resp["hash"] = "invalid_hash" 89 cr = TelegramChallengeResponse(data=resp) 90 cr.stage = self.mock_stage 91 with self.assertRaises(ValidationError): 92 cr.is_valid(raise_exception=True)
Test invalid hash in Telegram response validation
def
test_user_base_properties(self):
94 def test_user_base_properties(self): 95 """Test user base properties""" 96 cr = TelegramChallengeResponse(data=self._make_valid_response()) 97 cr.stage = self.mock_stage 98 cr.is_valid(raise_exception=True) 99 properties = self.source.get_base_user_properties(info=cr.validated_data) 100 self.assertEqual( 101 properties, 102 { 103 "username": "testuser", 104 "name": "Test User", 105 "email": None, 106 }, 107 )
Test user base properties
def
test_group_base_properties(self):
109 def test_group_base_properties(self): 110 """Test group base properties""" 111 for group_id in ["group 1", "group 2"]: 112 properties = self.source.get_base_group_properties(group_id=group_id) 113 self.assertEqual(properties, {"name": group_id})
Test group base properties
116class TestTelegramViews(MockTelegramResponseMixin, FlowTestCase): 117 """Test Telegram source views""" 118 119 def setUp(self): 120 super().setUp() 121 from authentik.sources.telegram.models import TelegramSource 122 123 self.pre_auth_flow = create_test_flow() 124 125 self.source = TelegramSource.objects.create( 126 name="test", 127 slug="test", 128 bot_username="test_bot", 129 bot_token="modern_token", # nosec 130 request_message_access=True, 131 enrollment_flow=create_test_flow(), 132 pre_authentication_flow=self.pre_auth_flow, 133 ) 134 135 self.flow = create_test_flow(FlowDesignation.AUTHENTICATION) 136 self.stage = IdentificationStage.objects.create( 137 name="identification", 138 user_fields=[UserFields.E_MAIL], 139 pretend_user_exists=False, 140 ) 141 self.stage.sources.set([self.source]) 142 self.stage.save() 143 FlowStageBinding.objects.create( 144 target=self.flow, 145 stage=self.stage, 146 order=0, 147 ) 148 149 def _make_initial_request(self): 150 return self.client.get( 151 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 152 ) 153 154 def _make_start_request(self): 155 return self.client.get( 156 reverse("authentik_sources_telegram:start", kwargs={"source_slug": self.source.slug}), 157 follow=True, 158 ) 159 160 def test_start_view(self): 161 """Test TelegramStartView""" 162 self.assertEqual(self._make_initial_request().status_code, 200) 163 164 response = self._make_start_request() 165 self.assertEqual(response.status_code, 200) 166 self.assertEqual( 167 response.redirect_chain[0][0], 168 reverse("authentik_core:if-flow", kwargs={"flow_slug": self.pre_auth_flow.slug}), 169 ) 170 171 def test_challenge_view(self): 172 """Test TelegramLoginView""" 173 self._make_initial_request() 174 self._make_start_request() 175 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.pre_auth_flow.slug}) 176 get_response = self.client.get(url) 177 self.assertEqual(get_response.status_code, 200) 178 form_data = self._make_valid_response() 179 form_data["component"] = "ak-source-telegram" 180 response = self.client.post(url, form_data) 181 self.assertEqual(response.status_code, 200) 182 self.assertStageRedirects( 183 response, 184 reverse( 185 "authentik_core:if-flow", kwargs={"flow_slug": self.source.enrollment_flow.slug} 186 ), 187 ) 188 189 def test_connect_user(self): 190 user = create_test_user("testuser") 191 user2 = create_test_user("testuser2") 192 self.client.force_login(user) 193 response = self.client.post( 194 reverse("authentik_api:telegramsource-connect-user", args=[self.source.slug]), 195 self._make_valid_response(), 196 ) 197 self.assertEqual(response.status_code, 201) 198 self.assertTrue( 199 UserTelegramSourceConnection.objects.filter(user=user, source=self.source).exists() 200 ) 201 self.client.logout() 202 self.client.force_login(user2) 203 response = self.client.post( 204 reverse("authentik_api:telegramsource-connect-user", args=[self.source.slug]), 205 self._make_valid_response(), 206 ) 207 self.assertEqual(response.status_code, 403)
Test Telegram source views
def
setUp(self):
119 def setUp(self): 120 super().setUp() 121 from authentik.sources.telegram.models import TelegramSource 122 123 self.pre_auth_flow = create_test_flow() 124 125 self.source = TelegramSource.objects.create( 126 name="test", 127 slug="test", 128 bot_username="test_bot", 129 bot_token="modern_token", # nosec 130 request_message_access=True, 131 enrollment_flow=create_test_flow(), 132 pre_authentication_flow=self.pre_auth_flow, 133 ) 134 135 self.flow = create_test_flow(FlowDesignation.AUTHENTICATION) 136 self.stage = IdentificationStage.objects.create( 137 name="identification", 138 user_fields=[UserFields.E_MAIL], 139 pretend_user_exists=False, 140 ) 141 self.stage.sources.set([self.source]) 142 self.stage.save() 143 FlowStageBinding.objects.create( 144 target=self.flow, 145 stage=self.stage, 146 order=0, 147 )
Hook method for setting up the test fixture before exercising it.
def
test_start_view(self):
160 def test_start_view(self): 161 """Test TelegramStartView""" 162 self.assertEqual(self._make_initial_request().status_code, 200) 163 164 response = self._make_start_request() 165 self.assertEqual(response.status_code, 200) 166 self.assertEqual( 167 response.redirect_chain[0][0], 168 reverse("authentik_core:if-flow", kwargs={"flow_slug": self.pre_auth_flow.slug}), 169 )
Test TelegramStartView
def
test_challenge_view(self):
171 def test_challenge_view(self): 172 """Test TelegramLoginView""" 173 self._make_initial_request() 174 self._make_start_request() 175 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.pre_auth_flow.slug}) 176 get_response = self.client.get(url) 177 self.assertEqual(get_response.status_code, 200) 178 form_data = self._make_valid_response() 179 form_data["component"] = "ak-source-telegram" 180 response = self.client.post(url, form_data) 181 self.assertEqual(response.status_code, 200) 182 self.assertStageRedirects( 183 response, 184 reverse( 185 "authentik_core:if-flow", kwargs={"flow_slug": self.source.enrollment_flow.slug} 186 ), 187 )
Test TelegramLoginView
def
test_connect_user(self):
189 def test_connect_user(self): 190 user = create_test_user("testuser") 191 user2 = create_test_user("testuser2") 192 self.client.force_login(user) 193 response = self.client.post( 194 reverse("authentik_api:telegramsource-connect-user", args=[self.source.slug]), 195 self._make_valid_response(), 196 ) 197 self.assertEqual(response.status_code, 201) 198 self.assertTrue( 199 UserTelegramSourceConnection.objects.filter(user=user, source=self.source).exists() 200 ) 201 self.client.logout() 202 self.client.force_login(user2) 203 response = self.client.post( 204 reverse("authentik_api:telegramsource-connect-user", args=[self.source.slug]), 205 self._make_valid_response(), 206 ) 207 self.assertEqual(response.status_code, 403)