authentik.providers.saml.processors.authn_request_parser
SAML AuthNRequest Parser and dataclass
1"""SAML AuthNRequest Parser and dataclass""" 2 3from base64 import b64decode 4from dataclasses import dataclass 5from urllib.parse import quote_plus 6from xml.etree.ElementTree import ParseError # nosec 7 8import xmlsec 9from defusedxml import ElementTree 10from structlog.stdlib import get_logger 11 12from authentik.common.saml.constants import ( 13 DSA_SHA1, 14 NS_MAP, 15 NS_SAML_PROTOCOL, 16 RSA_SHA1, 17 RSA_SHA256, 18 RSA_SHA384, 19 RSA_SHA512, 20 SAML_NAME_ID_FORMAT_UNSPECIFIED, 21) 22from authentik.lib.xml import lxml_from_string 23from authentik.providers.saml.exceptions import CannotHandleAssertion 24from authentik.providers.saml.models import SAMLProvider 25from authentik.providers.saml.utils.encoding import decode_base64_and_inflate 26from authentik.sources.saml.models import SAMLNameIDPolicy 27 28ERROR_CANNOT_DECODE_REQUEST = "Cannot decode SAML request." 29ERROR_SIGNATURE_REQUIRED_BUT_ABSENT = ( 30 "Verification Certificate configured, but request is not signed." 31) 32ERROR_FAILED_TO_VERIFY = "Failed to verify signature" 33 34 35@dataclass(slots=True) 36class AuthNRequest: 37 """AuthNRequest Dataclass""" 38 39 id: str | None = None 40 41 relay_state: str | None = None 42 43 name_id_policy: str = SAML_NAME_ID_FORMAT_UNSPECIFIED 44 45 46class AuthNRequestParser: 47 """AuthNRequest Parser""" 48 49 provider: SAMLProvider 50 51 def __init__(self, provider: SAMLProvider): 52 self.provider = provider 53 self.logger = get_logger().bind(provider=self.provider) 54 55 def _parse_xml(self, decoded_xml: str | bytes, relay_state: str | None) -> AuthNRequest: 56 root = ElementTree.fromstring(decoded_xml) 57 58 # http://docs.oasis-open.org/security/saml/v2.0/saml-core-2.0-os.pdf 59 # `AssertionConsumerServiceURL` can be omitted, and we should fallback to the 60 # default ACS URL 61 if "AssertionConsumerServiceURL" not in root.attrib: 62 request_acs_url = self.provider.acs_url.lower() 63 else: 64 request_acs_url = root.attrib["AssertionConsumerServiceURL"] 65 66 if self.provider.acs_url.lower() != request_acs_url.lower(): 67 msg = ( 68 f"ACS URL of {request_acs_url} doesn't match Provider " 69 f"ACS URL of {self.provider.acs_url}." 70 ) 71 self.logger.warning(msg) 72 raise CannotHandleAssertion(msg) 73 74 auth_n_request = AuthNRequest(id=root.attrib["ID"], relay_state=relay_state) 75 76 # Check if AuthnRequest has a NameID Policy object 77 name_id_policies = root.findall(f"{{{NS_SAML_PROTOCOL}}}NameIDPolicy") 78 if len(name_id_policies) > 0: 79 name_id_policy = name_id_policies[0] 80 auth_n_request.name_id_policy = name_id_policy.attrib.get( 81 "Format", SAML_NAME_ID_FORMAT_UNSPECIFIED 82 ) 83 84 return auth_n_request 85 86 def parse(self, saml_request: str, relay_state: str | None = None) -> AuthNRequest: 87 """Validate and parse raw request with enveloped signautre.""" 88 try: 89 decoded_xml = b64decode(saml_request.encode()) 90 except UnicodeDecodeError: 91 raise CannotHandleAssertion(ERROR_CANNOT_DECODE_REQUEST) from None 92 93 verifier = self.provider.verification_kp 94 if not verifier: 95 return self._parse_xml(decoded_xml, relay_state) 96 97 root = lxml_from_string(decoded_xml) 98 xmlsec.tree.add_ids(root, ["ID"]) 99 signature_nodes = root.xpath("/samlp:AuthnRequest/ds:Signature", namespaces=NS_MAP) 100 # No signatures, no verifier configured -> decode xml directly 101 if len(signature_nodes) < 1: 102 raise CannotHandleAssertion(ERROR_SIGNATURE_REQUIRED_BUT_ABSENT) 103 104 signature_node = signature_nodes[0] 105 106 if signature_node is not None: 107 try: 108 ctx = xmlsec.SignatureContext() 109 key = xmlsec.Key.from_memory( 110 verifier.certificate_data, 111 xmlsec.constants.KeyDataFormatCertPem, 112 None, 113 ) 114 ctx.key = key 115 ctx.verify(signature_node) 116 except xmlsec.Error as exc: 117 raise CannotHandleAssertion(ERROR_FAILED_TO_VERIFY) from exc 118 119 return self._parse_xml(decoded_xml, relay_state) 120 121 def parse_detached( 122 self, 123 saml_request: str, 124 relay_state: str | None, 125 signature: str | None = None, 126 sig_alg: str | None = None, 127 ) -> AuthNRequest: 128 """Validate and parse raw request with detached signature""" 129 try: 130 decoded_xml = decode_base64_and_inflate(saml_request) 131 except UnicodeDecodeError: 132 raise CannotHandleAssertion(ERROR_CANNOT_DECODE_REQUEST) from None 133 134 verifier = self.provider.verification_kp 135 if not verifier: 136 return self._parse_xml(decoded_xml, relay_state) 137 138 if verifier and not (signature and sig_alg): 139 raise CannotHandleAssertion(ERROR_SIGNATURE_REQUIRED_BUT_ABSENT) 140 141 if signature and sig_alg: 142 querystring = f"SAMLRequest={quote_plus(saml_request)}&" 143 if relay_state is not None: 144 querystring += f"RelayState={quote_plus(relay_state)}&" 145 querystring += f"SigAlg={quote_plus(sig_alg)}" 146 147 dsig_ctx = xmlsec.SignatureContext() 148 key = xmlsec.Key.from_memory( 149 verifier.certificate_data, xmlsec.constants.KeyDataFormatCertPem, None 150 ) 151 dsig_ctx.key = key 152 153 sign_algorithm_transform_map = { 154 DSA_SHA1: xmlsec.constants.TransformDsaSha1, 155 RSA_SHA1: xmlsec.constants.TransformRsaSha1, 156 RSA_SHA256: xmlsec.constants.TransformRsaSha256, 157 RSA_SHA384: xmlsec.constants.TransformRsaSha384, 158 RSA_SHA512: xmlsec.constants.TransformRsaSha512, 159 } 160 sign_algorithm_transform = sign_algorithm_transform_map.get( 161 sig_alg, xmlsec.constants.TransformRsaSha1 162 ) 163 164 try: 165 dsig_ctx.verify_binary( 166 querystring.encode("utf-8"), 167 sign_algorithm_transform, 168 b64decode(signature), 169 ) 170 except xmlsec.Error as exc: 171 raise CannotHandleAssertion(ERROR_FAILED_TO_VERIFY) from exc 172 try: 173 return self._parse_xml(decoded_xml, relay_state) 174 except ParseError as exc: 175 raise CannotHandleAssertion(ERROR_FAILED_TO_VERIFY) from exc 176 177 def idp_initiated(self) -> AuthNRequest: 178 """Create IdP Initiated AuthNRequest""" 179 request = AuthNRequest(relay_state=None) 180 if self.provider.default_relay_state != "": 181 request.relay_state = self.provider.default_relay_state 182 if self.provider.default_name_id_policy != SAMLNameIDPolicy.UNSPECIFIED: 183 request.name_id_policy = self.provider.default_name_id_policy 184 return request
ERROR_CANNOT_DECODE_REQUEST =
'Cannot decode SAML request.'
ERROR_SIGNATURE_REQUIRED_BUT_ABSENT =
'Verification Certificate configured, but request is not signed.'
ERROR_FAILED_TO_VERIFY =
'Failed to verify signature'
@dataclass(slots=True)
class
AuthNRequest:
36@dataclass(slots=True) 37class AuthNRequest: 38 """AuthNRequest Dataclass""" 39 40 id: str | None = None 41 42 relay_state: str | None = None 43 44 name_id_policy: str = SAML_NAME_ID_FORMAT_UNSPECIFIED
AuthNRequest Dataclass
class
AuthNRequestParser:
47class AuthNRequestParser: 48 """AuthNRequest Parser""" 49 50 provider: SAMLProvider 51 52 def __init__(self, provider: SAMLProvider): 53 self.provider = provider 54 self.logger = get_logger().bind(provider=self.provider) 55 56 def _parse_xml(self, decoded_xml: str | bytes, relay_state: str | None) -> AuthNRequest: 57 root = ElementTree.fromstring(decoded_xml) 58 59 # http://docs.oasis-open.org/security/saml/v2.0/saml-core-2.0-os.pdf 60 # `AssertionConsumerServiceURL` can be omitted, and we should fallback to the 61 # default ACS URL 62 if "AssertionConsumerServiceURL" not in root.attrib: 63 request_acs_url = self.provider.acs_url.lower() 64 else: 65 request_acs_url = root.attrib["AssertionConsumerServiceURL"] 66 67 if self.provider.acs_url.lower() != request_acs_url.lower(): 68 msg = ( 69 f"ACS URL of {request_acs_url} doesn't match Provider " 70 f"ACS URL of {self.provider.acs_url}." 71 ) 72 self.logger.warning(msg) 73 raise CannotHandleAssertion(msg) 74 75 auth_n_request = AuthNRequest(id=root.attrib["ID"], relay_state=relay_state) 76 77 # Check if AuthnRequest has a NameID Policy object 78 name_id_policies = root.findall(f"{{{NS_SAML_PROTOCOL}}}NameIDPolicy") 79 if len(name_id_policies) > 0: 80 name_id_policy = name_id_policies[0] 81 auth_n_request.name_id_policy = name_id_policy.attrib.get( 82 "Format", SAML_NAME_ID_FORMAT_UNSPECIFIED 83 ) 84 85 return auth_n_request 86 87 def parse(self, saml_request: str, relay_state: str | None = None) -> AuthNRequest: 88 """Validate and parse raw request with enveloped signautre.""" 89 try: 90 decoded_xml = b64decode(saml_request.encode()) 91 except UnicodeDecodeError: 92 raise CannotHandleAssertion(ERROR_CANNOT_DECODE_REQUEST) from None 93 94 verifier = self.provider.verification_kp 95 if not verifier: 96 return self._parse_xml(decoded_xml, relay_state) 97 98 root = lxml_from_string(decoded_xml) 99 xmlsec.tree.add_ids(root, ["ID"]) 100 signature_nodes = root.xpath("/samlp:AuthnRequest/ds:Signature", namespaces=NS_MAP) 101 # No signatures, no verifier configured -> decode xml directly 102 if len(signature_nodes) < 1: 103 raise CannotHandleAssertion(ERROR_SIGNATURE_REQUIRED_BUT_ABSENT) 104 105 signature_node = signature_nodes[0] 106 107 if signature_node is not None: 108 try: 109 ctx = xmlsec.SignatureContext() 110 key = xmlsec.Key.from_memory( 111 verifier.certificate_data, 112 xmlsec.constants.KeyDataFormatCertPem, 113 None, 114 ) 115 ctx.key = key 116 ctx.verify(signature_node) 117 except xmlsec.Error as exc: 118 raise CannotHandleAssertion(ERROR_FAILED_TO_VERIFY) from exc 119 120 return self._parse_xml(decoded_xml, relay_state) 121 122 def parse_detached( 123 self, 124 saml_request: str, 125 relay_state: str | None, 126 signature: str | None = None, 127 sig_alg: str | None = None, 128 ) -> AuthNRequest: 129 """Validate and parse raw request with detached signature""" 130 try: 131 decoded_xml = decode_base64_and_inflate(saml_request) 132 except UnicodeDecodeError: 133 raise CannotHandleAssertion(ERROR_CANNOT_DECODE_REQUEST) from None 134 135 verifier = self.provider.verification_kp 136 if not verifier: 137 return self._parse_xml(decoded_xml, relay_state) 138 139 if verifier and not (signature and sig_alg): 140 raise CannotHandleAssertion(ERROR_SIGNATURE_REQUIRED_BUT_ABSENT) 141 142 if signature and sig_alg: 143 querystring = f"SAMLRequest={quote_plus(saml_request)}&" 144 if relay_state is not None: 145 querystring += f"RelayState={quote_plus(relay_state)}&" 146 querystring += f"SigAlg={quote_plus(sig_alg)}" 147 148 dsig_ctx = xmlsec.SignatureContext() 149 key = xmlsec.Key.from_memory( 150 verifier.certificate_data, xmlsec.constants.KeyDataFormatCertPem, None 151 ) 152 dsig_ctx.key = key 153 154 sign_algorithm_transform_map = { 155 DSA_SHA1: xmlsec.constants.TransformDsaSha1, 156 RSA_SHA1: xmlsec.constants.TransformRsaSha1, 157 RSA_SHA256: xmlsec.constants.TransformRsaSha256, 158 RSA_SHA384: xmlsec.constants.TransformRsaSha384, 159 RSA_SHA512: xmlsec.constants.TransformRsaSha512, 160 } 161 sign_algorithm_transform = sign_algorithm_transform_map.get( 162 sig_alg, xmlsec.constants.TransformRsaSha1 163 ) 164 165 try: 166 dsig_ctx.verify_binary( 167 querystring.encode("utf-8"), 168 sign_algorithm_transform, 169 b64decode(signature), 170 ) 171 except xmlsec.Error as exc: 172 raise CannotHandleAssertion(ERROR_FAILED_TO_VERIFY) from exc 173 try: 174 return self._parse_xml(decoded_xml, relay_state) 175 except ParseError as exc: 176 raise CannotHandleAssertion(ERROR_FAILED_TO_VERIFY) from exc 177 178 def idp_initiated(self) -> AuthNRequest: 179 """Create IdP Initiated AuthNRequest""" 180 request = AuthNRequest(relay_state=None) 181 if self.provider.default_relay_state != "": 182 request.relay_state = self.provider.default_relay_state 183 if self.provider.default_name_id_policy != SAMLNameIDPolicy.UNSPECIFIED: 184 request.name_id_policy = self.provider.default_name_id_policy 185 return request
AuthNRequest Parser
AuthNRequestParser(provider: authentik.providers.saml.models.SAMLProvider)
87 def parse(self, saml_request: str, relay_state: str | None = None) -> AuthNRequest: 88 """Validate and parse raw request with enveloped signautre.""" 89 try: 90 decoded_xml = b64decode(saml_request.encode()) 91 except UnicodeDecodeError: 92 raise CannotHandleAssertion(ERROR_CANNOT_DECODE_REQUEST) from None 93 94 verifier = self.provider.verification_kp 95 if not verifier: 96 return self._parse_xml(decoded_xml, relay_state) 97 98 root = lxml_from_string(decoded_xml) 99 xmlsec.tree.add_ids(root, ["ID"]) 100 signature_nodes = root.xpath("/samlp:AuthnRequest/ds:Signature", namespaces=NS_MAP) 101 # No signatures, no verifier configured -> decode xml directly 102 if len(signature_nodes) < 1: 103 raise CannotHandleAssertion(ERROR_SIGNATURE_REQUIRED_BUT_ABSENT) 104 105 signature_node = signature_nodes[0] 106 107 if signature_node is not None: 108 try: 109 ctx = xmlsec.SignatureContext() 110 key = xmlsec.Key.from_memory( 111 verifier.certificate_data, 112 xmlsec.constants.KeyDataFormatCertPem, 113 None, 114 ) 115 ctx.key = key 116 ctx.verify(signature_node) 117 except xmlsec.Error as exc: 118 raise CannotHandleAssertion(ERROR_FAILED_TO_VERIFY) from exc 119 120 return self._parse_xml(decoded_xml, relay_state)
Validate and parse raw request with enveloped signautre.
def
parse_detached( self, saml_request: str, relay_state: str | None, signature: str | None = None, sig_alg: str | None = None) -> AuthNRequest:
122 def parse_detached( 123 self, 124 saml_request: str, 125 relay_state: str | None, 126 signature: str | None = None, 127 sig_alg: str | None = None, 128 ) -> AuthNRequest: 129 """Validate and parse raw request with detached signature""" 130 try: 131 decoded_xml = decode_base64_and_inflate(saml_request) 132 except UnicodeDecodeError: 133 raise CannotHandleAssertion(ERROR_CANNOT_DECODE_REQUEST) from None 134 135 verifier = self.provider.verification_kp 136 if not verifier: 137 return self._parse_xml(decoded_xml, relay_state) 138 139 if verifier and not (signature and sig_alg): 140 raise CannotHandleAssertion(ERROR_SIGNATURE_REQUIRED_BUT_ABSENT) 141 142 if signature and sig_alg: 143 querystring = f"SAMLRequest={quote_plus(saml_request)}&" 144 if relay_state is not None: 145 querystring += f"RelayState={quote_plus(relay_state)}&" 146 querystring += f"SigAlg={quote_plus(sig_alg)}" 147 148 dsig_ctx = xmlsec.SignatureContext() 149 key = xmlsec.Key.from_memory( 150 verifier.certificate_data, xmlsec.constants.KeyDataFormatCertPem, None 151 ) 152 dsig_ctx.key = key 153 154 sign_algorithm_transform_map = { 155 DSA_SHA1: xmlsec.constants.TransformDsaSha1, 156 RSA_SHA1: xmlsec.constants.TransformRsaSha1, 157 RSA_SHA256: xmlsec.constants.TransformRsaSha256, 158 RSA_SHA384: xmlsec.constants.TransformRsaSha384, 159 RSA_SHA512: xmlsec.constants.TransformRsaSha512, 160 } 161 sign_algorithm_transform = sign_algorithm_transform_map.get( 162 sig_alg, xmlsec.constants.TransformRsaSha1 163 ) 164 165 try: 166 dsig_ctx.verify_binary( 167 querystring.encode("utf-8"), 168 sign_algorithm_transform, 169 b64decode(signature), 170 ) 171 except xmlsec.Error as exc: 172 raise CannotHandleAssertion(ERROR_FAILED_TO_VERIFY) from exc 173 try: 174 return self._parse_xml(decoded_xml, relay_state) 175 except ParseError as exc: 176 raise CannotHandleAssertion(ERROR_FAILED_TO_VERIFY) from exc
Validate and parse raw request with detached signature
178 def idp_initiated(self) -> AuthNRequest: 179 """Create IdP Initiated AuthNRequest""" 180 request = AuthNRequest(relay_state=None) 181 if self.provider.default_relay_state != "": 182 request.relay_state = self.provider.default_relay_state 183 if self.provider.default_name_id_policy != SAMLNameIDPolicy.UNSPECIFIED: 184 request.name_id_policy = self.provider.default_name_id_policy 185 return request
Create IdP Initiated AuthNRequest