authentik.providers.rac.views
RAC Views
1"""RAC Views""" 2 3from typing import Any 4 5from django.http import Http404, HttpRequest, HttpResponse 6from django.shortcuts import get_object_or_404, redirect 7from django.urls import reverse 8from django.utils.timezone import now 9from django.utils.translation import gettext as _ 10 11from authentik.core.models import Application 12from authentik.core.views.interface import InterfaceView 13from authentik.events.models import Event, EventAction 14from authentik.flows.challenge import RedirectChallenge 15from authentik.flows.exceptions import FlowNonApplicableException 16from authentik.flows.models import in_memory_stage 17from authentik.flows.planner import PLAN_CONTEXT_APPLICATION, FlowPlanner 18from authentik.flows.stage import RedirectStage 19from authentik.lib.utils.time import timedelta_from_string 20from authentik.policies.engine import PolicyEngine 21from authentik.policies.views import PolicyAccessView 22from authentik.providers.rac.models import ConnectionToken, Endpoint, RACProvider 23from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT 24 25PLAN_CONNECTION_SETTINGS = "connection_settings" 26 27 28class RACStartView(PolicyAccessView): 29 """Start a RAC connection by checking access and creating a connection token""" 30 31 endpoint: Endpoint 32 33 def resolve_provider_application(self): 34 self.application = get_object_or_404(Application, slug=self.kwargs["app"]) 35 # Endpoint permissions are validated in the RACFinalStage below 36 self.endpoint = get_object_or_404(Endpoint, pk=self.kwargs["endpoint"]) 37 self.provider = RACProvider.objects.get(application=self.application) 38 39 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 40 """Start flow planner for RAC provider""" 41 planner = FlowPlanner(self.provider.authorization_flow) 42 planner.allow_empty_flows = True 43 try: 44 plan = planner.plan( 45 self.request, 46 { 47 PLAN_CONTEXT_APPLICATION: self.application, 48 }, 49 ) 50 except FlowNonApplicableException: 51 raise Http404 from None 52 plan.append_stage( 53 in_memory_stage( 54 RACFinalStage, 55 application=self.application, 56 endpoint=self.endpoint, 57 provider=self.provider, 58 ) 59 ) 60 return plan.to_redirect(request, self.provider.authorization_flow) 61 62 63class RACInterface(InterfaceView): 64 """Start RAC connection""" 65 66 template_name = "if/rac.html" 67 token: ConnectionToken 68 69 def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse: 70 # Early sanity check to ensure token still exists 71 token = ConnectionToken.objects.filter( 72 token=self.kwargs["token"], 73 session__session__session_key=request.session.session_key, 74 ).first() 75 if not token: 76 return redirect("authentik_core:if-user") 77 self.token = token 78 return super().dispatch(request, *args, **kwargs) 79 80 def get_context_data(self, **kwargs: Any) -> dict[str, Any]: 81 kwargs["token"] = self.token 82 return super().get_context_data(**kwargs) 83 84 85class RACFinalStage(RedirectStage): 86 """RAC Connection final stage, set the connection token in the stage""" 87 88 endpoint: Endpoint 89 provider: RACProvider 90 application: Application 91 92 def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse: 93 self.endpoint = self.executor.current_stage.endpoint 94 self.provider = self.executor.current_stage.provider 95 self.application = self.executor.current_stage.application 96 # Check policies bound to endpoint directly 97 engine = PolicyEngine(self.endpoint, self.request.user, self.request) 98 engine.use_cache = False 99 engine.build() 100 passing = engine.result 101 if not passing.passing: 102 return self.executor.stage_invalid(", ".join(passing.messages)) 103 # Check if we're already at the maximum connection limit 104 all_tokens = ConnectionToken.objects.filter( 105 endpoint=self.endpoint, 106 ) 107 if self.endpoint.maximum_connections > -1: 108 if all_tokens.count() >= self.endpoint.maximum_connections: 109 msg = [_("Maximum connection limit reached.")] 110 # Check if any other tokens exist for the current user, and inform them 111 # they are already connected 112 if all_tokens.filter(session__user=self.request.user).exists(): 113 msg.append(_("(You are already connected in another tab/window)")) 114 return self.executor.stage_invalid(" ".join(msg)) 115 return super().dispatch(request, *args, **kwargs) 116 117 def get_challenge(self, *args, **kwargs) -> RedirectChallenge: 118 settings = self.executor.plan.context.get(PLAN_CONNECTION_SETTINGS) 119 if not settings: 120 settings = self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {}).get( 121 PLAN_CONNECTION_SETTINGS 122 ) 123 token = ConnectionToken.objects.create( 124 provider=self.provider, 125 endpoint=self.endpoint, 126 settings=settings or {}, 127 session=self.request.session["authenticatedsession"], 128 expires=now() + timedelta_from_string(self.provider.connection_expiry), 129 expiring=True, 130 ) 131 Event.new( 132 EventAction.AUTHORIZE_APPLICATION, 133 authorized_application=self.application, 134 flow=self.executor.plan.flow_pk, 135 endpoint=self.endpoint.name, 136 ).from_http(self.request) 137 self.executor.current_stage.destination = self.request.build_absolute_uri( 138 reverse("authentik_providers_rac:if-rac", kwargs={"token": str(token.token)}) 139 ) 140 return super().get_challenge(*args, **kwargs)
PLAN_CONNECTION_SETTINGS =
'connection_settings'
29class RACStartView(PolicyAccessView): 30 """Start a RAC connection by checking access and creating a connection token""" 31 32 endpoint: Endpoint 33 34 def resolve_provider_application(self): 35 self.application = get_object_or_404(Application, slug=self.kwargs["app"]) 36 # Endpoint permissions are validated in the RACFinalStage below 37 self.endpoint = get_object_or_404(Endpoint, pk=self.kwargs["endpoint"]) 38 self.provider = RACProvider.objects.get(application=self.application) 39 40 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 41 """Start flow planner for RAC provider""" 42 planner = FlowPlanner(self.provider.authorization_flow) 43 planner.allow_empty_flows = True 44 try: 45 plan = planner.plan( 46 self.request, 47 { 48 PLAN_CONTEXT_APPLICATION: self.application, 49 }, 50 ) 51 except FlowNonApplicableException: 52 raise Http404 from None 53 plan.append_stage( 54 in_memory_stage( 55 RACFinalStage, 56 application=self.application, 57 endpoint=self.endpoint, 58 provider=self.provider, 59 ) 60 ) 61 return plan.to_redirect(request, self.provider.authorization_flow)
Start a RAC connection by checking access and creating a connection token
endpoint: authentik.providers.rac.models.Endpoint
def
resolve_provider_application(self):
34 def resolve_provider_application(self): 35 self.application = get_object_or_404(Application, slug=self.kwargs["app"]) 36 # Endpoint permissions are validated in the RACFinalStage below 37 self.endpoint = get_object_or_404(Endpoint, pk=self.kwargs["endpoint"]) 38 self.provider = RACProvider.objects.get(application=self.application)
Resolve self.provider and self.application. *.DoesNotExist Exceptions cause a normal AccessDenied view to be shown. An Http404 exception is not caught, and will return directly
def
get( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
40 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 41 """Start flow planner for RAC provider""" 42 planner = FlowPlanner(self.provider.authorization_flow) 43 planner.allow_empty_flows = True 44 try: 45 plan = planner.plan( 46 self.request, 47 { 48 PLAN_CONTEXT_APPLICATION: self.application, 49 }, 50 ) 51 except FlowNonApplicableException: 52 raise Http404 from None 53 plan.append_stage( 54 in_memory_stage( 55 RACFinalStage, 56 application=self.application, 57 endpoint=self.endpoint, 58 provider=self.provider, 59 ) 60 ) 61 return plan.to_redirect(request, self.provider.authorization_flow)
Start flow planner for RAC provider
64class RACInterface(InterfaceView): 65 """Start RAC connection""" 66 67 template_name = "if/rac.html" 68 token: ConnectionToken 69 70 def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse: 71 # Early sanity check to ensure token still exists 72 token = ConnectionToken.objects.filter( 73 token=self.kwargs["token"], 74 session__session__session_key=request.session.session_key, 75 ).first() 76 if not token: 77 return redirect("authentik_core:if-user") 78 self.token = token 79 return super().dispatch(request, *args, **kwargs) 80 81 def get_context_data(self, **kwargs: Any) -> dict[str, Any]: 82 kwargs["token"] = self.token 83 return super().get_context_data(**kwargs)
Start RAC connection
def
dispatch( self, request: django.http.request.HttpRequest, *args: Any, **kwargs: Any) -> django.http.response.HttpResponse:
70 def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse: 71 # Early sanity check to ensure token still exists 72 token = ConnectionToken.objects.filter( 73 token=self.kwargs["token"], 74 session__session__session_key=request.session.session_key, 75 ).first() 76 if not token: 77 return redirect("authentik_core:if-user") 78 self.token = token 79 return super().dispatch(request, *args, **kwargs)
86class RACFinalStage(RedirectStage): 87 """RAC Connection final stage, set the connection token in the stage""" 88 89 endpoint: Endpoint 90 provider: RACProvider 91 application: Application 92 93 def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse: 94 self.endpoint = self.executor.current_stage.endpoint 95 self.provider = self.executor.current_stage.provider 96 self.application = self.executor.current_stage.application 97 # Check policies bound to endpoint directly 98 engine = PolicyEngine(self.endpoint, self.request.user, self.request) 99 engine.use_cache = False 100 engine.build() 101 passing = engine.result 102 if not passing.passing: 103 return self.executor.stage_invalid(", ".join(passing.messages)) 104 # Check if we're already at the maximum connection limit 105 all_tokens = ConnectionToken.objects.filter( 106 endpoint=self.endpoint, 107 ) 108 if self.endpoint.maximum_connections > -1: 109 if all_tokens.count() >= self.endpoint.maximum_connections: 110 msg = [_("Maximum connection limit reached.")] 111 # Check if any other tokens exist for the current user, and inform them 112 # they are already connected 113 if all_tokens.filter(session__user=self.request.user).exists(): 114 msg.append(_("(You are already connected in another tab/window)")) 115 return self.executor.stage_invalid(" ".join(msg)) 116 return super().dispatch(request, *args, **kwargs) 117 118 def get_challenge(self, *args, **kwargs) -> RedirectChallenge: 119 settings = self.executor.plan.context.get(PLAN_CONNECTION_SETTINGS) 120 if not settings: 121 settings = self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {}).get( 122 PLAN_CONNECTION_SETTINGS 123 ) 124 token = ConnectionToken.objects.create( 125 provider=self.provider, 126 endpoint=self.endpoint, 127 settings=settings or {}, 128 session=self.request.session["authenticatedsession"], 129 expires=now() + timedelta_from_string(self.provider.connection_expiry), 130 expiring=True, 131 ) 132 Event.new( 133 EventAction.AUTHORIZE_APPLICATION, 134 authorized_application=self.application, 135 flow=self.executor.plan.flow_pk, 136 endpoint=self.endpoint.name, 137 ).from_http(self.request) 138 self.executor.current_stage.destination = self.request.build_absolute_uri( 139 reverse("authentik_providers_rac:if-rac", kwargs={"token": str(token.token)}) 140 ) 141 return super().get_challenge(*args, **kwargs)
RAC Connection final stage, set the connection token in the stage
endpoint: authentik.providers.rac.models.Endpoint
application: authentik.core.models.Application
def
dispatch( self, request: django.http.request.HttpRequest, *args: Any, **kwargs: Any) -> django.http.response.HttpResponse:
93 def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse: 94 self.endpoint = self.executor.current_stage.endpoint 95 self.provider = self.executor.current_stage.provider 96 self.application = self.executor.current_stage.application 97 # Check policies bound to endpoint directly 98 engine = PolicyEngine(self.endpoint, self.request.user, self.request) 99 engine.use_cache = False 100 engine.build() 101 passing = engine.result 102 if not passing.passing: 103 return self.executor.stage_invalid(", ".join(passing.messages)) 104 # Check if we're already at the maximum connection limit 105 all_tokens = ConnectionToken.objects.filter( 106 endpoint=self.endpoint, 107 ) 108 if self.endpoint.maximum_connections > -1: 109 if all_tokens.count() >= self.endpoint.maximum_connections: 110 msg = [_("Maximum connection limit reached.")] 111 # Check if any other tokens exist for the current user, and inform them 112 # they are already connected 113 if all_tokens.filter(session__user=self.request.user).exists(): 114 msg.append(_("(You are already connected in another tab/window)")) 115 return self.executor.stage_invalid(" ".join(msg)) 116 return super().dispatch(request, *args, **kwargs)
118 def get_challenge(self, *args, **kwargs) -> RedirectChallenge: 119 settings = self.executor.plan.context.get(PLAN_CONNECTION_SETTINGS) 120 if not settings: 121 settings = self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {}).get( 122 PLAN_CONNECTION_SETTINGS 123 ) 124 token = ConnectionToken.objects.create( 125 provider=self.provider, 126 endpoint=self.endpoint, 127 settings=settings or {}, 128 session=self.request.session["authenticatedsession"], 129 expires=now() + timedelta_from_string(self.provider.connection_expiry), 130 expiring=True, 131 ) 132 Event.new( 133 EventAction.AUTHORIZE_APPLICATION, 134 authorized_application=self.application, 135 flow=self.executor.plan.flow_pk, 136 endpoint=self.endpoint.name, 137 ).from_http(self.request) 138 self.executor.current_stage.destination = self.request.build_absolute_uri( 139 reverse("authentik_providers_rac:if-rac", kwargs={"token": str(token.token)}) 140 ) 141 return super().get_challenge(*args, **kwargs)
Return the challenge that the client should solve