authentik.providers.saml.processors.authn_request_parser

SAML AuthNRequest Parser and dataclass

  1"""SAML AuthNRequest Parser and dataclass"""
  2
  3from base64 import b64decode
  4from dataclasses import dataclass
  5from urllib.parse import quote_plus
  6from xml.etree.ElementTree import ParseError  # nosec
  7
  8import xmlsec
  9from defusedxml import ElementTree
 10from structlog.stdlib import get_logger
 11
 12from authentik.common.saml.constants import (
 13    DSA_SHA1,
 14    NS_MAP,
 15    NS_SAML_PROTOCOL,
 16    RSA_SHA1,
 17    RSA_SHA256,
 18    RSA_SHA384,
 19    RSA_SHA512,
 20    SAML_NAME_ID_FORMAT_UNSPECIFIED,
 21)
 22from authentik.lib.xml import lxml_from_string
 23from authentik.providers.saml.exceptions import CannotHandleAssertion
 24from authentik.providers.saml.models import SAMLProvider
 25from authentik.providers.saml.utils.encoding import decode_base64_and_inflate
 26from authentik.sources.saml.models import SAMLNameIDPolicy
 27
 28ERROR_CANNOT_DECODE_REQUEST = "Cannot decode SAML request."
 29ERROR_SIGNATURE_REQUIRED_BUT_ABSENT = (
 30    "Verification Certificate configured, but request is not signed."
 31)
 32ERROR_FAILED_TO_VERIFY = "Failed to verify signature"
 33
 34
 35@dataclass(slots=True)
 36class AuthNRequest:
 37    """AuthNRequest Dataclass"""
 38
 39    id: str | None = None
 40
 41    relay_state: str | None = None
 42
 43    name_id_policy: str = SAML_NAME_ID_FORMAT_UNSPECIFIED
 44
 45
 46class AuthNRequestParser:
 47    """AuthNRequest Parser"""
 48
 49    provider: SAMLProvider
 50
 51    def __init__(self, provider: SAMLProvider):
 52        self.provider = provider
 53        self.logger = get_logger().bind(provider=self.provider)
 54
 55    def _parse_xml(self, decoded_xml: str | bytes, relay_state: str | None) -> AuthNRequest:
 56        root = ElementTree.fromstring(decoded_xml)
 57
 58        # http://docs.oasis-open.org/security/saml/v2.0/saml-core-2.0-os.pdf
 59        # `AssertionConsumerServiceURL` can be omitted, and we should fallback to the
 60        # default ACS URL
 61        if "AssertionConsumerServiceURL" not in root.attrib:
 62            request_acs_url = self.provider.acs_url.lower()
 63        else:
 64            request_acs_url = root.attrib["AssertionConsumerServiceURL"]
 65
 66        if self.provider.acs_url.lower() != request_acs_url.lower():
 67            msg = (
 68                f"ACS URL of {request_acs_url} doesn't match Provider "
 69                f"ACS URL of {self.provider.acs_url}."
 70            )
 71            self.logger.warning(msg)
 72            raise CannotHandleAssertion(msg)
 73
 74        auth_n_request = AuthNRequest(id=root.attrib["ID"], relay_state=relay_state)
 75
 76        # Check if AuthnRequest has a NameID Policy object
 77        name_id_policies = root.findall(f"{{{NS_SAML_PROTOCOL}}}NameIDPolicy")
 78        if len(name_id_policies) > 0:
 79            name_id_policy = name_id_policies[0]
 80            auth_n_request.name_id_policy = name_id_policy.attrib.get(
 81                "Format", SAML_NAME_ID_FORMAT_UNSPECIFIED
 82            )
 83
 84        return auth_n_request
 85
 86    def parse(self, saml_request: str, relay_state: str | None = None) -> AuthNRequest:
 87        """Validate and parse raw request with enveloped signautre."""
 88        try:
 89            decoded_xml = b64decode(saml_request.encode())
 90        except UnicodeDecodeError:
 91            raise CannotHandleAssertion(ERROR_CANNOT_DECODE_REQUEST) from None
 92
 93        verifier = self.provider.verification_kp
 94        if not verifier:
 95            return self._parse_xml(decoded_xml, relay_state)
 96
 97        root = lxml_from_string(decoded_xml)
 98        xmlsec.tree.add_ids(root, ["ID"])
 99        signature_nodes = root.xpath("/samlp:AuthnRequest/ds:Signature", namespaces=NS_MAP)
100        # No signatures, no verifier configured -> decode xml directly
101        if len(signature_nodes) < 1:
102            raise CannotHandleAssertion(ERROR_SIGNATURE_REQUIRED_BUT_ABSENT)
103
104        signature_node = signature_nodes[0]
105
106        if signature_node is not None:
107            try:
108                ctx = xmlsec.SignatureContext()
109                key = xmlsec.Key.from_memory(
110                    verifier.certificate_data,
111                    xmlsec.constants.KeyDataFormatCertPem,
112                    None,
113                )
114                ctx.key = key
115                ctx.verify(signature_node)
116            except xmlsec.Error as exc:
117                raise CannotHandleAssertion(ERROR_FAILED_TO_VERIFY) from exc
118
119        return self._parse_xml(decoded_xml, relay_state)
120
121    def parse_detached(
122        self,
123        saml_request: str,
124        relay_state: str | None,
125        signature: str | None = None,
126        sig_alg: str | None = None,
127    ) -> AuthNRequest:
128        """Validate and parse raw request with detached signature"""
129        try:
130            decoded_xml = decode_base64_and_inflate(saml_request)
131        except UnicodeDecodeError:
132            raise CannotHandleAssertion(ERROR_CANNOT_DECODE_REQUEST) from None
133
134        verifier = self.provider.verification_kp
135        if not verifier:
136            return self._parse_xml(decoded_xml, relay_state)
137
138        if verifier and not (signature and sig_alg):
139            raise CannotHandleAssertion(ERROR_SIGNATURE_REQUIRED_BUT_ABSENT)
140
141        if signature and sig_alg:
142            querystring = f"SAMLRequest={quote_plus(saml_request)}&"
143            if relay_state is not None:
144                querystring += f"RelayState={quote_plus(relay_state)}&"
145            querystring += f"SigAlg={quote_plus(sig_alg)}"
146
147            dsig_ctx = xmlsec.SignatureContext()
148            key = xmlsec.Key.from_memory(
149                verifier.certificate_data, xmlsec.constants.KeyDataFormatCertPem, None
150            )
151            dsig_ctx.key = key
152
153            sign_algorithm_transform_map = {
154                DSA_SHA1: xmlsec.constants.TransformDsaSha1,
155                RSA_SHA1: xmlsec.constants.TransformRsaSha1,
156                RSA_SHA256: xmlsec.constants.TransformRsaSha256,
157                RSA_SHA384: xmlsec.constants.TransformRsaSha384,
158                RSA_SHA512: xmlsec.constants.TransformRsaSha512,
159            }
160            sign_algorithm_transform = sign_algorithm_transform_map.get(
161                sig_alg, xmlsec.constants.TransformRsaSha1
162            )
163
164            try:
165                dsig_ctx.verify_binary(
166                    querystring.encode("utf-8"),
167                    sign_algorithm_transform,
168                    b64decode(signature),
169                )
170            except xmlsec.Error as exc:
171                raise CannotHandleAssertion(ERROR_FAILED_TO_VERIFY) from exc
172        try:
173            return self._parse_xml(decoded_xml, relay_state)
174        except ParseError as exc:
175            raise CannotHandleAssertion(ERROR_FAILED_TO_VERIFY) from exc
176
177    def idp_initiated(self) -> AuthNRequest:
178        """Create IdP Initiated AuthNRequest"""
179        request = AuthNRequest(relay_state=None)
180        if self.provider.default_relay_state != "":
181            request.relay_state = self.provider.default_relay_state
182        if self.provider.default_name_id_policy != SAMLNameIDPolicy.UNSPECIFIED:
183            request.name_id_policy = self.provider.default_name_id_policy
184        return request
ERROR_CANNOT_DECODE_REQUEST = 'Cannot decode SAML request.'
ERROR_SIGNATURE_REQUIRED_BUT_ABSENT = 'Verification Certificate configured, but request is not signed.'
ERROR_FAILED_TO_VERIFY = 'Failed to verify signature'
@dataclass(slots=True)
class AuthNRequest:
36@dataclass(slots=True)
37class AuthNRequest:
38    """AuthNRequest Dataclass"""
39
40    id: str | None = None
41
42    relay_state: str | None = None
43
44    name_id_policy: str = SAML_NAME_ID_FORMAT_UNSPECIFIED

AuthNRequest Dataclass

AuthNRequest( id: str | None = None, relay_state: str | None = None, name_id_policy: str = 'urn:oasis:names:tc:SAML:1.1:nameid-format:unspecified')
id: str | None
relay_state: str | None
name_id_policy: str
class AuthNRequestParser:
 47class AuthNRequestParser:
 48    """AuthNRequest Parser"""
 49
 50    provider: SAMLProvider
 51
 52    def __init__(self, provider: SAMLProvider):
 53        self.provider = provider
 54        self.logger = get_logger().bind(provider=self.provider)
 55
 56    def _parse_xml(self, decoded_xml: str | bytes, relay_state: str | None) -> AuthNRequest:
 57        root = ElementTree.fromstring(decoded_xml)
 58
 59        # http://docs.oasis-open.org/security/saml/v2.0/saml-core-2.0-os.pdf
 60        # `AssertionConsumerServiceURL` can be omitted, and we should fallback to the
 61        # default ACS URL
 62        if "AssertionConsumerServiceURL" not in root.attrib:
 63            request_acs_url = self.provider.acs_url.lower()
 64        else:
 65            request_acs_url = root.attrib["AssertionConsumerServiceURL"]
 66
 67        if self.provider.acs_url.lower() != request_acs_url.lower():
 68            msg = (
 69                f"ACS URL of {request_acs_url} doesn't match Provider "
 70                f"ACS URL of {self.provider.acs_url}."
 71            )
 72            self.logger.warning(msg)
 73            raise CannotHandleAssertion(msg)
 74
 75        auth_n_request = AuthNRequest(id=root.attrib["ID"], relay_state=relay_state)
 76
 77        # Check if AuthnRequest has a NameID Policy object
 78        name_id_policies = root.findall(f"{{{NS_SAML_PROTOCOL}}}NameIDPolicy")
 79        if len(name_id_policies) > 0:
 80            name_id_policy = name_id_policies[0]
 81            auth_n_request.name_id_policy = name_id_policy.attrib.get(
 82                "Format", SAML_NAME_ID_FORMAT_UNSPECIFIED
 83            )
 84
 85        return auth_n_request
 86
 87    def parse(self, saml_request: str, relay_state: str | None = None) -> AuthNRequest:
 88        """Validate and parse raw request with enveloped signautre."""
 89        try:
 90            decoded_xml = b64decode(saml_request.encode())
 91        except UnicodeDecodeError:
 92            raise CannotHandleAssertion(ERROR_CANNOT_DECODE_REQUEST) from None
 93
 94        verifier = self.provider.verification_kp
 95        if not verifier:
 96            return self._parse_xml(decoded_xml, relay_state)
 97
 98        root = lxml_from_string(decoded_xml)
 99        xmlsec.tree.add_ids(root, ["ID"])
100        signature_nodes = root.xpath("/samlp:AuthnRequest/ds:Signature", namespaces=NS_MAP)
101        # No signatures, no verifier configured -> decode xml directly
102        if len(signature_nodes) < 1:
103            raise CannotHandleAssertion(ERROR_SIGNATURE_REQUIRED_BUT_ABSENT)
104
105        signature_node = signature_nodes[0]
106
107        if signature_node is not None:
108            try:
109                ctx = xmlsec.SignatureContext()
110                key = xmlsec.Key.from_memory(
111                    verifier.certificate_data,
112                    xmlsec.constants.KeyDataFormatCertPem,
113                    None,
114                )
115                ctx.key = key
116                ctx.verify(signature_node)
117            except xmlsec.Error as exc:
118                raise CannotHandleAssertion(ERROR_FAILED_TO_VERIFY) from exc
119
120        return self._parse_xml(decoded_xml, relay_state)
121
122    def parse_detached(
123        self,
124        saml_request: str,
125        relay_state: str | None,
126        signature: str | None = None,
127        sig_alg: str | None = None,
128    ) -> AuthNRequest:
129        """Validate and parse raw request with detached signature"""
130        try:
131            decoded_xml = decode_base64_and_inflate(saml_request)
132        except UnicodeDecodeError:
133            raise CannotHandleAssertion(ERROR_CANNOT_DECODE_REQUEST) from None
134
135        verifier = self.provider.verification_kp
136        if not verifier:
137            return self._parse_xml(decoded_xml, relay_state)
138
139        if verifier and not (signature and sig_alg):
140            raise CannotHandleAssertion(ERROR_SIGNATURE_REQUIRED_BUT_ABSENT)
141
142        if signature and sig_alg:
143            querystring = f"SAMLRequest={quote_plus(saml_request)}&"
144            if relay_state is not None:
145                querystring += f"RelayState={quote_plus(relay_state)}&"
146            querystring += f"SigAlg={quote_plus(sig_alg)}"
147
148            dsig_ctx = xmlsec.SignatureContext()
149            key = xmlsec.Key.from_memory(
150                verifier.certificate_data, xmlsec.constants.KeyDataFormatCertPem, None
151            )
152            dsig_ctx.key = key
153
154            sign_algorithm_transform_map = {
155                DSA_SHA1: xmlsec.constants.TransformDsaSha1,
156                RSA_SHA1: xmlsec.constants.TransformRsaSha1,
157                RSA_SHA256: xmlsec.constants.TransformRsaSha256,
158                RSA_SHA384: xmlsec.constants.TransformRsaSha384,
159                RSA_SHA512: xmlsec.constants.TransformRsaSha512,
160            }
161            sign_algorithm_transform = sign_algorithm_transform_map.get(
162                sig_alg, xmlsec.constants.TransformRsaSha1
163            )
164
165            try:
166                dsig_ctx.verify_binary(
167                    querystring.encode("utf-8"),
168                    sign_algorithm_transform,
169                    b64decode(signature),
170                )
171            except xmlsec.Error as exc:
172                raise CannotHandleAssertion(ERROR_FAILED_TO_VERIFY) from exc
173        try:
174            return self._parse_xml(decoded_xml, relay_state)
175        except ParseError as exc:
176            raise CannotHandleAssertion(ERROR_FAILED_TO_VERIFY) from exc
177
178    def idp_initiated(self) -> AuthNRequest:
179        """Create IdP Initiated AuthNRequest"""
180        request = AuthNRequest(relay_state=None)
181        if self.provider.default_relay_state != "":
182            request.relay_state = self.provider.default_relay_state
183        if self.provider.default_name_id_policy != SAMLNameIDPolicy.UNSPECIFIED:
184            request.name_id_policy = self.provider.default_name_id_policy
185        return request

AuthNRequest Parser

AuthNRequestParser(provider: authentik.providers.saml.models.SAMLProvider)
52    def __init__(self, provider: SAMLProvider):
53        self.provider = provider
54        self.logger = get_logger().bind(provider=self.provider)
logger
def parse( self, saml_request: str, relay_state: str | None = None) -> AuthNRequest:
 87    def parse(self, saml_request: str, relay_state: str | None = None) -> AuthNRequest:
 88        """Validate and parse raw request with enveloped signautre."""
 89        try:
 90            decoded_xml = b64decode(saml_request.encode())
 91        except UnicodeDecodeError:
 92            raise CannotHandleAssertion(ERROR_CANNOT_DECODE_REQUEST) from None
 93
 94        verifier = self.provider.verification_kp
 95        if not verifier:
 96            return self._parse_xml(decoded_xml, relay_state)
 97
 98        root = lxml_from_string(decoded_xml)
 99        xmlsec.tree.add_ids(root, ["ID"])
100        signature_nodes = root.xpath("/samlp:AuthnRequest/ds:Signature", namespaces=NS_MAP)
101        # No signatures, no verifier configured -> decode xml directly
102        if len(signature_nodes) < 1:
103            raise CannotHandleAssertion(ERROR_SIGNATURE_REQUIRED_BUT_ABSENT)
104
105        signature_node = signature_nodes[0]
106
107        if signature_node is not None:
108            try:
109                ctx = xmlsec.SignatureContext()
110                key = xmlsec.Key.from_memory(
111                    verifier.certificate_data,
112                    xmlsec.constants.KeyDataFormatCertPem,
113                    None,
114                )
115                ctx.key = key
116                ctx.verify(signature_node)
117            except xmlsec.Error as exc:
118                raise CannotHandleAssertion(ERROR_FAILED_TO_VERIFY) from exc
119
120        return self._parse_xml(decoded_xml, relay_state)

Validate and parse raw request with enveloped signautre.

def parse_detached( self, saml_request: str, relay_state: str | None, signature: str | None = None, sig_alg: str | None = None) -> AuthNRequest:
122    def parse_detached(
123        self,
124        saml_request: str,
125        relay_state: str | None,
126        signature: str | None = None,
127        sig_alg: str | None = None,
128    ) -> AuthNRequest:
129        """Validate and parse raw request with detached signature"""
130        try:
131            decoded_xml = decode_base64_and_inflate(saml_request)
132        except UnicodeDecodeError:
133            raise CannotHandleAssertion(ERROR_CANNOT_DECODE_REQUEST) from None
134
135        verifier = self.provider.verification_kp
136        if not verifier:
137            return self._parse_xml(decoded_xml, relay_state)
138
139        if verifier and not (signature and sig_alg):
140            raise CannotHandleAssertion(ERROR_SIGNATURE_REQUIRED_BUT_ABSENT)
141
142        if signature and sig_alg:
143            querystring = f"SAMLRequest={quote_plus(saml_request)}&"
144            if relay_state is not None:
145                querystring += f"RelayState={quote_plus(relay_state)}&"
146            querystring += f"SigAlg={quote_plus(sig_alg)}"
147
148            dsig_ctx = xmlsec.SignatureContext()
149            key = xmlsec.Key.from_memory(
150                verifier.certificate_data, xmlsec.constants.KeyDataFormatCertPem, None
151            )
152            dsig_ctx.key = key
153
154            sign_algorithm_transform_map = {
155                DSA_SHA1: xmlsec.constants.TransformDsaSha1,
156                RSA_SHA1: xmlsec.constants.TransformRsaSha1,
157                RSA_SHA256: xmlsec.constants.TransformRsaSha256,
158                RSA_SHA384: xmlsec.constants.TransformRsaSha384,
159                RSA_SHA512: xmlsec.constants.TransformRsaSha512,
160            }
161            sign_algorithm_transform = sign_algorithm_transform_map.get(
162                sig_alg, xmlsec.constants.TransformRsaSha1
163            )
164
165            try:
166                dsig_ctx.verify_binary(
167                    querystring.encode("utf-8"),
168                    sign_algorithm_transform,
169                    b64decode(signature),
170                )
171            except xmlsec.Error as exc:
172                raise CannotHandleAssertion(ERROR_FAILED_TO_VERIFY) from exc
173        try:
174            return self._parse_xml(decoded_xml, relay_state)
175        except ParseError as exc:
176            raise CannotHandleAssertion(ERROR_FAILED_TO_VERIFY) from exc

Validate and parse raw request with detached signature

def idp_initiated( self) -> AuthNRequest:
178    def idp_initiated(self) -> AuthNRequest:
179        """Create IdP Initiated AuthNRequest"""
180        request = AuthNRequest(relay_state=None)
181        if self.provider.default_relay_state != "":
182            request.relay_state = self.provider.default_relay_state
183        if self.provider.default_name_id_policy != SAMLNameIDPolicy.UNSPECIFIED:
184            request.name_id_policy = self.provider.default_name_id_policy
185        return request

Create IdP Initiated AuthNRequest