authentik.providers.saml.processors.logout_response_processor
LogoutResponse processor
1"""LogoutResponse processor""" 2 3import base64 4from urllib.parse import quote, urlencode 5 6import xmlsec 7from lxml import etree 8from lxml.etree import Element, SubElement 9 10from authentik.common.saml.constants import ( 11 DEFAULT_ISSUER, 12 DIGEST_ALGORITHM_TRANSLATION_MAP, 13 NS_MAP, 14 NS_SAML_ASSERTION, 15 NS_SAML_PROTOCOL, 16 SIGN_ALGORITHM_TRANSFORM_MAP, 17) 18from authentik.providers.saml.models import SAMLProvider 19from authentik.providers.saml.processors.logout_request_parser import LogoutRequest 20from authentik.providers.saml.utils import get_random_id 21from authentik.providers.saml.utils.encoding import deflate_and_base64_encode 22from authentik.providers.saml.utils.time import get_time_string 23 24 25class LogoutResponseProcessor: 26 """Generate a SAML LogoutResponse""" 27 28 provider: SAMLProvider 29 logout_request: LogoutRequest 30 destination: str | None 31 relay_state: str | None 32 issuer: str | None 33 _issue_instant: str 34 _response_id: str 35 36 def __init__( # noqa: PLR0913 37 self, 38 provider: SAMLProvider, 39 logout_request: LogoutRequest, 40 destination: str | None = None, 41 relay_state: str | None = None, 42 issuer: str | None = None, 43 ): 44 self.provider = provider 45 self.logout_request = logout_request 46 self.destination = destination 47 self.relay_state = relay_state or (logout_request.relay_state if logout_request else None) 48 self.issuer = issuer 49 self._issue_instant = get_time_string() 50 self._response_id = get_random_id() 51 52 def _get_issuer_value(self) -> str: 53 """Get issuer value from session, with fallback to provider""" 54 if self.issuer: 55 return self.issuer 56 if self.provider.issuer_override: 57 return self.provider.issuer_override 58 return DEFAULT_ISSUER 59 60 def get_issuer(self) -> Element: 61 """Get Issuer element""" 62 issuer = Element(f"{{{NS_SAML_ASSERTION}}}Issuer") 63 issuer.text = self._get_issuer_value() 64 return issuer 65 66 def build(self, status: str = "Success") -> Element: 67 """Build a SAML LogoutResponse as etree Element""" 68 response = Element(f"{{{NS_SAML_PROTOCOL}}}LogoutResponse", nsmap=NS_MAP) 69 response.attrib["Version"] = "2.0" 70 response.attrib["IssueInstant"] = self._issue_instant 71 response.attrib["ID"] = self._response_id 72 73 if self.destination: 74 response.attrib["Destination"] = self.destination 75 76 if self.logout_request and self.logout_request.id: 77 response.attrib["InResponseTo"] = self.logout_request.id 78 79 response.append(self.get_issuer()) 80 81 # Add Status element 82 status_element = SubElement(response, f"{{{NS_SAML_PROTOCOL}}}Status") 83 status_code = SubElement(status_element, f"{{{NS_SAML_PROTOCOL}}}StatusCode") 84 status_code.attrib["Value"] = f"urn:oasis:names:tc:SAML:2.0:status:{status}" 85 86 return response 87 88 def build_response(self, status: str = "Success") -> str: 89 """Build and sign LogoutResponse, return as XML string (not encoded)""" 90 response = self.build(status) 91 if self.provider.signing_kp and self.provider.sign_logout_response: 92 self._add_signature(response) 93 self._sign_response(response) 94 return etree.tostring(response).decode() 95 96 def encode_post(self, status: str = "Success") -> str: 97 """Encode LogoutResponse for POST binding""" 98 response = self.build(status) 99 if self.provider.signing_kp and self.provider.sign_logout_response: 100 self._add_signature(response) 101 self._sign_response(response) 102 return base64.b64encode(etree.tostring(response)).decode() 103 104 def encode_redirect(self, status: str = "Success") -> str: 105 """Encode LogoutResponse for Redirect binding""" 106 response = self.build(status) 107 # Note: For redirect binding, signatures are added as query parameters, not in XML 108 xml_str = etree.tostring(response, encoding="UTF-8", xml_declaration=True) 109 return deflate_and_base64_encode(xml_str.decode("UTF-8")) 110 111 def get_redirect_url(self, status: str = "Success") -> str: 112 """Build complete logout response URL for redirect binding with signature if needed""" 113 encoded_response = self.encode_redirect(status) 114 params = { 115 "SAMLResponse": encoded_response, 116 } 117 118 if self.relay_state: 119 params["RelayState"] = self.relay_state 120 121 if self.provider.signing_kp and self.provider.sign_logout_response: 122 sig_alg = self.provider.signature_algorithm 123 params["SigAlg"] = sig_alg 124 125 # Build the string to sign 126 query_string = self._build_signable_query_string(params) 127 128 signature = self._sign_query_string(query_string) 129 params["Signature"] = base64.b64encode(signature).decode() 130 131 # Some SP's use query params on their sls endpoint 132 if not self.destination: 133 raise ValueError("destination is required for redirect URL") 134 135 separator = "&" if "?" in self.destination else "?" 136 return f"{self.destination}{separator}{urlencode(params)}" 137 138 def _add_signature(self, element: Element): 139 """Add signature placeholder to element""" 140 sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get( 141 self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha1 142 ) 143 signature = xmlsec.template.create( 144 element, 145 xmlsec.constants.TransformExclC14N, 146 sign_algorithm_transform, 147 ns=xmlsec.constants.DSigNs, 148 ) 149 element.insert(1, signature) # Insert after Issuer 150 151 def _sign_response(self, response: Element): 152 """Sign the response element""" 153 digest_algorithm_transform = DIGEST_ALGORITHM_TRANSLATION_MAP.get( 154 self.provider.digest_algorithm, xmlsec.constants.TransformSha1 155 ) 156 157 xmlsec.tree.add_ids(response, ["ID"]) 158 signature_node = xmlsec.tree.find_node(response, xmlsec.constants.NodeSignature) 159 160 ref = xmlsec.template.add_reference( 161 signature_node, 162 digest_algorithm_transform, 163 uri="#" + response.attrib["ID"], 164 ) 165 xmlsec.template.add_transform(ref, xmlsec.constants.TransformEnveloped) 166 xmlsec.template.add_transform(ref, xmlsec.constants.TransformExclC14N) 167 key_info = xmlsec.template.ensure_key_info(signature_node) 168 xmlsec.template.add_x509_data(key_info) 169 170 ctx = xmlsec.SignatureContext() 171 ctx.key = xmlsec.Key.from_memory( 172 self.provider.signing_kp.key_data, # Use key_data for the private key 173 xmlsec.constants.KeyDataFormatPem, 174 ) 175 ctx.key.load_cert_from_memory( 176 self.provider.signing_kp.certificate_data, xmlsec.constants.KeyDataFormatPem 177 ) 178 ctx.sign(signature_node) 179 180 def _build_signable_query_string(self, params: dict) -> str: 181 """Build query string for signing (order matters per SAML spec)""" 182 # SAML spec requires specific order: SAMLResponse, RelayState, SigAlg 183 # Values must be URL-encoded individually before concatenation 184 ordered = [] 185 if "SAMLResponse" in params: 186 ordered.append(f"SAMLResponse={quote(params['SAMLResponse'], safe='')}") 187 if "RelayState" in params: 188 ordered.append(f"RelayState={quote(params['RelayState'], safe='')}") 189 if "SigAlg" in params: 190 ordered.append(f"SigAlg={quote(params['SigAlg'], safe='')}") 191 return "&".join(ordered) 192 193 def _sign_query_string(self, query_string: str) -> bytes: 194 """Sign the query string for redirect binding""" 195 signature_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get( 196 self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha256 197 ) 198 199 key = xmlsec.Key.from_memory( 200 self.provider.signing_kp.key_data, 201 xmlsec.constants.KeyDataFormatPem, 202 None, 203 ) 204 205 ctx = xmlsec.SignatureContext() 206 ctx.key = key 207 208 return ctx.sign_binary(query_string.encode("utf-8"), signature_algorithm_transform)
class
LogoutResponseProcessor:
26class LogoutResponseProcessor: 27 """Generate a SAML LogoutResponse""" 28 29 provider: SAMLProvider 30 logout_request: LogoutRequest 31 destination: str | None 32 relay_state: str | None 33 issuer: str | None 34 _issue_instant: str 35 _response_id: str 36 37 def __init__( # noqa: PLR0913 38 self, 39 provider: SAMLProvider, 40 logout_request: LogoutRequest, 41 destination: str | None = None, 42 relay_state: str | None = None, 43 issuer: str | None = None, 44 ): 45 self.provider = provider 46 self.logout_request = logout_request 47 self.destination = destination 48 self.relay_state = relay_state or (logout_request.relay_state if logout_request else None) 49 self.issuer = issuer 50 self._issue_instant = get_time_string() 51 self._response_id = get_random_id() 52 53 def _get_issuer_value(self) -> str: 54 """Get issuer value from session, with fallback to provider""" 55 if self.issuer: 56 return self.issuer 57 if self.provider.issuer_override: 58 return self.provider.issuer_override 59 return DEFAULT_ISSUER 60 61 def get_issuer(self) -> Element: 62 """Get Issuer element""" 63 issuer = Element(f"{{{NS_SAML_ASSERTION}}}Issuer") 64 issuer.text = self._get_issuer_value() 65 return issuer 66 67 def build(self, status: str = "Success") -> Element: 68 """Build a SAML LogoutResponse as etree Element""" 69 response = Element(f"{{{NS_SAML_PROTOCOL}}}LogoutResponse", nsmap=NS_MAP) 70 response.attrib["Version"] = "2.0" 71 response.attrib["IssueInstant"] = self._issue_instant 72 response.attrib["ID"] = self._response_id 73 74 if self.destination: 75 response.attrib["Destination"] = self.destination 76 77 if self.logout_request and self.logout_request.id: 78 response.attrib["InResponseTo"] = self.logout_request.id 79 80 response.append(self.get_issuer()) 81 82 # Add Status element 83 status_element = SubElement(response, f"{{{NS_SAML_PROTOCOL}}}Status") 84 status_code = SubElement(status_element, f"{{{NS_SAML_PROTOCOL}}}StatusCode") 85 status_code.attrib["Value"] = f"urn:oasis:names:tc:SAML:2.0:status:{status}" 86 87 return response 88 89 def build_response(self, status: str = "Success") -> str: 90 """Build and sign LogoutResponse, return as XML string (not encoded)""" 91 response = self.build(status) 92 if self.provider.signing_kp and self.provider.sign_logout_response: 93 self._add_signature(response) 94 self._sign_response(response) 95 return etree.tostring(response).decode() 96 97 def encode_post(self, status: str = "Success") -> str: 98 """Encode LogoutResponse for POST binding""" 99 response = self.build(status) 100 if self.provider.signing_kp and self.provider.sign_logout_response: 101 self._add_signature(response) 102 self._sign_response(response) 103 return base64.b64encode(etree.tostring(response)).decode() 104 105 def encode_redirect(self, status: str = "Success") -> str: 106 """Encode LogoutResponse for Redirect binding""" 107 response = self.build(status) 108 # Note: For redirect binding, signatures are added as query parameters, not in XML 109 xml_str = etree.tostring(response, encoding="UTF-8", xml_declaration=True) 110 return deflate_and_base64_encode(xml_str.decode("UTF-8")) 111 112 def get_redirect_url(self, status: str = "Success") -> str: 113 """Build complete logout response URL for redirect binding with signature if needed""" 114 encoded_response = self.encode_redirect(status) 115 params = { 116 "SAMLResponse": encoded_response, 117 } 118 119 if self.relay_state: 120 params["RelayState"] = self.relay_state 121 122 if self.provider.signing_kp and self.provider.sign_logout_response: 123 sig_alg = self.provider.signature_algorithm 124 params["SigAlg"] = sig_alg 125 126 # Build the string to sign 127 query_string = self._build_signable_query_string(params) 128 129 signature = self._sign_query_string(query_string) 130 params["Signature"] = base64.b64encode(signature).decode() 131 132 # Some SP's use query params on their sls endpoint 133 if not self.destination: 134 raise ValueError("destination is required for redirect URL") 135 136 separator = "&" if "?" in self.destination else "?" 137 return f"{self.destination}{separator}{urlencode(params)}" 138 139 def _add_signature(self, element: Element): 140 """Add signature placeholder to element""" 141 sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get( 142 self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha1 143 ) 144 signature = xmlsec.template.create( 145 element, 146 xmlsec.constants.TransformExclC14N, 147 sign_algorithm_transform, 148 ns=xmlsec.constants.DSigNs, 149 ) 150 element.insert(1, signature) # Insert after Issuer 151 152 def _sign_response(self, response: Element): 153 """Sign the response element""" 154 digest_algorithm_transform = DIGEST_ALGORITHM_TRANSLATION_MAP.get( 155 self.provider.digest_algorithm, xmlsec.constants.TransformSha1 156 ) 157 158 xmlsec.tree.add_ids(response, ["ID"]) 159 signature_node = xmlsec.tree.find_node(response, xmlsec.constants.NodeSignature) 160 161 ref = xmlsec.template.add_reference( 162 signature_node, 163 digest_algorithm_transform, 164 uri="#" + response.attrib["ID"], 165 ) 166 xmlsec.template.add_transform(ref, xmlsec.constants.TransformEnveloped) 167 xmlsec.template.add_transform(ref, xmlsec.constants.TransformExclC14N) 168 key_info = xmlsec.template.ensure_key_info(signature_node) 169 xmlsec.template.add_x509_data(key_info) 170 171 ctx = xmlsec.SignatureContext() 172 ctx.key = xmlsec.Key.from_memory( 173 self.provider.signing_kp.key_data, # Use key_data for the private key 174 xmlsec.constants.KeyDataFormatPem, 175 ) 176 ctx.key.load_cert_from_memory( 177 self.provider.signing_kp.certificate_data, xmlsec.constants.KeyDataFormatPem 178 ) 179 ctx.sign(signature_node) 180 181 def _build_signable_query_string(self, params: dict) -> str: 182 """Build query string for signing (order matters per SAML spec)""" 183 # SAML spec requires specific order: SAMLResponse, RelayState, SigAlg 184 # Values must be URL-encoded individually before concatenation 185 ordered = [] 186 if "SAMLResponse" in params: 187 ordered.append(f"SAMLResponse={quote(params['SAMLResponse'], safe='')}") 188 if "RelayState" in params: 189 ordered.append(f"RelayState={quote(params['RelayState'], safe='')}") 190 if "SigAlg" in params: 191 ordered.append(f"SigAlg={quote(params['SigAlg'], safe='')}") 192 return "&".join(ordered) 193 194 def _sign_query_string(self, query_string: str) -> bytes: 195 """Sign the query string for redirect binding""" 196 signature_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get( 197 self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha256 198 ) 199 200 key = xmlsec.Key.from_memory( 201 self.provider.signing_kp.key_data, 202 xmlsec.constants.KeyDataFormatPem, 203 None, 204 ) 205 206 ctx = xmlsec.SignatureContext() 207 ctx.key = key 208 209 return ctx.sign_binary(query_string.encode("utf-8"), signature_algorithm_transform)
Generate a SAML LogoutResponse
LogoutResponseProcessor( provider: authentik.providers.saml.models.SAMLProvider, logout_request: authentik.providers.saml.processors.logout_request_parser.LogoutRequest, destination: str | None = None, relay_state: str | None = None, issuer: str | None = None)
37 def __init__( # noqa: PLR0913 38 self, 39 provider: SAMLProvider, 40 logout_request: LogoutRequest, 41 destination: str | None = None, 42 relay_state: str | None = None, 43 issuer: str | None = None, 44 ): 45 self.provider = provider 46 self.logout_request = logout_request 47 self.destination = destination 48 self.relay_state = relay_state or (logout_request.relay_state if logout_request else None) 49 self.issuer = issuer 50 self._issue_instant = get_time_string() 51 self._response_id = get_random_id()
def
get_issuer(self) -> lxml.etree.Element:
61 def get_issuer(self) -> Element: 62 """Get Issuer element""" 63 issuer = Element(f"{{{NS_SAML_ASSERTION}}}Issuer") 64 issuer.text = self._get_issuer_value() 65 return issuer
Get Issuer element
def
build(self, status: str = 'Success') -> lxml.etree.Element:
67 def build(self, status: str = "Success") -> Element: 68 """Build a SAML LogoutResponse as etree Element""" 69 response = Element(f"{{{NS_SAML_PROTOCOL}}}LogoutResponse", nsmap=NS_MAP) 70 response.attrib["Version"] = "2.0" 71 response.attrib["IssueInstant"] = self._issue_instant 72 response.attrib["ID"] = self._response_id 73 74 if self.destination: 75 response.attrib["Destination"] = self.destination 76 77 if self.logout_request and self.logout_request.id: 78 response.attrib["InResponseTo"] = self.logout_request.id 79 80 response.append(self.get_issuer()) 81 82 # Add Status element 83 status_element = SubElement(response, f"{{{NS_SAML_PROTOCOL}}}Status") 84 status_code = SubElement(status_element, f"{{{NS_SAML_PROTOCOL}}}StatusCode") 85 status_code.attrib["Value"] = f"urn:oasis:names:tc:SAML:2.0:status:{status}" 86 87 return response
Build a SAML LogoutResponse as etree Element
def
build_response(self, status: str = 'Success') -> str:
89 def build_response(self, status: str = "Success") -> str: 90 """Build and sign LogoutResponse, return as XML string (not encoded)""" 91 response = self.build(status) 92 if self.provider.signing_kp and self.provider.sign_logout_response: 93 self._add_signature(response) 94 self._sign_response(response) 95 return etree.tostring(response).decode()
Build and sign LogoutResponse, return as XML string (not encoded)
def
encode_post(self, status: str = 'Success') -> str:
97 def encode_post(self, status: str = "Success") -> str: 98 """Encode LogoutResponse for POST binding""" 99 response = self.build(status) 100 if self.provider.signing_kp and self.provider.sign_logout_response: 101 self._add_signature(response) 102 self._sign_response(response) 103 return base64.b64encode(etree.tostring(response)).decode()
Encode LogoutResponse for POST binding
def
encode_redirect(self, status: str = 'Success') -> str:
105 def encode_redirect(self, status: str = "Success") -> str: 106 """Encode LogoutResponse for Redirect binding""" 107 response = self.build(status) 108 # Note: For redirect binding, signatures are added as query parameters, not in XML 109 xml_str = etree.tostring(response, encoding="UTF-8", xml_declaration=True) 110 return deflate_and_base64_encode(xml_str.decode("UTF-8"))
Encode LogoutResponse for Redirect binding
def
get_redirect_url(self, status: str = 'Success') -> str:
112 def get_redirect_url(self, status: str = "Success") -> str: 113 """Build complete logout response URL for redirect binding with signature if needed""" 114 encoded_response = self.encode_redirect(status) 115 params = { 116 "SAMLResponse": encoded_response, 117 } 118 119 if self.relay_state: 120 params["RelayState"] = self.relay_state 121 122 if self.provider.signing_kp and self.provider.sign_logout_response: 123 sig_alg = self.provider.signature_algorithm 124 params["SigAlg"] = sig_alg 125 126 # Build the string to sign 127 query_string = self._build_signable_query_string(params) 128 129 signature = self._sign_query_string(query_string) 130 params["Signature"] = base64.b64encode(signature).decode() 131 132 # Some SP's use query params on their sls endpoint 133 if not self.destination: 134 raise ValueError("destination is required for redirect URL") 135 136 separator = "&" if "?" in self.destination else "?" 137 return f"{self.destination}{separator}{urlencode(params)}"
Build complete logout response URL for redirect binding with signature if needed