authentik.enterprise.providers.ws_federation.processors.sign_in
1from dataclasses import dataclass 2from urllib.parse import urlparse 3 4from django.http import HttpRequest 5from django.shortcuts import get_object_or_404 6from lxml import etree # nosec 7from lxml.etree import Element, SubElement, _Element # nosec 8 9from authentik.core.models import Application 10from authentik.enterprise.providers.ws_federation.models import WSFederationProvider 11from authentik.enterprise.providers.ws_federation.processors.constants import ( 12 NS_ADDRESSING, 13 NS_MAP, 14 NS_POLICY, 15 NS_WS_FED_TRUST, 16 NS_WSS_D3P1, 17 NS_WSS_SEC, 18 NS_WSS_UTILITY, 19 WS_FED_ACTION_SIGN_IN, 20 WS_FED_POST_KEY_ACTION, 21 WS_FED_POST_KEY_CONTEXT, 22 WS_FED_POST_KEY_RESULT, 23 WSS_KEY_IDENTIFIER_SAML_ID, 24 WSS_TOKEN_TYPE_SAML2, 25) 26from authentik.lib.utils.time import timedelta_from_string 27from authentik.policies.utils import delete_none_values 28from authentik.providers.saml.processors.assertion import AssertionProcessor 29from authentik.providers.saml.processors.authn_request_parser import AuthNRequest 30from authentik.providers.saml.utils.time import get_time_string 31 32 33@dataclass() 34class SignInRequest: 35 wa: str 36 wtrealm: str 37 wreply: str 38 wctx: str | None 39 40 @staticmethod 41 def parse(request: HttpRequest) -> SignInRequest: 42 action = request.GET.get("wa") 43 if action != WS_FED_ACTION_SIGN_IN: 44 raise ValueError("Invalid action") 45 realm = request.GET.get("wtrealm") 46 if not realm: 47 raise ValueError("Missing Realm") 48 49 req = SignInRequest( 50 wa=action, 51 wtrealm=realm, 52 wreply=request.GET.get("wreply"), 53 wctx=request.GET.get("wctx", ""), 54 ) 55 56 _, provider = req.get_app_provider() 57 if not req.wreply: 58 req.wreply = provider.acs_url 59 reply = urlparse(req.wreply) 60 configured = urlparse(provider.acs_url) 61 if not (reply[:2] == configured[:2] and reply.path.startswith(configured.path)): 62 raise ValueError("Invalid wreply") 63 return req 64 65 def get_app_provider(self): 66 provider: WSFederationProvider = get_object_or_404( 67 WSFederationProvider, audience=self.wtrealm 68 ) 69 application = get_object_or_404(Application, provider=provider) 70 return application, provider 71 72 73class SignInProcessor: 74 provider: WSFederationProvider 75 request: HttpRequest 76 sign_in_request: SignInRequest 77 saml_processor: AssertionProcessor 78 79 def __init__( 80 self, provider: WSFederationProvider, request: HttpRequest, sign_in_request: SignInRequest 81 ): 82 self.provider = provider 83 self.request = request 84 self.sign_in_request = sign_in_request 85 self.saml_processor = AssertionProcessor(self.provider, self.request, AuthNRequest()) 86 self.saml_processor.provider.audience = self.sign_in_request.wtrealm 87 if self.provider.signing_kp: 88 self.saml_processor.provider.sign_assertion = True 89 90 def create_response_token(self): 91 root = Element(f"{{{NS_WS_FED_TRUST}}}RequestSecurityTokenResponse", nsmap=NS_MAP) 92 93 root.append(self.response_add_lifetime()) 94 root.append(self.response_add_applies_to()) 95 root.append(self.response_add_requested_security_token()) 96 root.append( 97 self.response_add_attached_reference( 98 "RequestedAttachedReference", self.saml_processor._assertion_id 99 ) 100 ) 101 root.append( 102 self.response_add_attached_reference( 103 "RequestedUnattachedReference", self.saml_processor._assertion_id 104 ) 105 ) 106 107 token_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}TokenType") 108 token_type.text = WSS_TOKEN_TYPE_SAML2 109 110 request_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}RequestType") 111 request_type.text = "http://schemas.xmlsoap.org/ws/2005/02/trust/Issue" 112 113 key_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}KeyType") 114 key_type.text = "http://schemas.xmlsoap.org/ws/2005/05/identity/NoProofKey" 115 116 return root 117 118 def response_add_lifetime(self) -> _Element: 119 """Add Lifetime element""" 120 lifetime = Element(f"{{{NS_WS_FED_TRUST}}}Lifetime", nsmap=NS_MAP) 121 created = SubElement(lifetime, f"{{{NS_WSS_UTILITY}}}Created") 122 created.text = get_time_string() 123 expires = SubElement(lifetime, f"{{{NS_WSS_UTILITY}}}Expires") 124 expires.text = get_time_string( 125 timedelta_from_string(self.provider.session_valid_not_on_or_after) 126 ) 127 return lifetime 128 129 def response_add_applies_to(self) -> _Element: 130 """Add AppliesTo element""" 131 applies_to = Element(f"{{{NS_POLICY}}}AppliesTo") 132 endpoint_ref = SubElement(applies_to, f"{{{NS_ADDRESSING}}}EndpointReference") 133 address = SubElement(endpoint_ref, f"{{{NS_ADDRESSING}}}Address") 134 address.text = self.sign_in_request.wtrealm 135 return applies_to 136 137 def response_add_requested_security_token(self) -> _Element: 138 """Add RequestedSecurityToken and child assertion""" 139 token = Element(f"{{{NS_WS_FED_TRUST}}}RequestedSecurityToken") 140 token.append(self.saml_processor.get_assertion()) 141 return token 142 143 def response_add_attached_reference(self, tag: str, value: str) -> _Element: 144 ref = Element(f"{{{NS_WS_FED_TRUST}}}{tag}") 145 sec_token_ref = SubElement(ref, f"{{{NS_WSS_SEC}}}SecurityTokenReference") 146 sec_token_ref.attrib[f"{{{NS_WSS_D3P1}}}TokenType"] = WSS_TOKEN_TYPE_SAML2 147 148 key_identifier = SubElement(sec_token_ref, f"{{{NS_WSS_SEC}}}KeyIdentifier") 149 key_identifier.attrib["ValueType"] = WSS_KEY_IDENTIFIER_SAML_ID 150 key_identifier.text = value 151 return ref 152 153 def response(self) -> dict[str, str]: 154 root = self.create_response_token() 155 assertion = root.xpath("//saml:Assertion", namespaces=NS_MAP)[0] 156 if self.provider.signing_kp: 157 self.saml_processor._sign(assertion) 158 str_token = etree.tostring(root).decode("utf-8") # nosec 159 return delete_none_values( 160 { 161 WS_FED_POST_KEY_ACTION: WS_FED_ACTION_SIGN_IN, 162 WS_FED_POST_KEY_RESULT: str_token, 163 WS_FED_POST_KEY_CONTEXT: self.sign_in_request.wctx, 164 } 165 )
@dataclass()
class
SignInRequest:
34@dataclass() 35class SignInRequest: 36 wa: str 37 wtrealm: str 38 wreply: str 39 wctx: str | None 40 41 @staticmethod 42 def parse(request: HttpRequest) -> SignInRequest: 43 action = request.GET.get("wa") 44 if action != WS_FED_ACTION_SIGN_IN: 45 raise ValueError("Invalid action") 46 realm = request.GET.get("wtrealm") 47 if not realm: 48 raise ValueError("Missing Realm") 49 50 req = SignInRequest( 51 wa=action, 52 wtrealm=realm, 53 wreply=request.GET.get("wreply"), 54 wctx=request.GET.get("wctx", ""), 55 ) 56 57 _, provider = req.get_app_provider() 58 if not req.wreply: 59 req.wreply = provider.acs_url 60 reply = urlparse(req.wreply) 61 configured = urlparse(provider.acs_url) 62 if not (reply[:2] == configured[:2] and reply.path.startswith(configured.path)): 63 raise ValueError("Invalid wreply") 64 return req 65 66 def get_app_provider(self): 67 provider: WSFederationProvider = get_object_or_404( 68 WSFederationProvider, audience=self.wtrealm 69 ) 70 application = get_object_or_404(Application, provider=provider) 71 return application, provider
41 @staticmethod 42 def parse(request: HttpRequest) -> SignInRequest: 43 action = request.GET.get("wa") 44 if action != WS_FED_ACTION_SIGN_IN: 45 raise ValueError("Invalid action") 46 realm = request.GET.get("wtrealm") 47 if not realm: 48 raise ValueError("Missing Realm") 49 50 req = SignInRequest( 51 wa=action, 52 wtrealm=realm, 53 wreply=request.GET.get("wreply"), 54 wctx=request.GET.get("wctx", ""), 55 ) 56 57 _, provider = req.get_app_provider() 58 if not req.wreply: 59 req.wreply = provider.acs_url 60 reply = urlparse(req.wreply) 61 configured = urlparse(provider.acs_url) 62 if not (reply[:2] == configured[:2] and reply.path.startswith(configured.path)): 63 raise ValueError("Invalid wreply") 64 return req
class
SignInProcessor:
74class SignInProcessor: 75 provider: WSFederationProvider 76 request: HttpRequest 77 sign_in_request: SignInRequest 78 saml_processor: AssertionProcessor 79 80 def __init__( 81 self, provider: WSFederationProvider, request: HttpRequest, sign_in_request: SignInRequest 82 ): 83 self.provider = provider 84 self.request = request 85 self.sign_in_request = sign_in_request 86 self.saml_processor = AssertionProcessor(self.provider, self.request, AuthNRequest()) 87 self.saml_processor.provider.audience = self.sign_in_request.wtrealm 88 if self.provider.signing_kp: 89 self.saml_processor.provider.sign_assertion = True 90 91 def create_response_token(self): 92 root = Element(f"{{{NS_WS_FED_TRUST}}}RequestSecurityTokenResponse", nsmap=NS_MAP) 93 94 root.append(self.response_add_lifetime()) 95 root.append(self.response_add_applies_to()) 96 root.append(self.response_add_requested_security_token()) 97 root.append( 98 self.response_add_attached_reference( 99 "RequestedAttachedReference", self.saml_processor._assertion_id 100 ) 101 ) 102 root.append( 103 self.response_add_attached_reference( 104 "RequestedUnattachedReference", self.saml_processor._assertion_id 105 ) 106 ) 107 108 token_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}TokenType") 109 token_type.text = WSS_TOKEN_TYPE_SAML2 110 111 request_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}RequestType") 112 request_type.text = "http://schemas.xmlsoap.org/ws/2005/02/trust/Issue" 113 114 key_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}KeyType") 115 key_type.text = "http://schemas.xmlsoap.org/ws/2005/05/identity/NoProofKey" 116 117 return root 118 119 def response_add_lifetime(self) -> _Element: 120 """Add Lifetime element""" 121 lifetime = Element(f"{{{NS_WS_FED_TRUST}}}Lifetime", nsmap=NS_MAP) 122 created = SubElement(lifetime, f"{{{NS_WSS_UTILITY}}}Created") 123 created.text = get_time_string() 124 expires = SubElement(lifetime, f"{{{NS_WSS_UTILITY}}}Expires") 125 expires.text = get_time_string( 126 timedelta_from_string(self.provider.session_valid_not_on_or_after) 127 ) 128 return lifetime 129 130 def response_add_applies_to(self) -> _Element: 131 """Add AppliesTo element""" 132 applies_to = Element(f"{{{NS_POLICY}}}AppliesTo") 133 endpoint_ref = SubElement(applies_to, f"{{{NS_ADDRESSING}}}EndpointReference") 134 address = SubElement(endpoint_ref, f"{{{NS_ADDRESSING}}}Address") 135 address.text = self.sign_in_request.wtrealm 136 return applies_to 137 138 def response_add_requested_security_token(self) -> _Element: 139 """Add RequestedSecurityToken and child assertion""" 140 token = Element(f"{{{NS_WS_FED_TRUST}}}RequestedSecurityToken") 141 token.append(self.saml_processor.get_assertion()) 142 return token 143 144 def response_add_attached_reference(self, tag: str, value: str) -> _Element: 145 ref = Element(f"{{{NS_WS_FED_TRUST}}}{tag}") 146 sec_token_ref = SubElement(ref, f"{{{NS_WSS_SEC}}}SecurityTokenReference") 147 sec_token_ref.attrib[f"{{{NS_WSS_D3P1}}}TokenType"] = WSS_TOKEN_TYPE_SAML2 148 149 key_identifier = SubElement(sec_token_ref, f"{{{NS_WSS_SEC}}}KeyIdentifier") 150 key_identifier.attrib["ValueType"] = WSS_KEY_IDENTIFIER_SAML_ID 151 key_identifier.text = value 152 return ref 153 154 def response(self) -> dict[str, str]: 155 root = self.create_response_token() 156 assertion = root.xpath("//saml:Assertion", namespaces=NS_MAP)[0] 157 if self.provider.signing_kp: 158 self.saml_processor._sign(assertion) 159 str_token = etree.tostring(root).decode("utf-8") # nosec 160 return delete_none_values( 161 { 162 WS_FED_POST_KEY_ACTION: WS_FED_ACTION_SIGN_IN, 163 WS_FED_POST_KEY_RESULT: str_token, 164 WS_FED_POST_KEY_CONTEXT: self.sign_in_request.wctx, 165 } 166 )
SignInProcessor( provider: authentik.enterprise.providers.ws_federation.models.WSFederationProvider, request: django.http.request.HttpRequest, sign_in_request: SignInRequest)
80 def __init__( 81 self, provider: WSFederationProvider, request: HttpRequest, sign_in_request: SignInRequest 82 ): 83 self.provider = provider 84 self.request = request 85 self.sign_in_request = sign_in_request 86 self.saml_processor = AssertionProcessor(self.provider, self.request, AuthNRequest()) 87 self.saml_processor.provider.audience = self.sign_in_request.wtrealm 88 if self.provider.signing_kp: 89 self.saml_processor.provider.sign_assertion = True
sign_in_request: SignInRequest
saml_processor: authentik.providers.saml.processors.assertion.AssertionProcessor
def
create_response_token(self):
91 def create_response_token(self): 92 root = Element(f"{{{NS_WS_FED_TRUST}}}RequestSecurityTokenResponse", nsmap=NS_MAP) 93 94 root.append(self.response_add_lifetime()) 95 root.append(self.response_add_applies_to()) 96 root.append(self.response_add_requested_security_token()) 97 root.append( 98 self.response_add_attached_reference( 99 "RequestedAttachedReference", self.saml_processor._assertion_id 100 ) 101 ) 102 root.append( 103 self.response_add_attached_reference( 104 "RequestedUnattachedReference", self.saml_processor._assertion_id 105 ) 106 ) 107 108 token_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}TokenType") 109 token_type.text = WSS_TOKEN_TYPE_SAML2 110 111 request_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}RequestType") 112 request_type.text = "http://schemas.xmlsoap.org/ws/2005/02/trust/Issue" 113 114 key_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}KeyType") 115 key_type.text = "http://schemas.xmlsoap.org/ws/2005/05/identity/NoProofKey" 116 117 return root
def
response_add_lifetime(self) -> lxml.etree._Element:
119 def response_add_lifetime(self) -> _Element: 120 """Add Lifetime element""" 121 lifetime = Element(f"{{{NS_WS_FED_TRUST}}}Lifetime", nsmap=NS_MAP) 122 created = SubElement(lifetime, f"{{{NS_WSS_UTILITY}}}Created") 123 created.text = get_time_string() 124 expires = SubElement(lifetime, f"{{{NS_WSS_UTILITY}}}Expires") 125 expires.text = get_time_string( 126 timedelta_from_string(self.provider.session_valid_not_on_or_after) 127 ) 128 return lifetime
Add Lifetime element
def
response_add_applies_to(self) -> lxml.etree._Element:
130 def response_add_applies_to(self) -> _Element: 131 """Add AppliesTo element""" 132 applies_to = Element(f"{{{NS_POLICY}}}AppliesTo") 133 endpoint_ref = SubElement(applies_to, f"{{{NS_ADDRESSING}}}EndpointReference") 134 address = SubElement(endpoint_ref, f"{{{NS_ADDRESSING}}}Address") 135 address.text = self.sign_in_request.wtrealm 136 return applies_to
Add AppliesTo element
def
response_add_requested_security_token(self) -> lxml.etree._Element:
138 def response_add_requested_security_token(self) -> _Element: 139 """Add RequestedSecurityToken and child assertion""" 140 token = Element(f"{{{NS_WS_FED_TRUST}}}RequestedSecurityToken") 141 token.append(self.saml_processor.get_assertion()) 142 return token
Add RequestedSecurityToken and child assertion
def
response_add_attached_reference(self, tag: str, value: str) -> lxml.etree._Element:
144 def response_add_attached_reference(self, tag: str, value: str) -> _Element: 145 ref = Element(f"{{{NS_WS_FED_TRUST}}}{tag}") 146 sec_token_ref = SubElement(ref, f"{{{NS_WSS_SEC}}}SecurityTokenReference") 147 sec_token_ref.attrib[f"{{{NS_WSS_D3P1}}}TokenType"] = WSS_TOKEN_TYPE_SAML2 148 149 key_identifier = SubElement(sec_token_ref, f"{{{NS_WSS_SEC}}}KeyIdentifier") 150 key_identifier.attrib["ValueType"] = WSS_KEY_IDENTIFIER_SAML_ID 151 key_identifier.text = value 152 return ref
def
response(self) -> dict[str, str]:
154 def response(self) -> dict[str, str]: 155 root = self.create_response_token() 156 assertion = root.xpath("//saml:Assertion", namespaces=NS_MAP)[0] 157 if self.provider.signing_kp: 158 self.saml_processor._sign(assertion) 159 str_token = etree.tostring(root).decode("utf-8") # nosec 160 return delete_none_values( 161 { 162 WS_FED_POST_KEY_ACTION: WS_FED_ACTION_SIGN_IN, 163 WS_FED_POST_KEY_RESULT: str_token, 164 WS_FED_POST_KEY_CONTEXT: self.sign_in_request.wctx, 165 } 166 )