authentik.providers.saml.processors.logout_response_processor

LogoutResponse processor

  1"""LogoutResponse processor"""
  2
  3import base64
  4from urllib.parse import quote, urlencode
  5
  6import xmlsec
  7from lxml import etree
  8from lxml.etree import Element, SubElement
  9
 10from authentik.common.saml.constants import (
 11    DEFAULT_ISSUER,
 12    DIGEST_ALGORITHM_TRANSLATION_MAP,
 13    NS_MAP,
 14    NS_SAML_ASSERTION,
 15    NS_SAML_PROTOCOL,
 16    SIGN_ALGORITHM_TRANSFORM_MAP,
 17)
 18from authentik.providers.saml.models import SAMLProvider
 19from authentik.providers.saml.processors.logout_request_parser import LogoutRequest
 20from authentik.providers.saml.utils import get_random_id
 21from authentik.providers.saml.utils.encoding import deflate_and_base64_encode
 22from authentik.providers.saml.utils.time import get_time_string
 23
 24
 25class LogoutResponseProcessor:
 26    """Generate a SAML LogoutResponse"""
 27
 28    provider: SAMLProvider
 29    logout_request: LogoutRequest
 30    destination: str | None
 31    relay_state: str | None
 32    issuer: str | None
 33    _issue_instant: str
 34    _response_id: str
 35
 36    def __init__(  # noqa: PLR0913
 37        self,
 38        provider: SAMLProvider,
 39        logout_request: LogoutRequest,
 40        destination: str | None = None,
 41        relay_state: str | None = None,
 42        issuer: str | None = None,
 43    ):
 44        self.provider = provider
 45        self.logout_request = logout_request
 46        self.destination = destination
 47        self.relay_state = relay_state or (logout_request.relay_state if logout_request else None)
 48        self.issuer = issuer
 49        self._issue_instant = get_time_string()
 50        self._response_id = get_random_id()
 51
 52    def _get_issuer_value(self) -> str:
 53        """Get issuer value from session, with fallback to provider"""
 54        if self.issuer:
 55            return self.issuer
 56        if self.provider.issuer_override:
 57            return self.provider.issuer_override
 58        return DEFAULT_ISSUER
 59
 60    def get_issuer(self) -> Element:
 61        """Get Issuer element"""
 62        issuer = Element(f"{{{NS_SAML_ASSERTION}}}Issuer")
 63        issuer.text = self._get_issuer_value()
 64        return issuer
 65
 66    def build(self, status: str = "Success") -> Element:
 67        """Build a SAML LogoutResponse as etree Element"""
 68        response = Element(f"{{{NS_SAML_PROTOCOL}}}LogoutResponse", nsmap=NS_MAP)
 69        response.attrib["Version"] = "2.0"
 70        response.attrib["IssueInstant"] = self._issue_instant
 71        response.attrib["ID"] = self._response_id
 72
 73        if self.destination:
 74            response.attrib["Destination"] = self.destination
 75
 76        if self.logout_request and self.logout_request.id:
 77            response.attrib["InResponseTo"] = self.logout_request.id
 78
 79        response.append(self.get_issuer())
 80
 81        # Add Status element
 82        status_element = SubElement(response, f"{{{NS_SAML_PROTOCOL}}}Status")
 83        status_code = SubElement(status_element, f"{{{NS_SAML_PROTOCOL}}}StatusCode")
 84        status_code.attrib["Value"] = f"urn:oasis:names:tc:SAML:2.0:status:{status}"
 85
 86        return response
 87
 88    def build_response(self, status: str = "Success") -> str:
 89        """Build and sign LogoutResponse, return as XML string (not encoded)"""
 90        response = self.build(status)
 91        if self.provider.signing_kp and self.provider.sign_logout_response:
 92            self._add_signature(response)
 93            self._sign_response(response)
 94        return etree.tostring(response).decode()
 95
 96    def encode_post(self, status: str = "Success") -> str:
 97        """Encode LogoutResponse for POST binding"""
 98        response = self.build(status)
 99        if self.provider.signing_kp and self.provider.sign_logout_response:
100            self._add_signature(response)
101            self._sign_response(response)
102        return base64.b64encode(etree.tostring(response)).decode()
103
104    def encode_redirect(self, status: str = "Success") -> str:
105        """Encode LogoutResponse for Redirect binding"""
106        response = self.build(status)
107        # Note: For redirect binding, signatures are added as query parameters, not in XML
108        xml_str = etree.tostring(response, encoding="UTF-8", xml_declaration=True)
109        return deflate_and_base64_encode(xml_str.decode("UTF-8"))
110
111    def get_redirect_url(self, status: str = "Success") -> str:
112        """Build complete logout response URL for redirect binding with signature if needed"""
113        encoded_response = self.encode_redirect(status)
114        params = {
115            "SAMLResponse": encoded_response,
116        }
117
118        if self.relay_state:
119            params["RelayState"] = self.relay_state
120
121        if self.provider.signing_kp and self.provider.sign_logout_response:
122            sig_alg = self.provider.signature_algorithm
123            params["SigAlg"] = sig_alg
124
125            # Build the string to sign
126            query_string = self._build_signable_query_string(params)
127
128            signature = self._sign_query_string(query_string)
129            params["Signature"] = base64.b64encode(signature).decode()
130
131        # Some SP's use query params on their sls endpoint
132        if not self.destination:
133            raise ValueError("destination is required for redirect URL")
134
135        separator = "&" if "?" in self.destination else "?"
136        return f"{self.destination}{separator}{urlencode(params)}"
137
138    def _add_signature(self, element: Element):
139        """Add signature placeholder to element"""
140        sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get(
141            self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha1
142        )
143        signature = xmlsec.template.create(
144            element,
145            xmlsec.constants.TransformExclC14N,
146            sign_algorithm_transform,
147            ns=xmlsec.constants.DSigNs,
148        )
149        element.insert(1, signature)  # Insert after Issuer
150
151    def _sign_response(self, response: Element):
152        """Sign the response element"""
153        digest_algorithm_transform = DIGEST_ALGORITHM_TRANSLATION_MAP.get(
154            self.provider.digest_algorithm, xmlsec.constants.TransformSha1
155        )
156
157        xmlsec.tree.add_ids(response, ["ID"])
158        signature_node = xmlsec.tree.find_node(response, xmlsec.constants.NodeSignature)
159
160        ref = xmlsec.template.add_reference(
161            signature_node,
162            digest_algorithm_transform,
163            uri="#" + response.attrib["ID"],
164        )
165        xmlsec.template.add_transform(ref, xmlsec.constants.TransformEnveloped)
166        xmlsec.template.add_transform(ref, xmlsec.constants.TransformExclC14N)
167        key_info = xmlsec.template.ensure_key_info(signature_node)
168        xmlsec.template.add_x509_data(key_info)
169
170        ctx = xmlsec.SignatureContext()
171        ctx.key = xmlsec.Key.from_memory(
172            self.provider.signing_kp.key_data,  # Use key_data for the private key
173            xmlsec.constants.KeyDataFormatPem,
174        )
175        ctx.key.load_cert_from_memory(
176            self.provider.signing_kp.certificate_data, xmlsec.constants.KeyDataFormatPem
177        )
178        ctx.sign(signature_node)
179
180    def _build_signable_query_string(self, params: dict) -> str:
181        """Build query string for signing (order matters per SAML spec)"""
182        # SAML spec requires specific order: SAMLResponse, RelayState, SigAlg
183        # Values must be URL-encoded individually before concatenation
184        ordered = []
185        if "SAMLResponse" in params:
186            ordered.append(f"SAMLResponse={quote(params['SAMLResponse'], safe='')}")
187        if "RelayState" in params:
188            ordered.append(f"RelayState={quote(params['RelayState'], safe='')}")
189        if "SigAlg" in params:
190            ordered.append(f"SigAlg={quote(params['SigAlg'], safe='')}")
191        return "&".join(ordered)
192
193    def _sign_query_string(self, query_string: str) -> bytes:
194        """Sign the query string for redirect binding"""
195        signature_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get(
196            self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha256
197        )
198
199        key = xmlsec.Key.from_memory(
200            self.provider.signing_kp.key_data,
201            xmlsec.constants.KeyDataFormatPem,
202            None,
203        )
204
205        ctx = xmlsec.SignatureContext()
206        ctx.key = key
207
208        return ctx.sign_binary(query_string.encode("utf-8"), signature_algorithm_transform)
class LogoutResponseProcessor:
 26class LogoutResponseProcessor:
 27    """Generate a SAML LogoutResponse"""
 28
 29    provider: SAMLProvider
 30    logout_request: LogoutRequest
 31    destination: str | None
 32    relay_state: str | None
 33    issuer: str | None
 34    _issue_instant: str
 35    _response_id: str
 36
 37    def __init__(  # noqa: PLR0913
 38        self,
 39        provider: SAMLProvider,
 40        logout_request: LogoutRequest,
 41        destination: str | None = None,
 42        relay_state: str | None = None,
 43        issuer: str | None = None,
 44    ):
 45        self.provider = provider
 46        self.logout_request = logout_request
 47        self.destination = destination
 48        self.relay_state = relay_state or (logout_request.relay_state if logout_request else None)
 49        self.issuer = issuer
 50        self._issue_instant = get_time_string()
 51        self._response_id = get_random_id()
 52
 53    def _get_issuer_value(self) -> str:
 54        """Get issuer value from session, with fallback to provider"""
 55        if self.issuer:
 56            return self.issuer
 57        if self.provider.issuer_override:
 58            return self.provider.issuer_override
 59        return DEFAULT_ISSUER
 60
 61    def get_issuer(self) -> Element:
 62        """Get Issuer element"""
 63        issuer = Element(f"{{{NS_SAML_ASSERTION}}}Issuer")
 64        issuer.text = self._get_issuer_value()
 65        return issuer
 66
 67    def build(self, status: str = "Success") -> Element:
 68        """Build a SAML LogoutResponse as etree Element"""
 69        response = Element(f"{{{NS_SAML_PROTOCOL}}}LogoutResponse", nsmap=NS_MAP)
 70        response.attrib["Version"] = "2.0"
 71        response.attrib["IssueInstant"] = self._issue_instant
 72        response.attrib["ID"] = self._response_id
 73
 74        if self.destination:
 75            response.attrib["Destination"] = self.destination
 76
 77        if self.logout_request and self.logout_request.id:
 78            response.attrib["InResponseTo"] = self.logout_request.id
 79
 80        response.append(self.get_issuer())
 81
 82        # Add Status element
 83        status_element = SubElement(response, f"{{{NS_SAML_PROTOCOL}}}Status")
 84        status_code = SubElement(status_element, f"{{{NS_SAML_PROTOCOL}}}StatusCode")
 85        status_code.attrib["Value"] = f"urn:oasis:names:tc:SAML:2.0:status:{status}"
 86
 87        return response
 88
 89    def build_response(self, status: str = "Success") -> str:
 90        """Build and sign LogoutResponse, return as XML string (not encoded)"""
 91        response = self.build(status)
 92        if self.provider.signing_kp and self.provider.sign_logout_response:
 93            self._add_signature(response)
 94            self._sign_response(response)
 95        return etree.tostring(response).decode()
 96
 97    def encode_post(self, status: str = "Success") -> str:
 98        """Encode LogoutResponse for POST binding"""
 99        response = self.build(status)
100        if self.provider.signing_kp and self.provider.sign_logout_response:
101            self._add_signature(response)
102            self._sign_response(response)
103        return base64.b64encode(etree.tostring(response)).decode()
104
105    def encode_redirect(self, status: str = "Success") -> str:
106        """Encode LogoutResponse for Redirect binding"""
107        response = self.build(status)
108        # Note: For redirect binding, signatures are added as query parameters, not in XML
109        xml_str = etree.tostring(response, encoding="UTF-8", xml_declaration=True)
110        return deflate_and_base64_encode(xml_str.decode("UTF-8"))
111
112    def get_redirect_url(self, status: str = "Success") -> str:
113        """Build complete logout response URL for redirect binding with signature if needed"""
114        encoded_response = self.encode_redirect(status)
115        params = {
116            "SAMLResponse": encoded_response,
117        }
118
119        if self.relay_state:
120            params["RelayState"] = self.relay_state
121
122        if self.provider.signing_kp and self.provider.sign_logout_response:
123            sig_alg = self.provider.signature_algorithm
124            params["SigAlg"] = sig_alg
125
126            # Build the string to sign
127            query_string = self._build_signable_query_string(params)
128
129            signature = self._sign_query_string(query_string)
130            params["Signature"] = base64.b64encode(signature).decode()
131
132        # Some SP's use query params on their sls endpoint
133        if not self.destination:
134            raise ValueError("destination is required for redirect URL")
135
136        separator = "&" if "?" in self.destination else "?"
137        return f"{self.destination}{separator}{urlencode(params)}"
138
139    def _add_signature(self, element: Element):
140        """Add signature placeholder to element"""
141        sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get(
142            self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha1
143        )
144        signature = xmlsec.template.create(
145            element,
146            xmlsec.constants.TransformExclC14N,
147            sign_algorithm_transform,
148            ns=xmlsec.constants.DSigNs,
149        )
150        element.insert(1, signature)  # Insert after Issuer
151
152    def _sign_response(self, response: Element):
153        """Sign the response element"""
154        digest_algorithm_transform = DIGEST_ALGORITHM_TRANSLATION_MAP.get(
155            self.provider.digest_algorithm, xmlsec.constants.TransformSha1
156        )
157
158        xmlsec.tree.add_ids(response, ["ID"])
159        signature_node = xmlsec.tree.find_node(response, xmlsec.constants.NodeSignature)
160
161        ref = xmlsec.template.add_reference(
162            signature_node,
163            digest_algorithm_transform,
164            uri="#" + response.attrib["ID"],
165        )
166        xmlsec.template.add_transform(ref, xmlsec.constants.TransformEnveloped)
167        xmlsec.template.add_transform(ref, xmlsec.constants.TransformExclC14N)
168        key_info = xmlsec.template.ensure_key_info(signature_node)
169        xmlsec.template.add_x509_data(key_info)
170
171        ctx = xmlsec.SignatureContext()
172        ctx.key = xmlsec.Key.from_memory(
173            self.provider.signing_kp.key_data,  # Use key_data for the private key
174            xmlsec.constants.KeyDataFormatPem,
175        )
176        ctx.key.load_cert_from_memory(
177            self.provider.signing_kp.certificate_data, xmlsec.constants.KeyDataFormatPem
178        )
179        ctx.sign(signature_node)
180
181    def _build_signable_query_string(self, params: dict) -> str:
182        """Build query string for signing (order matters per SAML spec)"""
183        # SAML spec requires specific order: SAMLResponse, RelayState, SigAlg
184        # Values must be URL-encoded individually before concatenation
185        ordered = []
186        if "SAMLResponse" in params:
187            ordered.append(f"SAMLResponse={quote(params['SAMLResponse'], safe='')}")
188        if "RelayState" in params:
189            ordered.append(f"RelayState={quote(params['RelayState'], safe='')}")
190        if "SigAlg" in params:
191            ordered.append(f"SigAlg={quote(params['SigAlg'], safe='')}")
192        return "&".join(ordered)
193
194    def _sign_query_string(self, query_string: str) -> bytes:
195        """Sign the query string for redirect binding"""
196        signature_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get(
197            self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha256
198        )
199
200        key = xmlsec.Key.from_memory(
201            self.provider.signing_kp.key_data,
202            xmlsec.constants.KeyDataFormatPem,
203            None,
204        )
205
206        ctx = xmlsec.SignatureContext()
207        ctx.key = key
208
209        return ctx.sign_binary(query_string.encode("utf-8"), signature_algorithm_transform)

Generate a SAML LogoutResponse

LogoutResponseProcessor( provider: authentik.providers.saml.models.SAMLProvider, logout_request: authentik.providers.saml.processors.logout_request_parser.LogoutRequest, destination: str | None = None, relay_state: str | None = None, issuer: str | None = None)
37    def __init__(  # noqa: PLR0913
38        self,
39        provider: SAMLProvider,
40        logout_request: LogoutRequest,
41        destination: str | None = None,
42        relay_state: str | None = None,
43        issuer: str | None = None,
44    ):
45        self.provider = provider
46        self.logout_request = logout_request
47        self.destination = destination
48        self.relay_state = relay_state or (logout_request.relay_state if logout_request else None)
49        self.issuer = issuer
50        self._issue_instant = get_time_string()
51        self._response_id = get_random_id()
destination: str | None
relay_state: str | None
issuer: str | None
def get_issuer(self) -> lxml.etree.Element:
61    def get_issuer(self) -> Element:
62        """Get Issuer element"""
63        issuer = Element(f"{{{NS_SAML_ASSERTION}}}Issuer")
64        issuer.text = self._get_issuer_value()
65        return issuer

Get Issuer element

def build(self, status: str = 'Success') -> lxml.etree.Element:
67    def build(self, status: str = "Success") -> Element:
68        """Build a SAML LogoutResponse as etree Element"""
69        response = Element(f"{{{NS_SAML_PROTOCOL}}}LogoutResponse", nsmap=NS_MAP)
70        response.attrib["Version"] = "2.0"
71        response.attrib["IssueInstant"] = self._issue_instant
72        response.attrib["ID"] = self._response_id
73
74        if self.destination:
75            response.attrib["Destination"] = self.destination
76
77        if self.logout_request and self.logout_request.id:
78            response.attrib["InResponseTo"] = self.logout_request.id
79
80        response.append(self.get_issuer())
81
82        # Add Status element
83        status_element = SubElement(response, f"{{{NS_SAML_PROTOCOL}}}Status")
84        status_code = SubElement(status_element, f"{{{NS_SAML_PROTOCOL}}}StatusCode")
85        status_code.attrib["Value"] = f"urn:oasis:names:tc:SAML:2.0:status:{status}"
86
87        return response

Build a SAML LogoutResponse as etree Element

def build_response(self, status: str = 'Success') -> str:
89    def build_response(self, status: str = "Success") -> str:
90        """Build and sign LogoutResponse, return as XML string (not encoded)"""
91        response = self.build(status)
92        if self.provider.signing_kp and self.provider.sign_logout_response:
93            self._add_signature(response)
94            self._sign_response(response)
95        return etree.tostring(response).decode()

Build and sign LogoutResponse, return as XML string (not encoded)

def encode_post(self, status: str = 'Success') -> str:
 97    def encode_post(self, status: str = "Success") -> str:
 98        """Encode LogoutResponse for POST binding"""
 99        response = self.build(status)
100        if self.provider.signing_kp and self.provider.sign_logout_response:
101            self._add_signature(response)
102            self._sign_response(response)
103        return base64.b64encode(etree.tostring(response)).decode()

Encode LogoutResponse for POST binding

def encode_redirect(self, status: str = 'Success') -> str:
105    def encode_redirect(self, status: str = "Success") -> str:
106        """Encode LogoutResponse for Redirect binding"""
107        response = self.build(status)
108        # Note: For redirect binding, signatures are added as query parameters, not in XML
109        xml_str = etree.tostring(response, encoding="UTF-8", xml_declaration=True)
110        return deflate_and_base64_encode(xml_str.decode("UTF-8"))

Encode LogoutResponse for Redirect binding

def get_redirect_url(self, status: str = 'Success') -> str:
112    def get_redirect_url(self, status: str = "Success") -> str:
113        """Build complete logout response URL for redirect binding with signature if needed"""
114        encoded_response = self.encode_redirect(status)
115        params = {
116            "SAMLResponse": encoded_response,
117        }
118
119        if self.relay_state:
120            params["RelayState"] = self.relay_state
121
122        if self.provider.signing_kp and self.provider.sign_logout_response:
123            sig_alg = self.provider.signature_algorithm
124            params["SigAlg"] = sig_alg
125
126            # Build the string to sign
127            query_string = self._build_signable_query_string(params)
128
129            signature = self._sign_query_string(query_string)
130            params["Signature"] = base64.b64encode(signature).decode()
131
132        # Some SP's use query params on their sls endpoint
133        if not self.destination:
134            raise ValueError("destination is required for redirect URL")
135
136        separator = "&" if "?" in self.destination else "?"
137        return f"{self.destination}{separator}{urlencode(params)}"

Build complete logout response URL for redirect binding with signature if needed