authentik.providers.saml.processors.logout_response_processor

LogoutResponse processor

  1"""LogoutResponse processor"""
  2
  3import base64
  4from urllib.parse import quote, urlencode
  5
  6import xmlsec
  7from lxml import etree
  8from lxml.etree import Element, SubElement
  9
 10from authentik.common.saml.constants import (
 11    DIGEST_ALGORITHM_TRANSLATION_MAP,
 12    NS_MAP,
 13    NS_SAML_ASSERTION,
 14    NS_SAML_PROTOCOL,
 15    SIGN_ALGORITHM_TRANSFORM_MAP,
 16)
 17from authentik.providers.saml.models import SAMLProvider
 18from authentik.providers.saml.processors.logout_request_parser import LogoutRequest
 19from authentik.providers.saml.utils import get_random_id
 20from authentik.providers.saml.utils.encoding import deflate_and_base64_encode
 21from authentik.providers.saml.utils.time import get_time_string
 22
 23
 24class LogoutResponseProcessor:
 25    """Generate a SAML LogoutResponse"""
 26
 27    provider: SAMLProvider
 28    logout_request: LogoutRequest
 29    destination: str | None
 30    relay_state: str | None
 31    _issue_instant: str
 32    _response_id: str
 33
 34    def __init__(
 35        self,
 36        provider: SAMLProvider,
 37        logout_request: LogoutRequest,
 38        destination: str | None = None,
 39        relay_state: str | None = None,
 40    ):
 41        self.provider = provider
 42        self.logout_request = logout_request
 43        self.destination = destination
 44        self.relay_state = relay_state or (logout_request.relay_state if logout_request else None)
 45        self._issue_instant = get_time_string()
 46        self._response_id = get_random_id()
 47
 48    def get_issuer(self) -> Element:
 49        """Get Issuer element"""
 50        issuer = Element(f"{{{NS_SAML_ASSERTION}}}Issuer")
 51        issuer.text = self.provider.issuer
 52        return issuer
 53
 54    def build(self, status: str = "Success") -> Element:
 55        """Build a SAML LogoutResponse as etree Element"""
 56        response = Element(f"{{{NS_SAML_PROTOCOL}}}LogoutResponse", nsmap=NS_MAP)
 57        response.attrib["Version"] = "2.0"
 58        response.attrib["IssueInstant"] = self._issue_instant
 59        response.attrib["ID"] = self._response_id
 60
 61        if self.destination:
 62            response.attrib["Destination"] = self.destination
 63
 64        if self.logout_request and self.logout_request.id:
 65            response.attrib["InResponseTo"] = self.logout_request.id
 66
 67        response.append(self.get_issuer())
 68
 69        # Add Status element
 70        status_element = SubElement(response, f"{{{NS_SAML_PROTOCOL}}}Status")
 71        status_code = SubElement(status_element, f"{{{NS_SAML_PROTOCOL}}}StatusCode")
 72        status_code.attrib["Value"] = f"urn:oasis:names:tc:SAML:2.0:status:{status}"
 73
 74        return response
 75
 76    def build_response(self, status: str = "Success") -> str:
 77        """Build and sign LogoutResponse, return as XML string (not encoded)"""
 78        response = self.build(status)
 79        if self.provider.signing_kp and self.provider.sign_logout_response:
 80            self._add_signature(response)
 81            self._sign_response(response)
 82        return etree.tostring(response).decode()
 83
 84    def encode_post(self, status: str = "Success") -> str:
 85        """Encode LogoutResponse for POST binding"""
 86        response = self.build(status)
 87        if self.provider.signing_kp and self.provider.sign_logout_response:
 88            self._add_signature(response)
 89            self._sign_response(response)
 90        return base64.b64encode(etree.tostring(response)).decode()
 91
 92    def encode_redirect(self, status: str = "Success") -> str:
 93        """Encode LogoutResponse for Redirect binding"""
 94        response = self.build(status)
 95        # Note: For redirect binding, signatures are added as query parameters, not in XML
 96        xml_str = etree.tostring(response, encoding="UTF-8", xml_declaration=True)
 97        return deflate_and_base64_encode(xml_str.decode("UTF-8"))
 98
 99    def get_redirect_url(self, status: str = "Success") -> str:
100        """Build complete logout response URL for redirect binding with signature if needed"""
101        encoded_response = self.encode_redirect(status)
102        params = {
103            "SAMLResponse": encoded_response,
104        }
105
106        if self.relay_state:
107            params["RelayState"] = self.relay_state
108
109        if self.provider.signing_kp and self.provider.sign_logout_response:
110            sig_alg = self.provider.signature_algorithm
111            params["SigAlg"] = sig_alg
112
113            # Build the string to sign
114            query_string = self._build_signable_query_string(params)
115
116            signature = self._sign_query_string(query_string)
117            params["Signature"] = base64.b64encode(signature).decode()
118
119        # Some SP's use query params on their sls endpoint
120        if not self.destination:
121            raise ValueError("destination is required for redirect URL")
122
123        separator = "&" if "?" in self.destination else "?"
124        return f"{self.destination}{separator}{urlencode(params)}"
125
126    def _add_signature(self, element: Element):
127        """Add signature placeholder to element"""
128        sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get(
129            self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha1
130        )
131        signature = xmlsec.template.create(
132            element,
133            xmlsec.constants.TransformExclC14N,
134            sign_algorithm_transform,
135            ns=xmlsec.constants.DSigNs,
136        )
137        element.insert(1, signature)  # Insert after Issuer
138
139    def _sign_response(self, response: Element):
140        """Sign the response element"""
141        digest_algorithm_transform = DIGEST_ALGORITHM_TRANSLATION_MAP.get(
142            self.provider.digest_algorithm, xmlsec.constants.TransformSha1
143        )
144
145        xmlsec.tree.add_ids(response, ["ID"])
146        signature_node = xmlsec.tree.find_node(response, xmlsec.constants.NodeSignature)
147
148        ref = xmlsec.template.add_reference(
149            signature_node,
150            digest_algorithm_transform,
151            uri="#" + response.attrib["ID"],
152        )
153        xmlsec.template.add_transform(ref, xmlsec.constants.TransformEnveloped)
154        xmlsec.template.add_transform(ref, xmlsec.constants.TransformExclC14N)
155        key_info = xmlsec.template.ensure_key_info(signature_node)
156        xmlsec.template.add_x509_data(key_info)
157
158        ctx = xmlsec.SignatureContext()
159        ctx.key = xmlsec.Key.from_memory(
160            self.provider.signing_kp.key_data,  # Use key_data for the private key
161            xmlsec.constants.KeyDataFormatPem,
162        )
163        ctx.key.load_cert_from_memory(
164            self.provider.signing_kp.certificate_data, xmlsec.constants.KeyDataFormatPem
165        )
166        ctx.sign(signature_node)
167
168    def _build_signable_query_string(self, params: dict) -> str:
169        """Build query string for signing (order matters per SAML spec)"""
170        # SAML spec requires specific order: SAMLResponse, RelayState, SigAlg
171        # Values must be URL-encoded individually before concatenation
172        ordered = []
173        if "SAMLResponse" in params:
174            ordered.append(f"SAMLResponse={quote(params['SAMLResponse'], safe='')}")
175        if "RelayState" in params:
176            ordered.append(f"RelayState={quote(params['RelayState'], safe='')}")
177        if "SigAlg" in params:
178            ordered.append(f"SigAlg={quote(params['SigAlg'], safe='')}")
179        return "&".join(ordered)
180
181    def _sign_query_string(self, query_string: str) -> bytes:
182        """Sign the query string for redirect binding"""
183        signature_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get(
184            self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha256
185        )
186
187        key = xmlsec.Key.from_memory(
188            self.provider.signing_kp.key_data,
189            xmlsec.constants.KeyDataFormatPem,
190            None,
191        )
192
193        ctx = xmlsec.SignatureContext()
194        ctx.key = key
195
196        return ctx.sign_binary(query_string.encode("utf-8"), signature_algorithm_transform)
class LogoutResponseProcessor:
 25class LogoutResponseProcessor:
 26    """Generate a SAML LogoutResponse"""
 27
 28    provider: SAMLProvider
 29    logout_request: LogoutRequest
 30    destination: str | None
 31    relay_state: str | None
 32    _issue_instant: str
 33    _response_id: str
 34
 35    def __init__(
 36        self,
 37        provider: SAMLProvider,
 38        logout_request: LogoutRequest,
 39        destination: str | None = None,
 40        relay_state: str | None = None,
 41    ):
 42        self.provider = provider
 43        self.logout_request = logout_request
 44        self.destination = destination
 45        self.relay_state = relay_state or (logout_request.relay_state if logout_request else None)
 46        self._issue_instant = get_time_string()
 47        self._response_id = get_random_id()
 48
 49    def get_issuer(self) -> Element:
 50        """Get Issuer element"""
 51        issuer = Element(f"{{{NS_SAML_ASSERTION}}}Issuer")
 52        issuer.text = self.provider.issuer
 53        return issuer
 54
 55    def build(self, status: str = "Success") -> Element:
 56        """Build a SAML LogoutResponse as etree Element"""
 57        response = Element(f"{{{NS_SAML_PROTOCOL}}}LogoutResponse", nsmap=NS_MAP)
 58        response.attrib["Version"] = "2.0"
 59        response.attrib["IssueInstant"] = self._issue_instant
 60        response.attrib["ID"] = self._response_id
 61
 62        if self.destination:
 63            response.attrib["Destination"] = self.destination
 64
 65        if self.logout_request and self.logout_request.id:
 66            response.attrib["InResponseTo"] = self.logout_request.id
 67
 68        response.append(self.get_issuer())
 69
 70        # Add Status element
 71        status_element = SubElement(response, f"{{{NS_SAML_PROTOCOL}}}Status")
 72        status_code = SubElement(status_element, f"{{{NS_SAML_PROTOCOL}}}StatusCode")
 73        status_code.attrib["Value"] = f"urn:oasis:names:tc:SAML:2.0:status:{status}"
 74
 75        return response
 76
 77    def build_response(self, status: str = "Success") -> str:
 78        """Build and sign LogoutResponse, return as XML string (not encoded)"""
 79        response = self.build(status)
 80        if self.provider.signing_kp and self.provider.sign_logout_response:
 81            self._add_signature(response)
 82            self._sign_response(response)
 83        return etree.tostring(response).decode()
 84
 85    def encode_post(self, status: str = "Success") -> str:
 86        """Encode LogoutResponse for POST binding"""
 87        response = self.build(status)
 88        if self.provider.signing_kp and self.provider.sign_logout_response:
 89            self._add_signature(response)
 90            self._sign_response(response)
 91        return base64.b64encode(etree.tostring(response)).decode()
 92
 93    def encode_redirect(self, status: str = "Success") -> str:
 94        """Encode LogoutResponse for Redirect binding"""
 95        response = self.build(status)
 96        # Note: For redirect binding, signatures are added as query parameters, not in XML
 97        xml_str = etree.tostring(response, encoding="UTF-8", xml_declaration=True)
 98        return deflate_and_base64_encode(xml_str.decode("UTF-8"))
 99
100    def get_redirect_url(self, status: str = "Success") -> str:
101        """Build complete logout response URL for redirect binding with signature if needed"""
102        encoded_response = self.encode_redirect(status)
103        params = {
104            "SAMLResponse": encoded_response,
105        }
106
107        if self.relay_state:
108            params["RelayState"] = self.relay_state
109
110        if self.provider.signing_kp and self.provider.sign_logout_response:
111            sig_alg = self.provider.signature_algorithm
112            params["SigAlg"] = sig_alg
113
114            # Build the string to sign
115            query_string = self._build_signable_query_string(params)
116
117            signature = self._sign_query_string(query_string)
118            params["Signature"] = base64.b64encode(signature).decode()
119
120        # Some SP's use query params on their sls endpoint
121        if not self.destination:
122            raise ValueError("destination is required for redirect URL")
123
124        separator = "&" if "?" in self.destination else "?"
125        return f"{self.destination}{separator}{urlencode(params)}"
126
127    def _add_signature(self, element: Element):
128        """Add signature placeholder to element"""
129        sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get(
130            self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha1
131        )
132        signature = xmlsec.template.create(
133            element,
134            xmlsec.constants.TransformExclC14N,
135            sign_algorithm_transform,
136            ns=xmlsec.constants.DSigNs,
137        )
138        element.insert(1, signature)  # Insert after Issuer
139
140    def _sign_response(self, response: Element):
141        """Sign the response element"""
142        digest_algorithm_transform = DIGEST_ALGORITHM_TRANSLATION_MAP.get(
143            self.provider.digest_algorithm, xmlsec.constants.TransformSha1
144        )
145
146        xmlsec.tree.add_ids(response, ["ID"])
147        signature_node = xmlsec.tree.find_node(response, xmlsec.constants.NodeSignature)
148
149        ref = xmlsec.template.add_reference(
150            signature_node,
151            digest_algorithm_transform,
152            uri="#" + response.attrib["ID"],
153        )
154        xmlsec.template.add_transform(ref, xmlsec.constants.TransformEnveloped)
155        xmlsec.template.add_transform(ref, xmlsec.constants.TransformExclC14N)
156        key_info = xmlsec.template.ensure_key_info(signature_node)
157        xmlsec.template.add_x509_data(key_info)
158
159        ctx = xmlsec.SignatureContext()
160        ctx.key = xmlsec.Key.from_memory(
161            self.provider.signing_kp.key_data,  # Use key_data for the private key
162            xmlsec.constants.KeyDataFormatPem,
163        )
164        ctx.key.load_cert_from_memory(
165            self.provider.signing_kp.certificate_data, xmlsec.constants.KeyDataFormatPem
166        )
167        ctx.sign(signature_node)
168
169    def _build_signable_query_string(self, params: dict) -> str:
170        """Build query string for signing (order matters per SAML spec)"""
171        # SAML spec requires specific order: SAMLResponse, RelayState, SigAlg
172        # Values must be URL-encoded individually before concatenation
173        ordered = []
174        if "SAMLResponse" in params:
175            ordered.append(f"SAMLResponse={quote(params['SAMLResponse'], safe='')}")
176        if "RelayState" in params:
177            ordered.append(f"RelayState={quote(params['RelayState'], safe='')}")
178        if "SigAlg" in params:
179            ordered.append(f"SigAlg={quote(params['SigAlg'], safe='')}")
180        return "&".join(ordered)
181
182    def _sign_query_string(self, query_string: str) -> bytes:
183        """Sign the query string for redirect binding"""
184        signature_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get(
185            self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha256
186        )
187
188        key = xmlsec.Key.from_memory(
189            self.provider.signing_kp.key_data,
190            xmlsec.constants.KeyDataFormatPem,
191            None,
192        )
193
194        ctx = xmlsec.SignatureContext()
195        ctx.key = key
196
197        return ctx.sign_binary(query_string.encode("utf-8"), signature_algorithm_transform)

Generate a SAML LogoutResponse

LogoutResponseProcessor( provider: authentik.providers.saml.models.SAMLProvider, logout_request: authentik.providers.saml.processors.logout_request_parser.LogoutRequest, destination: str | None = None, relay_state: str | None = None)
35    def __init__(
36        self,
37        provider: SAMLProvider,
38        logout_request: LogoutRequest,
39        destination: str | None = None,
40        relay_state: str | None = None,
41    ):
42        self.provider = provider
43        self.logout_request = logout_request
44        self.destination = destination
45        self.relay_state = relay_state or (logout_request.relay_state if logout_request else None)
46        self._issue_instant = get_time_string()
47        self._response_id = get_random_id()
destination: str | None
relay_state: str | None
def get_issuer(self) -> lxml.etree.Element:
49    def get_issuer(self) -> Element:
50        """Get Issuer element"""
51        issuer = Element(f"{{{NS_SAML_ASSERTION}}}Issuer")
52        issuer.text = self.provider.issuer
53        return issuer

Get Issuer element

def build(self, status: str = 'Success') -> lxml.etree.Element:
55    def build(self, status: str = "Success") -> Element:
56        """Build a SAML LogoutResponse as etree Element"""
57        response = Element(f"{{{NS_SAML_PROTOCOL}}}LogoutResponse", nsmap=NS_MAP)
58        response.attrib["Version"] = "2.0"
59        response.attrib["IssueInstant"] = self._issue_instant
60        response.attrib["ID"] = self._response_id
61
62        if self.destination:
63            response.attrib["Destination"] = self.destination
64
65        if self.logout_request and self.logout_request.id:
66            response.attrib["InResponseTo"] = self.logout_request.id
67
68        response.append(self.get_issuer())
69
70        # Add Status element
71        status_element = SubElement(response, f"{{{NS_SAML_PROTOCOL}}}Status")
72        status_code = SubElement(status_element, f"{{{NS_SAML_PROTOCOL}}}StatusCode")
73        status_code.attrib["Value"] = f"urn:oasis:names:tc:SAML:2.0:status:{status}"
74
75        return response

Build a SAML LogoutResponse as etree Element

def build_response(self, status: str = 'Success') -> str:
77    def build_response(self, status: str = "Success") -> str:
78        """Build and sign LogoutResponse, return as XML string (not encoded)"""
79        response = self.build(status)
80        if self.provider.signing_kp and self.provider.sign_logout_response:
81            self._add_signature(response)
82            self._sign_response(response)
83        return etree.tostring(response).decode()

Build and sign LogoutResponse, return as XML string (not encoded)

def encode_post(self, status: str = 'Success') -> str:
85    def encode_post(self, status: str = "Success") -> str:
86        """Encode LogoutResponse for POST binding"""
87        response = self.build(status)
88        if self.provider.signing_kp and self.provider.sign_logout_response:
89            self._add_signature(response)
90            self._sign_response(response)
91        return base64.b64encode(etree.tostring(response)).decode()

Encode LogoutResponse for POST binding

def encode_redirect(self, status: str = 'Success') -> str:
93    def encode_redirect(self, status: str = "Success") -> str:
94        """Encode LogoutResponse for Redirect binding"""
95        response = self.build(status)
96        # Note: For redirect binding, signatures are added as query parameters, not in XML
97        xml_str = etree.tostring(response, encoding="UTF-8", xml_declaration=True)
98        return deflate_and_base64_encode(xml_str.decode("UTF-8"))

Encode LogoutResponse for Redirect binding

def get_redirect_url(self, status: str = 'Success') -> str:
100    def get_redirect_url(self, status: str = "Success") -> str:
101        """Build complete logout response URL for redirect binding with signature if needed"""
102        encoded_response = self.encode_redirect(status)
103        params = {
104            "SAMLResponse": encoded_response,
105        }
106
107        if self.relay_state:
108            params["RelayState"] = self.relay_state
109
110        if self.provider.signing_kp and self.provider.sign_logout_response:
111            sig_alg = self.provider.signature_algorithm
112            params["SigAlg"] = sig_alg
113
114            # Build the string to sign
115            query_string = self._build_signable_query_string(params)
116
117            signature = self._sign_query_string(query_string)
118            params["Signature"] = base64.b64encode(signature).decode()
119
120        # Some SP's use query params on their sls endpoint
121        if not self.destination:
122            raise ValueError("destination is required for redirect URL")
123
124        separator = "&" if "?" in self.destination else "?"
125        return f"{self.destination}{separator}{urlencode(params)}"

Build complete logout response URL for redirect binding with signature if needed