authentik.providers.rac.views

RAC Views

  1"""RAC Views"""
  2
  3from typing import Any
  4
  5from django.http import Http404, HttpRequest, HttpResponse
  6from django.shortcuts import get_object_or_404, redirect
  7from django.urls import reverse
  8from django.utils.timezone import now
  9from django.utils.translation import gettext as _
 10
 11from authentik.core.models import Application
 12from authentik.core.views.interface import InterfaceView
 13from authentik.events.models import Event, EventAction
 14from authentik.flows.challenge import RedirectChallenge
 15from authentik.flows.exceptions import FlowNonApplicableException
 16from authentik.flows.models import in_memory_stage
 17from authentik.flows.planner import PLAN_CONTEXT_APPLICATION, FlowPlanner
 18from authentik.flows.stage import RedirectStage
 19from authentik.lib.utils.time import timedelta_from_string
 20from authentik.policies.engine import PolicyEngine
 21from authentik.policies.views import PolicyAccessView
 22from authentik.providers.rac.models import ConnectionToken, Endpoint, RACProvider
 23from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT
 24
 25PLAN_CONNECTION_SETTINGS = "connection_settings"
 26
 27
 28class RACStartView(PolicyAccessView):
 29    """Start a RAC connection by checking access and creating a connection token"""
 30
 31    endpoint: Endpoint
 32
 33    def resolve_provider_application(self):
 34        self.application = get_object_or_404(Application, slug=self.kwargs["app"])
 35        # Endpoint permissions are validated in the RACFinalStage below
 36        self.endpoint = get_object_or_404(Endpoint, pk=self.kwargs["endpoint"])
 37        self.provider = RACProvider.objects.get(application=self.application)
 38
 39    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
 40        """Start flow planner for RAC provider"""
 41        planner = FlowPlanner(self.provider.authorization_flow)
 42        planner.allow_empty_flows = True
 43        try:
 44            plan = planner.plan(
 45                self.request,
 46                {
 47                    PLAN_CONTEXT_APPLICATION: self.application,
 48                },
 49            )
 50        except FlowNonApplicableException:
 51            raise Http404 from None
 52        plan.append_stage(
 53            in_memory_stage(
 54                RACFinalStage,
 55                application=self.application,
 56                endpoint=self.endpoint,
 57                provider=self.provider,
 58            )
 59        )
 60        return plan.to_redirect(request, self.provider.authorization_flow)
 61
 62
 63class RACInterface(InterfaceView):
 64    """Start RAC connection"""
 65
 66    template_name = "if/rac.html"
 67    token: ConnectionToken
 68
 69    def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
 70        # Early sanity check to ensure token still exists
 71        token = ConnectionToken.objects.filter(
 72            token=self.kwargs["token"],
 73            session__session__session_key=request.session.session_key,
 74        ).first()
 75        if not token:
 76            return redirect("authentik_core:if-user")
 77        self.token = token
 78        return super().dispatch(request, *args, **kwargs)
 79
 80    def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
 81        kwargs["token"] = self.token
 82        return super().get_context_data(**kwargs)
 83
 84
 85class RACFinalStage(RedirectStage):
 86    """RAC Connection final stage, set the connection token in the stage"""
 87
 88    endpoint: Endpoint
 89    provider: RACProvider
 90    application: Application
 91
 92    def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
 93        self.endpoint = self.executor.current_stage.endpoint
 94        self.provider = self.executor.current_stage.provider
 95        self.application = self.executor.current_stage.application
 96        # Check policies bound to endpoint directly
 97        engine = PolicyEngine(self.endpoint, self.request.user, self.request)
 98        engine.use_cache = False
 99        engine.build()
100        passing = engine.result
101        if not passing.passing:
102            return self.executor.stage_invalid(", ".join(passing.messages))
103        # Check if we're already at the maximum connection limit
104        all_tokens = ConnectionToken.objects.filter(
105            endpoint=self.endpoint,
106        )
107        if self.endpoint.maximum_connections > -1:
108            if all_tokens.count() >= self.endpoint.maximum_connections:
109                msg = [_("Maximum connection limit reached.")]
110                # Check if any other tokens exist for the current user, and inform them
111                # they are already connected
112                if all_tokens.filter(session__user=self.request.user).exists():
113                    msg.append(_("(You are already connected in another tab/window)"))
114                return self.executor.stage_invalid(" ".join(msg))
115        return super().dispatch(request, *args, **kwargs)
116
117    def get_challenge(self, *args, **kwargs) -> RedirectChallenge:
118        settings = self.executor.plan.context.get(PLAN_CONNECTION_SETTINGS)
119        if not settings:
120            settings = self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {}).get(
121                PLAN_CONNECTION_SETTINGS
122            )
123        token = ConnectionToken.objects.create(
124            provider=self.provider,
125            endpoint=self.endpoint,
126            settings=settings or {},
127            session=self.request.session["authenticatedsession"],
128            expires=now() + timedelta_from_string(self.provider.connection_expiry),
129            expiring=True,
130        )
131        Event.new(
132            EventAction.AUTHORIZE_APPLICATION,
133            authorized_application=self.application,
134            flow=self.executor.plan.flow_pk,
135            endpoint=self.endpoint.name,
136        ).from_http(self.request)
137        self.executor.current_stage.destination = self.request.build_absolute_uri(
138            reverse("authentik_providers_rac:if-rac", kwargs={"token": str(token.token)})
139        )
140        return super().get_challenge(*args, **kwargs)
PLAN_CONNECTION_SETTINGS = 'connection_settings'
class RACStartView(authentik.policies.views.PolicyAccessView):
29class RACStartView(PolicyAccessView):
30    """Start a RAC connection by checking access and creating a connection token"""
31
32    endpoint: Endpoint
33
34    def resolve_provider_application(self):
35        self.application = get_object_or_404(Application, slug=self.kwargs["app"])
36        # Endpoint permissions are validated in the RACFinalStage below
37        self.endpoint = get_object_or_404(Endpoint, pk=self.kwargs["endpoint"])
38        self.provider = RACProvider.objects.get(application=self.application)
39
40    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
41        """Start flow planner for RAC provider"""
42        planner = FlowPlanner(self.provider.authorization_flow)
43        planner.allow_empty_flows = True
44        try:
45            plan = planner.plan(
46                self.request,
47                {
48                    PLAN_CONTEXT_APPLICATION: self.application,
49                },
50            )
51        except FlowNonApplicableException:
52            raise Http404 from None
53        plan.append_stage(
54            in_memory_stage(
55                RACFinalStage,
56                application=self.application,
57                endpoint=self.endpoint,
58                provider=self.provider,
59            )
60        )
61        return plan.to_redirect(request, self.provider.authorization_flow)

Start a RAC connection by checking access and creating a connection token

def resolve_provider_application(self):
34    def resolve_provider_application(self):
35        self.application = get_object_or_404(Application, slug=self.kwargs["app"])
36        # Endpoint permissions are validated in the RACFinalStage below
37        self.endpoint = get_object_or_404(Endpoint, pk=self.kwargs["endpoint"])
38        self.provider = RACProvider.objects.get(application=self.application)

Resolve self.provider and self.application. *.DoesNotExist Exceptions cause a normal AccessDenied view to be shown. An Http404 exception is not caught, and will return directly

def get( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
40    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
41        """Start flow planner for RAC provider"""
42        planner = FlowPlanner(self.provider.authorization_flow)
43        planner.allow_empty_flows = True
44        try:
45            plan = planner.plan(
46                self.request,
47                {
48                    PLAN_CONTEXT_APPLICATION: self.application,
49                },
50            )
51        except FlowNonApplicableException:
52            raise Http404 from None
53        plan.append_stage(
54            in_memory_stage(
55                RACFinalStage,
56                application=self.application,
57                endpoint=self.endpoint,
58                provider=self.provider,
59            )
60        )
61        return plan.to_redirect(request, self.provider.authorization_flow)

Start flow planner for RAC provider

class RACInterface(authentik.core.views.interface.InterfaceView):
64class RACInterface(InterfaceView):
65    """Start RAC connection"""
66
67    template_name = "if/rac.html"
68    token: ConnectionToken
69
70    def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
71        # Early sanity check to ensure token still exists
72        token = ConnectionToken.objects.filter(
73            token=self.kwargs["token"],
74            session__session__session_key=request.session.session_key,
75        ).first()
76        if not token:
77            return redirect("authentik_core:if-user")
78        self.token = token
79        return super().dispatch(request, *args, **kwargs)
80
81    def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
82        kwargs["token"] = self.token
83        return super().get_context_data(**kwargs)

Start RAC connection

template_name = 'if/rac.html'
def dispatch( self, request: django.http.request.HttpRequest, *args: Any, **kwargs: Any) -> django.http.response.HttpResponse:
70    def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
71        # Early sanity check to ensure token still exists
72        token = ConnectionToken.objects.filter(
73            token=self.kwargs["token"],
74            session__session__session_key=request.session.session_key,
75        ).first()
76        if not token:
77            return redirect("authentik_core:if-user")
78        self.token = token
79        return super().dispatch(request, *args, **kwargs)
def get_context_data(self, **kwargs: Any) -> dict[str, typing.Any]:
81    def get_context_data(self, **kwargs: Any) -> dict[str, Any]:
82        kwargs["token"] = self.token
83        return super().get_context_data(**kwargs)
class RACFinalStage(authentik.flows.stage.RedirectStage):
 86class RACFinalStage(RedirectStage):
 87    """RAC Connection final stage, set the connection token in the stage"""
 88
 89    endpoint: Endpoint
 90    provider: RACProvider
 91    application: Application
 92
 93    def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
 94        self.endpoint = self.executor.current_stage.endpoint
 95        self.provider = self.executor.current_stage.provider
 96        self.application = self.executor.current_stage.application
 97        # Check policies bound to endpoint directly
 98        engine = PolicyEngine(self.endpoint, self.request.user, self.request)
 99        engine.use_cache = False
100        engine.build()
101        passing = engine.result
102        if not passing.passing:
103            return self.executor.stage_invalid(", ".join(passing.messages))
104        # Check if we're already at the maximum connection limit
105        all_tokens = ConnectionToken.objects.filter(
106            endpoint=self.endpoint,
107        )
108        if self.endpoint.maximum_connections > -1:
109            if all_tokens.count() >= self.endpoint.maximum_connections:
110                msg = [_("Maximum connection limit reached.")]
111                # Check if any other tokens exist for the current user, and inform them
112                # they are already connected
113                if all_tokens.filter(session__user=self.request.user).exists():
114                    msg.append(_("(You are already connected in another tab/window)"))
115                return self.executor.stage_invalid(" ".join(msg))
116        return super().dispatch(request, *args, **kwargs)
117
118    def get_challenge(self, *args, **kwargs) -> RedirectChallenge:
119        settings = self.executor.plan.context.get(PLAN_CONNECTION_SETTINGS)
120        if not settings:
121            settings = self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {}).get(
122                PLAN_CONNECTION_SETTINGS
123            )
124        token = ConnectionToken.objects.create(
125            provider=self.provider,
126            endpoint=self.endpoint,
127            settings=settings or {},
128            session=self.request.session["authenticatedsession"],
129            expires=now() + timedelta_from_string(self.provider.connection_expiry),
130            expiring=True,
131        )
132        Event.new(
133            EventAction.AUTHORIZE_APPLICATION,
134            authorized_application=self.application,
135            flow=self.executor.plan.flow_pk,
136            endpoint=self.endpoint.name,
137        ).from_http(self.request)
138        self.executor.current_stage.destination = self.request.build_absolute_uri(
139            reverse("authentik_providers_rac:if-rac", kwargs={"token": str(token.token)})
140        )
141        return super().get_challenge(*args, **kwargs)

RAC Connection final stage, set the connection token in the stage

def dispatch( self, request: django.http.request.HttpRequest, *args: Any, **kwargs: Any) -> django.http.response.HttpResponse:
 93    def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
 94        self.endpoint = self.executor.current_stage.endpoint
 95        self.provider = self.executor.current_stage.provider
 96        self.application = self.executor.current_stage.application
 97        # Check policies bound to endpoint directly
 98        engine = PolicyEngine(self.endpoint, self.request.user, self.request)
 99        engine.use_cache = False
100        engine.build()
101        passing = engine.result
102        if not passing.passing:
103            return self.executor.stage_invalid(", ".join(passing.messages))
104        # Check if we're already at the maximum connection limit
105        all_tokens = ConnectionToken.objects.filter(
106            endpoint=self.endpoint,
107        )
108        if self.endpoint.maximum_connections > -1:
109            if all_tokens.count() >= self.endpoint.maximum_connections:
110                msg = [_("Maximum connection limit reached.")]
111                # Check if any other tokens exist for the current user, and inform them
112                # they are already connected
113                if all_tokens.filter(session__user=self.request.user).exists():
114                    msg.append(_("(You are already connected in another tab/window)"))
115                return self.executor.stage_invalid(" ".join(msg))
116        return super().dispatch(request, *args, **kwargs)
def get_challenge(self, *args, **kwargs) -> authentik.flows.challenge.RedirectChallenge:
118    def get_challenge(self, *args, **kwargs) -> RedirectChallenge:
119        settings = self.executor.plan.context.get(PLAN_CONNECTION_SETTINGS)
120        if not settings:
121            settings = self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {}).get(
122                PLAN_CONNECTION_SETTINGS
123            )
124        token = ConnectionToken.objects.create(
125            provider=self.provider,
126            endpoint=self.endpoint,
127            settings=settings or {},
128            session=self.request.session["authenticatedsession"],
129            expires=now() + timedelta_from_string(self.provider.connection_expiry),
130            expiring=True,
131        )
132        Event.new(
133            EventAction.AUTHORIZE_APPLICATION,
134            authorized_application=self.application,
135            flow=self.executor.plan.flow_pk,
136            endpoint=self.endpoint.name,
137        ).from_http(self.request)
138        self.executor.current_stage.destination = self.request.build_absolute_uri(
139            reverse("authentik_providers_rac:if-rac", kwargs={"token": str(token.token)})
140        )
141        return super().get_challenge(*args, **kwargs)

Return the challenge that the client should solve