authentik.enterprise.providers.ws_federation.processors.sign_in
1from dataclasses import dataclass 2 3from django.http import HttpRequest 4from django.shortcuts import get_object_or_404 5from lxml import etree # nosec 6from lxml.etree import Element, SubElement, _Element # nosec 7 8from authentik.core.models import Application 9from authentik.enterprise.providers.ws_federation.models import WSFederationProvider 10from authentik.enterprise.providers.ws_federation.processors.constants import ( 11 NS_ADDRESSING, 12 NS_MAP, 13 NS_POLICY, 14 NS_WS_FED_TRUST, 15 NS_WSS_D3P1, 16 NS_WSS_SEC, 17 NS_WSS_UTILITY, 18 WS_FED_ACTION_SIGN_IN, 19 WS_FED_POST_KEY_ACTION, 20 WS_FED_POST_KEY_CONTEXT, 21 WS_FED_POST_KEY_RESULT, 22 WSS_KEY_IDENTIFIER_SAML_ID, 23 WSS_TOKEN_TYPE_SAML2, 24) 25from authentik.lib.utils.time import timedelta_from_string 26from authentik.policies.utils import delete_none_values 27from authentik.providers.saml.processors.assertion import AssertionProcessor 28from authentik.providers.saml.processors.authn_request_parser import AuthNRequest 29from authentik.providers.saml.utils.time import get_time_string 30 31 32@dataclass() 33class SignInRequest: 34 wa: str 35 wtrealm: str 36 wreply: str 37 wctx: str | None 38 39 @staticmethod 40 def parse(request: HttpRequest) -> SignInRequest: 41 action = request.GET.get("wa") 42 if action != WS_FED_ACTION_SIGN_IN: 43 raise ValueError("Invalid action") 44 realm = request.GET.get("wtrealm") 45 if not realm: 46 raise ValueError("Missing Realm") 47 48 req = SignInRequest( 49 wa=action, 50 wtrealm=realm, 51 wreply=request.GET.get("wreply"), 52 wctx=request.GET.get("wctx", ""), 53 ) 54 55 _, provider = req.get_app_provider() 56 if not req.wreply: 57 req.wreply = provider.acs_url 58 if not req.wreply.startswith(provider.acs_url): 59 raise ValueError("Invalid wreply") 60 return req 61 62 def get_app_provider(self): 63 provider: WSFederationProvider = get_object_or_404( 64 WSFederationProvider, audience=self.wtrealm 65 ) 66 application = get_object_or_404(Application, provider=provider) 67 return application, provider 68 69 70class SignInProcessor: 71 provider: WSFederationProvider 72 request: HttpRequest 73 sign_in_request: SignInRequest 74 saml_processor: AssertionProcessor 75 76 def __init__( 77 self, provider: WSFederationProvider, request: HttpRequest, sign_in_request: SignInRequest 78 ): 79 self.provider = provider 80 self.request = request 81 self.sign_in_request = sign_in_request 82 self.saml_processor = AssertionProcessor(self.provider, self.request, AuthNRequest()) 83 self.saml_processor.provider.audience = self.sign_in_request.wtrealm 84 if self.provider.signing_kp: 85 self.saml_processor.provider.sign_assertion = True 86 87 def create_response_token(self): 88 root = Element(f"{{{NS_WS_FED_TRUST}}}RequestSecurityTokenResponse", nsmap=NS_MAP) 89 90 root.append(self.response_add_lifetime()) 91 root.append(self.response_add_applies_to()) 92 root.append(self.response_add_requested_security_token()) 93 root.append( 94 self.response_add_attached_reference( 95 "RequestedAttachedReference", self.saml_processor._assertion_id 96 ) 97 ) 98 root.append( 99 self.response_add_attached_reference( 100 "RequestedUnattachedReference", self.saml_processor._assertion_id 101 ) 102 ) 103 104 token_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}TokenType") 105 token_type.text = WSS_TOKEN_TYPE_SAML2 106 107 request_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}RequestType") 108 request_type.text = "http://schemas.xmlsoap.org/ws/2005/02/trust/Issue" 109 110 key_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}KeyType") 111 key_type.text = "http://schemas.xmlsoap.org/ws/2005/05/identity/NoProofKey" 112 113 return root 114 115 def response_add_lifetime(self) -> _Element: 116 """Add Lifetime element""" 117 lifetime = Element(f"{{{NS_WS_FED_TRUST}}}Lifetime", nsmap=NS_MAP) 118 created = SubElement(lifetime, f"{{{NS_WSS_UTILITY}}}Created") 119 created.text = get_time_string() 120 expires = SubElement(lifetime, f"{{{NS_WSS_UTILITY}}}Expires") 121 expires.text = get_time_string( 122 timedelta_from_string(self.provider.session_valid_not_on_or_after) 123 ) 124 return lifetime 125 126 def response_add_applies_to(self) -> _Element: 127 """Add AppliesTo element""" 128 applies_to = Element(f"{{{NS_POLICY}}}AppliesTo") 129 endpoint_ref = SubElement(applies_to, f"{{{NS_ADDRESSING}}}EndpointReference") 130 address = SubElement(endpoint_ref, f"{{{NS_ADDRESSING}}}Address") 131 address.text = self.sign_in_request.wtrealm 132 return applies_to 133 134 def response_add_requested_security_token(self) -> _Element: 135 """Add RequestedSecurityToken and child assertion""" 136 token = Element(f"{{{NS_WS_FED_TRUST}}}RequestedSecurityToken") 137 token.append(self.saml_processor.get_assertion()) 138 return token 139 140 def response_add_attached_reference(self, tag: str, value: str) -> _Element: 141 ref = Element(f"{{{NS_WS_FED_TRUST}}}{tag}") 142 sec_token_ref = SubElement(ref, f"{{{NS_WSS_SEC}}}SecurityTokenReference") 143 sec_token_ref.attrib[f"{{{NS_WSS_D3P1}}}TokenType"] = WSS_TOKEN_TYPE_SAML2 144 145 key_identifier = SubElement(sec_token_ref, f"{{{NS_WSS_SEC}}}KeyIdentifier") 146 key_identifier.attrib["ValueType"] = WSS_KEY_IDENTIFIER_SAML_ID 147 key_identifier.text = value 148 return ref 149 150 def response(self) -> dict[str, str]: 151 root = self.create_response_token() 152 assertion = root.xpath("//saml:Assertion", namespaces=NS_MAP)[0] 153 if self.provider.signing_kp: 154 self.saml_processor._sign(assertion) 155 str_token = etree.tostring(root).decode("utf-8") # nosec 156 return delete_none_values( 157 { 158 WS_FED_POST_KEY_ACTION: WS_FED_ACTION_SIGN_IN, 159 WS_FED_POST_KEY_RESULT: str_token, 160 WS_FED_POST_KEY_CONTEXT: self.sign_in_request.wctx, 161 } 162 )
@dataclass()
class
SignInRequest:
33@dataclass() 34class SignInRequest: 35 wa: str 36 wtrealm: str 37 wreply: str 38 wctx: str | None 39 40 @staticmethod 41 def parse(request: HttpRequest) -> SignInRequest: 42 action = request.GET.get("wa") 43 if action != WS_FED_ACTION_SIGN_IN: 44 raise ValueError("Invalid action") 45 realm = request.GET.get("wtrealm") 46 if not realm: 47 raise ValueError("Missing Realm") 48 49 req = SignInRequest( 50 wa=action, 51 wtrealm=realm, 52 wreply=request.GET.get("wreply"), 53 wctx=request.GET.get("wctx", ""), 54 ) 55 56 _, provider = req.get_app_provider() 57 if not req.wreply: 58 req.wreply = provider.acs_url 59 if not req.wreply.startswith(provider.acs_url): 60 raise ValueError("Invalid wreply") 61 return req 62 63 def get_app_provider(self): 64 provider: WSFederationProvider = get_object_or_404( 65 WSFederationProvider, audience=self.wtrealm 66 ) 67 application = get_object_or_404(Application, provider=provider) 68 return application, provider
40 @staticmethod 41 def parse(request: HttpRequest) -> SignInRequest: 42 action = request.GET.get("wa") 43 if action != WS_FED_ACTION_SIGN_IN: 44 raise ValueError("Invalid action") 45 realm = request.GET.get("wtrealm") 46 if not realm: 47 raise ValueError("Missing Realm") 48 49 req = SignInRequest( 50 wa=action, 51 wtrealm=realm, 52 wreply=request.GET.get("wreply"), 53 wctx=request.GET.get("wctx", ""), 54 ) 55 56 _, provider = req.get_app_provider() 57 if not req.wreply: 58 req.wreply = provider.acs_url 59 if not req.wreply.startswith(provider.acs_url): 60 raise ValueError("Invalid wreply") 61 return req
class
SignInProcessor:
71class SignInProcessor: 72 provider: WSFederationProvider 73 request: HttpRequest 74 sign_in_request: SignInRequest 75 saml_processor: AssertionProcessor 76 77 def __init__( 78 self, provider: WSFederationProvider, request: HttpRequest, sign_in_request: SignInRequest 79 ): 80 self.provider = provider 81 self.request = request 82 self.sign_in_request = sign_in_request 83 self.saml_processor = AssertionProcessor(self.provider, self.request, AuthNRequest()) 84 self.saml_processor.provider.audience = self.sign_in_request.wtrealm 85 if self.provider.signing_kp: 86 self.saml_processor.provider.sign_assertion = True 87 88 def create_response_token(self): 89 root = Element(f"{{{NS_WS_FED_TRUST}}}RequestSecurityTokenResponse", nsmap=NS_MAP) 90 91 root.append(self.response_add_lifetime()) 92 root.append(self.response_add_applies_to()) 93 root.append(self.response_add_requested_security_token()) 94 root.append( 95 self.response_add_attached_reference( 96 "RequestedAttachedReference", self.saml_processor._assertion_id 97 ) 98 ) 99 root.append( 100 self.response_add_attached_reference( 101 "RequestedUnattachedReference", self.saml_processor._assertion_id 102 ) 103 ) 104 105 token_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}TokenType") 106 token_type.text = WSS_TOKEN_TYPE_SAML2 107 108 request_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}RequestType") 109 request_type.text = "http://schemas.xmlsoap.org/ws/2005/02/trust/Issue" 110 111 key_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}KeyType") 112 key_type.text = "http://schemas.xmlsoap.org/ws/2005/05/identity/NoProofKey" 113 114 return root 115 116 def response_add_lifetime(self) -> _Element: 117 """Add Lifetime element""" 118 lifetime = Element(f"{{{NS_WS_FED_TRUST}}}Lifetime", nsmap=NS_MAP) 119 created = SubElement(lifetime, f"{{{NS_WSS_UTILITY}}}Created") 120 created.text = get_time_string() 121 expires = SubElement(lifetime, f"{{{NS_WSS_UTILITY}}}Expires") 122 expires.text = get_time_string( 123 timedelta_from_string(self.provider.session_valid_not_on_or_after) 124 ) 125 return lifetime 126 127 def response_add_applies_to(self) -> _Element: 128 """Add AppliesTo element""" 129 applies_to = Element(f"{{{NS_POLICY}}}AppliesTo") 130 endpoint_ref = SubElement(applies_to, f"{{{NS_ADDRESSING}}}EndpointReference") 131 address = SubElement(endpoint_ref, f"{{{NS_ADDRESSING}}}Address") 132 address.text = self.sign_in_request.wtrealm 133 return applies_to 134 135 def response_add_requested_security_token(self) -> _Element: 136 """Add RequestedSecurityToken and child assertion""" 137 token = Element(f"{{{NS_WS_FED_TRUST}}}RequestedSecurityToken") 138 token.append(self.saml_processor.get_assertion()) 139 return token 140 141 def response_add_attached_reference(self, tag: str, value: str) -> _Element: 142 ref = Element(f"{{{NS_WS_FED_TRUST}}}{tag}") 143 sec_token_ref = SubElement(ref, f"{{{NS_WSS_SEC}}}SecurityTokenReference") 144 sec_token_ref.attrib[f"{{{NS_WSS_D3P1}}}TokenType"] = WSS_TOKEN_TYPE_SAML2 145 146 key_identifier = SubElement(sec_token_ref, f"{{{NS_WSS_SEC}}}KeyIdentifier") 147 key_identifier.attrib["ValueType"] = WSS_KEY_IDENTIFIER_SAML_ID 148 key_identifier.text = value 149 return ref 150 151 def response(self) -> dict[str, str]: 152 root = self.create_response_token() 153 assertion = root.xpath("//saml:Assertion", namespaces=NS_MAP)[0] 154 if self.provider.signing_kp: 155 self.saml_processor._sign(assertion) 156 str_token = etree.tostring(root).decode("utf-8") # nosec 157 return delete_none_values( 158 { 159 WS_FED_POST_KEY_ACTION: WS_FED_ACTION_SIGN_IN, 160 WS_FED_POST_KEY_RESULT: str_token, 161 WS_FED_POST_KEY_CONTEXT: self.sign_in_request.wctx, 162 } 163 )
SignInProcessor( provider: authentik.enterprise.providers.ws_federation.models.WSFederationProvider, request: django.http.request.HttpRequest, sign_in_request: SignInRequest)
77 def __init__( 78 self, provider: WSFederationProvider, request: HttpRequest, sign_in_request: SignInRequest 79 ): 80 self.provider = provider 81 self.request = request 82 self.sign_in_request = sign_in_request 83 self.saml_processor = AssertionProcessor(self.provider, self.request, AuthNRequest()) 84 self.saml_processor.provider.audience = self.sign_in_request.wtrealm 85 if self.provider.signing_kp: 86 self.saml_processor.provider.sign_assertion = True
sign_in_request: SignInRequest
saml_processor: authentik.providers.saml.processors.assertion.AssertionProcessor
def
create_response_token(self):
88 def create_response_token(self): 89 root = Element(f"{{{NS_WS_FED_TRUST}}}RequestSecurityTokenResponse", nsmap=NS_MAP) 90 91 root.append(self.response_add_lifetime()) 92 root.append(self.response_add_applies_to()) 93 root.append(self.response_add_requested_security_token()) 94 root.append( 95 self.response_add_attached_reference( 96 "RequestedAttachedReference", self.saml_processor._assertion_id 97 ) 98 ) 99 root.append( 100 self.response_add_attached_reference( 101 "RequestedUnattachedReference", self.saml_processor._assertion_id 102 ) 103 ) 104 105 token_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}TokenType") 106 token_type.text = WSS_TOKEN_TYPE_SAML2 107 108 request_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}RequestType") 109 request_type.text = "http://schemas.xmlsoap.org/ws/2005/02/trust/Issue" 110 111 key_type = SubElement(root, f"{{{NS_WS_FED_TRUST}}}KeyType") 112 key_type.text = "http://schemas.xmlsoap.org/ws/2005/05/identity/NoProofKey" 113 114 return root
def
response_add_lifetime(self) -> lxml.etree._Element:
116 def response_add_lifetime(self) -> _Element: 117 """Add Lifetime element""" 118 lifetime = Element(f"{{{NS_WS_FED_TRUST}}}Lifetime", nsmap=NS_MAP) 119 created = SubElement(lifetime, f"{{{NS_WSS_UTILITY}}}Created") 120 created.text = get_time_string() 121 expires = SubElement(lifetime, f"{{{NS_WSS_UTILITY}}}Expires") 122 expires.text = get_time_string( 123 timedelta_from_string(self.provider.session_valid_not_on_or_after) 124 ) 125 return lifetime
Add Lifetime element
def
response_add_applies_to(self) -> lxml.etree._Element:
127 def response_add_applies_to(self) -> _Element: 128 """Add AppliesTo element""" 129 applies_to = Element(f"{{{NS_POLICY}}}AppliesTo") 130 endpoint_ref = SubElement(applies_to, f"{{{NS_ADDRESSING}}}EndpointReference") 131 address = SubElement(endpoint_ref, f"{{{NS_ADDRESSING}}}Address") 132 address.text = self.sign_in_request.wtrealm 133 return applies_to
Add AppliesTo element
def
response_add_requested_security_token(self) -> lxml.etree._Element:
135 def response_add_requested_security_token(self) -> _Element: 136 """Add RequestedSecurityToken and child assertion""" 137 token = Element(f"{{{NS_WS_FED_TRUST}}}RequestedSecurityToken") 138 token.append(self.saml_processor.get_assertion()) 139 return token
Add RequestedSecurityToken and child assertion
def
response_add_attached_reference(self, tag: str, value: str) -> lxml.etree._Element:
141 def response_add_attached_reference(self, tag: str, value: str) -> _Element: 142 ref = Element(f"{{{NS_WS_FED_TRUST}}}{tag}") 143 sec_token_ref = SubElement(ref, f"{{{NS_WSS_SEC}}}SecurityTokenReference") 144 sec_token_ref.attrib[f"{{{NS_WSS_D3P1}}}TokenType"] = WSS_TOKEN_TYPE_SAML2 145 146 key_identifier = SubElement(sec_token_ref, f"{{{NS_WSS_SEC}}}KeyIdentifier") 147 key_identifier.attrib["ValueType"] = WSS_KEY_IDENTIFIER_SAML_ID 148 key_identifier.text = value 149 return ref
def
response(self) -> dict[str, str]:
151 def response(self) -> dict[str, str]: 152 root = self.create_response_token() 153 assertion = root.xpath("//saml:Assertion", namespaces=NS_MAP)[0] 154 if self.provider.signing_kp: 155 self.saml_processor._sign(assertion) 156 str_token = etree.tostring(root).decode("utf-8") # nosec 157 return delete_none_values( 158 { 159 WS_FED_POST_KEY_ACTION: WS_FED_ACTION_SIGN_IN, 160 WS_FED_POST_KEY_RESULT: str_token, 161 WS_FED_POST_KEY_CONTEXT: self.sign_in_request.wctx, 162 } 163 )