authentik.sources.saml.processors.request
SAML AuthnRequest Processor
1"""SAML AuthnRequest Processor""" 2 3from base64 import b64encode 4from urllib.parse import quote_plus 5 6import xmlsec 7from django.http import HttpRequest 8from lxml import etree # nosec 9from lxml.etree import Element # nosec 10 11from authentik.common.saml.constants import ( 12 DIGEST_ALGORITHM_TRANSLATION_MAP, 13 NS_MAP, 14 NS_SAML_ASSERTION, 15 NS_SAML_PROTOCOL, 16 SAML_BINDING_POST, 17 SIGN_ALGORITHM_TRANSFORM_MAP, 18) 19from authentik.lib.xml import remove_xml_newlines 20from authentik.providers.saml.utils import get_random_id 21from authentik.providers.saml.utils.encoding import deflate_and_base64_encode 22from authentik.providers.saml.utils.time import get_time_string 23from authentik.sources.saml.models import SAMLBindingTypes, SAMLSource 24 25SESSION_KEY_REQUEST_ID = "authentik/sources/saml/request_id" 26 27 28class RequestProcessor: 29 """SAML AuthnRequest Processor""" 30 31 source: SAMLSource 32 http_request: HttpRequest 33 34 relay_state: str 35 36 request_id: str 37 issue_instant: str 38 39 def __init__(self, source: SAMLSource, request: HttpRequest, relay_state: str): 40 self.source = source 41 self.http_request = request 42 self.relay_state = relay_state 43 self.request_id = get_random_id() 44 self.http_request.session[SESSION_KEY_REQUEST_ID] = self.request_id 45 self.issue_instant = get_time_string() 46 47 def get_issuer(self) -> Element: 48 """Get Issuer Element""" 49 issuer = Element(f"{{{NS_SAML_ASSERTION}}}Issuer") 50 issuer.text = self.source.get_issuer(self.http_request) 51 return issuer 52 53 def get_name_id_policy(self) -> Element: 54 """Get NameID Policy Element""" 55 name_id_policy = Element(f"{{{NS_SAML_PROTOCOL}}}NameIDPolicy") 56 name_id_policy.attrib["Format"] = self.source.name_id_policy 57 return name_id_policy 58 59 def get_auth_n(self) -> Element: 60 """Get full AuthnRequest""" 61 auth_n_request = Element(f"{{{NS_SAML_PROTOCOL}}}AuthnRequest", nsmap=NS_MAP) 62 auth_n_request.attrib["AssertionConsumerServiceURL"] = self.source.build_full_url( 63 self.http_request 64 ) 65 auth_n_request.attrib["Destination"] = self.source.sso_url 66 auth_n_request.attrib["ID"] = self.request_id 67 auth_n_request.attrib["IssueInstant"] = self.issue_instant 68 auth_n_request.attrib["ProtocolBinding"] = SAML_BINDING_POST 69 auth_n_request.attrib["Version"] = "2.0" 70 if self.source.force_authn: 71 auth_n_request.attrib["ForceAuthn"] = "true" 72 # Create issuer object 73 auth_n_request.append(self.get_issuer()) 74 75 if self.source.signing_kp and self.source.binding_type != SAMLBindingTypes.REDIRECT: 76 sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get( 77 self.source.signature_algorithm, xmlsec.constants.TransformRsaSha1 78 ) 79 signature = xmlsec.template.create( 80 auth_n_request, 81 xmlsec.constants.TransformExclC14N, 82 sign_algorithm_transform, 83 ns=xmlsec.constants.DSigNs, 84 ) 85 auth_n_request.append(signature) 86 87 # Create NameID Policy Object 88 auth_n_request.append(self.get_name_id_policy()) 89 return auth_n_request 90 91 def build_auth_n(self) -> str: 92 """Get Signed string representation of AuthN Request 93 (used for POST Bindings)""" 94 auth_n_request = self.get_auth_n() 95 96 if self.source.signing_kp and self.source.binding_type != SAMLBindingTypes.REDIRECT: 97 xmlsec.tree.add_ids(auth_n_request, ["ID"]) 98 99 ctx = xmlsec.SignatureContext() 100 101 key = xmlsec.Key.from_memory( 102 self.source.signing_kp.key_data, xmlsec.constants.KeyDataFormatPem, None 103 ) 104 key.load_cert_from_memory( 105 self.source.signing_kp.certificate_data, 106 xmlsec.constants.KeyDataFormatCertPem, 107 ) 108 ctx.key = key 109 110 digest_algorithm_transform = DIGEST_ALGORITHM_TRANSLATION_MAP.get( 111 self.source.digest_algorithm, xmlsec.constants.TransformSha1 112 ) 113 114 signature_node = xmlsec.tree.find_node(auth_n_request, xmlsec.constants.NodeSignature) 115 116 ref = xmlsec.template.add_reference( 117 signature_node, 118 digest_algorithm_transform, 119 uri="#" + auth_n_request.attrib["ID"], 120 ) 121 xmlsec.template.add_transform(ref, xmlsec.constants.TransformEnveloped) 122 xmlsec.template.add_transform(ref, xmlsec.constants.TransformExclC14N) 123 key_info = xmlsec.template.ensure_key_info(signature_node) 124 xmlsec.template.add_x509_data(key_info) 125 126 ctx.sign(remove_xml_newlines(auth_n_request, signature_node)) 127 128 return etree.tostring(auth_n_request).decode() 129 130 def build_auth_n_detached(self) -> dict[str, str]: 131 """Get Dict AuthN Request for Redirect bindings, with detached 132 Signature. See https://docs.oasis-open.org/security/saml/v2.0/saml-bindings-2.0-os.pdf""" 133 auth_n_request = self.get_auth_n() 134 135 saml_request = deflate_and_base64_encode(etree.tostring(auth_n_request).decode()) 136 137 response_dict = { 138 "SAMLRequest": saml_request, 139 } 140 141 if self.relay_state != "": 142 response_dict["RelayState"] = self.relay_state 143 144 if self.source.signing_kp: 145 sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get( 146 self.source.signature_algorithm, xmlsec.constants.TransformRsaSha1 147 ) 148 149 # Create the full querystring in the correct order to be signed 150 querystring = f"SAMLRequest={quote_plus(saml_request)}&" 151 if "RelayState" in response_dict: 152 querystring += f"RelayState={quote_plus(response_dict['RelayState'])}&" 153 querystring += f"SigAlg={quote_plus(self.source.signature_algorithm)}" 154 155 ctx = xmlsec.SignatureContext() 156 157 key = xmlsec.Key.from_memory( 158 self.source.signing_kp.key_data, xmlsec.constants.KeyDataFormatPem, None 159 ) 160 key.load_cert_from_memory( 161 self.source.signing_kp.certificate_data, 162 xmlsec.constants.KeyDataFormatPem, 163 ) 164 ctx.key = key 165 166 signature = ctx.sign_binary(querystring.encode("utf-8"), sign_algorithm_transform) 167 response_dict["Signature"] = b64encode(signature).decode() 168 response_dict["SigAlg"] = self.source.signature_algorithm 169 170 return response_dict
SESSION_KEY_REQUEST_ID =
'authentik/sources/saml/request_id'
class
RequestProcessor:
29class RequestProcessor: 30 """SAML AuthnRequest Processor""" 31 32 source: SAMLSource 33 http_request: HttpRequest 34 35 relay_state: str 36 37 request_id: str 38 issue_instant: str 39 40 def __init__(self, source: SAMLSource, request: HttpRequest, relay_state: str): 41 self.source = source 42 self.http_request = request 43 self.relay_state = relay_state 44 self.request_id = get_random_id() 45 self.http_request.session[SESSION_KEY_REQUEST_ID] = self.request_id 46 self.issue_instant = get_time_string() 47 48 def get_issuer(self) -> Element: 49 """Get Issuer Element""" 50 issuer = Element(f"{{{NS_SAML_ASSERTION}}}Issuer") 51 issuer.text = self.source.get_issuer(self.http_request) 52 return issuer 53 54 def get_name_id_policy(self) -> Element: 55 """Get NameID Policy Element""" 56 name_id_policy = Element(f"{{{NS_SAML_PROTOCOL}}}NameIDPolicy") 57 name_id_policy.attrib["Format"] = self.source.name_id_policy 58 return name_id_policy 59 60 def get_auth_n(self) -> Element: 61 """Get full AuthnRequest""" 62 auth_n_request = Element(f"{{{NS_SAML_PROTOCOL}}}AuthnRequest", nsmap=NS_MAP) 63 auth_n_request.attrib["AssertionConsumerServiceURL"] = self.source.build_full_url( 64 self.http_request 65 ) 66 auth_n_request.attrib["Destination"] = self.source.sso_url 67 auth_n_request.attrib["ID"] = self.request_id 68 auth_n_request.attrib["IssueInstant"] = self.issue_instant 69 auth_n_request.attrib["ProtocolBinding"] = SAML_BINDING_POST 70 auth_n_request.attrib["Version"] = "2.0" 71 if self.source.force_authn: 72 auth_n_request.attrib["ForceAuthn"] = "true" 73 # Create issuer object 74 auth_n_request.append(self.get_issuer()) 75 76 if self.source.signing_kp and self.source.binding_type != SAMLBindingTypes.REDIRECT: 77 sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get( 78 self.source.signature_algorithm, xmlsec.constants.TransformRsaSha1 79 ) 80 signature = xmlsec.template.create( 81 auth_n_request, 82 xmlsec.constants.TransformExclC14N, 83 sign_algorithm_transform, 84 ns=xmlsec.constants.DSigNs, 85 ) 86 auth_n_request.append(signature) 87 88 # Create NameID Policy Object 89 auth_n_request.append(self.get_name_id_policy()) 90 return auth_n_request 91 92 def build_auth_n(self) -> str: 93 """Get Signed string representation of AuthN Request 94 (used for POST Bindings)""" 95 auth_n_request = self.get_auth_n() 96 97 if self.source.signing_kp and self.source.binding_type != SAMLBindingTypes.REDIRECT: 98 xmlsec.tree.add_ids(auth_n_request, ["ID"]) 99 100 ctx = xmlsec.SignatureContext() 101 102 key = xmlsec.Key.from_memory( 103 self.source.signing_kp.key_data, xmlsec.constants.KeyDataFormatPem, None 104 ) 105 key.load_cert_from_memory( 106 self.source.signing_kp.certificate_data, 107 xmlsec.constants.KeyDataFormatCertPem, 108 ) 109 ctx.key = key 110 111 digest_algorithm_transform = DIGEST_ALGORITHM_TRANSLATION_MAP.get( 112 self.source.digest_algorithm, xmlsec.constants.TransformSha1 113 ) 114 115 signature_node = xmlsec.tree.find_node(auth_n_request, xmlsec.constants.NodeSignature) 116 117 ref = xmlsec.template.add_reference( 118 signature_node, 119 digest_algorithm_transform, 120 uri="#" + auth_n_request.attrib["ID"], 121 ) 122 xmlsec.template.add_transform(ref, xmlsec.constants.TransformEnveloped) 123 xmlsec.template.add_transform(ref, xmlsec.constants.TransformExclC14N) 124 key_info = xmlsec.template.ensure_key_info(signature_node) 125 xmlsec.template.add_x509_data(key_info) 126 127 ctx.sign(remove_xml_newlines(auth_n_request, signature_node)) 128 129 return etree.tostring(auth_n_request).decode() 130 131 def build_auth_n_detached(self) -> dict[str, str]: 132 """Get Dict AuthN Request for Redirect bindings, with detached 133 Signature. See https://docs.oasis-open.org/security/saml/v2.0/saml-bindings-2.0-os.pdf""" 134 auth_n_request = self.get_auth_n() 135 136 saml_request = deflate_and_base64_encode(etree.tostring(auth_n_request).decode()) 137 138 response_dict = { 139 "SAMLRequest": saml_request, 140 } 141 142 if self.relay_state != "": 143 response_dict["RelayState"] = self.relay_state 144 145 if self.source.signing_kp: 146 sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get( 147 self.source.signature_algorithm, xmlsec.constants.TransformRsaSha1 148 ) 149 150 # Create the full querystring in the correct order to be signed 151 querystring = f"SAMLRequest={quote_plus(saml_request)}&" 152 if "RelayState" in response_dict: 153 querystring += f"RelayState={quote_plus(response_dict['RelayState'])}&" 154 querystring += f"SigAlg={quote_plus(self.source.signature_algorithm)}" 155 156 ctx = xmlsec.SignatureContext() 157 158 key = xmlsec.Key.from_memory( 159 self.source.signing_kp.key_data, xmlsec.constants.KeyDataFormatPem, None 160 ) 161 key.load_cert_from_memory( 162 self.source.signing_kp.certificate_data, 163 xmlsec.constants.KeyDataFormatPem, 164 ) 165 ctx.key = key 166 167 signature = ctx.sign_binary(querystring.encode("utf-8"), sign_algorithm_transform) 168 response_dict["Signature"] = b64encode(signature).decode() 169 response_dict["SigAlg"] = self.source.signature_algorithm 170 171 return response_dict
SAML AuthnRequest Processor
RequestProcessor( source: authentik.sources.saml.models.SAMLSource, request: django.http.request.HttpRequest, relay_state: str)
40 def __init__(self, source: SAMLSource, request: HttpRequest, relay_state: str): 41 self.source = source 42 self.http_request = request 43 self.relay_state = relay_state 44 self.request_id = get_random_id() 45 self.http_request.session[SESSION_KEY_REQUEST_ID] = self.request_id 46 self.issue_instant = get_time_string()
def
get_issuer(self) -> lxml.etree.Element:
48 def get_issuer(self) -> Element: 49 """Get Issuer Element""" 50 issuer = Element(f"{{{NS_SAML_ASSERTION}}}Issuer") 51 issuer.text = self.source.get_issuer(self.http_request) 52 return issuer
Get Issuer Element
def
get_name_id_policy(self) -> lxml.etree.Element:
54 def get_name_id_policy(self) -> Element: 55 """Get NameID Policy Element""" 56 name_id_policy = Element(f"{{{NS_SAML_PROTOCOL}}}NameIDPolicy") 57 name_id_policy.attrib["Format"] = self.source.name_id_policy 58 return name_id_policy
Get NameID Policy Element
def
get_auth_n(self) -> lxml.etree.Element:
60 def get_auth_n(self) -> Element: 61 """Get full AuthnRequest""" 62 auth_n_request = Element(f"{{{NS_SAML_PROTOCOL}}}AuthnRequest", nsmap=NS_MAP) 63 auth_n_request.attrib["AssertionConsumerServiceURL"] = self.source.build_full_url( 64 self.http_request 65 ) 66 auth_n_request.attrib["Destination"] = self.source.sso_url 67 auth_n_request.attrib["ID"] = self.request_id 68 auth_n_request.attrib["IssueInstant"] = self.issue_instant 69 auth_n_request.attrib["ProtocolBinding"] = SAML_BINDING_POST 70 auth_n_request.attrib["Version"] = "2.0" 71 if self.source.force_authn: 72 auth_n_request.attrib["ForceAuthn"] = "true" 73 # Create issuer object 74 auth_n_request.append(self.get_issuer()) 75 76 if self.source.signing_kp and self.source.binding_type != SAMLBindingTypes.REDIRECT: 77 sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get( 78 self.source.signature_algorithm, xmlsec.constants.TransformRsaSha1 79 ) 80 signature = xmlsec.template.create( 81 auth_n_request, 82 xmlsec.constants.TransformExclC14N, 83 sign_algorithm_transform, 84 ns=xmlsec.constants.DSigNs, 85 ) 86 auth_n_request.append(signature) 87 88 # Create NameID Policy Object 89 auth_n_request.append(self.get_name_id_policy()) 90 return auth_n_request
Get full AuthnRequest
def
build_auth_n(self) -> str:
92 def build_auth_n(self) -> str: 93 """Get Signed string representation of AuthN Request 94 (used for POST Bindings)""" 95 auth_n_request = self.get_auth_n() 96 97 if self.source.signing_kp and self.source.binding_type != SAMLBindingTypes.REDIRECT: 98 xmlsec.tree.add_ids(auth_n_request, ["ID"]) 99 100 ctx = xmlsec.SignatureContext() 101 102 key = xmlsec.Key.from_memory( 103 self.source.signing_kp.key_data, xmlsec.constants.KeyDataFormatPem, None 104 ) 105 key.load_cert_from_memory( 106 self.source.signing_kp.certificate_data, 107 xmlsec.constants.KeyDataFormatCertPem, 108 ) 109 ctx.key = key 110 111 digest_algorithm_transform = DIGEST_ALGORITHM_TRANSLATION_MAP.get( 112 self.source.digest_algorithm, xmlsec.constants.TransformSha1 113 ) 114 115 signature_node = xmlsec.tree.find_node(auth_n_request, xmlsec.constants.NodeSignature) 116 117 ref = xmlsec.template.add_reference( 118 signature_node, 119 digest_algorithm_transform, 120 uri="#" + auth_n_request.attrib["ID"], 121 ) 122 xmlsec.template.add_transform(ref, xmlsec.constants.TransformEnveloped) 123 xmlsec.template.add_transform(ref, xmlsec.constants.TransformExclC14N) 124 key_info = xmlsec.template.ensure_key_info(signature_node) 125 xmlsec.template.add_x509_data(key_info) 126 127 ctx.sign(remove_xml_newlines(auth_n_request, signature_node)) 128 129 return etree.tostring(auth_n_request).decode()
Get Signed string representation of AuthN Request (used for POST Bindings)
def
build_auth_n_detached(self) -> dict[str, str]:
131 def build_auth_n_detached(self) -> dict[str, str]: 132 """Get Dict AuthN Request for Redirect bindings, with detached 133 Signature. See https://docs.oasis-open.org/security/saml/v2.0/saml-bindings-2.0-os.pdf""" 134 auth_n_request = self.get_auth_n() 135 136 saml_request = deflate_and_base64_encode(etree.tostring(auth_n_request).decode()) 137 138 response_dict = { 139 "SAMLRequest": saml_request, 140 } 141 142 if self.relay_state != "": 143 response_dict["RelayState"] = self.relay_state 144 145 if self.source.signing_kp: 146 sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get( 147 self.source.signature_algorithm, xmlsec.constants.TransformRsaSha1 148 ) 149 150 # Create the full querystring in the correct order to be signed 151 querystring = f"SAMLRequest={quote_plus(saml_request)}&" 152 if "RelayState" in response_dict: 153 querystring += f"RelayState={quote_plus(response_dict['RelayState'])}&" 154 querystring += f"SigAlg={quote_plus(self.source.signature_algorithm)}" 155 156 ctx = xmlsec.SignatureContext() 157 158 key = xmlsec.Key.from_memory( 159 self.source.signing_kp.key_data, xmlsec.constants.KeyDataFormatPem, None 160 ) 161 key.load_cert_from_memory( 162 self.source.signing_kp.certificate_data, 163 xmlsec.constants.KeyDataFormatPem, 164 ) 165 ctx.key = key 166 167 signature = ctx.sign_binary(querystring.encode("utf-8"), sign_algorithm_transform) 168 response_dict["Signature"] = b64encode(signature).decode() 169 response_dict["SigAlg"] = self.source.signature_algorithm 170 171 return response_dict
Get Dict AuthN Request for Redirect bindings, with detached Signature. See https://docs.oasis-open.org/security/saml/v2.0/saml-bindings-2.0-os.pdf