authentik.enterprise.providers.ws_federation.processors.sign_in

  1from dataclasses import dataclass
  2from urllib.parse import urlparse
  3
  4from django.http import HttpRequest
  5from django.shortcuts import get_object_or_404
  6from lxml import etree  # nosec
  7from lxml.etree import Element, SubElement, _Element  # nosec
  8
  9from authentik.core.models import Application
 10from authentik.enterprise.providers.ws_federation.models import WSFederationProvider
 11from authentik.enterprise.providers.ws_federation.processors.constants import (
 12    NS_ADDRESSING,
 13    NS_MAP,
 14    NS_POLICY,
 15    NS_WS_FED_TRUST,
 16    NS_WSS_D3P1,
 17    NS_WSS_SEC,
 18    NS_WSS_UTILITY,
 19    WS_FED_ACTION_SIGN_IN,
 20    WS_FED_POST_KEY_ACTION,
 21    WS_FED_POST_KEY_CONTEXT,
 22    WS_FED_POST_KEY_RESULT,
 23    WSS_KEY_IDENTIFIER_SAML_ID,
 24    WSS_TOKEN_TYPE_SAML2,
 25)
 26from authentik.lib.utils.time import timedelta_from_string
 27from authentik.policies.utils import delete_none_values
 28from authentik.providers.saml.processors.assertion import AssertionProcessor
 29from authentik.providers.saml.processors.authn_request_parser import AuthNRequest
 30from authentik.providers.saml.utils.time import get_time_string
 31
 32
 33@dataclass()
 34class SignInRequest:
 35    wa: str
 36    wtrealm: str
 37    wreply: str
 38    wctx: str | None
 39
 40    @staticmethod
 41    def parse(request: HttpRequest) -> SignInRequest:
 42        action = request.GET.get("wa")
 43        if action != WS_FED_ACTION_SIGN_IN:
 44            raise ValueError("Invalid action")
 45        realm = request.GET.get("wtrealm")
 46        if not realm:
 47            raise ValueError("Missing Realm")
 48
 49        req = SignInRequest(
 50            wa=action,
 51            wtrealm=realm,
 52            wreply=request.GET.get("wreply"),
 53            wctx=request.GET.get("wctx", ""),
 54        )
 55
 56        _, provider = req.get_app_provider()
 57        if not req.wreply:
 58            req.wreply = provider.acs_url
 59        reply = urlparse(req.wreply)
 60        configured = urlparse(provider.acs_url)
 61        if not (reply[:2] == configured[:2] and reply.path.startswith(configured.path)):
 62            raise ValueError("Invalid wreply")
 63        return req
 64
 65    def get_app_provider(self):
 66        provider: WSFederationProvider = get_object_or_404(
 67            WSFederationProvider, audience=self.wtrealm
 68        )
 69        application = get_object_or_404(Application, provider=provider)
 70        return application, provider
 71
 72
 73class SignInProcessor:
 74    provider: WSFederationProvider
 75    request: HttpRequest
 76    sign_in_request: SignInRequest
 77    saml_processor: AssertionProcessor
 78
 79    def __init__(
 80        self, provider: WSFederationProvider, request: HttpRequest, sign_in_request: SignInRequest
 81    ):
 82        self.provider = provider
 83        self.request = request
 84        self.sign_in_request = sign_in_request
 85        self.saml_processor = AssertionProcessor(self.provider, self.request, AuthNRequest())
 86        self.saml_processor.provider.audience = self.sign_in_request.wtrealm
 87        if self.provider.signing_kp:
 88            self.saml_processor.provider.sign_assertion = True
 89
 90    def create_response_token(self):
 91        root = Element(f"{{{NS_WS_FED_TRUST}}}RequestSecurityTokenResponse", nsmap=NS_MAP)
 92
 93        root.append(self.response_add_lifetime())
 94        root.append(self.response_add_applies_to())
 95        root.append(self.response_add_requested_security_token())
 96        root.append(
 97            self.response_add_attached_reference(
 98                "RequestedAttachedReference", self.saml_processor._assertion_id
 99            )
100        )
101        root.append(
102            self.response_add_attached_reference(
103                "RequestedUnattachedReference", self.saml_processor._assertion_id
104            )
105        )
106
107        token_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}TokenType")
108        token_type.text = WSS_TOKEN_TYPE_SAML2
109
110        request_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}RequestType")
111        request_type.text = "http://schemas.xmlsoap.org/ws/2005/02/trust/Issue"
112
113        key_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}KeyType")
114        key_type.text = "http://schemas.xmlsoap.org/ws/2005/05/identity/NoProofKey"
115
116        return root
117
118    def response_add_lifetime(self) -> _Element:
119        """Add Lifetime element"""
120        lifetime = Element(f"{{{NS_WS_FED_TRUST}}}Lifetime", nsmap=NS_MAP)
121        created = SubElement(lifetime, f"{{{NS_WSS_UTILITY}}}Created")
122        created.text = get_time_string()
123        expires = SubElement(lifetime, f"{{{NS_WSS_UTILITY}}}Expires")
124        expires.text = get_time_string(
125            timedelta_from_string(self.provider.session_valid_not_on_or_after)
126        )
127        return lifetime
128
129    def response_add_applies_to(self) -> _Element:
130        """Add AppliesTo element"""
131        applies_to = Element(f"{{{NS_POLICY}}}AppliesTo")
132        endpoint_ref = SubElement(applies_to, f"{{{NS_ADDRESSING}}}EndpointReference")
133        address = SubElement(endpoint_ref, f"{{{NS_ADDRESSING}}}Address")
134        address.text = self.sign_in_request.wtrealm
135        return applies_to
136
137    def response_add_requested_security_token(self) -> _Element:
138        """Add RequestedSecurityToken and child assertion"""
139        token = Element(f"{{{NS_WS_FED_TRUST}}}RequestedSecurityToken")
140        token.append(self.saml_processor.get_assertion())
141        return token
142
143    def response_add_attached_reference(self, tag: str, value: str) -> _Element:
144        ref = Element(f"{{{NS_WS_FED_TRUST}}}{tag}")
145        sec_token_ref = SubElement(ref, f"{{{NS_WSS_SEC}}}SecurityTokenReference")
146        sec_token_ref.attrib[f"{{{NS_WSS_D3P1}}}TokenType"] = WSS_TOKEN_TYPE_SAML2
147
148        key_identifier = SubElement(sec_token_ref, f"{{{NS_WSS_SEC}}}KeyIdentifier")
149        key_identifier.attrib["ValueType"] = WSS_KEY_IDENTIFIER_SAML_ID
150        key_identifier.text = value
151        return ref
152
153    def response(self) -> dict[str, str]:
154        root = self.create_response_token()
155        assertion = root.xpath("//saml:Assertion", namespaces=NS_MAP)[0]
156        if self.provider.signing_kp:
157            self.saml_processor._sign(assertion)
158        str_token = etree.tostring(root).decode("utf-8")  # nosec
159        return delete_none_values(
160            {
161                WS_FED_POST_KEY_ACTION: WS_FED_ACTION_SIGN_IN,
162                WS_FED_POST_KEY_RESULT: str_token,
163                WS_FED_POST_KEY_CONTEXT: self.sign_in_request.wctx,
164            }
165        )
@dataclass()
class SignInRequest:
34@dataclass()
35class SignInRequest:
36    wa: str
37    wtrealm: str
38    wreply: str
39    wctx: str | None
40
41    @staticmethod
42    def parse(request: HttpRequest) -> SignInRequest:
43        action = request.GET.get("wa")
44        if action != WS_FED_ACTION_SIGN_IN:
45            raise ValueError("Invalid action")
46        realm = request.GET.get("wtrealm")
47        if not realm:
48            raise ValueError("Missing Realm")
49
50        req = SignInRequest(
51            wa=action,
52            wtrealm=realm,
53            wreply=request.GET.get("wreply"),
54            wctx=request.GET.get("wctx", ""),
55        )
56
57        _, provider = req.get_app_provider()
58        if not req.wreply:
59            req.wreply = provider.acs_url
60        reply = urlparse(req.wreply)
61        configured = urlparse(provider.acs_url)
62        if not (reply[:2] == configured[:2] and reply.path.startswith(configured.path)):
63            raise ValueError("Invalid wreply")
64        return req
65
66    def get_app_provider(self):
67        provider: WSFederationProvider = get_object_or_404(
68            WSFederationProvider, audience=self.wtrealm
69        )
70        application = get_object_or_404(Application, provider=provider)
71        return application, provider
SignInRequest(wa: str, wtrealm: str, wreply: str, wctx: str | None)
wa: str
wtrealm: str
wreply: str
wctx: str | None
@staticmethod
def parse( request: django.http.request.HttpRequest) -> SignInRequest:
41    @staticmethod
42    def parse(request: HttpRequest) -> SignInRequest:
43        action = request.GET.get("wa")
44        if action != WS_FED_ACTION_SIGN_IN:
45            raise ValueError("Invalid action")
46        realm = request.GET.get("wtrealm")
47        if not realm:
48            raise ValueError("Missing Realm")
49
50        req = SignInRequest(
51            wa=action,
52            wtrealm=realm,
53            wreply=request.GET.get("wreply"),
54            wctx=request.GET.get("wctx", ""),
55        )
56
57        _, provider = req.get_app_provider()
58        if not req.wreply:
59            req.wreply = provider.acs_url
60        reply = urlparse(req.wreply)
61        configured = urlparse(provider.acs_url)
62        if not (reply[:2] == configured[:2] and reply.path.startswith(configured.path)):
63            raise ValueError("Invalid wreply")
64        return req
def get_app_provider(self):
66    def get_app_provider(self):
67        provider: WSFederationProvider = get_object_or_404(
68            WSFederationProvider, audience=self.wtrealm
69        )
70        application = get_object_or_404(Application, provider=provider)
71        return application, provider
class SignInProcessor:
 74class SignInProcessor:
 75    provider: WSFederationProvider
 76    request: HttpRequest
 77    sign_in_request: SignInRequest
 78    saml_processor: AssertionProcessor
 79
 80    def __init__(
 81        self, provider: WSFederationProvider, request: HttpRequest, sign_in_request: SignInRequest
 82    ):
 83        self.provider = provider
 84        self.request = request
 85        self.sign_in_request = sign_in_request
 86        self.saml_processor = AssertionProcessor(self.provider, self.request, AuthNRequest())
 87        self.saml_processor.provider.audience = self.sign_in_request.wtrealm
 88        if self.provider.signing_kp:
 89            self.saml_processor.provider.sign_assertion = True
 90
 91    def create_response_token(self):
 92        root = Element(f"{{{NS_WS_FED_TRUST}}}RequestSecurityTokenResponse", nsmap=NS_MAP)
 93
 94        root.append(self.response_add_lifetime())
 95        root.append(self.response_add_applies_to())
 96        root.append(self.response_add_requested_security_token())
 97        root.append(
 98            self.response_add_attached_reference(
 99                "RequestedAttachedReference", self.saml_processor._assertion_id
100            )
101        )
102        root.append(
103            self.response_add_attached_reference(
104                "RequestedUnattachedReference", self.saml_processor._assertion_id
105            )
106        )
107
108        token_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}TokenType")
109        token_type.text = WSS_TOKEN_TYPE_SAML2
110
111        request_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}RequestType")
112        request_type.text = "http://schemas.xmlsoap.org/ws/2005/02/trust/Issue"
113
114        key_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}KeyType")
115        key_type.text = "http://schemas.xmlsoap.org/ws/2005/05/identity/NoProofKey"
116
117        return root
118
119    def response_add_lifetime(self) -> _Element:
120        """Add Lifetime element"""
121        lifetime = Element(f"{{{NS_WS_FED_TRUST}}}Lifetime", nsmap=NS_MAP)
122        created = SubElement(lifetime, f"{{{NS_WSS_UTILITY}}}Created")
123        created.text = get_time_string()
124        expires = SubElement(lifetime, f"{{{NS_WSS_UTILITY}}}Expires")
125        expires.text = get_time_string(
126            timedelta_from_string(self.provider.session_valid_not_on_or_after)
127        )
128        return lifetime
129
130    def response_add_applies_to(self) -> _Element:
131        """Add AppliesTo element"""
132        applies_to = Element(f"{{{NS_POLICY}}}AppliesTo")
133        endpoint_ref = SubElement(applies_to, f"{{{NS_ADDRESSING}}}EndpointReference")
134        address = SubElement(endpoint_ref, f"{{{NS_ADDRESSING}}}Address")
135        address.text = self.sign_in_request.wtrealm
136        return applies_to
137
138    def response_add_requested_security_token(self) -> _Element:
139        """Add RequestedSecurityToken and child assertion"""
140        token = Element(f"{{{NS_WS_FED_TRUST}}}RequestedSecurityToken")
141        token.append(self.saml_processor.get_assertion())
142        return token
143
144    def response_add_attached_reference(self, tag: str, value: str) -> _Element:
145        ref = Element(f"{{{NS_WS_FED_TRUST}}}{tag}")
146        sec_token_ref = SubElement(ref, f"{{{NS_WSS_SEC}}}SecurityTokenReference")
147        sec_token_ref.attrib[f"{{{NS_WSS_D3P1}}}TokenType"] = WSS_TOKEN_TYPE_SAML2
148
149        key_identifier = SubElement(sec_token_ref, f"{{{NS_WSS_SEC}}}KeyIdentifier")
150        key_identifier.attrib["ValueType"] = WSS_KEY_IDENTIFIER_SAML_ID
151        key_identifier.text = value
152        return ref
153
154    def response(self) -> dict[str, str]:
155        root = self.create_response_token()
156        assertion = root.xpath("//saml:Assertion", namespaces=NS_MAP)[0]
157        if self.provider.signing_kp:
158            self.saml_processor._sign(assertion)
159        str_token = etree.tostring(root).decode("utf-8")  # nosec
160        return delete_none_values(
161            {
162                WS_FED_POST_KEY_ACTION: WS_FED_ACTION_SIGN_IN,
163                WS_FED_POST_KEY_RESULT: str_token,
164                WS_FED_POST_KEY_CONTEXT: self.sign_in_request.wctx,
165            }
166        )
SignInProcessor( provider: authentik.enterprise.providers.ws_federation.models.WSFederationProvider, request: django.http.request.HttpRequest, sign_in_request: SignInRequest)
80    def __init__(
81        self, provider: WSFederationProvider, request: HttpRequest, sign_in_request: SignInRequest
82    ):
83        self.provider = provider
84        self.request = request
85        self.sign_in_request = sign_in_request
86        self.saml_processor = AssertionProcessor(self.provider, self.request, AuthNRequest())
87        self.saml_processor.provider.audience = self.sign_in_request.wtrealm
88        if self.provider.signing_kp:
89            self.saml_processor.provider.sign_assertion = True
request: django.http.request.HttpRequest
sign_in_request: SignInRequest
def create_response_token(self):
 91    def create_response_token(self):
 92        root = Element(f"{{{NS_WS_FED_TRUST}}}RequestSecurityTokenResponse", nsmap=NS_MAP)
 93
 94        root.append(self.response_add_lifetime())
 95        root.append(self.response_add_applies_to())
 96        root.append(self.response_add_requested_security_token())
 97        root.append(
 98            self.response_add_attached_reference(
 99                "RequestedAttachedReference", self.saml_processor._assertion_id
100            )
101        )
102        root.append(
103            self.response_add_attached_reference(
104                "RequestedUnattachedReference", self.saml_processor._assertion_id
105            )
106        )
107
108        token_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}TokenType")
109        token_type.text = WSS_TOKEN_TYPE_SAML2
110
111        request_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}RequestType")
112        request_type.text = "http://schemas.xmlsoap.org/ws/2005/02/trust/Issue"
113
114        key_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}KeyType")
115        key_type.text = "http://schemas.xmlsoap.org/ws/2005/05/identity/NoProofKey"
116
117        return root
def response_add_lifetime(self) -> lxml.etree._Element:
119    def response_add_lifetime(self) -> _Element:
120        """Add Lifetime element"""
121        lifetime = Element(f"{{{NS_WS_FED_TRUST}}}Lifetime", nsmap=NS_MAP)
122        created = SubElement(lifetime, f"{{{NS_WSS_UTILITY}}}Created")
123        created.text = get_time_string()
124        expires = SubElement(lifetime, f"{{{NS_WSS_UTILITY}}}Expires")
125        expires.text = get_time_string(
126            timedelta_from_string(self.provider.session_valid_not_on_or_after)
127        )
128        return lifetime

Add Lifetime element

def response_add_applies_to(self) -> lxml.etree._Element:
130    def response_add_applies_to(self) -> _Element:
131        """Add AppliesTo element"""
132        applies_to = Element(f"{{{NS_POLICY}}}AppliesTo")
133        endpoint_ref = SubElement(applies_to, f"{{{NS_ADDRESSING}}}EndpointReference")
134        address = SubElement(endpoint_ref, f"{{{NS_ADDRESSING}}}Address")
135        address.text = self.sign_in_request.wtrealm
136        return applies_to

Add AppliesTo element

def response_add_requested_security_token(self) -> lxml.etree._Element:
138    def response_add_requested_security_token(self) -> _Element:
139        """Add RequestedSecurityToken and child assertion"""
140        token = Element(f"{{{NS_WS_FED_TRUST}}}RequestedSecurityToken")
141        token.append(self.saml_processor.get_assertion())
142        return token

Add RequestedSecurityToken and child assertion

def response_add_attached_reference(self, tag: str, value: str) -> lxml.etree._Element:
144    def response_add_attached_reference(self, tag: str, value: str) -> _Element:
145        ref = Element(f"{{{NS_WS_FED_TRUST}}}{tag}")
146        sec_token_ref = SubElement(ref, f"{{{NS_WSS_SEC}}}SecurityTokenReference")
147        sec_token_ref.attrib[f"{{{NS_WSS_D3P1}}}TokenType"] = WSS_TOKEN_TYPE_SAML2
148
149        key_identifier = SubElement(sec_token_ref, f"{{{NS_WSS_SEC}}}KeyIdentifier")
150        key_identifier.attrib["ValueType"] = WSS_KEY_IDENTIFIER_SAML_ID
151        key_identifier.text = value
152        return ref
def response(self) -> dict[str, str]:
154    def response(self) -> dict[str, str]:
155        root = self.create_response_token()
156        assertion = root.xpath("//saml:Assertion", namespaces=NS_MAP)[0]
157        if self.provider.signing_kp:
158            self.saml_processor._sign(assertion)
159        str_token = etree.tostring(root).decode("utf-8")  # nosec
160        return delete_none_values(
161            {
162                WS_FED_POST_KEY_ACTION: WS_FED_ACTION_SIGN_IN,
163                WS_FED_POST_KEY_RESULT: str_token,
164                WS_FED_POST_KEY_CONTEXT: self.sign_in_request.wctx,
165            }
166        )