authentik.providers.saml.processors.logout_request

SAML LogoutRequest Processor

  1"""SAML LogoutRequest Processor"""
  2
  3import base64
  4from urllib.parse import quote, urlencode
  5
  6import xmlsec
  7from lxml import etree  # nosec
  8from lxml.etree import Element, _Element
  9
 10from authentik.common.saml.constants import (
 11    DEFAULT_ISSUER,
 12    DIGEST_ALGORITHM_TRANSLATION_MAP,
 13    NS_MAP,
 14    NS_SAML_ASSERTION,
 15    NS_SAML_PROTOCOL,
 16    SAML_NAME_ID_FORMAT_EMAIL,
 17    SIGN_ALGORITHM_TRANSFORM_MAP,
 18)
 19from authentik.core.models import User
 20from authentik.lib.xml import remove_xml_newlines
 21from authentik.providers.saml.models import SAMLProvider
 22from authentik.providers.saml.utils import get_random_id
 23from authentik.providers.saml.utils.encoding import deflate_and_base64_encode
 24from authentik.providers.saml.utils.time import get_time_string
 25
 26
 27class LogoutRequestProcessor:
 28    """Generate SAML LogoutRequest messages"""
 29
 30    provider: SAMLProvider
 31    user: User | None
 32    destination: str
 33    name_id: str | None
 34    name_id_format: str
 35    session_index: str | None
 36    relay_state: str | None
 37    issuer: str | None
 38
 39    _issue_instant: str
 40    _request_id: str
 41
 42    def __init__(  # noqa: PLR0913
 43        self,
 44        provider: SAMLProvider,
 45        user: User | None,
 46        destination: str,
 47        name_id: str | None = None,
 48        name_id_format: str = SAML_NAME_ID_FORMAT_EMAIL,
 49        session_index: str | None = None,
 50        relay_state: str | None = None,
 51        issuer: str | None = None,
 52    ):
 53        self.provider = provider
 54        self.user = user
 55        self.destination = destination
 56        self.name_id = name_id or (user.email if user else None)
 57        self.name_id_format = name_id_format
 58        self.session_index = session_index
 59        self.relay_state = relay_state
 60        self.issuer = issuer
 61
 62        self._issue_instant = get_time_string()
 63        self._request_id = get_random_id()
 64
 65    def _get_issuer_value(self) -> str:
 66        """Get issuer value from session, with fallback to provider"""
 67        if self.issuer:
 68            return self.issuer
 69        if self.provider.issuer_override:
 70            return self.provider.issuer_override
 71        return DEFAULT_ISSUER
 72
 73    def get_issuer(self) -> Element:
 74        """Get Issuer element"""
 75        issuer = Element(f"{{{NS_SAML_ASSERTION}}}Issuer")
 76        issuer.text = self._get_issuer_value()
 77        return issuer
 78
 79    def get_name_id(self) -> Element:
 80        """Get NameID element"""
 81        name_id = Element(f"{{{NS_SAML_ASSERTION}}}NameID")
 82        name_id.attrib["Format"] = self.name_id_format
 83        name_id.text = self.name_id
 84        return name_id
 85
 86    def build(self) -> Element:
 87        """Build a SAML LogoutRequest as etree Element"""
 88        logout_request = Element(f"{{{NS_SAML_PROTOCOL}}}LogoutRequest", nsmap=NS_MAP)
 89        logout_request.attrib["ID"] = self._request_id
 90        logout_request.attrib["Version"] = "2.0"
 91        logout_request.attrib["IssueInstant"] = self._issue_instant
 92        logout_request.attrib["Destination"] = self.destination
 93
 94        logout_request.append(self.get_issuer())
 95        logout_request.append(self.get_name_id())
 96
 97        if self.session_index:
 98            session_index_element = Element(f"{{{NS_SAML_PROTOCOL}}}SessionIndex")
 99            session_index_element.text = self.session_index
100            logout_request.append(session_index_element)
101
102        return logout_request
103
104    def encode_post(self) -> str:
105        """Encode LogoutRequest for POST binding"""
106        logout_request = self.build()
107        if self.provider.signing_kp and self.provider.sign_logout_request:
108            self._sign_logout_request(logout_request)
109        return base64.b64encode(etree.tostring(logout_request)).decode()
110
111    def encode_redirect(self) -> str:
112        """Encode LogoutRequest for Redirect binding"""
113        logout_request = self.build()
114        # Note: For redirect binding, signatures are added as query parameters, not in XML
115        # Ensure proper XML serialization with encoding declaration
116        xml_str = etree.tostring(logout_request, encoding="UTF-8", xml_declaration=True)
117        return deflate_and_base64_encode(xml_str.decode("UTF-8"))
118
119    def get_redirect_url(self) -> str:
120        """Build complete logout URL for redirect binding with signature if needed"""
121        encoded_request = self.encode_redirect()
122        params = {
123            "SAMLRequest": encoded_request,
124        }
125
126        if self.relay_state:
127            params["RelayState"] = self.relay_state
128
129        if self.provider.signing_kp and self.provider.sign_logout_request:
130            sig_alg = self.provider.signature_algorithm
131            params["SigAlg"] = sig_alg
132
133            # Build the string to sign
134            query_string = self._build_signable_query_string(params)
135
136            signature = self._sign_query_string(query_string)
137            params["Signature"] = base64.b64encode(signature).decode()
138
139        # Some SP's use query params on their sls endpoint
140        separator = "&" if "?" in self.destination else "?"
141        return f"{self.destination}{separator}{urlencode(params)}"
142
143    def get_post_form_data(self) -> dict:
144        """Get form data for POST binding"""
145        return {
146            "SAMLRequest": self.encode_post(),
147            "RelayState": self.relay_state or "",
148        }
149
150    def _sign_logout_request(self, logout_request: _Element):
151        """Sign the LogoutRequest element"""
152        signature_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get(
153            self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha1
154        )
155        signature = xmlsec.template.create(
156            logout_request,
157            xmlsec.constants.TransformExclC14N,
158            signature_algorithm_transform,
159            ns=xmlsec.constants.DSigNs,
160        )
161
162        issuer = logout_request.find(f"{{{NS_SAML_ASSERTION}}}Issuer")
163        if issuer is not None:
164            issuer.addnext(signature)
165        else:
166            logout_request.insert(0, signature)
167
168        self._sign(logout_request)
169
170    def _sign(self, element: _Element):
171        """Sign an XML element based on the providers' configured signing settings"""
172        digest_algorithm_transform = DIGEST_ALGORITHM_TRANSLATION_MAP.get(
173            self.provider.digest_algorithm, xmlsec.constants.TransformSha1
174        )
175        xmlsec.tree.add_ids(element, ["ID"])
176        signature_node = xmlsec.tree.find_node(element, xmlsec.constants.NodeSignature)
177        ref = xmlsec.template.add_reference(
178            signature_node,
179            digest_algorithm_transform,
180            uri="#" + element.attrib["ID"],
181        )
182        xmlsec.template.add_transform(ref, xmlsec.constants.TransformEnveloped)
183        xmlsec.template.add_transform(ref, xmlsec.constants.TransformExclC14N)
184        key_info = xmlsec.template.ensure_key_info(signature_node)
185        xmlsec.template.add_x509_data(key_info)
186
187        ctx = xmlsec.SignatureContext()
188
189        key = xmlsec.Key.from_memory(
190            self.provider.signing_kp.key_data,
191            xmlsec.constants.KeyDataFormatPem,
192            None,
193        )
194        key.load_cert_from_memory(
195            self.provider.signing_kp.certificate_data,
196            xmlsec.constants.KeyDataFormatCertPem,
197        )
198        ctx.key = key
199        ctx.sign(remove_xml_newlines(element, signature_node))
200
201    def _build_signable_query_string(self, params: dict) -> str:
202        """Build query string for signing (order matters per SAML spec)"""
203        # SAML spec requires specific order: SAMLRequest, RelayState, SigAlg
204        # Values must be URL-encoded individually before concatenation
205        ordered = []
206        if "SAMLRequest" in params:
207            ordered.append(f"SAMLRequest={quote(params['SAMLRequest'], safe='')}")
208        if "RelayState" in params:
209            ordered.append(f"RelayState={quote(params['RelayState'], safe='')}")
210        if "SigAlg" in params:
211            ordered.append(f"SigAlg={quote(params['SigAlg'], safe='')}")
212        return "&".join(ordered)
213
214    def _sign_query_string(self, query_string: str) -> bytes:
215        """Sign the query string for redirect binding"""
216        signature_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get(
217            self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha256
218        )
219
220        key = xmlsec.Key.from_memory(
221            self.provider.signing_kp.key_data,
222            xmlsec.constants.KeyDataFormatPem,
223            None,
224        )
225
226        ctx = xmlsec.SignatureContext()
227        ctx.key = key
228
229        return ctx.sign_binary(query_string.encode("utf-8"), signature_algorithm_transform)
class LogoutRequestProcessor:
 28class LogoutRequestProcessor:
 29    """Generate SAML LogoutRequest messages"""
 30
 31    provider: SAMLProvider
 32    user: User | None
 33    destination: str
 34    name_id: str | None
 35    name_id_format: str
 36    session_index: str | None
 37    relay_state: str | None
 38    issuer: str | None
 39
 40    _issue_instant: str
 41    _request_id: str
 42
 43    def __init__(  # noqa: PLR0913
 44        self,
 45        provider: SAMLProvider,
 46        user: User | None,
 47        destination: str,
 48        name_id: str | None = None,
 49        name_id_format: str = SAML_NAME_ID_FORMAT_EMAIL,
 50        session_index: str | None = None,
 51        relay_state: str | None = None,
 52        issuer: str | None = None,
 53    ):
 54        self.provider = provider
 55        self.user = user
 56        self.destination = destination
 57        self.name_id = name_id or (user.email if user else None)
 58        self.name_id_format = name_id_format
 59        self.session_index = session_index
 60        self.relay_state = relay_state
 61        self.issuer = issuer
 62
 63        self._issue_instant = get_time_string()
 64        self._request_id = get_random_id()
 65
 66    def _get_issuer_value(self) -> str:
 67        """Get issuer value from session, with fallback to provider"""
 68        if self.issuer:
 69            return self.issuer
 70        if self.provider.issuer_override:
 71            return self.provider.issuer_override
 72        return DEFAULT_ISSUER
 73
 74    def get_issuer(self) -> Element:
 75        """Get Issuer element"""
 76        issuer = Element(f"{{{NS_SAML_ASSERTION}}}Issuer")
 77        issuer.text = self._get_issuer_value()
 78        return issuer
 79
 80    def get_name_id(self) -> Element:
 81        """Get NameID element"""
 82        name_id = Element(f"{{{NS_SAML_ASSERTION}}}NameID")
 83        name_id.attrib["Format"] = self.name_id_format
 84        name_id.text = self.name_id
 85        return name_id
 86
 87    def build(self) -> Element:
 88        """Build a SAML LogoutRequest as etree Element"""
 89        logout_request = Element(f"{{{NS_SAML_PROTOCOL}}}LogoutRequest", nsmap=NS_MAP)
 90        logout_request.attrib["ID"] = self._request_id
 91        logout_request.attrib["Version"] = "2.0"
 92        logout_request.attrib["IssueInstant"] = self._issue_instant
 93        logout_request.attrib["Destination"] = self.destination
 94
 95        logout_request.append(self.get_issuer())
 96        logout_request.append(self.get_name_id())
 97
 98        if self.session_index:
 99            session_index_element = Element(f"{{{NS_SAML_PROTOCOL}}}SessionIndex")
100            session_index_element.text = self.session_index
101            logout_request.append(session_index_element)
102
103        return logout_request
104
105    def encode_post(self) -> str:
106        """Encode LogoutRequest for POST binding"""
107        logout_request = self.build()
108        if self.provider.signing_kp and self.provider.sign_logout_request:
109            self._sign_logout_request(logout_request)
110        return base64.b64encode(etree.tostring(logout_request)).decode()
111
112    def encode_redirect(self) -> str:
113        """Encode LogoutRequest for Redirect binding"""
114        logout_request = self.build()
115        # Note: For redirect binding, signatures are added as query parameters, not in XML
116        # Ensure proper XML serialization with encoding declaration
117        xml_str = etree.tostring(logout_request, encoding="UTF-8", xml_declaration=True)
118        return deflate_and_base64_encode(xml_str.decode("UTF-8"))
119
120    def get_redirect_url(self) -> str:
121        """Build complete logout URL for redirect binding with signature if needed"""
122        encoded_request = self.encode_redirect()
123        params = {
124            "SAMLRequest": encoded_request,
125        }
126
127        if self.relay_state:
128            params["RelayState"] = self.relay_state
129
130        if self.provider.signing_kp and self.provider.sign_logout_request:
131            sig_alg = self.provider.signature_algorithm
132            params["SigAlg"] = sig_alg
133
134            # Build the string to sign
135            query_string = self._build_signable_query_string(params)
136
137            signature = self._sign_query_string(query_string)
138            params["Signature"] = base64.b64encode(signature).decode()
139
140        # Some SP's use query params on their sls endpoint
141        separator = "&" if "?" in self.destination else "?"
142        return f"{self.destination}{separator}{urlencode(params)}"
143
144    def get_post_form_data(self) -> dict:
145        """Get form data for POST binding"""
146        return {
147            "SAMLRequest": self.encode_post(),
148            "RelayState": self.relay_state or "",
149        }
150
151    def _sign_logout_request(self, logout_request: _Element):
152        """Sign the LogoutRequest element"""
153        signature_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get(
154            self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha1
155        )
156        signature = xmlsec.template.create(
157            logout_request,
158            xmlsec.constants.TransformExclC14N,
159            signature_algorithm_transform,
160            ns=xmlsec.constants.DSigNs,
161        )
162
163        issuer = logout_request.find(f"{{{NS_SAML_ASSERTION}}}Issuer")
164        if issuer is not None:
165            issuer.addnext(signature)
166        else:
167            logout_request.insert(0, signature)
168
169        self._sign(logout_request)
170
171    def _sign(self, element: _Element):
172        """Sign an XML element based on the providers' configured signing settings"""
173        digest_algorithm_transform = DIGEST_ALGORITHM_TRANSLATION_MAP.get(
174            self.provider.digest_algorithm, xmlsec.constants.TransformSha1
175        )
176        xmlsec.tree.add_ids(element, ["ID"])
177        signature_node = xmlsec.tree.find_node(element, xmlsec.constants.NodeSignature)
178        ref = xmlsec.template.add_reference(
179            signature_node,
180            digest_algorithm_transform,
181            uri="#" + element.attrib["ID"],
182        )
183        xmlsec.template.add_transform(ref, xmlsec.constants.TransformEnveloped)
184        xmlsec.template.add_transform(ref, xmlsec.constants.TransformExclC14N)
185        key_info = xmlsec.template.ensure_key_info(signature_node)
186        xmlsec.template.add_x509_data(key_info)
187
188        ctx = xmlsec.SignatureContext()
189
190        key = xmlsec.Key.from_memory(
191            self.provider.signing_kp.key_data,
192            xmlsec.constants.KeyDataFormatPem,
193            None,
194        )
195        key.load_cert_from_memory(
196            self.provider.signing_kp.certificate_data,
197            xmlsec.constants.KeyDataFormatCertPem,
198        )
199        ctx.key = key
200        ctx.sign(remove_xml_newlines(element, signature_node))
201
202    def _build_signable_query_string(self, params: dict) -> str:
203        """Build query string for signing (order matters per SAML spec)"""
204        # SAML spec requires specific order: SAMLRequest, RelayState, SigAlg
205        # Values must be URL-encoded individually before concatenation
206        ordered = []
207        if "SAMLRequest" in params:
208            ordered.append(f"SAMLRequest={quote(params['SAMLRequest'], safe='')}")
209        if "RelayState" in params:
210            ordered.append(f"RelayState={quote(params['RelayState'], safe='')}")
211        if "SigAlg" in params:
212            ordered.append(f"SigAlg={quote(params['SigAlg'], safe='')}")
213        return "&".join(ordered)
214
215    def _sign_query_string(self, query_string: str) -> bytes:
216        """Sign the query string for redirect binding"""
217        signature_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get(
218            self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha256
219        )
220
221        key = xmlsec.Key.from_memory(
222            self.provider.signing_kp.key_data,
223            xmlsec.constants.KeyDataFormatPem,
224            None,
225        )
226
227        ctx = xmlsec.SignatureContext()
228        ctx.key = key
229
230        return ctx.sign_binary(query_string.encode("utf-8"), signature_algorithm_transform)

Generate SAML LogoutRequest messages

LogoutRequestProcessor( provider: authentik.providers.saml.models.SAMLProvider, user: authentik.core.models.User | None, destination: str, name_id: str | None = None, name_id_format: str = 'urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress', session_index: str | None = None, relay_state: str | None = None, issuer: str | None = None)
43    def __init__(  # noqa: PLR0913
44        self,
45        provider: SAMLProvider,
46        user: User | None,
47        destination: str,
48        name_id: str | None = None,
49        name_id_format: str = SAML_NAME_ID_FORMAT_EMAIL,
50        session_index: str | None = None,
51        relay_state: str | None = None,
52        issuer: str | None = None,
53    ):
54        self.provider = provider
55        self.user = user
56        self.destination = destination
57        self.name_id = name_id or (user.email if user else None)
58        self.name_id_format = name_id_format
59        self.session_index = session_index
60        self.relay_state = relay_state
61        self.issuer = issuer
62
63        self._issue_instant = get_time_string()
64        self._request_id = get_random_id()
destination: str
name_id: str | None
name_id_format: str
session_index: str | None
relay_state: str | None
issuer: str | None
def get_issuer(self) -> lxml.etree.Element:
74    def get_issuer(self) -> Element:
75        """Get Issuer element"""
76        issuer = Element(f"{{{NS_SAML_ASSERTION}}}Issuer")
77        issuer.text = self._get_issuer_value()
78        return issuer

Get Issuer element

def get_name_id(self) -> lxml.etree.Element:
80    def get_name_id(self) -> Element:
81        """Get NameID element"""
82        name_id = Element(f"{{{NS_SAML_ASSERTION}}}NameID")
83        name_id.attrib["Format"] = self.name_id_format
84        name_id.text = self.name_id
85        return name_id

Get NameID element

def build(self) -> lxml.etree.Element:
 87    def build(self) -> Element:
 88        """Build a SAML LogoutRequest as etree Element"""
 89        logout_request = Element(f"{{{NS_SAML_PROTOCOL}}}LogoutRequest", nsmap=NS_MAP)
 90        logout_request.attrib["ID"] = self._request_id
 91        logout_request.attrib["Version"] = "2.0"
 92        logout_request.attrib["IssueInstant"] = self._issue_instant
 93        logout_request.attrib["Destination"] = self.destination
 94
 95        logout_request.append(self.get_issuer())
 96        logout_request.append(self.get_name_id())
 97
 98        if self.session_index:
 99            session_index_element = Element(f"{{{NS_SAML_PROTOCOL}}}SessionIndex")
100            session_index_element.text = self.session_index
101            logout_request.append(session_index_element)
102
103        return logout_request

Build a SAML LogoutRequest as etree Element

def encode_post(self) -> str:
105    def encode_post(self) -> str:
106        """Encode LogoutRequest for POST binding"""
107        logout_request = self.build()
108        if self.provider.signing_kp and self.provider.sign_logout_request:
109            self._sign_logout_request(logout_request)
110        return base64.b64encode(etree.tostring(logout_request)).decode()

Encode LogoutRequest for POST binding

def encode_redirect(self) -> str:
112    def encode_redirect(self) -> str:
113        """Encode LogoutRequest for Redirect binding"""
114        logout_request = self.build()
115        # Note: For redirect binding, signatures are added as query parameters, not in XML
116        # Ensure proper XML serialization with encoding declaration
117        xml_str = etree.tostring(logout_request, encoding="UTF-8", xml_declaration=True)
118        return deflate_and_base64_encode(xml_str.decode("UTF-8"))

Encode LogoutRequest for Redirect binding

def get_redirect_url(self) -> str:
120    def get_redirect_url(self) -> str:
121        """Build complete logout URL for redirect binding with signature if needed"""
122        encoded_request = self.encode_redirect()
123        params = {
124            "SAMLRequest": encoded_request,
125        }
126
127        if self.relay_state:
128            params["RelayState"] = self.relay_state
129
130        if self.provider.signing_kp and self.provider.sign_logout_request:
131            sig_alg = self.provider.signature_algorithm
132            params["SigAlg"] = sig_alg
133
134            # Build the string to sign
135            query_string = self._build_signable_query_string(params)
136
137            signature = self._sign_query_string(query_string)
138            params["Signature"] = base64.b64encode(signature).decode()
139
140        # Some SP's use query params on their sls endpoint
141        separator = "&" if "?" in self.destination else "?"
142        return f"{self.destination}{separator}{urlencode(params)}"

Build complete logout URL for redirect binding with signature if needed

def get_post_form_data(self) -> dict:
144    def get_post_form_data(self) -> dict:
145        """Get form data for POST binding"""
146        return {
147            "SAMLRequest": self.encode_post(),
148            "RelayState": self.relay_state or "",
149        }

Get form data for POST binding