authentik.providers.saml.processors.logout_response_processor
LogoutResponse processor
1"""LogoutResponse processor""" 2 3import base64 4from urllib.parse import quote, urlencode 5 6import xmlsec 7from lxml import etree 8from lxml.etree import Element, SubElement 9 10from authentik.common.saml.constants import ( 11 DIGEST_ALGORITHM_TRANSLATION_MAP, 12 NS_MAP, 13 NS_SAML_ASSERTION, 14 NS_SAML_PROTOCOL, 15 SIGN_ALGORITHM_TRANSFORM_MAP, 16) 17from authentik.providers.saml.models import SAMLProvider 18from authentik.providers.saml.processors.logout_request_parser import LogoutRequest 19from authentik.providers.saml.utils import get_random_id 20from authentik.providers.saml.utils.encoding import deflate_and_base64_encode 21from authentik.providers.saml.utils.time import get_time_string 22 23 24class LogoutResponseProcessor: 25 """Generate a SAML LogoutResponse""" 26 27 provider: SAMLProvider 28 logout_request: LogoutRequest 29 destination: str | None 30 relay_state: str | None 31 _issue_instant: str 32 _response_id: str 33 34 def __init__( 35 self, 36 provider: SAMLProvider, 37 logout_request: LogoutRequest, 38 destination: str | None = None, 39 relay_state: str | None = None, 40 ): 41 self.provider = provider 42 self.logout_request = logout_request 43 self.destination = destination 44 self.relay_state = relay_state or (logout_request.relay_state if logout_request else None) 45 self._issue_instant = get_time_string() 46 self._response_id = get_random_id() 47 48 def get_issuer(self) -> Element: 49 """Get Issuer element""" 50 issuer = Element(f"{{{NS_SAML_ASSERTION}}}Issuer") 51 issuer.text = self.provider.issuer 52 return issuer 53 54 def build(self, status: str = "Success") -> Element: 55 """Build a SAML LogoutResponse as etree Element""" 56 response = Element(f"{{{NS_SAML_PROTOCOL}}}LogoutResponse", nsmap=NS_MAP) 57 response.attrib["Version"] = "2.0" 58 response.attrib["IssueInstant"] = self._issue_instant 59 response.attrib["ID"] = self._response_id 60 61 if self.destination: 62 response.attrib["Destination"] = self.destination 63 64 if self.logout_request and self.logout_request.id: 65 response.attrib["InResponseTo"] = self.logout_request.id 66 67 response.append(self.get_issuer()) 68 69 # Add Status element 70 status_element = SubElement(response, f"{{{NS_SAML_PROTOCOL}}}Status") 71 status_code = SubElement(status_element, f"{{{NS_SAML_PROTOCOL}}}StatusCode") 72 status_code.attrib["Value"] = f"urn:oasis:names:tc:SAML:2.0:status:{status}" 73 74 return response 75 76 def build_response(self, status: str = "Success") -> str: 77 """Build and sign LogoutResponse, return as XML string (not encoded)""" 78 response = self.build(status) 79 if self.provider.signing_kp and self.provider.sign_logout_response: 80 self._add_signature(response) 81 self._sign_response(response) 82 return etree.tostring(response).decode() 83 84 def encode_post(self, status: str = "Success") -> str: 85 """Encode LogoutResponse for POST binding""" 86 response = self.build(status) 87 if self.provider.signing_kp and self.provider.sign_logout_response: 88 self._add_signature(response) 89 self._sign_response(response) 90 return base64.b64encode(etree.tostring(response)).decode() 91 92 def encode_redirect(self, status: str = "Success") -> str: 93 """Encode LogoutResponse for Redirect binding""" 94 response = self.build(status) 95 # Note: For redirect binding, signatures are added as query parameters, not in XML 96 xml_str = etree.tostring(response, encoding="UTF-8", xml_declaration=True) 97 return deflate_and_base64_encode(xml_str.decode("UTF-8")) 98 99 def get_redirect_url(self, status: str = "Success") -> str: 100 """Build complete logout response URL for redirect binding with signature if needed""" 101 encoded_response = self.encode_redirect(status) 102 params = { 103 "SAMLResponse": encoded_response, 104 } 105 106 if self.relay_state: 107 params["RelayState"] = self.relay_state 108 109 if self.provider.signing_kp and self.provider.sign_logout_response: 110 sig_alg = self.provider.signature_algorithm 111 params["SigAlg"] = sig_alg 112 113 # Build the string to sign 114 query_string = self._build_signable_query_string(params) 115 116 signature = self._sign_query_string(query_string) 117 params["Signature"] = base64.b64encode(signature).decode() 118 119 # Some SP's use query params on their sls endpoint 120 if not self.destination: 121 raise ValueError("destination is required for redirect URL") 122 123 separator = "&" if "?" in self.destination else "?" 124 return f"{self.destination}{separator}{urlencode(params)}" 125 126 def _add_signature(self, element: Element): 127 """Add signature placeholder to element""" 128 sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get( 129 self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha1 130 ) 131 signature = xmlsec.template.create( 132 element, 133 xmlsec.constants.TransformExclC14N, 134 sign_algorithm_transform, 135 ns=xmlsec.constants.DSigNs, 136 ) 137 element.insert(1, signature) # Insert after Issuer 138 139 def _sign_response(self, response: Element): 140 """Sign the response element""" 141 digest_algorithm_transform = DIGEST_ALGORITHM_TRANSLATION_MAP.get( 142 self.provider.digest_algorithm, xmlsec.constants.TransformSha1 143 ) 144 145 xmlsec.tree.add_ids(response, ["ID"]) 146 signature_node = xmlsec.tree.find_node(response, xmlsec.constants.NodeSignature) 147 148 ref = xmlsec.template.add_reference( 149 signature_node, 150 digest_algorithm_transform, 151 uri="#" + response.attrib["ID"], 152 ) 153 xmlsec.template.add_transform(ref, xmlsec.constants.TransformEnveloped) 154 xmlsec.template.add_transform(ref, xmlsec.constants.TransformExclC14N) 155 key_info = xmlsec.template.ensure_key_info(signature_node) 156 xmlsec.template.add_x509_data(key_info) 157 158 ctx = xmlsec.SignatureContext() 159 ctx.key = xmlsec.Key.from_memory( 160 self.provider.signing_kp.key_data, # Use key_data for the private key 161 xmlsec.constants.KeyDataFormatPem, 162 ) 163 ctx.key.load_cert_from_memory( 164 self.provider.signing_kp.certificate_data, xmlsec.constants.KeyDataFormatPem 165 ) 166 ctx.sign(signature_node) 167 168 def _build_signable_query_string(self, params: dict) -> str: 169 """Build query string for signing (order matters per SAML spec)""" 170 # SAML spec requires specific order: SAMLResponse, RelayState, SigAlg 171 # Values must be URL-encoded individually before concatenation 172 ordered = [] 173 if "SAMLResponse" in params: 174 ordered.append(f"SAMLResponse={quote(params['SAMLResponse'], safe='')}") 175 if "RelayState" in params: 176 ordered.append(f"RelayState={quote(params['RelayState'], safe='')}") 177 if "SigAlg" in params: 178 ordered.append(f"SigAlg={quote(params['SigAlg'], safe='')}") 179 return "&".join(ordered) 180 181 def _sign_query_string(self, query_string: str) -> bytes: 182 """Sign the query string for redirect binding""" 183 signature_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get( 184 self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha256 185 ) 186 187 key = xmlsec.Key.from_memory( 188 self.provider.signing_kp.key_data, 189 xmlsec.constants.KeyDataFormatPem, 190 None, 191 ) 192 193 ctx = xmlsec.SignatureContext() 194 ctx.key = key 195 196 return ctx.sign_binary(query_string.encode("utf-8"), signature_algorithm_transform)
class
LogoutResponseProcessor:
25class LogoutResponseProcessor: 26 """Generate a SAML LogoutResponse""" 27 28 provider: SAMLProvider 29 logout_request: LogoutRequest 30 destination: str | None 31 relay_state: str | None 32 _issue_instant: str 33 _response_id: str 34 35 def __init__( 36 self, 37 provider: SAMLProvider, 38 logout_request: LogoutRequest, 39 destination: str | None = None, 40 relay_state: str | None = None, 41 ): 42 self.provider = provider 43 self.logout_request = logout_request 44 self.destination = destination 45 self.relay_state = relay_state or (logout_request.relay_state if logout_request else None) 46 self._issue_instant = get_time_string() 47 self._response_id = get_random_id() 48 49 def get_issuer(self) -> Element: 50 """Get Issuer element""" 51 issuer = Element(f"{{{NS_SAML_ASSERTION}}}Issuer") 52 issuer.text = self.provider.issuer 53 return issuer 54 55 def build(self, status: str = "Success") -> Element: 56 """Build a SAML LogoutResponse as etree Element""" 57 response = Element(f"{{{NS_SAML_PROTOCOL}}}LogoutResponse", nsmap=NS_MAP) 58 response.attrib["Version"] = "2.0" 59 response.attrib["IssueInstant"] = self._issue_instant 60 response.attrib["ID"] = self._response_id 61 62 if self.destination: 63 response.attrib["Destination"] = self.destination 64 65 if self.logout_request and self.logout_request.id: 66 response.attrib["InResponseTo"] = self.logout_request.id 67 68 response.append(self.get_issuer()) 69 70 # Add Status element 71 status_element = SubElement(response, f"{{{NS_SAML_PROTOCOL}}}Status") 72 status_code = SubElement(status_element, f"{{{NS_SAML_PROTOCOL}}}StatusCode") 73 status_code.attrib["Value"] = f"urn:oasis:names:tc:SAML:2.0:status:{status}" 74 75 return response 76 77 def build_response(self, status: str = "Success") -> str: 78 """Build and sign LogoutResponse, return as XML string (not encoded)""" 79 response = self.build(status) 80 if self.provider.signing_kp and self.provider.sign_logout_response: 81 self._add_signature(response) 82 self._sign_response(response) 83 return etree.tostring(response).decode() 84 85 def encode_post(self, status: str = "Success") -> str: 86 """Encode LogoutResponse for POST binding""" 87 response = self.build(status) 88 if self.provider.signing_kp and self.provider.sign_logout_response: 89 self._add_signature(response) 90 self._sign_response(response) 91 return base64.b64encode(etree.tostring(response)).decode() 92 93 def encode_redirect(self, status: str = "Success") -> str: 94 """Encode LogoutResponse for Redirect binding""" 95 response = self.build(status) 96 # Note: For redirect binding, signatures are added as query parameters, not in XML 97 xml_str = etree.tostring(response, encoding="UTF-8", xml_declaration=True) 98 return deflate_and_base64_encode(xml_str.decode("UTF-8")) 99 100 def get_redirect_url(self, status: str = "Success") -> str: 101 """Build complete logout response URL for redirect binding with signature if needed""" 102 encoded_response = self.encode_redirect(status) 103 params = { 104 "SAMLResponse": encoded_response, 105 } 106 107 if self.relay_state: 108 params["RelayState"] = self.relay_state 109 110 if self.provider.signing_kp and self.provider.sign_logout_response: 111 sig_alg = self.provider.signature_algorithm 112 params["SigAlg"] = sig_alg 113 114 # Build the string to sign 115 query_string = self._build_signable_query_string(params) 116 117 signature = self._sign_query_string(query_string) 118 params["Signature"] = base64.b64encode(signature).decode() 119 120 # Some SP's use query params on their sls endpoint 121 if not self.destination: 122 raise ValueError("destination is required for redirect URL") 123 124 separator = "&" if "?" in self.destination else "?" 125 return f"{self.destination}{separator}{urlencode(params)}" 126 127 def _add_signature(self, element: Element): 128 """Add signature placeholder to element""" 129 sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get( 130 self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha1 131 ) 132 signature = xmlsec.template.create( 133 element, 134 xmlsec.constants.TransformExclC14N, 135 sign_algorithm_transform, 136 ns=xmlsec.constants.DSigNs, 137 ) 138 element.insert(1, signature) # Insert after Issuer 139 140 def _sign_response(self, response: Element): 141 """Sign the response element""" 142 digest_algorithm_transform = DIGEST_ALGORITHM_TRANSLATION_MAP.get( 143 self.provider.digest_algorithm, xmlsec.constants.TransformSha1 144 ) 145 146 xmlsec.tree.add_ids(response, ["ID"]) 147 signature_node = xmlsec.tree.find_node(response, xmlsec.constants.NodeSignature) 148 149 ref = xmlsec.template.add_reference( 150 signature_node, 151 digest_algorithm_transform, 152 uri="#" + response.attrib["ID"], 153 ) 154 xmlsec.template.add_transform(ref, xmlsec.constants.TransformEnveloped) 155 xmlsec.template.add_transform(ref, xmlsec.constants.TransformExclC14N) 156 key_info = xmlsec.template.ensure_key_info(signature_node) 157 xmlsec.template.add_x509_data(key_info) 158 159 ctx = xmlsec.SignatureContext() 160 ctx.key = xmlsec.Key.from_memory( 161 self.provider.signing_kp.key_data, # Use key_data for the private key 162 xmlsec.constants.KeyDataFormatPem, 163 ) 164 ctx.key.load_cert_from_memory( 165 self.provider.signing_kp.certificate_data, xmlsec.constants.KeyDataFormatPem 166 ) 167 ctx.sign(signature_node) 168 169 def _build_signable_query_string(self, params: dict) -> str: 170 """Build query string for signing (order matters per SAML spec)""" 171 # SAML spec requires specific order: SAMLResponse, RelayState, SigAlg 172 # Values must be URL-encoded individually before concatenation 173 ordered = [] 174 if "SAMLResponse" in params: 175 ordered.append(f"SAMLResponse={quote(params['SAMLResponse'], safe='')}") 176 if "RelayState" in params: 177 ordered.append(f"RelayState={quote(params['RelayState'], safe='')}") 178 if "SigAlg" in params: 179 ordered.append(f"SigAlg={quote(params['SigAlg'], safe='')}") 180 return "&".join(ordered) 181 182 def _sign_query_string(self, query_string: str) -> bytes: 183 """Sign the query string for redirect binding""" 184 signature_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get( 185 self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha256 186 ) 187 188 key = xmlsec.Key.from_memory( 189 self.provider.signing_kp.key_data, 190 xmlsec.constants.KeyDataFormatPem, 191 None, 192 ) 193 194 ctx = xmlsec.SignatureContext() 195 ctx.key = key 196 197 return ctx.sign_binary(query_string.encode("utf-8"), signature_algorithm_transform)
Generate a SAML LogoutResponse
LogoutResponseProcessor( provider: authentik.providers.saml.models.SAMLProvider, logout_request: authentik.providers.saml.processors.logout_request_parser.LogoutRequest, destination: str | None = None, relay_state: str | None = None)
35 def __init__( 36 self, 37 provider: SAMLProvider, 38 logout_request: LogoutRequest, 39 destination: str | None = None, 40 relay_state: str | None = None, 41 ): 42 self.provider = provider 43 self.logout_request = logout_request 44 self.destination = destination 45 self.relay_state = relay_state or (logout_request.relay_state if logout_request else None) 46 self._issue_instant = get_time_string() 47 self._response_id = get_random_id()
def
get_issuer(self) -> lxml.etree.Element:
49 def get_issuer(self) -> Element: 50 """Get Issuer element""" 51 issuer = Element(f"{{{NS_SAML_ASSERTION}}}Issuer") 52 issuer.text = self.provider.issuer 53 return issuer
Get Issuer element
def
build(self, status: str = 'Success') -> lxml.etree.Element:
55 def build(self, status: str = "Success") -> Element: 56 """Build a SAML LogoutResponse as etree Element""" 57 response = Element(f"{{{NS_SAML_PROTOCOL}}}LogoutResponse", nsmap=NS_MAP) 58 response.attrib["Version"] = "2.0" 59 response.attrib["IssueInstant"] = self._issue_instant 60 response.attrib["ID"] = self._response_id 61 62 if self.destination: 63 response.attrib["Destination"] = self.destination 64 65 if self.logout_request and self.logout_request.id: 66 response.attrib["InResponseTo"] = self.logout_request.id 67 68 response.append(self.get_issuer()) 69 70 # Add Status element 71 status_element = SubElement(response, f"{{{NS_SAML_PROTOCOL}}}Status") 72 status_code = SubElement(status_element, f"{{{NS_SAML_PROTOCOL}}}StatusCode") 73 status_code.attrib["Value"] = f"urn:oasis:names:tc:SAML:2.0:status:{status}" 74 75 return response
Build a SAML LogoutResponse as etree Element
def
build_response(self, status: str = 'Success') -> str:
77 def build_response(self, status: str = "Success") -> str: 78 """Build and sign LogoutResponse, return as XML string (not encoded)""" 79 response = self.build(status) 80 if self.provider.signing_kp and self.provider.sign_logout_response: 81 self._add_signature(response) 82 self._sign_response(response) 83 return etree.tostring(response).decode()
Build and sign LogoutResponse, return as XML string (not encoded)
def
encode_post(self, status: str = 'Success') -> str:
85 def encode_post(self, status: str = "Success") -> str: 86 """Encode LogoutResponse for POST binding""" 87 response = self.build(status) 88 if self.provider.signing_kp and self.provider.sign_logout_response: 89 self._add_signature(response) 90 self._sign_response(response) 91 return base64.b64encode(etree.tostring(response)).decode()
Encode LogoutResponse for POST binding
def
encode_redirect(self, status: str = 'Success') -> str:
93 def encode_redirect(self, status: str = "Success") -> str: 94 """Encode LogoutResponse for Redirect binding""" 95 response = self.build(status) 96 # Note: For redirect binding, signatures are added as query parameters, not in XML 97 xml_str = etree.tostring(response, encoding="UTF-8", xml_declaration=True) 98 return deflate_and_base64_encode(xml_str.decode("UTF-8"))
Encode LogoutResponse for Redirect binding
def
get_redirect_url(self, status: str = 'Success') -> str:
100 def get_redirect_url(self, status: str = "Success") -> str: 101 """Build complete logout response URL for redirect binding with signature if needed""" 102 encoded_response = self.encode_redirect(status) 103 params = { 104 "SAMLResponse": encoded_response, 105 } 106 107 if self.relay_state: 108 params["RelayState"] = self.relay_state 109 110 if self.provider.signing_kp and self.provider.sign_logout_response: 111 sig_alg = self.provider.signature_algorithm 112 params["SigAlg"] = sig_alg 113 114 # Build the string to sign 115 query_string = self._build_signable_query_string(params) 116 117 signature = self._sign_query_string(query_string) 118 params["Signature"] = base64.b64encode(signature).decode() 119 120 # Some SP's use query params on their sls endpoint 121 if not self.destination: 122 raise ValueError("destination is required for redirect URL") 123 124 separator = "&" if "?" in self.destination else "?" 125 return f"{self.destination}{separator}{urlencode(params)}"
Build complete logout response URL for redirect binding with signature if needed