authentik.providers.saml.processors.logout_request

SAML LogoutRequest Processor

  1"""SAML LogoutRequest Processor"""
  2
  3import base64
  4from urllib.parse import quote, urlencode
  5
  6import xmlsec
  7from lxml import etree  # nosec
  8from lxml.etree import Element, _Element
  9
 10from authentik.common.saml.constants import (
 11    DIGEST_ALGORITHM_TRANSLATION_MAP,
 12    NS_MAP,
 13    NS_SAML_ASSERTION,
 14    NS_SAML_PROTOCOL,
 15    SAML_NAME_ID_FORMAT_EMAIL,
 16    SIGN_ALGORITHM_TRANSFORM_MAP,
 17)
 18from authentik.core.models import User
 19from authentik.lib.xml import remove_xml_newlines
 20from authentik.providers.saml.models import SAMLProvider
 21from authentik.providers.saml.utils import get_random_id
 22from authentik.providers.saml.utils.encoding import deflate_and_base64_encode
 23from authentik.providers.saml.utils.time import get_time_string
 24
 25
 26class LogoutRequestProcessor:
 27    """Generate SAML LogoutRequest messages"""
 28
 29    provider: SAMLProvider
 30    user: User | None
 31    destination: str
 32    name_id: str | None
 33    name_id_format: str
 34    session_index: str | None
 35    relay_state: str | None
 36
 37    _issue_instant: str
 38    _request_id: str
 39
 40    def __init__(
 41        self,
 42        provider: SAMLProvider,
 43        user: User | None,
 44        destination: str,
 45        name_id: str | None = None,
 46        name_id_format: str = SAML_NAME_ID_FORMAT_EMAIL,
 47        session_index: str | None = None,
 48        relay_state: str | None = None,
 49    ):
 50        self.provider = provider
 51        self.user = user
 52        self.destination = destination
 53        self.name_id = name_id or (user.email if user else None)
 54        self.name_id_format = name_id_format
 55        self.session_index = session_index
 56        self.relay_state = relay_state
 57
 58        self._issue_instant = get_time_string()
 59        self._request_id = get_random_id()
 60
 61    def get_issuer(self) -> Element:
 62        """Get Issuer element"""
 63        issuer = Element(f"{{{NS_SAML_ASSERTION}}}Issuer")
 64        issuer.text = self.provider.issuer
 65        return issuer
 66
 67    def get_name_id(self) -> Element:
 68        """Get NameID element"""
 69        name_id = Element(f"{{{NS_SAML_ASSERTION}}}NameID")
 70        name_id.attrib["Format"] = self.name_id_format
 71        name_id.text = self.name_id
 72        return name_id
 73
 74    def build(self) -> Element:
 75        """Build a SAML LogoutRequest as etree Element"""
 76        logout_request = Element(f"{{{NS_SAML_PROTOCOL}}}LogoutRequest", nsmap=NS_MAP)
 77        logout_request.attrib["ID"] = self._request_id
 78        logout_request.attrib["Version"] = "2.0"
 79        logout_request.attrib["IssueInstant"] = self._issue_instant
 80        logout_request.attrib["Destination"] = self.destination
 81
 82        logout_request.append(self.get_issuer())
 83        logout_request.append(self.get_name_id())
 84
 85        if self.session_index:
 86            session_index_element = Element(f"{{{NS_SAML_PROTOCOL}}}SessionIndex")
 87            session_index_element.text = self.session_index
 88            logout_request.append(session_index_element)
 89
 90        return logout_request
 91
 92    def encode_post(self) -> str:
 93        """Encode LogoutRequest for POST binding"""
 94        logout_request = self.build()
 95        if self.provider.signing_kp and self.provider.sign_logout_request:
 96            self._sign_logout_request(logout_request)
 97        return base64.b64encode(etree.tostring(logout_request)).decode()
 98
 99    def encode_redirect(self) -> str:
100        """Encode LogoutRequest for Redirect binding"""
101        logout_request = self.build()
102        # Note: For redirect binding, signatures are added as query parameters, not in XML
103        # Ensure proper XML serialization with encoding declaration
104        xml_str = etree.tostring(logout_request, encoding="UTF-8", xml_declaration=True)
105        return deflate_and_base64_encode(xml_str.decode("UTF-8"))
106
107    def get_redirect_url(self) -> str:
108        """Build complete logout URL for redirect binding with signature if needed"""
109        encoded_request = self.encode_redirect()
110        params = {
111            "SAMLRequest": encoded_request,
112        }
113
114        if self.relay_state:
115            params["RelayState"] = self.relay_state
116
117        if self.provider.signing_kp and self.provider.sign_logout_request:
118            sig_alg = self.provider.signature_algorithm
119            params["SigAlg"] = sig_alg
120
121            # Build the string to sign
122            query_string = self._build_signable_query_string(params)
123
124            signature = self._sign_query_string(query_string)
125            params["Signature"] = base64.b64encode(signature).decode()
126
127        # Some SP's use query params on their sls endpoint
128        separator = "&" if "?" in self.destination else "?"
129        return f"{self.destination}{separator}{urlencode(params)}"
130
131    def get_post_form_data(self) -> dict:
132        """Get form data for POST binding"""
133        return {
134            "SAMLRequest": self.encode_post(),
135            "RelayState": self.relay_state or "",
136        }
137
138    def _sign_logout_request(self, logout_request: _Element):
139        """Sign the LogoutRequest element"""
140        signature_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get(
141            self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha1
142        )
143        signature = xmlsec.template.create(
144            logout_request,
145            xmlsec.constants.TransformExclC14N,
146            signature_algorithm_transform,
147            ns=xmlsec.constants.DSigNs,
148        )
149
150        issuer = logout_request.find(f"{{{NS_SAML_ASSERTION}}}Issuer")
151        if issuer is not None:
152            issuer.addnext(signature)
153        else:
154            logout_request.insert(0, signature)
155
156        self._sign(logout_request)
157
158    def _sign(self, element: _Element):
159        """Sign an XML element based on the providers' configured signing settings"""
160        digest_algorithm_transform = DIGEST_ALGORITHM_TRANSLATION_MAP.get(
161            self.provider.digest_algorithm, xmlsec.constants.TransformSha1
162        )
163        xmlsec.tree.add_ids(element, ["ID"])
164        signature_node = xmlsec.tree.find_node(element, xmlsec.constants.NodeSignature)
165        ref = xmlsec.template.add_reference(
166            signature_node,
167            digest_algorithm_transform,
168            uri="#" + element.attrib["ID"],
169        )
170        xmlsec.template.add_transform(ref, xmlsec.constants.TransformEnveloped)
171        xmlsec.template.add_transform(ref, xmlsec.constants.TransformExclC14N)
172        key_info = xmlsec.template.ensure_key_info(signature_node)
173        xmlsec.template.add_x509_data(key_info)
174
175        ctx = xmlsec.SignatureContext()
176
177        key = xmlsec.Key.from_memory(
178            self.provider.signing_kp.key_data,
179            xmlsec.constants.KeyDataFormatPem,
180            None,
181        )
182        key.load_cert_from_memory(
183            self.provider.signing_kp.certificate_data,
184            xmlsec.constants.KeyDataFormatCertPem,
185        )
186        ctx.key = key
187        ctx.sign(remove_xml_newlines(element, signature_node))
188
189    def _build_signable_query_string(self, params: dict) -> str:
190        """Build query string for signing (order matters per SAML spec)"""
191        # SAML spec requires specific order: SAMLRequest, RelayState, SigAlg
192        # Values must be URL-encoded individually before concatenation
193        ordered = []
194        if "SAMLRequest" in params:
195            ordered.append(f"SAMLRequest={quote(params['SAMLRequest'], safe='')}")
196        if "RelayState" in params:
197            ordered.append(f"RelayState={quote(params['RelayState'], safe='')}")
198        if "SigAlg" in params:
199            ordered.append(f"SigAlg={quote(params['SigAlg'], safe='')}")
200        return "&".join(ordered)
201
202    def _sign_query_string(self, query_string: str) -> bytes:
203        """Sign the query string for redirect binding"""
204        signature_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get(
205            self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha256
206        )
207
208        key = xmlsec.Key.from_memory(
209            self.provider.signing_kp.key_data,
210            xmlsec.constants.KeyDataFormatPem,
211            None,
212        )
213
214        ctx = xmlsec.SignatureContext()
215        ctx.key = key
216
217        return ctx.sign_binary(query_string.encode("utf-8"), signature_algorithm_transform)
class LogoutRequestProcessor:
 27class LogoutRequestProcessor:
 28    """Generate SAML LogoutRequest messages"""
 29
 30    provider: SAMLProvider
 31    user: User | None
 32    destination: str
 33    name_id: str | None
 34    name_id_format: str
 35    session_index: str | None
 36    relay_state: str | None
 37
 38    _issue_instant: str
 39    _request_id: str
 40
 41    def __init__(
 42        self,
 43        provider: SAMLProvider,
 44        user: User | None,
 45        destination: str,
 46        name_id: str | None = None,
 47        name_id_format: str = SAML_NAME_ID_FORMAT_EMAIL,
 48        session_index: str | None = None,
 49        relay_state: str | None = None,
 50    ):
 51        self.provider = provider
 52        self.user = user
 53        self.destination = destination
 54        self.name_id = name_id or (user.email if user else None)
 55        self.name_id_format = name_id_format
 56        self.session_index = session_index
 57        self.relay_state = relay_state
 58
 59        self._issue_instant = get_time_string()
 60        self._request_id = get_random_id()
 61
 62    def get_issuer(self) -> Element:
 63        """Get Issuer element"""
 64        issuer = Element(f"{{{NS_SAML_ASSERTION}}}Issuer")
 65        issuer.text = self.provider.issuer
 66        return issuer
 67
 68    def get_name_id(self) -> Element:
 69        """Get NameID element"""
 70        name_id = Element(f"{{{NS_SAML_ASSERTION}}}NameID")
 71        name_id.attrib["Format"] = self.name_id_format
 72        name_id.text = self.name_id
 73        return name_id
 74
 75    def build(self) -> Element:
 76        """Build a SAML LogoutRequest as etree Element"""
 77        logout_request = Element(f"{{{NS_SAML_PROTOCOL}}}LogoutRequest", nsmap=NS_MAP)
 78        logout_request.attrib["ID"] = self._request_id
 79        logout_request.attrib["Version"] = "2.0"
 80        logout_request.attrib["IssueInstant"] = self._issue_instant
 81        logout_request.attrib["Destination"] = self.destination
 82
 83        logout_request.append(self.get_issuer())
 84        logout_request.append(self.get_name_id())
 85
 86        if self.session_index:
 87            session_index_element = Element(f"{{{NS_SAML_PROTOCOL}}}SessionIndex")
 88            session_index_element.text = self.session_index
 89            logout_request.append(session_index_element)
 90
 91        return logout_request
 92
 93    def encode_post(self) -> str:
 94        """Encode LogoutRequest for POST binding"""
 95        logout_request = self.build()
 96        if self.provider.signing_kp and self.provider.sign_logout_request:
 97            self._sign_logout_request(logout_request)
 98        return base64.b64encode(etree.tostring(logout_request)).decode()
 99
100    def encode_redirect(self) -> str:
101        """Encode LogoutRequest for Redirect binding"""
102        logout_request = self.build()
103        # Note: For redirect binding, signatures are added as query parameters, not in XML
104        # Ensure proper XML serialization with encoding declaration
105        xml_str = etree.tostring(logout_request, encoding="UTF-8", xml_declaration=True)
106        return deflate_and_base64_encode(xml_str.decode("UTF-8"))
107
108    def get_redirect_url(self) -> str:
109        """Build complete logout URL for redirect binding with signature if needed"""
110        encoded_request = self.encode_redirect()
111        params = {
112            "SAMLRequest": encoded_request,
113        }
114
115        if self.relay_state:
116            params["RelayState"] = self.relay_state
117
118        if self.provider.signing_kp and self.provider.sign_logout_request:
119            sig_alg = self.provider.signature_algorithm
120            params["SigAlg"] = sig_alg
121
122            # Build the string to sign
123            query_string = self._build_signable_query_string(params)
124
125            signature = self._sign_query_string(query_string)
126            params["Signature"] = base64.b64encode(signature).decode()
127
128        # Some SP's use query params on their sls endpoint
129        separator = "&" if "?" in self.destination else "?"
130        return f"{self.destination}{separator}{urlencode(params)}"
131
132    def get_post_form_data(self) -> dict:
133        """Get form data for POST binding"""
134        return {
135            "SAMLRequest": self.encode_post(),
136            "RelayState": self.relay_state or "",
137        }
138
139    def _sign_logout_request(self, logout_request: _Element):
140        """Sign the LogoutRequest element"""
141        signature_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get(
142            self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha1
143        )
144        signature = xmlsec.template.create(
145            logout_request,
146            xmlsec.constants.TransformExclC14N,
147            signature_algorithm_transform,
148            ns=xmlsec.constants.DSigNs,
149        )
150
151        issuer = logout_request.find(f"{{{NS_SAML_ASSERTION}}}Issuer")
152        if issuer is not None:
153            issuer.addnext(signature)
154        else:
155            logout_request.insert(0, signature)
156
157        self._sign(logout_request)
158
159    def _sign(self, element: _Element):
160        """Sign an XML element based on the providers' configured signing settings"""
161        digest_algorithm_transform = DIGEST_ALGORITHM_TRANSLATION_MAP.get(
162            self.provider.digest_algorithm, xmlsec.constants.TransformSha1
163        )
164        xmlsec.tree.add_ids(element, ["ID"])
165        signature_node = xmlsec.tree.find_node(element, xmlsec.constants.NodeSignature)
166        ref = xmlsec.template.add_reference(
167            signature_node,
168            digest_algorithm_transform,
169            uri="#" + element.attrib["ID"],
170        )
171        xmlsec.template.add_transform(ref, xmlsec.constants.TransformEnveloped)
172        xmlsec.template.add_transform(ref, xmlsec.constants.TransformExclC14N)
173        key_info = xmlsec.template.ensure_key_info(signature_node)
174        xmlsec.template.add_x509_data(key_info)
175
176        ctx = xmlsec.SignatureContext()
177
178        key = xmlsec.Key.from_memory(
179            self.provider.signing_kp.key_data,
180            xmlsec.constants.KeyDataFormatPem,
181            None,
182        )
183        key.load_cert_from_memory(
184            self.provider.signing_kp.certificate_data,
185            xmlsec.constants.KeyDataFormatCertPem,
186        )
187        ctx.key = key
188        ctx.sign(remove_xml_newlines(element, signature_node))
189
190    def _build_signable_query_string(self, params: dict) -> str:
191        """Build query string for signing (order matters per SAML spec)"""
192        # SAML spec requires specific order: SAMLRequest, RelayState, SigAlg
193        # Values must be URL-encoded individually before concatenation
194        ordered = []
195        if "SAMLRequest" in params:
196            ordered.append(f"SAMLRequest={quote(params['SAMLRequest'], safe='')}")
197        if "RelayState" in params:
198            ordered.append(f"RelayState={quote(params['RelayState'], safe='')}")
199        if "SigAlg" in params:
200            ordered.append(f"SigAlg={quote(params['SigAlg'], safe='')}")
201        return "&".join(ordered)
202
203    def _sign_query_string(self, query_string: str) -> bytes:
204        """Sign the query string for redirect binding"""
205        signature_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get(
206            self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha256
207        )
208
209        key = xmlsec.Key.from_memory(
210            self.provider.signing_kp.key_data,
211            xmlsec.constants.KeyDataFormatPem,
212            None,
213        )
214
215        ctx = xmlsec.SignatureContext()
216        ctx.key = key
217
218        return ctx.sign_binary(query_string.encode("utf-8"), signature_algorithm_transform)

Generate SAML LogoutRequest messages

LogoutRequestProcessor( provider: authentik.providers.saml.models.SAMLProvider, user: authentik.core.models.User | None, destination: str, name_id: str | None = None, name_id_format: str = 'urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress', session_index: str | None = None, relay_state: str | None = None)
41    def __init__(
42        self,
43        provider: SAMLProvider,
44        user: User | None,
45        destination: str,
46        name_id: str | None = None,
47        name_id_format: str = SAML_NAME_ID_FORMAT_EMAIL,
48        session_index: str | None = None,
49        relay_state: str | None = None,
50    ):
51        self.provider = provider
52        self.user = user
53        self.destination = destination
54        self.name_id = name_id or (user.email if user else None)
55        self.name_id_format = name_id_format
56        self.session_index = session_index
57        self.relay_state = relay_state
58
59        self._issue_instant = get_time_string()
60        self._request_id = get_random_id()
destination: str
name_id: str | None
name_id_format: str
session_index: str | None
relay_state: str | None
def get_issuer(self) -> lxml.etree.Element:
62    def get_issuer(self) -> Element:
63        """Get Issuer element"""
64        issuer = Element(f"{{{NS_SAML_ASSERTION}}}Issuer")
65        issuer.text = self.provider.issuer
66        return issuer

Get Issuer element

def get_name_id(self) -> lxml.etree.Element:
68    def get_name_id(self) -> Element:
69        """Get NameID element"""
70        name_id = Element(f"{{{NS_SAML_ASSERTION}}}NameID")
71        name_id.attrib["Format"] = self.name_id_format
72        name_id.text = self.name_id
73        return name_id

Get NameID element

def build(self) -> lxml.etree.Element:
75    def build(self) -> Element:
76        """Build a SAML LogoutRequest as etree Element"""
77        logout_request = Element(f"{{{NS_SAML_PROTOCOL}}}LogoutRequest", nsmap=NS_MAP)
78        logout_request.attrib["ID"] = self._request_id
79        logout_request.attrib["Version"] = "2.0"
80        logout_request.attrib["IssueInstant"] = self._issue_instant
81        logout_request.attrib["Destination"] = self.destination
82
83        logout_request.append(self.get_issuer())
84        logout_request.append(self.get_name_id())
85
86        if self.session_index:
87            session_index_element = Element(f"{{{NS_SAML_PROTOCOL}}}SessionIndex")
88            session_index_element.text = self.session_index
89            logout_request.append(session_index_element)
90
91        return logout_request

Build a SAML LogoutRequest as etree Element

def encode_post(self) -> str:
93    def encode_post(self) -> str:
94        """Encode LogoutRequest for POST binding"""
95        logout_request = self.build()
96        if self.provider.signing_kp and self.provider.sign_logout_request:
97            self._sign_logout_request(logout_request)
98        return base64.b64encode(etree.tostring(logout_request)).decode()

Encode LogoutRequest for POST binding

def encode_redirect(self) -> str:
100    def encode_redirect(self) -> str:
101        """Encode LogoutRequest for Redirect binding"""
102        logout_request = self.build()
103        # Note: For redirect binding, signatures are added as query parameters, not in XML
104        # Ensure proper XML serialization with encoding declaration
105        xml_str = etree.tostring(logout_request, encoding="UTF-8", xml_declaration=True)
106        return deflate_and_base64_encode(xml_str.decode("UTF-8"))

Encode LogoutRequest for Redirect binding

def get_redirect_url(self) -> str:
108    def get_redirect_url(self) -> str:
109        """Build complete logout URL for redirect binding with signature if needed"""
110        encoded_request = self.encode_redirect()
111        params = {
112            "SAMLRequest": encoded_request,
113        }
114
115        if self.relay_state:
116            params["RelayState"] = self.relay_state
117
118        if self.provider.signing_kp and self.provider.sign_logout_request:
119            sig_alg = self.provider.signature_algorithm
120            params["SigAlg"] = sig_alg
121
122            # Build the string to sign
123            query_string = self._build_signable_query_string(params)
124
125            signature = self._sign_query_string(query_string)
126            params["Signature"] = base64.b64encode(signature).decode()
127
128        # Some SP's use query params on their sls endpoint
129        separator = "&" if "?" in self.destination else "?"
130        return f"{self.destination}{separator}{urlencode(params)}"

Build complete logout URL for redirect binding with signature if needed

def get_post_form_data(self) -> dict:
132    def get_post_form_data(self) -> dict:
133        """Get form data for POST binding"""
134        return {
135            "SAMLRequest": self.encode_post(),
136            "RelayState": self.relay_state or "",
137        }

Get form data for POST binding