authentik.enterprise.providers.ws_federation.views

  1from django.http import Http404, HttpRequest, HttpResponse
  2from django.shortcuts import get_object_or_404, redirect
  3from django.urls import reverse
  4from django.utils.translation import gettext as _
  5from django.views import View
  6from structlog.stdlib import get_logger
  7
  8from authentik.core.models import Application, AuthenticatedSession
  9from authentik.enterprise.providers.ws_federation.models import WSFederationProvider
 10from authentik.enterprise.providers.ws_federation.processors.constants import (
 11    WS_FED_ACTION_SIGN_IN,
 12    WS_FED_ACTION_SIGN_OUT,
 13)
 14from authentik.enterprise.providers.ws_federation.processors.sign_in import (
 15    SignInProcessor,
 16    SignInRequest,
 17)
 18from authentik.enterprise.providers.ws_federation.processors.sign_out import SignOutRequest
 19from authentik.flows.challenge import (
 20    PLAN_CONTEXT_TITLE,
 21    AutosubmitChallenge,
 22    AutoSubmitChallengeResponse,
 23)
 24from authentik.flows.exceptions import FlowNonApplicableException
 25from authentik.flows.models import in_memory_stage
 26from authentik.flows.planner import PLAN_CONTEXT_APPLICATION, PLAN_CONTEXT_SSO, FlowPlanner
 27from authentik.flows.stage import ChallengeStageView, SessionEndStage
 28from authentik.lib.views import bad_request_message
 29from authentik.policies.views import PolicyAccessView, RequestValidationError
 30from authentik.providers.saml.models import SAMLSession
 31from authentik.stages.consent.stage import (
 32    PLAN_CONTEXT_CONSENT_HEADER,
 33    PLAN_CONTEXT_CONSENT_PERMISSIONS,
 34)
 35
 36PLAN_CONTEXT_WS_FED_REQUEST = "authentik/providers/ws_federation/request"
 37LOGGER = get_logger()
 38
 39
 40class WSFedEntryView(PolicyAccessView):
 41    req: SignInRequest | SignOutRequest
 42
 43    def pre_permission_check(self):
 44        self.action = self.request.GET.get("wa")
 45        try:
 46            if self.action == WS_FED_ACTION_SIGN_IN:
 47                self.req = SignInRequest.parse(self.request)
 48            elif self.action == WS_FED_ACTION_SIGN_OUT:
 49                self.req = SignOutRequest.parse(self.request)
 50            else:
 51                raise RequestValidationError(
 52                    bad_request_message(self.request, "Invalid WS-Federation action")
 53                )
 54        except ValueError as exc:
 55            LOGGER.warning("Invalid WS-Fed request", exc=exc)
 56            raise RequestValidationError(
 57                bad_request_message(self.request, "Invalid WS-Federation request")
 58            ) from None
 59
 60    def resolve_provider_application(self):
 61        self.application, self.provider = self.req.get_app_provider()
 62
 63    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
 64        if self.action == WS_FED_ACTION_SIGN_IN:
 65            return self.ws_fed_sign_in()
 66        elif self.action == WS_FED_ACTION_SIGN_OUT:
 67            return self.ws_fed_sign_out()
 68        else:
 69            return HttpResponse("Unsupported WS-Federation action", status=400)
 70
 71    def ws_fed_sign_in(self) -> HttpResponse:
 72        planner = FlowPlanner(self.provider.authorization_flow)
 73        planner.allow_empty_flows = True
 74        try:
 75            plan = planner.plan(
 76                self.request,
 77                {
 78                    PLAN_CONTEXT_SSO: True,
 79                    PLAN_CONTEXT_APPLICATION: self.application,
 80                    PLAN_CONTEXT_CONSENT_HEADER: _("You're about to sign into %(application)s.")
 81                    % {"application": self.application.name},
 82                    PLAN_CONTEXT_CONSENT_PERMISSIONS: [],
 83                    PLAN_CONTEXT_WS_FED_REQUEST: self.req,
 84                },
 85            )
 86        except FlowNonApplicableException:
 87            raise Http404 from None
 88        plan.append_stage(in_memory_stage(WSFedFlowFinalView))
 89        return plan.to_redirect(
 90            self.request,
 91            self.provider.authorization_flow,
 92        )
 93
 94    def ws_fed_sign_out(self) -> HttpResponse:
 95        flow = self.provider.invalidation_flow or self.request.brand.flow_invalidation
 96
 97        planner = FlowPlanner(flow)
 98        planner.allow_empty_flows = True
 99        try:
100            plan = planner.plan(
101                self.request,
102                {
103                    PLAN_CONTEXT_SSO: True,
104                    PLAN_CONTEXT_APPLICATION: self.application,
105                    PLAN_CONTEXT_WS_FED_REQUEST: self.req,
106                },
107            )
108        except FlowNonApplicableException:
109            raise Http404 from None
110        plan.append_stage(in_memory_stage(SessionEndStage))
111        return plan.to_redirect(self.request, flow)
112
113
114class WSFedFlowFinalView(ChallengeStageView):
115    response_class = AutoSubmitChallengeResponse
116
117    def get(self, request, *args, **kwargs):
118        if PLAN_CONTEXT_WS_FED_REQUEST not in self.executor.plan.context:
119            self.logger.warning("No WS-Fed request in context")
120            return self.executor.stage_invalid()
121        return super().get(request, *args, **kwargs)
122
123    def get_challenge(self, *args, **kwargs):
124        application: Application = self.executor.plan.context[PLAN_CONTEXT_APPLICATION]
125        provider: WSFederationProvider = get_object_or_404(
126            WSFederationProvider, pk=application.provider_id
127        )
128        sign_in_req: SignInRequest = self.executor.plan.context[PLAN_CONTEXT_WS_FED_REQUEST]
129        proc = SignInProcessor(provider, self.request, sign_in_req)
130        response = proc.response()
131        saml_processor = proc.saml_processor
132
133        # Create SAMLSession to track this login
134        auth_session = AuthenticatedSession.from_request(self.request, self.request.user)
135        if auth_session:
136            # Since samlsessions should only exist uniquely for an active session and a provider
137            # any existing combination is likely an old, dead session
138            SAMLSession.objects.filter(
139                session_index=saml_processor.session_index, provider=provider
140            ).delete()
141
142            SAMLSession.objects.update_or_create(
143                session_index=saml_processor.session_index,
144                provider=provider,
145                defaults={
146                    "user": self.request.user,
147                    "session": auth_session,
148                    "name_id": saml_processor.name_id,
149                    "name_id_format": saml_processor.name_id_format,
150                    "expires": saml_processor.session_not_on_or_after_datetime,
151                    "expiring": True,
152                },
153            )
154        return AutosubmitChallenge(
155            data={
156                "component": "ak-stage-autosubmit",
157                "title": self.executor.plan.context.get(
158                    PLAN_CONTEXT_TITLE,
159                    _("Redirecting to {app}...".format_map({"app": application.name})),
160                ),
161                "url": sign_in_req.wreply,
162                "attrs": response,
163            },
164        )
165
166
167class MetadataDownload(View):
168    """Redirect to metadata download"""
169
170    def dispatch(self, request: HttpRequest, application_slug: str) -> HttpResponse:
171        app = Application.objects.filter(slug=application_slug).with_provider().first()
172        if not app:
173            raise Http404
174        provider = app.get_provider()
175        if not provider:
176            raise Http404
177        return redirect(
178            reverse(
179                "authentik_api:wsfederationprovider-metadata",
180                kwargs={
181                    "pk": provider.pk,
182                },
183            )
184            + "?download"
185        )
PLAN_CONTEXT_WS_FED_REQUEST = 'authentik/providers/ws_federation/request'
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
class WSFedEntryView(authentik.policies.views.PolicyAccessView):
 41class WSFedEntryView(PolicyAccessView):
 42    req: SignInRequest | SignOutRequest
 43
 44    def pre_permission_check(self):
 45        self.action = self.request.GET.get("wa")
 46        try:
 47            if self.action == WS_FED_ACTION_SIGN_IN:
 48                self.req = SignInRequest.parse(self.request)
 49            elif self.action == WS_FED_ACTION_SIGN_OUT:
 50                self.req = SignOutRequest.parse(self.request)
 51            else:
 52                raise RequestValidationError(
 53                    bad_request_message(self.request, "Invalid WS-Federation action")
 54                )
 55        except ValueError as exc:
 56            LOGGER.warning("Invalid WS-Fed request", exc=exc)
 57            raise RequestValidationError(
 58                bad_request_message(self.request, "Invalid WS-Federation request")
 59            ) from None
 60
 61    def resolve_provider_application(self):
 62        self.application, self.provider = self.req.get_app_provider()
 63
 64    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
 65        if self.action == WS_FED_ACTION_SIGN_IN:
 66            return self.ws_fed_sign_in()
 67        elif self.action == WS_FED_ACTION_SIGN_OUT:
 68            return self.ws_fed_sign_out()
 69        else:
 70            return HttpResponse("Unsupported WS-Federation action", status=400)
 71
 72    def ws_fed_sign_in(self) -> HttpResponse:
 73        planner = FlowPlanner(self.provider.authorization_flow)
 74        planner.allow_empty_flows = True
 75        try:
 76            plan = planner.plan(
 77                self.request,
 78                {
 79                    PLAN_CONTEXT_SSO: True,
 80                    PLAN_CONTEXT_APPLICATION: self.application,
 81                    PLAN_CONTEXT_CONSENT_HEADER: _("You're about to sign into %(application)s.")
 82                    % {"application": self.application.name},
 83                    PLAN_CONTEXT_CONSENT_PERMISSIONS: [],
 84                    PLAN_CONTEXT_WS_FED_REQUEST: self.req,
 85                },
 86            )
 87        except FlowNonApplicableException:
 88            raise Http404 from None
 89        plan.append_stage(in_memory_stage(WSFedFlowFinalView))
 90        return plan.to_redirect(
 91            self.request,
 92            self.provider.authorization_flow,
 93        )
 94
 95    def ws_fed_sign_out(self) -> HttpResponse:
 96        flow = self.provider.invalidation_flow or self.request.brand.flow_invalidation
 97
 98        planner = FlowPlanner(flow)
 99        planner.allow_empty_flows = True
100        try:
101            plan = planner.plan(
102                self.request,
103                {
104                    PLAN_CONTEXT_SSO: True,
105                    PLAN_CONTEXT_APPLICATION: self.application,
106                    PLAN_CONTEXT_WS_FED_REQUEST: self.req,
107                },
108            )
109        except FlowNonApplicableException:
110            raise Http404 from None
111        plan.append_stage(in_memory_stage(SessionEndStage))
112        return plan.to_redirect(self.request, flow)

Mixin class for usage in Authorization views. Provider functions to check application access, etc

def pre_permission_check(self):
44    def pre_permission_check(self):
45        self.action = self.request.GET.get("wa")
46        try:
47            if self.action == WS_FED_ACTION_SIGN_IN:
48                self.req = SignInRequest.parse(self.request)
49            elif self.action == WS_FED_ACTION_SIGN_OUT:
50                self.req = SignOutRequest.parse(self.request)
51            else:
52                raise RequestValidationError(
53                    bad_request_message(self.request, "Invalid WS-Federation action")
54                )
55        except ValueError as exc:
56            LOGGER.warning("Invalid WS-Fed request", exc=exc)
57            raise RequestValidationError(
58                bad_request_message(self.request, "Invalid WS-Federation request")
59            ) from None

Optionally hook in before permission check to check if a request is valid. Can raise RequestValidationError to return a response.

def resolve_provider_application(self):
61    def resolve_provider_application(self):
62        self.application, self.provider = self.req.get_app_provider()

Resolve self.provider and self.application. *.DoesNotExist Exceptions cause a normal AccessDenied view to be shown. An Http404 exception is not caught, and will return directly

def get( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
64    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
65        if self.action == WS_FED_ACTION_SIGN_IN:
66            return self.ws_fed_sign_in()
67        elif self.action == WS_FED_ACTION_SIGN_OUT:
68            return self.ws_fed_sign_out()
69        else:
70            return HttpResponse("Unsupported WS-Federation action", status=400)
def ws_fed_sign_in(self) -> django.http.response.HttpResponse:
72    def ws_fed_sign_in(self) -> HttpResponse:
73        planner = FlowPlanner(self.provider.authorization_flow)
74        planner.allow_empty_flows = True
75        try:
76            plan = planner.plan(
77                self.request,
78                {
79                    PLAN_CONTEXT_SSO: True,
80                    PLAN_CONTEXT_APPLICATION: self.application,
81                    PLAN_CONTEXT_CONSENT_HEADER: _("You're about to sign into %(application)s.")
82                    % {"application": self.application.name},
83                    PLAN_CONTEXT_CONSENT_PERMISSIONS: [],
84                    PLAN_CONTEXT_WS_FED_REQUEST: self.req,
85                },
86            )
87        except FlowNonApplicableException:
88            raise Http404 from None
89        plan.append_stage(in_memory_stage(WSFedFlowFinalView))
90        return plan.to_redirect(
91            self.request,
92            self.provider.authorization_flow,
93        )
def ws_fed_sign_out(self) -> django.http.response.HttpResponse:
 95    def ws_fed_sign_out(self) -> HttpResponse:
 96        flow = self.provider.invalidation_flow or self.request.brand.flow_invalidation
 97
 98        planner = FlowPlanner(flow)
 99        planner.allow_empty_flows = True
100        try:
101            plan = planner.plan(
102                self.request,
103                {
104                    PLAN_CONTEXT_SSO: True,
105                    PLAN_CONTEXT_APPLICATION: self.application,
106                    PLAN_CONTEXT_WS_FED_REQUEST: self.req,
107                },
108            )
109        except FlowNonApplicableException:
110            raise Http404 from None
111        plan.append_stage(in_memory_stage(SessionEndStage))
112        return plan.to_redirect(self.request, flow)
class WSFedFlowFinalView(authentik.flows.stage.ChallengeStageView):
115class WSFedFlowFinalView(ChallengeStageView):
116    response_class = AutoSubmitChallengeResponse
117
118    def get(self, request, *args, **kwargs):
119        if PLAN_CONTEXT_WS_FED_REQUEST not in self.executor.plan.context:
120            self.logger.warning("No WS-Fed request in context")
121            return self.executor.stage_invalid()
122        return super().get(request, *args, **kwargs)
123
124    def get_challenge(self, *args, **kwargs):
125        application: Application = self.executor.plan.context[PLAN_CONTEXT_APPLICATION]
126        provider: WSFederationProvider = get_object_or_404(
127            WSFederationProvider, pk=application.provider_id
128        )
129        sign_in_req: SignInRequest = self.executor.plan.context[PLAN_CONTEXT_WS_FED_REQUEST]
130        proc = SignInProcessor(provider, self.request, sign_in_req)
131        response = proc.response()
132        saml_processor = proc.saml_processor
133
134        # Create SAMLSession to track this login
135        auth_session = AuthenticatedSession.from_request(self.request, self.request.user)
136        if auth_session:
137            # Since samlsessions should only exist uniquely for an active session and a provider
138            # any existing combination is likely an old, dead session
139            SAMLSession.objects.filter(
140                session_index=saml_processor.session_index, provider=provider
141            ).delete()
142
143            SAMLSession.objects.update_or_create(
144                session_index=saml_processor.session_index,
145                provider=provider,
146                defaults={
147                    "user": self.request.user,
148                    "session": auth_session,
149                    "name_id": saml_processor.name_id,
150                    "name_id_format": saml_processor.name_id_format,
151                    "expires": saml_processor.session_not_on_or_after_datetime,
152                    "expiring": True,
153                },
154            )
155        return AutosubmitChallenge(
156            data={
157                "component": "ak-stage-autosubmit",
158                "title": self.executor.plan.context.get(
159                    PLAN_CONTEXT_TITLE,
160                    _("Redirecting to {app}...".format_map({"app": application.name})),
161                ),
162                "url": sign_in_req.wreply,
163                "attrs": response,
164            },
165        )

Stage view which response with a challenge

def get(self, request, *args, **kwargs):
118    def get(self, request, *args, **kwargs):
119        if PLAN_CONTEXT_WS_FED_REQUEST not in self.executor.plan.context:
120            self.logger.warning("No WS-Fed request in context")
121            return self.executor.stage_invalid()
122        return super().get(request, *args, **kwargs)

Return a challenge for the frontend to solve

def get_challenge(self, *args, **kwargs):
124    def get_challenge(self, *args, **kwargs):
125        application: Application = self.executor.plan.context[PLAN_CONTEXT_APPLICATION]
126        provider: WSFederationProvider = get_object_or_404(
127            WSFederationProvider, pk=application.provider_id
128        )
129        sign_in_req: SignInRequest = self.executor.plan.context[PLAN_CONTEXT_WS_FED_REQUEST]
130        proc = SignInProcessor(provider, self.request, sign_in_req)
131        response = proc.response()
132        saml_processor = proc.saml_processor
133
134        # Create SAMLSession to track this login
135        auth_session = AuthenticatedSession.from_request(self.request, self.request.user)
136        if auth_session:
137            # Since samlsessions should only exist uniquely for an active session and a provider
138            # any existing combination is likely an old, dead session
139            SAMLSession.objects.filter(
140                session_index=saml_processor.session_index, provider=provider
141            ).delete()
142
143            SAMLSession.objects.update_or_create(
144                session_index=saml_processor.session_index,
145                provider=provider,
146                defaults={
147                    "user": self.request.user,
148                    "session": auth_session,
149                    "name_id": saml_processor.name_id,
150                    "name_id_format": saml_processor.name_id_format,
151                    "expires": saml_processor.session_not_on_or_after_datetime,
152                    "expiring": True,
153                },
154            )
155        return AutosubmitChallenge(
156            data={
157                "component": "ak-stage-autosubmit",
158                "title": self.executor.plan.context.get(
159                    PLAN_CONTEXT_TITLE,
160                    _("Redirecting to {app}...".format_map({"app": application.name})),
161                ),
162                "url": sign_in_req.wreply,
163                "attrs": response,
164            },
165        )

Return the challenge that the client should solve

class MetadataDownload(django.views.generic.base.View):
168class MetadataDownload(View):
169    """Redirect to metadata download"""
170
171    def dispatch(self, request: HttpRequest, application_slug: str) -> HttpResponse:
172        app = Application.objects.filter(slug=application_slug).with_provider().first()
173        if not app:
174            raise Http404
175        provider = app.get_provider()
176        if not provider:
177            raise Http404
178        return redirect(
179            reverse(
180                "authentik_api:wsfederationprovider-metadata",
181                kwargs={
182                    "pk": provider.pk,
183                },
184            )
185            + "?download"
186        )

Redirect to metadata download

def dispatch( self, request: django.http.request.HttpRequest, application_slug: str) -> django.http.response.HttpResponse:
171    def dispatch(self, request: HttpRequest, application_slug: str) -> HttpResponse:
172        app = Application.objects.filter(slug=application_slug).with_provider().first()
173        if not app:
174            raise Http404
175        provider = app.get_provider()
176        if not provider:
177            raise Http404
178        return redirect(
179            reverse(
180                "authentik_api:wsfederationprovider-metadata",
181                kwargs={
182                    "pk": provider.pk,
183                },
184            )
185            + "?download"
186        )