authentik.sources.saml.views
saml sp views
1"""saml sp views""" 2 3from urllib.parse import parse_qsl, urlparse, urlunparse 4 5from django.contrib.auth import logout 6from django.contrib.auth.mixins import LoginRequiredMixin 7from django.core.exceptions import SuspiciousOperation 8from django.http import Http404, HttpRequest, HttpResponse 9from django.http.response import HttpResponseBadRequest 10from django.shortcuts import get_object_or_404, redirect 11from django.utils.decorators import method_decorator 12from django.utils.http import urlencode 13from django.utils.translation import gettext as _ 14from django.views import View 15from django.views.decorators.csrf import csrf_exempt 16from structlog.stdlib import get_logger 17from xmlsec import InternalError, VerificationError 18 19from authentik.flows.challenge import ( 20 PLAN_CONTEXT_ATTRS, 21 PLAN_CONTEXT_TITLE, 22 PLAN_CONTEXT_URL, 23 AutosubmitChallenge, 24 Challenge, 25 ChallengeResponse, 26) 27from authentik.flows.exceptions import FlowNonApplicableException 28from authentik.flows.models import in_memory_stage 29from authentik.flows.planner import ( 30 PLAN_CONTEXT_REDIRECT, 31 PLAN_CONTEXT_SOURCE, 32 PLAN_CONTEXT_SSO, 33 FlowPlan, 34 FlowPlanner, 35) 36from authentik.flows.stage import ChallengeStageView 37from authentik.flows.views.executor import NEXT_ARG_NAME, SESSION_KEY_GET, SESSION_KEY_PLAN 38from authentik.lib.views import bad_request_message 39from authentik.providers.saml.utils.encoding import nice64 40from authentik.sources.saml.exceptions import ( 41 InvalidEncryption, 42 InvalidSignature, 43 MismatchedRequestID, 44 MissingSAMLResponse, 45 UnsupportedNameIDFormat, 46) 47from authentik.sources.saml.models import SAMLBindingTypes, SAMLSource 48from authentik.sources.saml.processors.metadata import MetadataProcessor 49from authentik.sources.saml.processors.request import RequestProcessor 50from authentik.sources.saml.processors.response import ResponseProcessor 51from authentik.stages.consent.stage import PLAN_CONTEXT_CONSENT_HEADER, ConsentStageView 52 53LOGGER = get_logger() 54 55 56class AutosubmitStageView(ChallengeStageView): 57 """Wrapper stage to create an autosubmit challenge from plan context variables""" 58 59 def get_challenge(self, *args, **kwargs) -> Challenge: 60 return AutosubmitChallenge( 61 data={ 62 "component": "ak-stage-autosubmit", 63 "title": self.executor.plan.context.get(PLAN_CONTEXT_TITLE, ""), 64 "url": self.executor.plan.context.get(PLAN_CONTEXT_URL, ""), 65 "attrs": self.executor.plan.context.get(PLAN_CONTEXT_ATTRS, ""), 66 }, 67 ) 68 69 # Since `ak-stage-autosubmit` redirects off site, we don't have anything to check 70 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: 71 return HttpResponseBadRequest() 72 73 74class InitiateView(View): 75 """Get the Form with SAML Request, which sends us to the IDP""" 76 77 def handle_login_flow(self, source: SAMLSource, *stages_to_append, **kwargs) -> HttpResponse: 78 """Prepare Authentication Plan, redirect user FlowExecutor""" 79 # Ensure redirect is carried through when user was trying to 80 # authorize application 81 final_redirect = self.request.session.get(SESSION_KEY_GET, {}).get( 82 NEXT_ARG_NAME, "authentik_core:if-user" 83 ) 84 kwargs.update( 85 { 86 PLAN_CONTEXT_SSO: True, 87 PLAN_CONTEXT_SOURCE: source, 88 PLAN_CONTEXT_REDIRECT: final_redirect, 89 } 90 ) 91 # We run the Flow planner here so we can pass the Pending user in the context 92 planner = FlowPlanner(source.pre_authentication_flow) 93 planner.allow_empty_flows = True 94 try: 95 plan = planner.plan(self.request, kwargs) 96 except FlowNonApplicableException: 97 raise Http404 from None 98 for stage in stages_to_append: 99 plan.append_stage(stage) 100 return plan.to_redirect(self.request, source.pre_authentication_flow) 101 102 def get(self, request: HttpRequest, source_slug: str) -> HttpResponse: 103 """Replies with an XHTML SSO Request.""" 104 source: SAMLSource = get_object_or_404(SAMLSource, slug=source_slug) 105 if not source.enabled: 106 raise Http404 107 relay_state = request.GET.get("next", "") 108 auth_n_req = RequestProcessor(source, request, relay_state) 109 # If the source is configured for Redirect bindings, we can just redirect there 110 if source.binding_type == SAMLBindingTypes.REDIRECT: 111 # Parse the initial SSO URL 112 sso_url = urlparse(source.sso_url) 113 # Parse the querystring into a dict... 114 url_kwargs = dict(parse_qsl(sso_url.query)) 115 # ... and update it with the SAML args 116 url_kwargs.update(auth_n_req.build_auth_n_detached()) 117 # Update the url 118 final_url = urlunparse(sso_url._replace(query=urlencode(url_kwargs))) 119 return redirect(final_url) 120 # As POST Binding we show a form 121 try: 122 saml_request = nice64(auth_n_req.build_auth_n()) 123 except InternalError as exc: 124 LOGGER.warning(str(exc)) 125 return bad_request_message(request, str(exc)) 126 injected_stages = [] 127 plan_kwargs = { 128 PLAN_CONTEXT_TITLE: f"Redirecting to {source.name}...", 129 PLAN_CONTEXT_ATTRS: { 130 "SAMLRequest": saml_request, 131 "RelayState": relay_state, 132 }, 133 PLAN_CONTEXT_URL: source.sso_url, 134 } 135 # For just POST we add a consent stage, 136 # otherwise we default to POST_AUTO, with direct redirect 137 if source.binding_type == SAMLBindingTypes.POST: 138 injected_stages.append(in_memory_stage(ConsentStageView)) 139 plan_kwargs[PLAN_CONTEXT_CONSENT_HEADER] = _( 140 "Continue to {source_name}".format(source_name=source.name) 141 ) 142 injected_stages.append(in_memory_stage(AutosubmitStageView)) 143 return self.handle_login_flow( 144 source, 145 *injected_stages, 146 **plan_kwargs, 147 ) 148 149 150@method_decorator(csrf_exempt, name="dispatch") 151class ACSView(View): 152 """AssertionConsumerService, consume assertion and log user in""" 153 154 def post(self, request: HttpRequest, source_slug: str) -> HttpResponse: 155 """Handles a POSTed SSO Assertion and logs the user in.""" 156 source: SAMLSource = get_object_or_404(SAMLSource, slug=source_slug) 157 if not source.enabled: 158 raise Http404 159 processor = ResponseProcessor(source, request) 160 try: 161 processor.parse() 162 except ( 163 InvalidEncryption, 164 InvalidSignature, 165 MismatchedRequestID, 166 MissingSAMLResponse, 167 SuspiciousOperation, 168 VerificationError, 169 ValueError, 170 ) as exc: 171 return bad_request_message(request, str(exc)) 172 173 try: 174 if SESSION_KEY_PLAN in request.session: 175 plan: FlowPlan = self.request.session[SESSION_KEY_PLAN] 176 plan_redirect = plan.context.get(PLAN_CONTEXT_REDIRECT) 177 if plan_redirect: 178 self.request.session[SESSION_KEY_GET] = {NEXT_ARG_NAME: plan_redirect} 179 return processor.prepare_flow_manager().get_flow() 180 except (UnsupportedNameIDFormat, ValueError) as exc: 181 return bad_request_message(request, str(exc)) 182 183 184class SLOView(LoginRequiredMixin, View): 185 """Single-Logout-View""" 186 187 def dispatch(self, request: HttpRequest, source_slug: str) -> HttpResponse: 188 """Log user out and redirect them to the IdP's SLO URL.""" 189 source: SAMLSource = get_object_or_404(SAMLSource, slug=source_slug) 190 if not source.enabled: 191 raise Http404 192 logout(request) 193 return redirect(source.slo_url) 194 195 196class MetadataView(View): 197 """Return XML Metadata for IDP""" 198 199 def dispatch(self, request: HttpRequest, source_slug: str) -> HttpResponse: 200 """Replies with the XML Metadata SPSSODescriptor.""" 201 source: SAMLSource = get_object_or_404(SAMLSource, slug=source_slug) 202 metadata = MetadataProcessor(source, request).build_entity_descriptor() 203 return HttpResponse(metadata, content_type="text/xml")
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
57class AutosubmitStageView(ChallengeStageView): 58 """Wrapper stage to create an autosubmit challenge from plan context variables""" 59 60 def get_challenge(self, *args, **kwargs) -> Challenge: 61 return AutosubmitChallenge( 62 data={ 63 "component": "ak-stage-autosubmit", 64 "title": self.executor.plan.context.get(PLAN_CONTEXT_TITLE, ""), 65 "url": self.executor.plan.context.get(PLAN_CONTEXT_URL, ""), 66 "attrs": self.executor.plan.context.get(PLAN_CONTEXT_ATTRS, ""), 67 }, 68 ) 69 70 # Since `ak-stage-autosubmit` redirects off site, we don't have anything to check 71 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: 72 return HttpResponseBadRequest()
Wrapper stage to create an autosubmit challenge from plan context variables
60 def get_challenge(self, *args, **kwargs) -> Challenge: 61 return AutosubmitChallenge( 62 data={ 63 "component": "ak-stage-autosubmit", 64 "title": self.executor.plan.context.get(PLAN_CONTEXT_TITLE, ""), 65 "url": self.executor.plan.context.get(PLAN_CONTEXT_URL, ""), 66 "attrs": self.executor.plan.context.get(PLAN_CONTEXT_ATTRS, ""), 67 }, 68 )
Return the challenge that the client should solve
def
challenge_valid( self, response: authentik.flows.challenge.ChallengeResponse) -> django.http.response.HttpResponse:
71 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: 72 return HttpResponseBadRequest()
Callback when the challenge has the correct format
class
InitiateView(django.views.generic.base.View):
75class InitiateView(View): 76 """Get the Form with SAML Request, which sends us to the IDP""" 77 78 def handle_login_flow(self, source: SAMLSource, *stages_to_append, **kwargs) -> HttpResponse: 79 """Prepare Authentication Plan, redirect user FlowExecutor""" 80 # Ensure redirect is carried through when user was trying to 81 # authorize application 82 final_redirect = self.request.session.get(SESSION_KEY_GET, {}).get( 83 NEXT_ARG_NAME, "authentik_core:if-user" 84 ) 85 kwargs.update( 86 { 87 PLAN_CONTEXT_SSO: True, 88 PLAN_CONTEXT_SOURCE: source, 89 PLAN_CONTEXT_REDIRECT: final_redirect, 90 } 91 ) 92 # We run the Flow planner here so we can pass the Pending user in the context 93 planner = FlowPlanner(source.pre_authentication_flow) 94 planner.allow_empty_flows = True 95 try: 96 plan = planner.plan(self.request, kwargs) 97 except FlowNonApplicableException: 98 raise Http404 from None 99 for stage in stages_to_append: 100 plan.append_stage(stage) 101 return plan.to_redirect(self.request, source.pre_authentication_flow) 102 103 def get(self, request: HttpRequest, source_slug: str) -> HttpResponse: 104 """Replies with an XHTML SSO Request.""" 105 source: SAMLSource = get_object_or_404(SAMLSource, slug=source_slug) 106 if not source.enabled: 107 raise Http404 108 relay_state = request.GET.get("next", "") 109 auth_n_req = RequestProcessor(source, request, relay_state) 110 # If the source is configured for Redirect bindings, we can just redirect there 111 if source.binding_type == SAMLBindingTypes.REDIRECT: 112 # Parse the initial SSO URL 113 sso_url = urlparse(source.sso_url) 114 # Parse the querystring into a dict... 115 url_kwargs = dict(parse_qsl(sso_url.query)) 116 # ... and update it with the SAML args 117 url_kwargs.update(auth_n_req.build_auth_n_detached()) 118 # Update the url 119 final_url = urlunparse(sso_url._replace(query=urlencode(url_kwargs))) 120 return redirect(final_url) 121 # As POST Binding we show a form 122 try: 123 saml_request = nice64(auth_n_req.build_auth_n()) 124 except InternalError as exc: 125 LOGGER.warning(str(exc)) 126 return bad_request_message(request, str(exc)) 127 injected_stages = [] 128 plan_kwargs = { 129 PLAN_CONTEXT_TITLE: f"Redirecting to {source.name}...", 130 PLAN_CONTEXT_ATTRS: { 131 "SAMLRequest": saml_request, 132 "RelayState": relay_state, 133 }, 134 PLAN_CONTEXT_URL: source.sso_url, 135 } 136 # For just POST we add a consent stage, 137 # otherwise we default to POST_AUTO, with direct redirect 138 if source.binding_type == SAMLBindingTypes.POST: 139 injected_stages.append(in_memory_stage(ConsentStageView)) 140 plan_kwargs[PLAN_CONTEXT_CONSENT_HEADER] = _( 141 "Continue to {source_name}".format(source_name=source.name) 142 ) 143 injected_stages.append(in_memory_stage(AutosubmitStageView)) 144 return self.handle_login_flow( 145 source, 146 *injected_stages, 147 **plan_kwargs, 148 )
Get the Form with SAML Request, which sends us to the IDP
def
handle_login_flow( self, source: authentik.sources.saml.models.SAMLSource, *stages_to_append, **kwargs) -> django.http.response.HttpResponse:
78 def handle_login_flow(self, source: SAMLSource, *stages_to_append, **kwargs) -> HttpResponse: 79 """Prepare Authentication Plan, redirect user FlowExecutor""" 80 # Ensure redirect is carried through when user was trying to 81 # authorize application 82 final_redirect = self.request.session.get(SESSION_KEY_GET, {}).get( 83 NEXT_ARG_NAME, "authentik_core:if-user" 84 ) 85 kwargs.update( 86 { 87 PLAN_CONTEXT_SSO: True, 88 PLAN_CONTEXT_SOURCE: source, 89 PLAN_CONTEXT_REDIRECT: final_redirect, 90 } 91 ) 92 # We run the Flow planner here so we can pass the Pending user in the context 93 planner = FlowPlanner(source.pre_authentication_flow) 94 planner.allow_empty_flows = True 95 try: 96 plan = planner.plan(self.request, kwargs) 97 except FlowNonApplicableException: 98 raise Http404 from None 99 for stage in stages_to_append: 100 plan.append_stage(stage) 101 return plan.to_redirect(self.request, source.pre_authentication_flow)
Prepare Authentication Plan, redirect user FlowExecutor
def
get( self, request: django.http.request.HttpRequest, source_slug: str) -> django.http.response.HttpResponse:
103 def get(self, request: HttpRequest, source_slug: str) -> HttpResponse: 104 """Replies with an XHTML SSO Request.""" 105 source: SAMLSource = get_object_or_404(SAMLSource, slug=source_slug) 106 if not source.enabled: 107 raise Http404 108 relay_state = request.GET.get("next", "") 109 auth_n_req = RequestProcessor(source, request, relay_state) 110 # If the source is configured for Redirect bindings, we can just redirect there 111 if source.binding_type == SAMLBindingTypes.REDIRECT: 112 # Parse the initial SSO URL 113 sso_url = urlparse(source.sso_url) 114 # Parse the querystring into a dict... 115 url_kwargs = dict(parse_qsl(sso_url.query)) 116 # ... and update it with the SAML args 117 url_kwargs.update(auth_n_req.build_auth_n_detached()) 118 # Update the url 119 final_url = urlunparse(sso_url._replace(query=urlencode(url_kwargs))) 120 return redirect(final_url) 121 # As POST Binding we show a form 122 try: 123 saml_request = nice64(auth_n_req.build_auth_n()) 124 except InternalError as exc: 125 LOGGER.warning(str(exc)) 126 return bad_request_message(request, str(exc)) 127 injected_stages = [] 128 plan_kwargs = { 129 PLAN_CONTEXT_TITLE: f"Redirecting to {source.name}...", 130 PLAN_CONTEXT_ATTRS: { 131 "SAMLRequest": saml_request, 132 "RelayState": relay_state, 133 }, 134 PLAN_CONTEXT_URL: source.sso_url, 135 } 136 # For just POST we add a consent stage, 137 # otherwise we default to POST_AUTO, with direct redirect 138 if source.binding_type == SAMLBindingTypes.POST: 139 injected_stages.append(in_memory_stage(ConsentStageView)) 140 plan_kwargs[PLAN_CONTEXT_CONSENT_HEADER] = _( 141 "Continue to {source_name}".format(source_name=source.name) 142 ) 143 injected_stages.append(in_memory_stage(AutosubmitStageView)) 144 return self.handle_login_flow( 145 source, 146 *injected_stages, 147 **plan_kwargs, 148 )
Replies with an XHTML SSO Request.
@method_decorator(csrf_exempt, name='dispatch')
class
ACSView151@method_decorator(csrf_exempt, name="dispatch") 152class ACSView(View): 153 """AssertionConsumerService, consume assertion and log user in""" 154 155 def post(self, request: HttpRequest, source_slug: str) -> HttpResponse: 156 """Handles a POSTed SSO Assertion and logs the user in.""" 157 source: SAMLSource = get_object_or_404(SAMLSource, slug=source_slug) 158 if not source.enabled: 159 raise Http404 160 processor = ResponseProcessor(source, request) 161 try: 162 processor.parse() 163 except ( 164 InvalidEncryption, 165 InvalidSignature, 166 MismatchedRequestID, 167 MissingSAMLResponse, 168 SuspiciousOperation, 169 VerificationError, 170 ValueError, 171 ) as exc: 172 return bad_request_message(request, str(exc)) 173 174 try: 175 if SESSION_KEY_PLAN in request.session: 176 plan: FlowPlan = self.request.session[SESSION_KEY_PLAN] 177 plan_redirect = plan.context.get(PLAN_CONTEXT_REDIRECT) 178 if plan_redirect: 179 self.request.session[SESSION_KEY_GET] = {NEXT_ARG_NAME: plan_redirect} 180 return processor.prepare_flow_manager().get_flow() 181 except (UnsupportedNameIDFormat, ValueError) as exc: 182 return bad_request_message(request, str(exc))
AssertionConsumerService, consume assertion and log user in
def
post( self, request: django.http.request.HttpRequest, source_slug: str) -> django.http.response.HttpResponse:
155 def post(self, request: HttpRequest, source_slug: str) -> HttpResponse: 156 """Handles a POSTed SSO Assertion and logs the user in.""" 157 source: SAMLSource = get_object_or_404(SAMLSource, slug=source_slug) 158 if not source.enabled: 159 raise Http404 160 processor = ResponseProcessor(source, request) 161 try: 162 processor.parse() 163 except ( 164 InvalidEncryption, 165 InvalidSignature, 166 MismatchedRequestID, 167 MissingSAMLResponse, 168 SuspiciousOperation, 169 VerificationError, 170 ValueError, 171 ) as exc: 172 return bad_request_message(request, str(exc)) 173 174 try: 175 if SESSION_KEY_PLAN in request.session: 176 plan: FlowPlan = self.request.session[SESSION_KEY_PLAN] 177 plan_redirect = plan.context.get(PLAN_CONTEXT_REDIRECT) 178 if plan_redirect: 179 self.request.session[SESSION_KEY_GET] = {NEXT_ARG_NAME: plan_redirect} 180 return processor.prepare_flow_manager().get_flow() 181 except (UnsupportedNameIDFormat, ValueError) as exc: 182 return bad_request_message(request, str(exc))
Handles a POSTed SSO Assertion and logs the user in.
def
dispatch(self, request, *args, **kwargs):
135 def dispatch(self, request, *args, **kwargs): 136 # Try to dispatch to the right method; if a method doesn't exist, 137 # defer to the error handler. Also defer to the error handler if the 138 # request method isn't on the approved list. 139 if request.method.lower() in self.http_method_names: 140 handler = getattr( 141 self, request.method.lower(), self.http_method_not_allowed 142 ) 143 else: 144 handler = self.http_method_not_allowed 145 return handler(request, *args, **kwargs)
class
SLOView(django.contrib.auth.mixins.LoginRequiredMixin, django.views.generic.base.View):
185class SLOView(LoginRequiredMixin, View): 186 """Single-Logout-View""" 187 188 def dispatch(self, request: HttpRequest, source_slug: str) -> HttpResponse: 189 """Log user out and redirect them to the IdP's SLO URL.""" 190 source: SAMLSource = get_object_or_404(SAMLSource, slug=source_slug) 191 if not source.enabled: 192 raise Http404 193 logout(request) 194 return redirect(source.slo_url)
Single-Logout-View
def
dispatch( self, request: django.http.request.HttpRequest, source_slug: str) -> django.http.response.HttpResponse:
188 def dispatch(self, request: HttpRequest, source_slug: str) -> HttpResponse: 189 """Log user out and redirect them to the IdP's SLO URL.""" 190 source: SAMLSource = get_object_or_404(SAMLSource, slug=source_slug) 191 if not source.enabled: 192 raise Http404 193 logout(request) 194 return redirect(source.slo_url)
Log user out and redirect them to the IdP's SLO URL.
class
MetadataView(django.views.generic.base.View):
197class MetadataView(View): 198 """Return XML Metadata for IDP""" 199 200 def dispatch(self, request: HttpRequest, source_slug: str) -> HttpResponse: 201 """Replies with the XML Metadata SPSSODescriptor.""" 202 source: SAMLSource = get_object_or_404(SAMLSource, slug=source_slug) 203 metadata = MetadataProcessor(source, request).build_entity_descriptor() 204 return HttpResponse(metadata, content_type="text/xml")
Return XML Metadata for IDP
def
dispatch( self, request: django.http.request.HttpRequest, source_slug: str) -> django.http.response.HttpResponse:
200 def dispatch(self, request: HttpRequest, source_slug: str) -> HttpResponse: 201 """Replies with the XML Metadata SPSSODescriptor.""" 202 source: SAMLSource = get_object_or_404(SAMLSource, slug=source_slug) 203 metadata = MetadataProcessor(source, request).build_entity_descriptor() 204 return HttpResponse(metadata, content_type="text/xml")
Replies with the XML Metadata SPSSODescriptor.