authentik.providers.saml.processors.logout_request
SAML LogoutRequest Processor
1"""SAML LogoutRequest Processor""" 2 3import base64 4from urllib.parse import quote, urlencode 5 6import xmlsec 7from lxml import etree # nosec 8from lxml.etree import Element, _Element 9 10from authentik.common.saml.constants import ( 11 DEFAULT_ISSUER, 12 DIGEST_ALGORITHM_TRANSLATION_MAP, 13 NS_MAP, 14 NS_SAML_ASSERTION, 15 NS_SAML_PROTOCOL, 16 SAML_NAME_ID_FORMAT_EMAIL, 17 SIGN_ALGORITHM_TRANSFORM_MAP, 18) 19from authentik.core.models import User 20from authentik.lib.xml import remove_xml_newlines 21from authentik.providers.saml.models import SAMLProvider 22from authentik.providers.saml.utils import get_random_id 23from authentik.providers.saml.utils.encoding import deflate_and_base64_encode 24from authentik.providers.saml.utils.time import get_time_string 25 26 27class LogoutRequestProcessor: 28 """Generate SAML LogoutRequest messages""" 29 30 provider: SAMLProvider 31 user: User | None 32 destination: str 33 name_id: str | None 34 name_id_format: str 35 session_index: str | None 36 relay_state: str | None 37 issuer: str | None 38 39 _issue_instant: str 40 _request_id: str 41 42 def __init__( # noqa: PLR0913 43 self, 44 provider: SAMLProvider, 45 user: User | None, 46 destination: str, 47 name_id: str | None = None, 48 name_id_format: str = SAML_NAME_ID_FORMAT_EMAIL, 49 session_index: str | None = None, 50 relay_state: str | None = None, 51 issuer: str | None = None, 52 ): 53 self.provider = provider 54 self.user = user 55 self.destination = destination 56 self.name_id = name_id or (user.email if user else None) 57 self.name_id_format = name_id_format 58 self.session_index = session_index 59 self.relay_state = relay_state 60 self.issuer = issuer 61 62 self._issue_instant = get_time_string() 63 self._request_id = get_random_id() 64 65 def _get_issuer_value(self) -> str: 66 """Get issuer value from session, with fallback to provider""" 67 if self.issuer: 68 return self.issuer 69 if self.provider.issuer_override: 70 return self.provider.issuer_override 71 return DEFAULT_ISSUER 72 73 def get_issuer(self) -> Element: 74 """Get Issuer element""" 75 issuer = Element(f"{{{NS_SAML_ASSERTION}}}Issuer") 76 issuer.text = self._get_issuer_value() 77 return issuer 78 79 def get_name_id(self) -> Element: 80 """Get NameID element""" 81 name_id = Element(f"{{{NS_SAML_ASSERTION}}}NameID") 82 name_id.attrib["Format"] = self.name_id_format 83 name_id.text = self.name_id 84 return name_id 85 86 def build(self) -> Element: 87 """Build a SAML LogoutRequest as etree Element""" 88 logout_request = Element(f"{{{NS_SAML_PROTOCOL}}}LogoutRequest", nsmap=NS_MAP) 89 logout_request.attrib["ID"] = self._request_id 90 logout_request.attrib["Version"] = "2.0" 91 logout_request.attrib["IssueInstant"] = self._issue_instant 92 logout_request.attrib["Destination"] = self.destination 93 94 logout_request.append(self.get_issuer()) 95 logout_request.append(self.get_name_id()) 96 97 if self.session_index: 98 session_index_element = Element(f"{{{NS_SAML_PROTOCOL}}}SessionIndex") 99 session_index_element.text = self.session_index 100 logout_request.append(session_index_element) 101 102 return logout_request 103 104 def encode_post(self) -> str: 105 """Encode LogoutRequest for POST binding""" 106 logout_request = self.build() 107 if self.provider.signing_kp and self.provider.sign_logout_request: 108 self._sign_logout_request(logout_request) 109 return base64.b64encode(etree.tostring(logout_request)).decode() 110 111 def encode_redirect(self) -> str: 112 """Encode LogoutRequest for Redirect binding""" 113 logout_request = self.build() 114 # Note: For redirect binding, signatures are added as query parameters, not in XML 115 # Ensure proper XML serialization with encoding declaration 116 xml_str = etree.tostring(logout_request, encoding="UTF-8", xml_declaration=True) 117 return deflate_and_base64_encode(xml_str.decode("UTF-8")) 118 119 def get_redirect_url(self) -> str: 120 """Build complete logout URL for redirect binding with signature if needed""" 121 encoded_request = self.encode_redirect() 122 params = { 123 "SAMLRequest": encoded_request, 124 } 125 126 if self.relay_state: 127 params["RelayState"] = self.relay_state 128 129 if self.provider.signing_kp and self.provider.sign_logout_request: 130 sig_alg = self.provider.signature_algorithm 131 params["SigAlg"] = sig_alg 132 133 # Build the string to sign 134 query_string = self._build_signable_query_string(params) 135 136 signature = self._sign_query_string(query_string) 137 params["Signature"] = base64.b64encode(signature).decode() 138 139 # Some SP's use query params on their sls endpoint 140 separator = "&" if "?" in self.destination else "?" 141 return f"{self.destination}{separator}{urlencode(params)}" 142 143 def get_post_form_data(self) -> dict: 144 """Get form data for POST binding""" 145 return { 146 "SAMLRequest": self.encode_post(), 147 "RelayState": self.relay_state or "", 148 } 149 150 def _sign_logout_request(self, logout_request: _Element): 151 """Sign the LogoutRequest element""" 152 signature_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get( 153 self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha1 154 ) 155 signature = xmlsec.template.create( 156 logout_request, 157 xmlsec.constants.TransformExclC14N, 158 signature_algorithm_transform, 159 ns=xmlsec.constants.DSigNs, 160 ) 161 162 issuer = logout_request.find(f"{{{NS_SAML_ASSERTION}}}Issuer") 163 if issuer is not None: 164 issuer.addnext(signature) 165 else: 166 logout_request.insert(0, signature) 167 168 self._sign(logout_request) 169 170 def _sign(self, element: _Element): 171 """Sign an XML element based on the providers' configured signing settings""" 172 digest_algorithm_transform = DIGEST_ALGORITHM_TRANSLATION_MAP.get( 173 self.provider.digest_algorithm, xmlsec.constants.TransformSha1 174 ) 175 xmlsec.tree.add_ids(element, ["ID"]) 176 signature_node = xmlsec.tree.find_node(element, xmlsec.constants.NodeSignature) 177 ref = xmlsec.template.add_reference( 178 signature_node, 179 digest_algorithm_transform, 180 uri="#" + element.attrib["ID"], 181 ) 182 xmlsec.template.add_transform(ref, xmlsec.constants.TransformEnveloped) 183 xmlsec.template.add_transform(ref, xmlsec.constants.TransformExclC14N) 184 key_info = xmlsec.template.ensure_key_info(signature_node) 185 xmlsec.template.add_x509_data(key_info) 186 187 ctx = xmlsec.SignatureContext() 188 189 key = xmlsec.Key.from_memory( 190 self.provider.signing_kp.key_data, 191 xmlsec.constants.KeyDataFormatPem, 192 None, 193 ) 194 key.load_cert_from_memory( 195 self.provider.signing_kp.certificate_data, 196 xmlsec.constants.KeyDataFormatCertPem, 197 ) 198 ctx.key = key 199 ctx.sign(remove_xml_newlines(element, signature_node)) 200 201 def _build_signable_query_string(self, params: dict) -> str: 202 """Build query string for signing (order matters per SAML spec)""" 203 # SAML spec requires specific order: SAMLRequest, RelayState, SigAlg 204 # Values must be URL-encoded individually before concatenation 205 ordered = [] 206 if "SAMLRequest" in params: 207 ordered.append(f"SAMLRequest={quote(params['SAMLRequest'], safe='')}") 208 if "RelayState" in params: 209 ordered.append(f"RelayState={quote(params['RelayState'], safe='')}") 210 if "SigAlg" in params: 211 ordered.append(f"SigAlg={quote(params['SigAlg'], safe='')}") 212 return "&".join(ordered) 213 214 def _sign_query_string(self, query_string: str) -> bytes: 215 """Sign the query string for redirect binding""" 216 signature_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get( 217 self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha256 218 ) 219 220 key = xmlsec.Key.from_memory( 221 self.provider.signing_kp.key_data, 222 xmlsec.constants.KeyDataFormatPem, 223 None, 224 ) 225 226 ctx = xmlsec.SignatureContext() 227 ctx.key = key 228 229 return ctx.sign_binary(query_string.encode("utf-8"), signature_algorithm_transform)
class
LogoutRequestProcessor:
28class LogoutRequestProcessor: 29 """Generate SAML LogoutRequest messages""" 30 31 provider: SAMLProvider 32 user: User | None 33 destination: str 34 name_id: str | None 35 name_id_format: str 36 session_index: str | None 37 relay_state: str | None 38 issuer: str | None 39 40 _issue_instant: str 41 _request_id: str 42 43 def __init__( # noqa: PLR0913 44 self, 45 provider: SAMLProvider, 46 user: User | None, 47 destination: str, 48 name_id: str | None = None, 49 name_id_format: str = SAML_NAME_ID_FORMAT_EMAIL, 50 session_index: str | None = None, 51 relay_state: str | None = None, 52 issuer: str | None = None, 53 ): 54 self.provider = provider 55 self.user = user 56 self.destination = destination 57 self.name_id = name_id or (user.email if user else None) 58 self.name_id_format = name_id_format 59 self.session_index = session_index 60 self.relay_state = relay_state 61 self.issuer = issuer 62 63 self._issue_instant = get_time_string() 64 self._request_id = get_random_id() 65 66 def _get_issuer_value(self) -> str: 67 """Get issuer value from session, with fallback to provider""" 68 if self.issuer: 69 return self.issuer 70 if self.provider.issuer_override: 71 return self.provider.issuer_override 72 return DEFAULT_ISSUER 73 74 def get_issuer(self) -> Element: 75 """Get Issuer element""" 76 issuer = Element(f"{{{NS_SAML_ASSERTION}}}Issuer") 77 issuer.text = self._get_issuer_value() 78 return issuer 79 80 def get_name_id(self) -> Element: 81 """Get NameID element""" 82 name_id = Element(f"{{{NS_SAML_ASSERTION}}}NameID") 83 name_id.attrib["Format"] = self.name_id_format 84 name_id.text = self.name_id 85 return name_id 86 87 def build(self) -> Element: 88 """Build a SAML LogoutRequest as etree Element""" 89 logout_request = Element(f"{{{NS_SAML_PROTOCOL}}}LogoutRequest", nsmap=NS_MAP) 90 logout_request.attrib["ID"] = self._request_id 91 logout_request.attrib["Version"] = "2.0" 92 logout_request.attrib["IssueInstant"] = self._issue_instant 93 logout_request.attrib["Destination"] = self.destination 94 95 logout_request.append(self.get_issuer()) 96 logout_request.append(self.get_name_id()) 97 98 if self.session_index: 99 session_index_element = Element(f"{{{NS_SAML_PROTOCOL}}}SessionIndex") 100 session_index_element.text = self.session_index 101 logout_request.append(session_index_element) 102 103 return logout_request 104 105 def encode_post(self) -> str: 106 """Encode LogoutRequest for POST binding""" 107 logout_request = self.build() 108 if self.provider.signing_kp and self.provider.sign_logout_request: 109 self._sign_logout_request(logout_request) 110 return base64.b64encode(etree.tostring(logout_request)).decode() 111 112 def encode_redirect(self) -> str: 113 """Encode LogoutRequest for Redirect binding""" 114 logout_request = self.build() 115 # Note: For redirect binding, signatures are added as query parameters, not in XML 116 # Ensure proper XML serialization with encoding declaration 117 xml_str = etree.tostring(logout_request, encoding="UTF-8", xml_declaration=True) 118 return deflate_and_base64_encode(xml_str.decode("UTF-8")) 119 120 def get_redirect_url(self) -> str: 121 """Build complete logout URL for redirect binding with signature if needed""" 122 encoded_request = self.encode_redirect() 123 params = { 124 "SAMLRequest": encoded_request, 125 } 126 127 if self.relay_state: 128 params["RelayState"] = self.relay_state 129 130 if self.provider.signing_kp and self.provider.sign_logout_request: 131 sig_alg = self.provider.signature_algorithm 132 params["SigAlg"] = sig_alg 133 134 # Build the string to sign 135 query_string = self._build_signable_query_string(params) 136 137 signature = self._sign_query_string(query_string) 138 params["Signature"] = base64.b64encode(signature).decode() 139 140 # Some SP's use query params on their sls endpoint 141 separator = "&" if "?" in self.destination else "?" 142 return f"{self.destination}{separator}{urlencode(params)}" 143 144 def get_post_form_data(self) -> dict: 145 """Get form data for POST binding""" 146 return { 147 "SAMLRequest": self.encode_post(), 148 "RelayState": self.relay_state or "", 149 } 150 151 def _sign_logout_request(self, logout_request: _Element): 152 """Sign the LogoutRequest element""" 153 signature_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get( 154 self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha1 155 ) 156 signature = xmlsec.template.create( 157 logout_request, 158 xmlsec.constants.TransformExclC14N, 159 signature_algorithm_transform, 160 ns=xmlsec.constants.DSigNs, 161 ) 162 163 issuer = logout_request.find(f"{{{NS_SAML_ASSERTION}}}Issuer") 164 if issuer is not None: 165 issuer.addnext(signature) 166 else: 167 logout_request.insert(0, signature) 168 169 self._sign(logout_request) 170 171 def _sign(self, element: _Element): 172 """Sign an XML element based on the providers' configured signing settings""" 173 digest_algorithm_transform = DIGEST_ALGORITHM_TRANSLATION_MAP.get( 174 self.provider.digest_algorithm, xmlsec.constants.TransformSha1 175 ) 176 xmlsec.tree.add_ids(element, ["ID"]) 177 signature_node = xmlsec.tree.find_node(element, xmlsec.constants.NodeSignature) 178 ref = xmlsec.template.add_reference( 179 signature_node, 180 digest_algorithm_transform, 181 uri="#" + element.attrib["ID"], 182 ) 183 xmlsec.template.add_transform(ref, xmlsec.constants.TransformEnveloped) 184 xmlsec.template.add_transform(ref, xmlsec.constants.TransformExclC14N) 185 key_info = xmlsec.template.ensure_key_info(signature_node) 186 xmlsec.template.add_x509_data(key_info) 187 188 ctx = xmlsec.SignatureContext() 189 190 key = xmlsec.Key.from_memory( 191 self.provider.signing_kp.key_data, 192 xmlsec.constants.KeyDataFormatPem, 193 None, 194 ) 195 key.load_cert_from_memory( 196 self.provider.signing_kp.certificate_data, 197 xmlsec.constants.KeyDataFormatCertPem, 198 ) 199 ctx.key = key 200 ctx.sign(remove_xml_newlines(element, signature_node)) 201 202 def _build_signable_query_string(self, params: dict) -> str: 203 """Build query string for signing (order matters per SAML spec)""" 204 # SAML spec requires specific order: SAMLRequest, RelayState, SigAlg 205 # Values must be URL-encoded individually before concatenation 206 ordered = [] 207 if "SAMLRequest" in params: 208 ordered.append(f"SAMLRequest={quote(params['SAMLRequest'], safe='')}") 209 if "RelayState" in params: 210 ordered.append(f"RelayState={quote(params['RelayState'], safe='')}") 211 if "SigAlg" in params: 212 ordered.append(f"SigAlg={quote(params['SigAlg'], safe='')}") 213 return "&".join(ordered) 214 215 def _sign_query_string(self, query_string: str) -> bytes: 216 """Sign the query string for redirect binding""" 217 signature_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get( 218 self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha256 219 ) 220 221 key = xmlsec.Key.from_memory( 222 self.provider.signing_kp.key_data, 223 xmlsec.constants.KeyDataFormatPem, 224 None, 225 ) 226 227 ctx = xmlsec.SignatureContext() 228 ctx.key = key 229 230 return ctx.sign_binary(query_string.encode("utf-8"), signature_algorithm_transform)
Generate SAML LogoutRequest messages
LogoutRequestProcessor( provider: authentik.providers.saml.models.SAMLProvider, user: authentik.core.models.User | None, destination: str, name_id: str | None = None, name_id_format: str = 'urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress', session_index: str | None = None, relay_state: str | None = None, issuer: str | None = None)
43 def __init__( # noqa: PLR0913 44 self, 45 provider: SAMLProvider, 46 user: User | None, 47 destination: str, 48 name_id: str | None = None, 49 name_id_format: str = SAML_NAME_ID_FORMAT_EMAIL, 50 session_index: str | None = None, 51 relay_state: str | None = None, 52 issuer: str | None = None, 53 ): 54 self.provider = provider 55 self.user = user 56 self.destination = destination 57 self.name_id = name_id or (user.email if user else None) 58 self.name_id_format = name_id_format 59 self.session_index = session_index 60 self.relay_state = relay_state 61 self.issuer = issuer 62 63 self._issue_instant = get_time_string() 64 self._request_id = get_random_id()
user: authentik.core.models.User | None
def
get_issuer(self) -> lxml.etree.Element:
74 def get_issuer(self) -> Element: 75 """Get Issuer element""" 76 issuer = Element(f"{{{NS_SAML_ASSERTION}}}Issuer") 77 issuer.text = self._get_issuer_value() 78 return issuer
Get Issuer element
def
get_name_id(self) -> lxml.etree.Element:
80 def get_name_id(self) -> Element: 81 """Get NameID element""" 82 name_id = Element(f"{{{NS_SAML_ASSERTION}}}NameID") 83 name_id.attrib["Format"] = self.name_id_format 84 name_id.text = self.name_id 85 return name_id
Get NameID element
def
build(self) -> lxml.etree.Element:
87 def build(self) -> Element: 88 """Build a SAML LogoutRequest as etree Element""" 89 logout_request = Element(f"{{{NS_SAML_PROTOCOL}}}LogoutRequest", nsmap=NS_MAP) 90 logout_request.attrib["ID"] = self._request_id 91 logout_request.attrib["Version"] = "2.0" 92 logout_request.attrib["IssueInstant"] = self._issue_instant 93 logout_request.attrib["Destination"] = self.destination 94 95 logout_request.append(self.get_issuer()) 96 logout_request.append(self.get_name_id()) 97 98 if self.session_index: 99 session_index_element = Element(f"{{{NS_SAML_PROTOCOL}}}SessionIndex") 100 session_index_element.text = self.session_index 101 logout_request.append(session_index_element) 102 103 return logout_request
Build a SAML LogoutRequest as etree Element
def
encode_post(self) -> str:
105 def encode_post(self) -> str: 106 """Encode LogoutRequest for POST binding""" 107 logout_request = self.build() 108 if self.provider.signing_kp and self.provider.sign_logout_request: 109 self._sign_logout_request(logout_request) 110 return base64.b64encode(etree.tostring(logout_request)).decode()
Encode LogoutRequest for POST binding
def
encode_redirect(self) -> str:
112 def encode_redirect(self) -> str: 113 """Encode LogoutRequest for Redirect binding""" 114 logout_request = self.build() 115 # Note: For redirect binding, signatures are added as query parameters, not in XML 116 # Ensure proper XML serialization with encoding declaration 117 xml_str = etree.tostring(logout_request, encoding="UTF-8", xml_declaration=True) 118 return deflate_and_base64_encode(xml_str.decode("UTF-8"))
Encode LogoutRequest for Redirect binding
def
get_redirect_url(self) -> str:
120 def get_redirect_url(self) -> str: 121 """Build complete logout URL for redirect binding with signature if needed""" 122 encoded_request = self.encode_redirect() 123 params = { 124 "SAMLRequest": encoded_request, 125 } 126 127 if self.relay_state: 128 params["RelayState"] = self.relay_state 129 130 if self.provider.signing_kp and self.provider.sign_logout_request: 131 sig_alg = self.provider.signature_algorithm 132 params["SigAlg"] = sig_alg 133 134 # Build the string to sign 135 query_string = self._build_signable_query_string(params) 136 137 signature = self._sign_query_string(query_string) 138 params["Signature"] = base64.b64encode(signature).decode() 139 140 # Some SP's use query params on their sls endpoint 141 separator = "&" if "?" in self.destination else "?" 142 return f"{self.destination}{separator}{urlencode(params)}"
Build complete logout URL for redirect binding with signature if needed