authentik.enterprise.providers.ws_federation.views
1from django.http import Http404, HttpRequest, HttpResponse 2from django.shortcuts import get_object_or_404, redirect 3from django.urls import reverse 4from django.utils.translation import gettext as _ 5from django.views import View 6from structlog.stdlib import get_logger 7 8from authentik.core.models import Application, AuthenticatedSession 9from authentik.enterprise.providers.ws_federation.models import WSFederationProvider 10from authentik.enterprise.providers.ws_federation.processors.constants import ( 11 WS_FED_ACTION_SIGN_IN, 12 WS_FED_ACTION_SIGN_OUT, 13) 14from authentik.enterprise.providers.ws_federation.processors.sign_in import ( 15 SignInProcessor, 16 SignInRequest, 17) 18from authentik.enterprise.providers.ws_federation.processors.sign_out import SignOutRequest 19from authentik.flows.challenge import ( 20 PLAN_CONTEXT_TITLE, 21 AutosubmitChallenge, 22 AutoSubmitChallengeResponse, 23) 24from authentik.flows.exceptions import FlowNonApplicableException 25from authentik.flows.models import in_memory_stage 26from authentik.flows.planner import PLAN_CONTEXT_APPLICATION, PLAN_CONTEXT_SSO, FlowPlanner 27from authentik.flows.stage import ChallengeStageView, SessionEndStage 28from authentik.lib.views import bad_request_message 29from authentik.policies.views import PolicyAccessView, RequestValidationError 30from authentik.providers.saml.models import SAMLSession 31from authentik.stages.consent.stage import ( 32 PLAN_CONTEXT_CONSENT_HEADER, 33 PLAN_CONTEXT_CONSENT_PERMISSIONS, 34) 35 36PLAN_CONTEXT_WS_FED_REQUEST = "authentik/providers/ws_federation/request" 37LOGGER = get_logger() 38 39 40class WSFedEntryView(PolicyAccessView): 41 req: SignInRequest | SignOutRequest 42 43 def pre_permission_check(self): 44 self.action = self.request.GET.get("wa") 45 try: 46 if self.action == WS_FED_ACTION_SIGN_IN: 47 self.req = SignInRequest.parse(self.request) 48 elif self.action == WS_FED_ACTION_SIGN_OUT: 49 self.req = SignOutRequest.parse(self.request) 50 else: 51 raise RequestValidationError( 52 bad_request_message(self.request, "Invalid WS-Federation action") 53 ) 54 except ValueError as exc: 55 LOGGER.warning("Invalid WS-Fed request", exc=exc) 56 raise RequestValidationError( 57 bad_request_message(self.request, "Invalid WS-Federation request") 58 ) from None 59 60 def resolve_provider_application(self): 61 self.application, self.provider = self.req.get_app_provider() 62 63 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 64 if self.action == WS_FED_ACTION_SIGN_IN: 65 return self.ws_fed_sign_in() 66 elif self.action == WS_FED_ACTION_SIGN_OUT: 67 return self.ws_fed_sign_out() 68 else: 69 return HttpResponse("Unsupported WS-Federation action", status=400) 70 71 def ws_fed_sign_in(self) -> HttpResponse: 72 planner = FlowPlanner(self.provider.authorization_flow) 73 planner.allow_empty_flows = True 74 try: 75 plan = planner.plan( 76 self.request, 77 { 78 PLAN_CONTEXT_SSO: True, 79 PLAN_CONTEXT_APPLICATION: self.application, 80 PLAN_CONTEXT_CONSENT_HEADER: _("You're about to sign into %(application)s.") 81 % {"application": self.application.name}, 82 PLAN_CONTEXT_CONSENT_PERMISSIONS: [], 83 PLAN_CONTEXT_WS_FED_REQUEST: self.req, 84 }, 85 ) 86 except FlowNonApplicableException: 87 raise Http404 from None 88 plan.append_stage(in_memory_stage(WSFedFlowFinalView)) 89 return plan.to_redirect( 90 self.request, 91 self.provider.authorization_flow, 92 ) 93 94 def ws_fed_sign_out(self) -> HttpResponse: 95 flow = self.provider.invalidation_flow or self.request.brand.flow_invalidation 96 97 planner = FlowPlanner(flow) 98 planner.allow_empty_flows = True 99 try: 100 plan = planner.plan( 101 self.request, 102 { 103 PLAN_CONTEXT_SSO: True, 104 PLAN_CONTEXT_APPLICATION: self.application, 105 PLAN_CONTEXT_WS_FED_REQUEST: self.req, 106 }, 107 ) 108 except FlowNonApplicableException: 109 raise Http404 from None 110 plan.append_stage(in_memory_stage(SessionEndStage)) 111 return plan.to_redirect(self.request, flow) 112 113 114class WSFedFlowFinalView(ChallengeStageView): 115 response_class = AutoSubmitChallengeResponse 116 117 def get(self, request, *args, **kwargs): 118 if PLAN_CONTEXT_WS_FED_REQUEST not in self.executor.plan.context: 119 self.logger.warning("No WS-Fed request in context") 120 return self.executor.stage_invalid() 121 return super().get(request, *args, **kwargs) 122 123 def get_challenge(self, *args, **kwargs): 124 application: Application = self.executor.plan.context[PLAN_CONTEXT_APPLICATION] 125 provider: WSFederationProvider = get_object_or_404( 126 WSFederationProvider, pk=application.provider_id 127 ) 128 sign_in_req: SignInRequest = self.executor.plan.context[PLAN_CONTEXT_WS_FED_REQUEST] 129 proc = SignInProcessor(provider, self.request, sign_in_req) 130 response = proc.response() 131 saml_processor = proc.saml_processor 132 133 # Create SAMLSession to track this login 134 auth_session = AuthenticatedSession.from_request(self.request, self.request.user) 135 if auth_session: 136 # Since samlsessions should only exist uniquely for an active session and a provider 137 # any existing combination is likely an old, dead session 138 SAMLSession.objects.filter( 139 session_index=saml_processor.session_index, provider=provider 140 ).delete() 141 142 SAMLSession.objects.update_or_create( 143 session_index=saml_processor.session_index, 144 provider=provider, 145 defaults={ 146 "user": self.request.user, 147 "session": auth_session, 148 "name_id": saml_processor.name_id, 149 "name_id_format": saml_processor.name_id_format, 150 "expires": saml_processor.session_not_on_or_after_datetime, 151 "expiring": True, 152 }, 153 ) 154 return AutosubmitChallenge( 155 data={ 156 "component": "ak-stage-autosubmit", 157 "title": self.executor.plan.context.get( 158 PLAN_CONTEXT_TITLE, 159 _("Redirecting to {app}...".format_map({"app": application.name})), 160 ), 161 "url": sign_in_req.wreply, 162 "attrs": response, 163 }, 164 ) 165 166 167class MetadataDownload(View): 168 """Redirect to metadata download""" 169 170 def dispatch(self, request: HttpRequest, application_slug: str) -> HttpResponse: 171 app = Application.objects.filter(slug=application_slug).with_provider().first() 172 if not app: 173 raise Http404 174 provider = app.get_provider() 175 if not provider: 176 raise Http404 177 return redirect( 178 reverse( 179 "authentik_api:wsfederationprovider-metadata", 180 kwargs={ 181 "pk": provider.pk, 182 }, 183 ) 184 + "?download" 185 )
PLAN_CONTEXT_WS_FED_REQUEST =
'authentik/providers/ws_federation/request'
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
41class WSFedEntryView(PolicyAccessView): 42 req: SignInRequest | SignOutRequest 43 44 def pre_permission_check(self): 45 self.action = self.request.GET.get("wa") 46 try: 47 if self.action == WS_FED_ACTION_SIGN_IN: 48 self.req = SignInRequest.parse(self.request) 49 elif self.action == WS_FED_ACTION_SIGN_OUT: 50 self.req = SignOutRequest.parse(self.request) 51 else: 52 raise RequestValidationError( 53 bad_request_message(self.request, "Invalid WS-Federation action") 54 ) 55 except ValueError as exc: 56 LOGGER.warning("Invalid WS-Fed request", exc=exc) 57 raise RequestValidationError( 58 bad_request_message(self.request, "Invalid WS-Federation request") 59 ) from None 60 61 def resolve_provider_application(self): 62 self.application, self.provider = self.req.get_app_provider() 63 64 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 65 if self.action == WS_FED_ACTION_SIGN_IN: 66 return self.ws_fed_sign_in() 67 elif self.action == WS_FED_ACTION_SIGN_OUT: 68 return self.ws_fed_sign_out() 69 else: 70 return HttpResponse("Unsupported WS-Federation action", status=400) 71 72 def ws_fed_sign_in(self) -> HttpResponse: 73 planner = FlowPlanner(self.provider.authorization_flow) 74 planner.allow_empty_flows = True 75 try: 76 plan = planner.plan( 77 self.request, 78 { 79 PLAN_CONTEXT_SSO: True, 80 PLAN_CONTEXT_APPLICATION: self.application, 81 PLAN_CONTEXT_CONSENT_HEADER: _("You're about to sign into %(application)s.") 82 % {"application": self.application.name}, 83 PLAN_CONTEXT_CONSENT_PERMISSIONS: [], 84 PLAN_CONTEXT_WS_FED_REQUEST: self.req, 85 }, 86 ) 87 except FlowNonApplicableException: 88 raise Http404 from None 89 plan.append_stage(in_memory_stage(WSFedFlowFinalView)) 90 return plan.to_redirect( 91 self.request, 92 self.provider.authorization_flow, 93 ) 94 95 def ws_fed_sign_out(self) -> HttpResponse: 96 flow = self.provider.invalidation_flow or self.request.brand.flow_invalidation 97 98 planner = FlowPlanner(flow) 99 planner.allow_empty_flows = True 100 try: 101 plan = planner.plan( 102 self.request, 103 { 104 PLAN_CONTEXT_SSO: True, 105 PLAN_CONTEXT_APPLICATION: self.application, 106 PLAN_CONTEXT_WS_FED_REQUEST: self.req, 107 }, 108 ) 109 except FlowNonApplicableException: 110 raise Http404 from None 111 plan.append_stage(in_memory_stage(SessionEndStage)) 112 return plan.to_redirect(self.request, flow)
Mixin class for usage in Authorization views. Provider functions to check application access, etc
req: authentik.enterprise.providers.ws_federation.processors.sign_in.SignInRequest | authentik.enterprise.providers.ws_federation.processors.sign_out.SignOutRequest
def
pre_permission_check(self):
44 def pre_permission_check(self): 45 self.action = self.request.GET.get("wa") 46 try: 47 if self.action == WS_FED_ACTION_SIGN_IN: 48 self.req = SignInRequest.parse(self.request) 49 elif self.action == WS_FED_ACTION_SIGN_OUT: 50 self.req = SignOutRequest.parse(self.request) 51 else: 52 raise RequestValidationError( 53 bad_request_message(self.request, "Invalid WS-Federation action") 54 ) 55 except ValueError as exc: 56 LOGGER.warning("Invalid WS-Fed request", exc=exc) 57 raise RequestValidationError( 58 bad_request_message(self.request, "Invalid WS-Federation request") 59 ) from None
Optionally hook in before permission check to check if a request is valid.
Can raise RequestValidationError to return a response.
def
resolve_provider_application(self):
61 def resolve_provider_application(self): 62 self.application, self.provider = self.req.get_app_provider()
Resolve self.provider and self.application. *.DoesNotExist Exceptions cause a normal AccessDenied view to be shown. An Http404 exception is not caught, and will return directly
def
get( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
64 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 65 if self.action == WS_FED_ACTION_SIGN_IN: 66 return self.ws_fed_sign_in() 67 elif self.action == WS_FED_ACTION_SIGN_OUT: 68 return self.ws_fed_sign_out() 69 else: 70 return HttpResponse("Unsupported WS-Federation action", status=400)
def
ws_fed_sign_in(self) -> django.http.response.HttpResponse:
72 def ws_fed_sign_in(self) -> HttpResponse: 73 planner = FlowPlanner(self.provider.authorization_flow) 74 planner.allow_empty_flows = True 75 try: 76 plan = planner.plan( 77 self.request, 78 { 79 PLAN_CONTEXT_SSO: True, 80 PLAN_CONTEXT_APPLICATION: self.application, 81 PLAN_CONTEXT_CONSENT_HEADER: _("You're about to sign into %(application)s.") 82 % {"application": self.application.name}, 83 PLAN_CONTEXT_CONSENT_PERMISSIONS: [], 84 PLAN_CONTEXT_WS_FED_REQUEST: self.req, 85 }, 86 ) 87 except FlowNonApplicableException: 88 raise Http404 from None 89 plan.append_stage(in_memory_stage(WSFedFlowFinalView)) 90 return plan.to_redirect( 91 self.request, 92 self.provider.authorization_flow, 93 )
def
ws_fed_sign_out(self) -> django.http.response.HttpResponse:
95 def ws_fed_sign_out(self) -> HttpResponse: 96 flow = self.provider.invalidation_flow or self.request.brand.flow_invalidation 97 98 planner = FlowPlanner(flow) 99 planner.allow_empty_flows = True 100 try: 101 plan = planner.plan( 102 self.request, 103 { 104 PLAN_CONTEXT_SSO: True, 105 PLAN_CONTEXT_APPLICATION: self.application, 106 PLAN_CONTEXT_WS_FED_REQUEST: self.req, 107 }, 108 ) 109 except FlowNonApplicableException: 110 raise Http404 from None 111 plan.append_stage(in_memory_stage(SessionEndStage)) 112 return plan.to_redirect(self.request, flow)
115class WSFedFlowFinalView(ChallengeStageView): 116 response_class = AutoSubmitChallengeResponse 117 118 def get(self, request, *args, **kwargs): 119 if PLAN_CONTEXT_WS_FED_REQUEST not in self.executor.plan.context: 120 self.logger.warning("No WS-Fed request in context") 121 return self.executor.stage_invalid() 122 return super().get(request, *args, **kwargs) 123 124 def get_challenge(self, *args, **kwargs): 125 application: Application = self.executor.plan.context[PLAN_CONTEXT_APPLICATION] 126 provider: WSFederationProvider = get_object_or_404( 127 WSFederationProvider, pk=application.provider_id 128 ) 129 sign_in_req: SignInRequest = self.executor.plan.context[PLAN_CONTEXT_WS_FED_REQUEST] 130 proc = SignInProcessor(provider, self.request, sign_in_req) 131 response = proc.response() 132 saml_processor = proc.saml_processor 133 134 # Create SAMLSession to track this login 135 auth_session = AuthenticatedSession.from_request(self.request, self.request.user) 136 if auth_session: 137 # Since samlsessions should only exist uniquely for an active session and a provider 138 # any existing combination is likely an old, dead session 139 SAMLSession.objects.filter( 140 session_index=saml_processor.session_index, provider=provider 141 ).delete() 142 143 SAMLSession.objects.update_or_create( 144 session_index=saml_processor.session_index, 145 provider=provider, 146 defaults={ 147 "user": self.request.user, 148 "session": auth_session, 149 "name_id": saml_processor.name_id, 150 "name_id_format": saml_processor.name_id_format, 151 "expires": saml_processor.session_not_on_or_after_datetime, 152 "expiring": True, 153 }, 154 ) 155 return AutosubmitChallenge( 156 data={ 157 "component": "ak-stage-autosubmit", 158 "title": self.executor.plan.context.get( 159 PLAN_CONTEXT_TITLE, 160 _("Redirecting to {app}...".format_map({"app": application.name})), 161 ), 162 "url": sign_in_req.wreply, 163 "attrs": response, 164 }, 165 )
Stage view which response with a challenge
response_class =
<class 'authentik.flows.challenge.AutoSubmitChallengeResponse'>
def
get(self, request, *args, **kwargs):
118 def get(self, request, *args, **kwargs): 119 if PLAN_CONTEXT_WS_FED_REQUEST not in self.executor.plan.context: 120 self.logger.warning("No WS-Fed request in context") 121 return self.executor.stage_invalid() 122 return super().get(request, *args, **kwargs)
Return a challenge for the frontend to solve
def
get_challenge(self, *args, **kwargs):
124 def get_challenge(self, *args, **kwargs): 125 application: Application = self.executor.plan.context[PLAN_CONTEXT_APPLICATION] 126 provider: WSFederationProvider = get_object_or_404( 127 WSFederationProvider, pk=application.provider_id 128 ) 129 sign_in_req: SignInRequest = self.executor.plan.context[PLAN_CONTEXT_WS_FED_REQUEST] 130 proc = SignInProcessor(provider, self.request, sign_in_req) 131 response = proc.response() 132 saml_processor = proc.saml_processor 133 134 # Create SAMLSession to track this login 135 auth_session = AuthenticatedSession.from_request(self.request, self.request.user) 136 if auth_session: 137 # Since samlsessions should only exist uniquely for an active session and a provider 138 # any existing combination is likely an old, dead session 139 SAMLSession.objects.filter( 140 session_index=saml_processor.session_index, provider=provider 141 ).delete() 142 143 SAMLSession.objects.update_or_create( 144 session_index=saml_processor.session_index, 145 provider=provider, 146 defaults={ 147 "user": self.request.user, 148 "session": auth_session, 149 "name_id": saml_processor.name_id, 150 "name_id_format": saml_processor.name_id_format, 151 "expires": saml_processor.session_not_on_or_after_datetime, 152 "expiring": True, 153 }, 154 ) 155 return AutosubmitChallenge( 156 data={ 157 "component": "ak-stage-autosubmit", 158 "title": self.executor.plan.context.get( 159 PLAN_CONTEXT_TITLE, 160 _("Redirecting to {app}...".format_map({"app": application.name})), 161 ), 162 "url": sign_in_req.wreply, 163 "attrs": response, 164 }, 165 )
Return the challenge that the client should solve
class
MetadataDownload(django.views.generic.base.View):
168class MetadataDownload(View): 169 """Redirect to metadata download""" 170 171 def dispatch(self, request: HttpRequest, application_slug: str) -> HttpResponse: 172 app = Application.objects.filter(slug=application_slug).with_provider().first() 173 if not app: 174 raise Http404 175 provider = app.get_provider() 176 if not provider: 177 raise Http404 178 return redirect( 179 reverse( 180 "authentik_api:wsfederationprovider-metadata", 181 kwargs={ 182 "pk": provider.pk, 183 }, 184 ) 185 + "?download" 186 )
Redirect to metadata download
def
dispatch( self, request: django.http.request.HttpRequest, application_slug: str) -> django.http.response.HttpResponse:
171 def dispatch(self, request: HttpRequest, application_slug: str) -> HttpResponse: 172 app = Application.objects.filter(slug=application_slug).with_provider().first() 173 if not app: 174 raise Http404 175 provider = app.get_provider() 176 if not provider: 177 raise Http404 178 return redirect( 179 reverse( 180 "authentik_api:wsfederationprovider-metadata", 181 kwargs={ 182 "pk": provider.pk, 183 }, 184 ) 185 + "?download" 186 )