authentik.sources.telegram.views

 1from django.http import Http404, HttpRequest, HttpResponse
 2from django.shortcuts import get_object_or_404
 3from django.views import View
 4
 5from authentik.core.sources.flow_manager import SourceFlowManager
 6from authentik.flows.challenge import Challenge
 7from authentik.flows.exceptions import FlowNonApplicableException
 8from authentik.flows.models import in_memory_stage
 9from authentik.flows.planner import (
10    PLAN_CONTEXT_REDIRECT,
11    PLAN_CONTEXT_SOURCE,
12    PLAN_CONTEXT_SSO,
13    FlowPlanner,
14)
15from authentik.flows.stage import ChallengeStageView
16from authentik.flows.views.executor import NEXT_ARG_NAME, SESSION_KEY_GET
17from authentik.sources.telegram.models import (
18    GroupTelegramSourceConnection,
19    TelegramSource,
20    UserTelegramSourceConnection,
21)
22from authentik.sources.telegram.stage import TelegramChallengeResponse, TelegramLoginChallenge
23
24
25class TelegramStartView(View):
26    def handle_login_flow(
27        self, source: TelegramSource, *stages_to_append, **kwargs
28    ) -> HttpResponse:
29        """Prepare Authentication Plan, redirect user FlowExecutor"""
30        # Ensure redirect is carried through when user was trying to
31        # authorize application
32        final_redirect = self.request.session.get(SESSION_KEY_GET, {}).get(
33            NEXT_ARG_NAME, "authentik_core:if-user"
34        )
35        kwargs.update(
36            {
37                PLAN_CONTEXT_SSO: True,
38                PLAN_CONTEXT_SOURCE: source,
39                PLAN_CONTEXT_REDIRECT: final_redirect,
40            }
41        )
42        # We run the Flow planner here so we can pass the Pending user in the context
43        planner = FlowPlanner(source.pre_authentication_flow)
44        planner.allow_empty_flows = True
45        try:
46            plan = planner.plan(self.request, kwargs)
47        except FlowNonApplicableException:
48            raise Http404 from None
49        for stage in stages_to_append:
50            plan.append_stage(stage)
51        return plan.to_redirect(self.request, source.pre_authentication_flow)
52
53    def get(self, request: HttpRequest, source_slug: str) -> HttpResponse:
54        source = get_object_or_404(TelegramSource, slug=source_slug, enabled=True)
55        telegram_login_stage = in_memory_stage(TelegramLoginView)
56
57        return self.handle_login_flow(source, telegram_login_stage)
58
59
60class TelegramSourceFlowManager(SourceFlowManager):
61    """Flow manager for Telegram source"""
62
63    user_connection_type = UserTelegramSourceConnection
64    group_connection_type = GroupTelegramSourceConnection
65
66
67class TelegramLoginView(ChallengeStageView):
68
69    response_class = TelegramChallengeResponse
70
71    def dispatch(self, request, *args, **kwargs):
72        self.source = self.executor.plan.context[PLAN_CONTEXT_SOURCE]
73        return super().dispatch(request, *args, **kwargs)
74
75    def get_challenge(self, *args, **kwargs) -> Challenge:
76        return TelegramLoginChallenge(
77            data={
78                "bot_username": self.source.bot_username,
79                "request_message_access": self.source.request_message_access,
80            },
81        )
82
83    def challenge_valid(self, response: TelegramChallengeResponse) -> HttpResponse:
84        raw_info = response.validated_data.copy()
85        raw_info.pop("component")
86        raw_info.pop("hash")
87        raw_info.pop("auth_date")
88        source = self.source
89        sfm = TelegramSourceFlowManager(
90            source=source,
91            request=self.request,
92            identifier=raw_info["id"],
93            user_info={"info": raw_info},
94            policy_context={"telegram": raw_info},
95        )
96        return sfm.get_flow(
97            raw_info=raw_info,
98        )
class TelegramStartView(django.views.generic.base.View):
26class TelegramStartView(View):
27    def handle_login_flow(
28        self, source: TelegramSource, *stages_to_append, **kwargs
29    ) -> HttpResponse:
30        """Prepare Authentication Plan, redirect user FlowExecutor"""
31        # Ensure redirect is carried through when user was trying to
32        # authorize application
33        final_redirect = self.request.session.get(SESSION_KEY_GET, {}).get(
34            NEXT_ARG_NAME, "authentik_core:if-user"
35        )
36        kwargs.update(
37            {
38                PLAN_CONTEXT_SSO: True,
39                PLAN_CONTEXT_SOURCE: source,
40                PLAN_CONTEXT_REDIRECT: final_redirect,
41            }
42        )
43        # We run the Flow planner here so we can pass the Pending user in the context
44        planner = FlowPlanner(source.pre_authentication_flow)
45        planner.allow_empty_flows = True
46        try:
47            plan = planner.plan(self.request, kwargs)
48        except FlowNonApplicableException:
49            raise Http404 from None
50        for stage in stages_to_append:
51            plan.append_stage(stage)
52        return plan.to_redirect(self.request, source.pre_authentication_flow)
53
54    def get(self, request: HttpRequest, source_slug: str) -> HttpResponse:
55        source = get_object_or_404(TelegramSource, slug=source_slug, enabled=True)
56        telegram_login_stage = in_memory_stage(TelegramLoginView)
57
58        return self.handle_login_flow(source, telegram_login_stage)

Intentionally simple parent class for all views. Only implements dispatch-by-method and simple sanity checking.

def handle_login_flow( self, source: authentik.sources.telegram.models.TelegramSource, *stages_to_append, **kwargs) -> django.http.response.HttpResponse:
27    def handle_login_flow(
28        self, source: TelegramSource, *stages_to_append, **kwargs
29    ) -> HttpResponse:
30        """Prepare Authentication Plan, redirect user FlowExecutor"""
31        # Ensure redirect is carried through when user was trying to
32        # authorize application
33        final_redirect = self.request.session.get(SESSION_KEY_GET, {}).get(
34            NEXT_ARG_NAME, "authentik_core:if-user"
35        )
36        kwargs.update(
37            {
38                PLAN_CONTEXT_SSO: True,
39                PLAN_CONTEXT_SOURCE: source,
40                PLAN_CONTEXT_REDIRECT: final_redirect,
41            }
42        )
43        # We run the Flow planner here so we can pass the Pending user in the context
44        planner = FlowPlanner(source.pre_authentication_flow)
45        planner.allow_empty_flows = True
46        try:
47            plan = planner.plan(self.request, kwargs)
48        except FlowNonApplicableException:
49            raise Http404 from None
50        for stage in stages_to_append:
51            plan.append_stage(stage)
52        return plan.to_redirect(self.request, source.pre_authentication_flow)

Prepare Authentication Plan, redirect user FlowExecutor

def get( self, request: django.http.request.HttpRequest, source_slug: str) -> django.http.response.HttpResponse:
54    def get(self, request: HttpRequest, source_slug: str) -> HttpResponse:
55        source = get_object_or_404(TelegramSource, slug=source_slug, enabled=True)
56        telegram_login_stage = in_memory_stage(TelegramLoginView)
57
58        return self.handle_login_flow(source, telegram_login_stage)
class TelegramSourceFlowManager(authentik.core.sources.flow_manager.SourceFlowManager):
61class TelegramSourceFlowManager(SourceFlowManager):
62    """Flow manager for Telegram source"""
63
64    user_connection_type = UserTelegramSourceConnection
65    group_connection_type = GroupTelegramSourceConnection

Flow manager for Telegram source

class TelegramLoginView(authentik.flows.stage.ChallengeStageView):
68class TelegramLoginView(ChallengeStageView):
69
70    response_class = TelegramChallengeResponse
71
72    def dispatch(self, request, *args, **kwargs):
73        self.source = self.executor.plan.context[PLAN_CONTEXT_SOURCE]
74        return super().dispatch(request, *args, **kwargs)
75
76    def get_challenge(self, *args, **kwargs) -> Challenge:
77        return TelegramLoginChallenge(
78            data={
79                "bot_username": self.source.bot_username,
80                "request_message_access": self.source.request_message_access,
81            },
82        )
83
84    def challenge_valid(self, response: TelegramChallengeResponse) -> HttpResponse:
85        raw_info = response.validated_data.copy()
86        raw_info.pop("component")
87        raw_info.pop("hash")
88        raw_info.pop("auth_date")
89        source = self.source
90        sfm = TelegramSourceFlowManager(
91            source=source,
92            request=self.request,
93            identifier=raw_info["id"],
94            user_info={"info": raw_info},
95            policy_context={"telegram": raw_info},
96        )
97        return sfm.get_flow(
98            raw_info=raw_info,
99        )

Stage view which response with a challenge

def dispatch(self, request, *args, **kwargs):
72    def dispatch(self, request, *args, **kwargs):
73        self.source = self.executor.plan.context[PLAN_CONTEXT_SOURCE]
74        return super().dispatch(request, *args, **kwargs)
def get_challenge(self, *args, **kwargs) -> authentik.flows.challenge.Challenge:
76    def get_challenge(self, *args, **kwargs) -> Challenge:
77        return TelegramLoginChallenge(
78            data={
79                "bot_username": self.source.bot_username,
80                "request_message_access": self.source.request_message_access,
81            },
82        )

Return the challenge that the client should solve

def challenge_valid( self, response: authentik.sources.telegram.stage.TelegramChallengeResponse) -> django.http.response.HttpResponse:
84    def challenge_valid(self, response: TelegramChallengeResponse) -> HttpResponse:
85        raw_info = response.validated_data.copy()
86        raw_info.pop("component")
87        raw_info.pop("hash")
88        raw_info.pop("auth_date")
89        source = self.source
90        sfm = TelegramSourceFlowManager(
91            source=source,
92            request=self.request,
93            identifier=raw_info["id"],
94            user_info={"info": raw_info},
95            policy_context={"telegram": raw_info},
96        )
97        return sfm.get_flow(
98            raw_info=raw_info,
99        )

Callback when the challenge has the correct format