authentik.sources.saml.views

saml sp views

  1"""saml sp views"""
  2
  3from urllib.parse import parse_qsl, urlparse, urlunparse
  4
  5from django.contrib.auth import logout
  6from django.contrib.auth.mixins import LoginRequiredMixin
  7from django.core.exceptions import SuspiciousOperation
  8from django.http import Http404, HttpRequest, HttpResponse
  9from django.http.response import HttpResponseBadRequest
 10from django.shortcuts import get_object_or_404, redirect
 11from django.utils.decorators import method_decorator
 12from django.utils.http import urlencode
 13from django.utils.translation import gettext as _
 14from django.views import View
 15from django.views.decorators.csrf import csrf_exempt
 16from structlog.stdlib import get_logger
 17from xmlsec import InternalError, VerificationError
 18
 19from authentik.flows.challenge import (
 20    PLAN_CONTEXT_ATTRS,
 21    PLAN_CONTEXT_TITLE,
 22    PLAN_CONTEXT_URL,
 23    AutosubmitChallenge,
 24    Challenge,
 25    ChallengeResponse,
 26)
 27from authentik.flows.exceptions import FlowNonApplicableException
 28from authentik.flows.models import in_memory_stage
 29from authentik.flows.planner import (
 30    PLAN_CONTEXT_REDIRECT,
 31    PLAN_CONTEXT_SOURCE,
 32    PLAN_CONTEXT_SSO,
 33    FlowPlan,
 34    FlowPlanner,
 35)
 36from authentik.flows.stage import ChallengeStageView
 37from authentik.flows.views.executor import NEXT_ARG_NAME, SESSION_KEY_GET, SESSION_KEY_PLAN
 38from authentik.lib.views import bad_request_message
 39from authentik.providers.saml.utils.encoding import nice64
 40from authentik.sources.saml.exceptions import (
 41    InvalidEncryption,
 42    InvalidSignature,
 43    MismatchedRequestID,
 44    MissingSAMLResponse,
 45    UnsupportedNameIDFormat,
 46)
 47from authentik.sources.saml.models import SAMLBindingTypes, SAMLSource
 48from authentik.sources.saml.processors.metadata import MetadataProcessor
 49from authentik.sources.saml.processors.request import RequestProcessor
 50from authentik.sources.saml.processors.response import ResponseProcessor
 51from authentik.stages.consent.stage import PLAN_CONTEXT_CONSENT_HEADER, ConsentStageView
 52
 53LOGGER = get_logger()
 54
 55
 56class AutosubmitStageView(ChallengeStageView):
 57    """Wrapper stage to create an autosubmit challenge from plan context variables"""
 58
 59    def get_challenge(self, *args, **kwargs) -> Challenge:
 60        return AutosubmitChallenge(
 61            data={
 62                "component": "ak-stage-autosubmit",
 63                "title": self.executor.plan.context.get(PLAN_CONTEXT_TITLE, ""),
 64                "url": self.executor.plan.context.get(PLAN_CONTEXT_URL, ""),
 65                "attrs": self.executor.plan.context.get(PLAN_CONTEXT_ATTRS, ""),
 66            },
 67        )
 68
 69    # Since `ak-stage-autosubmit` redirects off site, we don't have anything to check
 70    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
 71        return HttpResponseBadRequest()
 72
 73
 74class InitiateView(View):
 75    """Get the Form with SAML Request, which sends us to the IDP"""
 76
 77    def handle_login_flow(self, source: SAMLSource, *stages_to_append, **kwargs) -> HttpResponse:
 78        """Prepare Authentication Plan, redirect user FlowExecutor"""
 79        # Ensure redirect is carried through when user was trying to
 80        # authorize application
 81        final_redirect = self.request.session.get(SESSION_KEY_GET, {}).get(
 82            NEXT_ARG_NAME, "authentik_core:if-user"
 83        )
 84        kwargs.update(
 85            {
 86                PLAN_CONTEXT_SSO: True,
 87                PLAN_CONTEXT_SOURCE: source,
 88                PLAN_CONTEXT_REDIRECT: final_redirect,
 89            }
 90        )
 91        # We run the Flow planner here so we can pass the Pending user in the context
 92        planner = FlowPlanner(source.pre_authentication_flow)
 93        planner.allow_empty_flows = True
 94        try:
 95            plan = planner.plan(self.request, kwargs)
 96        except FlowNonApplicableException:
 97            raise Http404 from None
 98        for stage in stages_to_append:
 99            plan.append_stage(stage)
100        return plan.to_redirect(self.request, source.pre_authentication_flow)
101
102    def get(self, request: HttpRequest, source_slug: str) -> HttpResponse:
103        """Replies with an XHTML SSO Request."""
104        source: SAMLSource = get_object_or_404(SAMLSource, slug=source_slug)
105        if not source.enabled:
106            raise Http404
107        relay_state = request.GET.get("next", "")
108        auth_n_req = RequestProcessor(source, request, relay_state)
109        # If the source is configured for Redirect bindings, we can just redirect there
110        if source.binding_type == SAMLBindingTypes.REDIRECT:
111            # Parse the initial SSO URL
112            sso_url = urlparse(source.sso_url)
113            # Parse the querystring into a dict...
114            url_kwargs = dict(parse_qsl(sso_url.query))
115            # ... and update it with the SAML args
116            url_kwargs.update(auth_n_req.build_auth_n_detached())
117            # Update the url
118            final_url = urlunparse(sso_url._replace(query=urlencode(url_kwargs)))
119            return redirect(final_url)
120        # As POST Binding we show a form
121        try:
122            saml_request = nice64(auth_n_req.build_auth_n())
123        except InternalError as exc:
124            LOGGER.warning(str(exc))
125            return bad_request_message(request, str(exc))
126        injected_stages = []
127        plan_kwargs = {
128            PLAN_CONTEXT_TITLE: f"Redirecting to {source.name}...",
129            PLAN_CONTEXT_ATTRS: {
130                "SAMLRequest": saml_request,
131                "RelayState": relay_state,
132            },
133            PLAN_CONTEXT_URL: source.sso_url,
134        }
135        # For just POST we add a consent stage,
136        # otherwise we default to POST_AUTO, with direct redirect
137        if source.binding_type == SAMLBindingTypes.POST:
138            injected_stages.append(in_memory_stage(ConsentStageView))
139            plan_kwargs[PLAN_CONTEXT_CONSENT_HEADER] = _(
140                "Continue to {source_name}".format(source_name=source.name)
141            )
142        injected_stages.append(in_memory_stage(AutosubmitStageView))
143        return self.handle_login_flow(
144            source,
145            *injected_stages,
146            **plan_kwargs,
147        )
148
149
150@method_decorator(csrf_exempt, name="dispatch")
151class ACSView(View):
152    """AssertionConsumerService, consume assertion and log user in"""
153
154    def post(self, request: HttpRequest, source_slug: str) -> HttpResponse:
155        """Handles a POSTed SSO Assertion and logs the user in."""
156        source: SAMLSource = get_object_or_404(SAMLSource, slug=source_slug)
157        if not source.enabled:
158            raise Http404
159        processor = ResponseProcessor(source, request)
160        try:
161            processor.parse()
162        except (
163            InvalidEncryption,
164            InvalidSignature,
165            MismatchedRequestID,
166            MissingSAMLResponse,
167            SuspiciousOperation,
168            VerificationError,
169            ValueError,
170        ) as exc:
171            return bad_request_message(request, str(exc))
172
173        try:
174            if SESSION_KEY_PLAN in request.session:
175                plan: FlowPlan = self.request.session[SESSION_KEY_PLAN]
176                plan_redirect = plan.context.get(PLAN_CONTEXT_REDIRECT)
177                if plan_redirect:
178                    self.request.session[SESSION_KEY_GET] = {NEXT_ARG_NAME: plan_redirect}
179            return processor.prepare_flow_manager().get_flow()
180        except (UnsupportedNameIDFormat, ValueError) as exc:
181            return bad_request_message(request, str(exc))
182
183
184class SLOView(LoginRequiredMixin, View):
185    """Single-Logout-View"""
186
187    def dispatch(self, request: HttpRequest, source_slug: str) -> HttpResponse:
188        """Log user out and redirect them to the IdP's SLO URL."""
189        source: SAMLSource = get_object_or_404(SAMLSource, slug=source_slug)
190        if not source.enabled:
191            raise Http404
192        logout(request)
193        return redirect(source.slo_url)
194
195
196class MetadataView(View):
197    """Return XML Metadata for IDP"""
198
199    def dispatch(self, request: HttpRequest, source_slug: str) -> HttpResponse:
200        """Replies with the XML Metadata SPSSODescriptor."""
201        source: SAMLSource = get_object_or_404(SAMLSource, slug=source_slug)
202        metadata = MetadataProcessor(source, request).build_entity_descriptor()
203        return HttpResponse(metadata, content_type="text/xml")
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
class AutosubmitStageView(authentik.flows.stage.ChallengeStageView):
57class AutosubmitStageView(ChallengeStageView):
58    """Wrapper stage to create an autosubmit challenge from plan context variables"""
59
60    def get_challenge(self, *args, **kwargs) -> Challenge:
61        return AutosubmitChallenge(
62            data={
63                "component": "ak-stage-autosubmit",
64                "title": self.executor.plan.context.get(PLAN_CONTEXT_TITLE, ""),
65                "url": self.executor.plan.context.get(PLAN_CONTEXT_URL, ""),
66                "attrs": self.executor.plan.context.get(PLAN_CONTEXT_ATTRS, ""),
67            },
68        )
69
70    # Since `ak-stage-autosubmit` redirects off site, we don't have anything to check
71    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
72        return HttpResponseBadRequest()

Wrapper stage to create an autosubmit challenge from plan context variables

def get_challenge(self, *args, **kwargs) -> authentik.flows.challenge.Challenge:
60    def get_challenge(self, *args, **kwargs) -> Challenge:
61        return AutosubmitChallenge(
62            data={
63                "component": "ak-stage-autosubmit",
64                "title": self.executor.plan.context.get(PLAN_CONTEXT_TITLE, ""),
65                "url": self.executor.plan.context.get(PLAN_CONTEXT_URL, ""),
66                "attrs": self.executor.plan.context.get(PLAN_CONTEXT_ATTRS, ""),
67            },
68        )

Return the challenge that the client should solve

def challenge_valid( self, response: authentik.flows.challenge.ChallengeResponse) -> django.http.response.HttpResponse:
71    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
72        return HttpResponseBadRequest()

Callback when the challenge has the correct format

class InitiateView(django.views.generic.base.View):
 75class InitiateView(View):
 76    """Get the Form with SAML Request, which sends us to the IDP"""
 77
 78    def handle_login_flow(self, source: SAMLSource, *stages_to_append, **kwargs) -> HttpResponse:
 79        """Prepare Authentication Plan, redirect user FlowExecutor"""
 80        # Ensure redirect is carried through when user was trying to
 81        # authorize application
 82        final_redirect = self.request.session.get(SESSION_KEY_GET, {}).get(
 83            NEXT_ARG_NAME, "authentik_core:if-user"
 84        )
 85        kwargs.update(
 86            {
 87                PLAN_CONTEXT_SSO: True,
 88                PLAN_CONTEXT_SOURCE: source,
 89                PLAN_CONTEXT_REDIRECT: final_redirect,
 90            }
 91        )
 92        # We run the Flow planner here so we can pass the Pending user in the context
 93        planner = FlowPlanner(source.pre_authentication_flow)
 94        planner.allow_empty_flows = True
 95        try:
 96            plan = planner.plan(self.request, kwargs)
 97        except FlowNonApplicableException:
 98            raise Http404 from None
 99        for stage in stages_to_append:
100            plan.append_stage(stage)
101        return plan.to_redirect(self.request, source.pre_authentication_flow)
102
103    def get(self, request: HttpRequest, source_slug: str) -> HttpResponse:
104        """Replies with an XHTML SSO Request."""
105        source: SAMLSource = get_object_or_404(SAMLSource, slug=source_slug)
106        if not source.enabled:
107            raise Http404
108        relay_state = request.GET.get("next", "")
109        auth_n_req = RequestProcessor(source, request, relay_state)
110        # If the source is configured for Redirect bindings, we can just redirect there
111        if source.binding_type == SAMLBindingTypes.REDIRECT:
112            # Parse the initial SSO URL
113            sso_url = urlparse(source.sso_url)
114            # Parse the querystring into a dict...
115            url_kwargs = dict(parse_qsl(sso_url.query))
116            # ... and update it with the SAML args
117            url_kwargs.update(auth_n_req.build_auth_n_detached())
118            # Update the url
119            final_url = urlunparse(sso_url._replace(query=urlencode(url_kwargs)))
120            return redirect(final_url)
121        # As POST Binding we show a form
122        try:
123            saml_request = nice64(auth_n_req.build_auth_n())
124        except InternalError as exc:
125            LOGGER.warning(str(exc))
126            return bad_request_message(request, str(exc))
127        injected_stages = []
128        plan_kwargs = {
129            PLAN_CONTEXT_TITLE: f"Redirecting to {source.name}...",
130            PLAN_CONTEXT_ATTRS: {
131                "SAMLRequest": saml_request,
132                "RelayState": relay_state,
133            },
134            PLAN_CONTEXT_URL: source.sso_url,
135        }
136        # For just POST we add a consent stage,
137        # otherwise we default to POST_AUTO, with direct redirect
138        if source.binding_type == SAMLBindingTypes.POST:
139            injected_stages.append(in_memory_stage(ConsentStageView))
140            plan_kwargs[PLAN_CONTEXT_CONSENT_HEADER] = _(
141                "Continue to {source_name}".format(source_name=source.name)
142            )
143        injected_stages.append(in_memory_stage(AutosubmitStageView))
144        return self.handle_login_flow(
145            source,
146            *injected_stages,
147            **plan_kwargs,
148        )

Get the Form with SAML Request, which sends us to the IDP

def handle_login_flow( self, source: authentik.sources.saml.models.SAMLSource, *stages_to_append, **kwargs) -> django.http.response.HttpResponse:
 78    def handle_login_flow(self, source: SAMLSource, *stages_to_append, **kwargs) -> HttpResponse:
 79        """Prepare Authentication Plan, redirect user FlowExecutor"""
 80        # Ensure redirect is carried through when user was trying to
 81        # authorize application
 82        final_redirect = self.request.session.get(SESSION_KEY_GET, {}).get(
 83            NEXT_ARG_NAME, "authentik_core:if-user"
 84        )
 85        kwargs.update(
 86            {
 87                PLAN_CONTEXT_SSO: True,
 88                PLAN_CONTEXT_SOURCE: source,
 89                PLAN_CONTEXT_REDIRECT: final_redirect,
 90            }
 91        )
 92        # We run the Flow planner here so we can pass the Pending user in the context
 93        planner = FlowPlanner(source.pre_authentication_flow)
 94        planner.allow_empty_flows = True
 95        try:
 96            plan = planner.plan(self.request, kwargs)
 97        except FlowNonApplicableException:
 98            raise Http404 from None
 99        for stage in stages_to_append:
100            plan.append_stage(stage)
101        return plan.to_redirect(self.request, source.pre_authentication_flow)

Prepare Authentication Plan, redirect user FlowExecutor

def get( self, request: django.http.request.HttpRequest, source_slug: str) -> django.http.response.HttpResponse:
103    def get(self, request: HttpRequest, source_slug: str) -> HttpResponse:
104        """Replies with an XHTML SSO Request."""
105        source: SAMLSource = get_object_or_404(SAMLSource, slug=source_slug)
106        if not source.enabled:
107            raise Http404
108        relay_state = request.GET.get("next", "")
109        auth_n_req = RequestProcessor(source, request, relay_state)
110        # If the source is configured for Redirect bindings, we can just redirect there
111        if source.binding_type == SAMLBindingTypes.REDIRECT:
112            # Parse the initial SSO URL
113            sso_url = urlparse(source.sso_url)
114            # Parse the querystring into a dict...
115            url_kwargs = dict(parse_qsl(sso_url.query))
116            # ... and update it with the SAML args
117            url_kwargs.update(auth_n_req.build_auth_n_detached())
118            # Update the url
119            final_url = urlunparse(sso_url._replace(query=urlencode(url_kwargs)))
120            return redirect(final_url)
121        # As POST Binding we show a form
122        try:
123            saml_request = nice64(auth_n_req.build_auth_n())
124        except InternalError as exc:
125            LOGGER.warning(str(exc))
126            return bad_request_message(request, str(exc))
127        injected_stages = []
128        plan_kwargs = {
129            PLAN_CONTEXT_TITLE: f"Redirecting to {source.name}...",
130            PLAN_CONTEXT_ATTRS: {
131                "SAMLRequest": saml_request,
132                "RelayState": relay_state,
133            },
134            PLAN_CONTEXT_URL: source.sso_url,
135        }
136        # For just POST we add a consent stage,
137        # otherwise we default to POST_AUTO, with direct redirect
138        if source.binding_type == SAMLBindingTypes.POST:
139            injected_stages.append(in_memory_stage(ConsentStageView))
140            plan_kwargs[PLAN_CONTEXT_CONSENT_HEADER] = _(
141                "Continue to {source_name}".format(source_name=source.name)
142            )
143        injected_stages.append(in_memory_stage(AutosubmitStageView))
144        return self.handle_login_flow(
145            source,
146            *injected_stages,
147            **plan_kwargs,
148        )

Replies with an XHTML SSO Request.

@method_decorator(csrf_exempt, name='dispatch')
class ACSView(django.views.generic.base.View):
151@method_decorator(csrf_exempt, name="dispatch")
152class ACSView(View):
153    """AssertionConsumerService, consume assertion and log user in"""
154
155    def post(self, request: HttpRequest, source_slug: str) -> HttpResponse:
156        """Handles a POSTed SSO Assertion and logs the user in."""
157        source: SAMLSource = get_object_or_404(SAMLSource, slug=source_slug)
158        if not source.enabled:
159            raise Http404
160        processor = ResponseProcessor(source, request)
161        try:
162            processor.parse()
163        except (
164            InvalidEncryption,
165            InvalidSignature,
166            MismatchedRequestID,
167            MissingSAMLResponse,
168            SuspiciousOperation,
169            VerificationError,
170            ValueError,
171        ) as exc:
172            return bad_request_message(request, str(exc))
173
174        try:
175            if SESSION_KEY_PLAN in request.session:
176                plan: FlowPlan = self.request.session[SESSION_KEY_PLAN]
177                plan_redirect = plan.context.get(PLAN_CONTEXT_REDIRECT)
178                if plan_redirect:
179                    self.request.session[SESSION_KEY_GET] = {NEXT_ARG_NAME: plan_redirect}
180            return processor.prepare_flow_manager().get_flow()
181        except (UnsupportedNameIDFormat, ValueError) as exc:
182            return bad_request_message(request, str(exc))

AssertionConsumerService, consume assertion and log user in

def post( self, request: django.http.request.HttpRequest, source_slug: str) -> django.http.response.HttpResponse:
155    def post(self, request: HttpRequest, source_slug: str) -> HttpResponse:
156        """Handles a POSTed SSO Assertion and logs the user in."""
157        source: SAMLSource = get_object_or_404(SAMLSource, slug=source_slug)
158        if not source.enabled:
159            raise Http404
160        processor = ResponseProcessor(source, request)
161        try:
162            processor.parse()
163        except (
164            InvalidEncryption,
165            InvalidSignature,
166            MismatchedRequestID,
167            MissingSAMLResponse,
168            SuspiciousOperation,
169            VerificationError,
170            ValueError,
171        ) as exc:
172            return bad_request_message(request, str(exc))
173
174        try:
175            if SESSION_KEY_PLAN in request.session:
176                plan: FlowPlan = self.request.session[SESSION_KEY_PLAN]
177                plan_redirect = plan.context.get(PLAN_CONTEXT_REDIRECT)
178                if plan_redirect:
179                    self.request.session[SESSION_KEY_GET] = {NEXT_ARG_NAME: plan_redirect}
180            return processor.prepare_flow_manager().get_flow()
181        except (UnsupportedNameIDFormat, ValueError) as exc:
182            return bad_request_message(request, str(exc))

Handles a POSTed SSO Assertion and logs the user in.

def dispatch(self, request, *args, **kwargs):
135    def dispatch(self, request, *args, **kwargs):
136        # Try to dispatch to the right method; if a method doesn't exist,
137        # defer to the error handler. Also defer to the error handler if the
138        # request method isn't on the approved list.
139        if request.method.lower() in self.http_method_names:
140            handler = getattr(
141                self, request.method.lower(), self.http_method_not_allowed
142            )
143        else:
144            handler = self.http_method_not_allowed
145        return handler(request, *args, **kwargs)
class SLOView(django.contrib.auth.mixins.LoginRequiredMixin, django.views.generic.base.View):
185class SLOView(LoginRequiredMixin, View):
186    """Single-Logout-View"""
187
188    def dispatch(self, request: HttpRequest, source_slug: str) -> HttpResponse:
189        """Log user out and redirect them to the IdP's SLO URL."""
190        source: SAMLSource = get_object_or_404(SAMLSource, slug=source_slug)
191        if not source.enabled:
192            raise Http404
193        logout(request)
194        return redirect(source.slo_url)

Single-Logout-View

def dispatch( self, request: django.http.request.HttpRequest, source_slug: str) -> django.http.response.HttpResponse:
188    def dispatch(self, request: HttpRequest, source_slug: str) -> HttpResponse:
189        """Log user out and redirect them to the IdP's SLO URL."""
190        source: SAMLSource = get_object_or_404(SAMLSource, slug=source_slug)
191        if not source.enabled:
192            raise Http404
193        logout(request)
194        return redirect(source.slo_url)

Log user out and redirect them to the IdP's SLO URL.

class MetadataView(django.views.generic.base.View):
197class MetadataView(View):
198    """Return XML Metadata for IDP"""
199
200    def dispatch(self, request: HttpRequest, source_slug: str) -> HttpResponse:
201        """Replies with the XML Metadata SPSSODescriptor."""
202        source: SAMLSource = get_object_or_404(SAMLSource, slug=source_slug)
203        metadata = MetadataProcessor(source, request).build_entity_descriptor()
204        return HttpResponse(metadata, content_type="text/xml")

Return XML Metadata for IDP

def dispatch( self, request: django.http.request.HttpRequest, source_slug: str) -> django.http.response.HttpResponse:
200    def dispatch(self, request: HttpRequest, source_slug: str) -> HttpResponse:
201        """Replies with the XML Metadata SPSSODescriptor."""
202        source: SAMLSource = get_object_or_404(SAMLSource, slug=source_slug)
203        metadata = MetadataProcessor(source, request).build_entity_descriptor()
204        return HttpResponse(metadata, content_type="text/xml")

Replies with the XML Metadata SPSSODescriptor.