authentik.enterprise.providers.ws_federation.processors.sign_in

  1from dataclasses import dataclass
  2
  3from django.http import HttpRequest
  4from django.shortcuts import get_object_or_404
  5from lxml import etree  # nosec
  6from lxml.etree import Element, SubElement, _Element  # nosec
  7
  8from authentik.core.models import Application
  9from authentik.enterprise.providers.ws_federation.models import WSFederationProvider
 10from authentik.enterprise.providers.ws_federation.processors.constants import (
 11    NS_ADDRESSING,
 12    NS_MAP,
 13    NS_POLICY,
 14    NS_WS_FED_TRUST,
 15    NS_WSS_D3P1,
 16    NS_WSS_SEC,
 17    NS_WSS_UTILITY,
 18    WS_FED_ACTION_SIGN_IN,
 19    WS_FED_POST_KEY_ACTION,
 20    WS_FED_POST_KEY_CONTEXT,
 21    WS_FED_POST_KEY_RESULT,
 22    WSS_KEY_IDENTIFIER_SAML_ID,
 23    WSS_TOKEN_TYPE_SAML2,
 24)
 25from authentik.lib.utils.time import timedelta_from_string
 26from authentik.policies.utils import delete_none_values
 27from authentik.providers.saml.processors.assertion import AssertionProcessor
 28from authentik.providers.saml.processors.authn_request_parser import AuthNRequest
 29from authentik.providers.saml.utils.time import get_time_string
 30
 31
 32@dataclass()
 33class SignInRequest:
 34    wa: str
 35    wtrealm: str
 36    wreply: str
 37    wctx: str | None
 38
 39    @staticmethod
 40    def parse(request: HttpRequest) -> SignInRequest:
 41        action = request.GET.get("wa")
 42        if action != WS_FED_ACTION_SIGN_IN:
 43            raise ValueError("Invalid action")
 44        realm = request.GET.get("wtrealm")
 45        if not realm:
 46            raise ValueError("Missing Realm")
 47
 48        req = SignInRequest(
 49            wa=action,
 50            wtrealm=realm,
 51            wreply=request.GET.get("wreply"),
 52            wctx=request.GET.get("wctx", ""),
 53        )
 54
 55        _, provider = req.get_app_provider()
 56        if not req.wreply:
 57            req.wreply = provider.acs_url
 58        if not req.wreply.startswith(provider.acs_url):
 59            raise ValueError("Invalid wreply")
 60        return req
 61
 62    def get_app_provider(self):
 63        provider: WSFederationProvider = get_object_or_404(
 64            WSFederationProvider, audience=self.wtrealm
 65        )
 66        application = get_object_or_404(Application, provider=provider)
 67        return application, provider
 68
 69
 70class SignInProcessor:
 71    provider: WSFederationProvider
 72    request: HttpRequest
 73    sign_in_request: SignInRequest
 74    saml_processor: AssertionProcessor
 75
 76    def __init__(
 77        self, provider: WSFederationProvider, request: HttpRequest, sign_in_request: SignInRequest
 78    ):
 79        self.provider = provider
 80        self.request = request
 81        self.sign_in_request = sign_in_request
 82        self.saml_processor = AssertionProcessor(self.provider, self.request, AuthNRequest())
 83        self.saml_processor.provider.audience = self.sign_in_request.wtrealm
 84        if self.provider.signing_kp:
 85            self.saml_processor.provider.sign_assertion = True
 86
 87    def create_response_token(self):
 88        root = Element(f"{{{NS_WS_FED_TRUST}}}RequestSecurityTokenResponse", nsmap=NS_MAP)
 89
 90        root.append(self.response_add_lifetime())
 91        root.append(self.response_add_applies_to())
 92        root.append(self.response_add_requested_security_token())
 93        root.append(
 94            self.response_add_attached_reference(
 95                "RequestedAttachedReference", self.saml_processor._assertion_id
 96            )
 97        )
 98        root.append(
 99            self.response_add_attached_reference(
100                "RequestedUnattachedReference", self.saml_processor._assertion_id
101            )
102        )
103
104        token_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}TokenType")
105        token_type.text = WSS_TOKEN_TYPE_SAML2
106
107        request_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}RequestType")
108        request_type.text = "http://schemas.xmlsoap.org/ws/2005/02/trust/Issue"
109
110        key_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}KeyType")
111        key_type.text = "http://schemas.xmlsoap.org/ws/2005/05/identity/NoProofKey"
112
113        return root
114
115    def response_add_lifetime(self) -> _Element:
116        """Add Lifetime element"""
117        lifetime = Element(f"{{{NS_WS_FED_TRUST}}}Lifetime", nsmap=NS_MAP)
118        created = SubElement(lifetime, f"{{{NS_WSS_UTILITY}}}Created")
119        created.text = get_time_string()
120        expires = SubElement(lifetime, f"{{{NS_WSS_UTILITY}}}Expires")
121        expires.text = get_time_string(
122            timedelta_from_string(self.provider.session_valid_not_on_or_after)
123        )
124        return lifetime
125
126    def response_add_applies_to(self) -> _Element:
127        """Add AppliesTo element"""
128        applies_to = Element(f"{{{NS_POLICY}}}AppliesTo")
129        endpoint_ref = SubElement(applies_to, f"{{{NS_ADDRESSING}}}EndpointReference")
130        address = SubElement(endpoint_ref, f"{{{NS_ADDRESSING}}}Address")
131        address.text = self.sign_in_request.wtrealm
132        return applies_to
133
134    def response_add_requested_security_token(self) -> _Element:
135        """Add RequestedSecurityToken and child assertion"""
136        token = Element(f"{{{NS_WS_FED_TRUST}}}RequestedSecurityToken")
137        token.append(self.saml_processor.get_assertion())
138        return token
139
140    def response_add_attached_reference(self, tag: str, value: str) -> _Element:
141        ref = Element(f"{{{NS_WS_FED_TRUST}}}{tag}")
142        sec_token_ref = SubElement(ref, f"{{{NS_WSS_SEC}}}SecurityTokenReference")
143        sec_token_ref.attrib[f"{{{NS_WSS_D3P1}}}TokenType"] = WSS_TOKEN_TYPE_SAML2
144
145        key_identifier = SubElement(sec_token_ref, f"{{{NS_WSS_SEC}}}KeyIdentifier")
146        key_identifier.attrib["ValueType"] = WSS_KEY_IDENTIFIER_SAML_ID
147        key_identifier.text = value
148        return ref
149
150    def response(self) -> dict[str, str]:
151        root = self.create_response_token()
152        assertion = root.xpath("//saml:Assertion", namespaces=NS_MAP)[0]
153        if self.provider.signing_kp:
154            self.saml_processor._sign(assertion)
155        str_token = etree.tostring(root).decode("utf-8")  # nosec
156        return delete_none_values(
157            {
158                WS_FED_POST_KEY_ACTION: WS_FED_ACTION_SIGN_IN,
159                WS_FED_POST_KEY_RESULT: str_token,
160                WS_FED_POST_KEY_CONTEXT: self.sign_in_request.wctx,
161            }
162        )
@dataclass()
class SignInRequest:
33@dataclass()
34class SignInRequest:
35    wa: str
36    wtrealm: str
37    wreply: str
38    wctx: str | None
39
40    @staticmethod
41    def parse(request: HttpRequest) -> SignInRequest:
42        action = request.GET.get("wa")
43        if action != WS_FED_ACTION_SIGN_IN:
44            raise ValueError("Invalid action")
45        realm = request.GET.get("wtrealm")
46        if not realm:
47            raise ValueError("Missing Realm")
48
49        req = SignInRequest(
50            wa=action,
51            wtrealm=realm,
52            wreply=request.GET.get("wreply"),
53            wctx=request.GET.get("wctx", ""),
54        )
55
56        _, provider = req.get_app_provider()
57        if not req.wreply:
58            req.wreply = provider.acs_url
59        if not req.wreply.startswith(provider.acs_url):
60            raise ValueError("Invalid wreply")
61        return req
62
63    def get_app_provider(self):
64        provider: WSFederationProvider = get_object_or_404(
65            WSFederationProvider, audience=self.wtrealm
66        )
67        application = get_object_or_404(Application, provider=provider)
68        return application, provider
SignInRequest(wa: str, wtrealm: str, wreply: str, wctx: str | None)
wa: str
wtrealm: str
wreply: str
wctx: str | None
@staticmethod
def parse( request: django.http.request.HttpRequest) -> SignInRequest:
40    @staticmethod
41    def parse(request: HttpRequest) -> SignInRequest:
42        action = request.GET.get("wa")
43        if action != WS_FED_ACTION_SIGN_IN:
44            raise ValueError("Invalid action")
45        realm = request.GET.get("wtrealm")
46        if not realm:
47            raise ValueError("Missing Realm")
48
49        req = SignInRequest(
50            wa=action,
51            wtrealm=realm,
52            wreply=request.GET.get("wreply"),
53            wctx=request.GET.get("wctx", ""),
54        )
55
56        _, provider = req.get_app_provider()
57        if not req.wreply:
58            req.wreply = provider.acs_url
59        if not req.wreply.startswith(provider.acs_url):
60            raise ValueError("Invalid wreply")
61        return req
def get_app_provider(self):
63    def get_app_provider(self):
64        provider: WSFederationProvider = get_object_or_404(
65            WSFederationProvider, audience=self.wtrealm
66        )
67        application = get_object_or_404(Application, provider=provider)
68        return application, provider
class SignInProcessor:
 71class SignInProcessor:
 72    provider: WSFederationProvider
 73    request: HttpRequest
 74    sign_in_request: SignInRequest
 75    saml_processor: AssertionProcessor
 76
 77    def __init__(
 78        self, provider: WSFederationProvider, request: HttpRequest, sign_in_request: SignInRequest
 79    ):
 80        self.provider = provider
 81        self.request = request
 82        self.sign_in_request = sign_in_request
 83        self.saml_processor = AssertionProcessor(self.provider, self.request, AuthNRequest())
 84        self.saml_processor.provider.audience = self.sign_in_request.wtrealm
 85        if self.provider.signing_kp:
 86            self.saml_processor.provider.sign_assertion = True
 87
 88    def create_response_token(self):
 89        root = Element(f"{{{NS_WS_FED_TRUST}}}RequestSecurityTokenResponse", nsmap=NS_MAP)
 90
 91        root.append(self.response_add_lifetime())
 92        root.append(self.response_add_applies_to())
 93        root.append(self.response_add_requested_security_token())
 94        root.append(
 95            self.response_add_attached_reference(
 96                "RequestedAttachedReference", self.saml_processor._assertion_id
 97            )
 98        )
 99        root.append(
100            self.response_add_attached_reference(
101                "RequestedUnattachedReference", self.saml_processor._assertion_id
102            )
103        )
104
105        token_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}TokenType")
106        token_type.text = WSS_TOKEN_TYPE_SAML2
107
108        request_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}RequestType")
109        request_type.text = "http://schemas.xmlsoap.org/ws/2005/02/trust/Issue"
110
111        key_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}KeyType")
112        key_type.text = "http://schemas.xmlsoap.org/ws/2005/05/identity/NoProofKey"
113
114        return root
115
116    def response_add_lifetime(self) -> _Element:
117        """Add Lifetime element"""
118        lifetime = Element(f"{{{NS_WS_FED_TRUST}}}Lifetime", nsmap=NS_MAP)
119        created = SubElement(lifetime, f"{{{NS_WSS_UTILITY}}}Created")
120        created.text = get_time_string()
121        expires = SubElement(lifetime, f"{{{NS_WSS_UTILITY}}}Expires")
122        expires.text = get_time_string(
123            timedelta_from_string(self.provider.session_valid_not_on_or_after)
124        )
125        return lifetime
126
127    def response_add_applies_to(self) -> _Element:
128        """Add AppliesTo element"""
129        applies_to = Element(f"{{{NS_POLICY}}}AppliesTo")
130        endpoint_ref = SubElement(applies_to, f"{{{NS_ADDRESSING}}}EndpointReference")
131        address = SubElement(endpoint_ref, f"{{{NS_ADDRESSING}}}Address")
132        address.text = self.sign_in_request.wtrealm
133        return applies_to
134
135    def response_add_requested_security_token(self) -> _Element:
136        """Add RequestedSecurityToken and child assertion"""
137        token = Element(f"{{{NS_WS_FED_TRUST}}}RequestedSecurityToken")
138        token.append(self.saml_processor.get_assertion())
139        return token
140
141    def response_add_attached_reference(self, tag: str, value: str) -> _Element:
142        ref = Element(f"{{{NS_WS_FED_TRUST}}}{tag}")
143        sec_token_ref = SubElement(ref, f"{{{NS_WSS_SEC}}}SecurityTokenReference")
144        sec_token_ref.attrib[f"{{{NS_WSS_D3P1}}}TokenType"] = WSS_TOKEN_TYPE_SAML2
145
146        key_identifier = SubElement(sec_token_ref, f"{{{NS_WSS_SEC}}}KeyIdentifier")
147        key_identifier.attrib["ValueType"] = WSS_KEY_IDENTIFIER_SAML_ID
148        key_identifier.text = value
149        return ref
150
151    def response(self) -> dict[str, str]:
152        root = self.create_response_token()
153        assertion = root.xpath("//saml:Assertion", namespaces=NS_MAP)[0]
154        if self.provider.signing_kp:
155            self.saml_processor._sign(assertion)
156        str_token = etree.tostring(root).decode("utf-8")  # nosec
157        return delete_none_values(
158            {
159                WS_FED_POST_KEY_ACTION: WS_FED_ACTION_SIGN_IN,
160                WS_FED_POST_KEY_RESULT: str_token,
161                WS_FED_POST_KEY_CONTEXT: self.sign_in_request.wctx,
162            }
163        )
SignInProcessor( provider: authentik.enterprise.providers.ws_federation.models.WSFederationProvider, request: django.http.request.HttpRequest, sign_in_request: SignInRequest)
77    def __init__(
78        self, provider: WSFederationProvider, request: HttpRequest, sign_in_request: SignInRequest
79    ):
80        self.provider = provider
81        self.request = request
82        self.sign_in_request = sign_in_request
83        self.saml_processor = AssertionProcessor(self.provider, self.request, AuthNRequest())
84        self.saml_processor.provider.audience = self.sign_in_request.wtrealm
85        if self.provider.signing_kp:
86            self.saml_processor.provider.sign_assertion = True
request: django.http.request.HttpRequest
sign_in_request: SignInRequest
def create_response_token(self):
 88    def create_response_token(self):
 89        root = Element(f"{{{NS_WS_FED_TRUST}}}RequestSecurityTokenResponse", nsmap=NS_MAP)
 90
 91        root.append(self.response_add_lifetime())
 92        root.append(self.response_add_applies_to())
 93        root.append(self.response_add_requested_security_token())
 94        root.append(
 95            self.response_add_attached_reference(
 96                "RequestedAttachedReference", self.saml_processor._assertion_id
 97            )
 98        )
 99        root.append(
100            self.response_add_attached_reference(
101                "RequestedUnattachedReference", self.saml_processor._assertion_id
102            )
103        )
104
105        token_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}TokenType")
106        token_type.text = WSS_TOKEN_TYPE_SAML2
107
108        request_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}RequestType")
109        request_type.text = "http://schemas.xmlsoap.org/ws/2005/02/trust/Issue"
110
111        key_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}KeyType")
112        key_type.text = "http://schemas.xmlsoap.org/ws/2005/05/identity/NoProofKey"
113
114        return root
def response_add_lifetime(self) -> lxml.etree._Element:
116    def response_add_lifetime(self) -> _Element:
117        """Add Lifetime element"""
118        lifetime = Element(f"{{{NS_WS_FED_TRUST}}}Lifetime", nsmap=NS_MAP)
119        created = SubElement(lifetime, f"{{{NS_WSS_UTILITY}}}Created")
120        created.text = get_time_string()
121        expires = SubElement(lifetime, f"{{{NS_WSS_UTILITY}}}Expires")
122        expires.text = get_time_string(
123            timedelta_from_string(self.provider.session_valid_not_on_or_after)
124        )
125        return lifetime

Add Lifetime element

def response_add_applies_to(self) -> lxml.etree._Element:
127    def response_add_applies_to(self) -> _Element:
128        """Add AppliesTo element"""
129        applies_to = Element(f"{{{NS_POLICY}}}AppliesTo")
130        endpoint_ref = SubElement(applies_to, f"{{{NS_ADDRESSING}}}EndpointReference")
131        address = SubElement(endpoint_ref, f"{{{NS_ADDRESSING}}}Address")
132        address.text = self.sign_in_request.wtrealm
133        return applies_to

Add AppliesTo element

def response_add_requested_security_token(self) -> lxml.etree._Element:
135    def response_add_requested_security_token(self) -> _Element:
136        """Add RequestedSecurityToken and child assertion"""
137        token = Element(f"{{{NS_WS_FED_TRUST}}}RequestedSecurityToken")
138        token.append(self.saml_processor.get_assertion())
139        return token

Add RequestedSecurityToken and child assertion

def response_add_attached_reference(self, tag: str, value: str) -> lxml.etree._Element:
141    def response_add_attached_reference(self, tag: str, value: str) -> _Element:
142        ref = Element(f"{{{NS_WS_FED_TRUST}}}{tag}")
143        sec_token_ref = SubElement(ref, f"{{{NS_WSS_SEC}}}SecurityTokenReference")
144        sec_token_ref.attrib[f"{{{NS_WSS_D3P1}}}TokenType"] = WSS_TOKEN_TYPE_SAML2
145
146        key_identifier = SubElement(sec_token_ref, f"{{{NS_WSS_SEC}}}KeyIdentifier")
147        key_identifier.attrib["ValueType"] = WSS_KEY_IDENTIFIER_SAML_ID
148        key_identifier.text = value
149        return ref
def response(self) -> dict[str, str]:
151    def response(self) -> dict[str, str]:
152        root = self.create_response_token()
153        assertion = root.xpath("//saml:Assertion", namespaces=NS_MAP)[0]
154        if self.provider.signing_kp:
155            self.saml_processor._sign(assertion)
156        str_token = etree.tostring(root).decode("utf-8")  # nosec
157        return delete_none_values(
158            {
159                WS_FED_POST_KEY_ACTION: WS_FED_ACTION_SIGN_IN,
160                WS_FED_POST_KEY_RESULT: str_token,
161                WS_FED_POST_KEY_CONTEXT: self.sign_in_request.wctx,
162            }
163        )