authentik.sources.saml.processors.request

SAML AuthnRequest Processor

  1"""SAML AuthnRequest Processor"""
  2
  3from base64 import b64encode
  4from urllib.parse import quote_plus
  5
  6import xmlsec
  7from django.http import HttpRequest
  8from lxml import etree  # nosec
  9from lxml.etree import Element  # nosec
 10
 11from authentik.common.saml.constants import (
 12    DIGEST_ALGORITHM_TRANSLATION_MAP,
 13    NS_MAP,
 14    NS_SAML_ASSERTION,
 15    NS_SAML_PROTOCOL,
 16    SAML_BINDING_POST,
 17    SIGN_ALGORITHM_TRANSFORM_MAP,
 18)
 19from authentik.lib.xml import remove_xml_newlines
 20from authentik.providers.saml.utils import get_random_id
 21from authentik.providers.saml.utils.encoding import deflate_and_base64_encode
 22from authentik.providers.saml.utils.time import get_time_string
 23from authentik.sources.saml.models import SAMLBindingTypes, SAMLSource
 24
 25SESSION_KEY_REQUEST_ID = "authentik/sources/saml/request_id"
 26
 27
 28class RequestProcessor:
 29    """SAML AuthnRequest Processor"""
 30
 31    source: SAMLSource
 32    http_request: HttpRequest
 33
 34    relay_state: str
 35
 36    request_id: str
 37    issue_instant: str
 38
 39    def __init__(self, source: SAMLSource, request: HttpRequest, relay_state: str):
 40        self.source = source
 41        self.http_request = request
 42        self.relay_state = relay_state
 43        self.request_id = get_random_id()
 44        self.http_request.session[SESSION_KEY_REQUEST_ID] = self.request_id
 45        self.issue_instant = get_time_string()
 46
 47    def get_issuer(self) -> Element:
 48        """Get Issuer Element"""
 49        issuer = Element(f"{{{NS_SAML_ASSERTION}}}Issuer")
 50        issuer.text = self.source.get_issuer(self.http_request)
 51        return issuer
 52
 53    def get_name_id_policy(self) -> Element:
 54        """Get NameID Policy Element"""
 55        name_id_policy = Element(f"{{{NS_SAML_PROTOCOL}}}NameIDPolicy")
 56        name_id_policy.attrib["Format"] = self.source.name_id_policy
 57        return name_id_policy
 58
 59    def get_auth_n(self) -> Element:
 60        """Get full AuthnRequest"""
 61        auth_n_request = Element(f"{{{NS_SAML_PROTOCOL}}}AuthnRequest", nsmap=NS_MAP)
 62        auth_n_request.attrib["AssertionConsumerServiceURL"] = self.source.build_full_url(
 63            self.http_request
 64        )
 65        auth_n_request.attrib["Destination"] = self.source.sso_url
 66        auth_n_request.attrib["ID"] = self.request_id
 67        auth_n_request.attrib["IssueInstant"] = self.issue_instant
 68        auth_n_request.attrib["ProtocolBinding"] = SAML_BINDING_POST
 69        auth_n_request.attrib["Version"] = "2.0"
 70        if self.source.force_authn:
 71            auth_n_request.attrib["ForceAuthn"] = "true"
 72        # Create issuer object
 73        auth_n_request.append(self.get_issuer())
 74
 75        if self.source.signing_kp and self.source.binding_type != SAMLBindingTypes.REDIRECT:
 76            sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get(
 77                self.source.signature_algorithm, xmlsec.constants.TransformRsaSha1
 78            )
 79            signature = xmlsec.template.create(
 80                auth_n_request,
 81                xmlsec.constants.TransformExclC14N,
 82                sign_algorithm_transform,
 83                ns=xmlsec.constants.DSigNs,
 84            )
 85            auth_n_request.append(signature)
 86
 87        # Create NameID Policy Object
 88        auth_n_request.append(self.get_name_id_policy())
 89        return auth_n_request
 90
 91    def build_auth_n(self) -> str:
 92        """Get Signed string representation of AuthN Request
 93        (used for POST Bindings)"""
 94        auth_n_request = self.get_auth_n()
 95
 96        if self.source.signing_kp and self.source.binding_type != SAMLBindingTypes.REDIRECT:
 97            xmlsec.tree.add_ids(auth_n_request, ["ID"])
 98
 99            ctx = xmlsec.SignatureContext()
100
101            key = xmlsec.Key.from_memory(
102                self.source.signing_kp.key_data, xmlsec.constants.KeyDataFormatPem, None
103            )
104            key.load_cert_from_memory(
105                self.source.signing_kp.certificate_data,
106                xmlsec.constants.KeyDataFormatCertPem,
107            )
108            ctx.key = key
109
110            digest_algorithm_transform = DIGEST_ALGORITHM_TRANSLATION_MAP.get(
111                self.source.digest_algorithm, xmlsec.constants.TransformSha1
112            )
113
114            signature_node = xmlsec.tree.find_node(auth_n_request, xmlsec.constants.NodeSignature)
115
116            ref = xmlsec.template.add_reference(
117                signature_node,
118                digest_algorithm_transform,
119                uri="#" + auth_n_request.attrib["ID"],
120            )
121            xmlsec.template.add_transform(ref, xmlsec.constants.TransformEnveloped)
122            xmlsec.template.add_transform(ref, xmlsec.constants.TransformExclC14N)
123            key_info = xmlsec.template.ensure_key_info(signature_node)
124            xmlsec.template.add_x509_data(key_info)
125
126            ctx.sign(remove_xml_newlines(auth_n_request, signature_node))
127
128        return etree.tostring(auth_n_request).decode()
129
130    def build_auth_n_detached(self) -> dict[str, str]:
131        """Get Dict AuthN Request for Redirect bindings, with detached
132        Signature. See https://docs.oasis-open.org/security/saml/v2.0/saml-bindings-2.0-os.pdf"""
133        auth_n_request = self.get_auth_n()
134
135        saml_request = deflate_and_base64_encode(etree.tostring(auth_n_request).decode())
136
137        response_dict = {
138            "SAMLRequest": saml_request,
139        }
140
141        if self.relay_state != "":
142            response_dict["RelayState"] = self.relay_state
143
144        if self.source.signing_kp:
145            sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get(
146                self.source.signature_algorithm, xmlsec.constants.TransformRsaSha1
147            )
148
149            # Create the full querystring in the correct order to be signed
150            querystring = f"SAMLRequest={quote_plus(saml_request)}&"
151            if "RelayState" in response_dict:
152                querystring += f"RelayState={quote_plus(response_dict['RelayState'])}&"
153            querystring += f"SigAlg={quote_plus(self.source.signature_algorithm)}"
154
155            ctx = xmlsec.SignatureContext()
156
157            key = xmlsec.Key.from_memory(
158                self.source.signing_kp.key_data, xmlsec.constants.KeyDataFormatPem, None
159            )
160            key.load_cert_from_memory(
161                self.source.signing_kp.certificate_data,
162                xmlsec.constants.KeyDataFormatPem,
163            )
164            ctx.key = key
165
166            signature = ctx.sign_binary(querystring.encode("utf-8"), sign_algorithm_transform)
167            response_dict["Signature"] = b64encode(signature).decode()
168            response_dict["SigAlg"] = self.source.signature_algorithm
169
170        return response_dict
SESSION_KEY_REQUEST_ID = 'authentik/sources/saml/request_id'
class RequestProcessor:
 29class RequestProcessor:
 30    """SAML AuthnRequest Processor"""
 31
 32    source: SAMLSource
 33    http_request: HttpRequest
 34
 35    relay_state: str
 36
 37    request_id: str
 38    issue_instant: str
 39
 40    def __init__(self, source: SAMLSource, request: HttpRequest, relay_state: str):
 41        self.source = source
 42        self.http_request = request
 43        self.relay_state = relay_state
 44        self.request_id = get_random_id()
 45        self.http_request.session[SESSION_KEY_REQUEST_ID] = self.request_id
 46        self.issue_instant = get_time_string()
 47
 48    def get_issuer(self) -> Element:
 49        """Get Issuer Element"""
 50        issuer = Element(f"{{{NS_SAML_ASSERTION}}}Issuer")
 51        issuer.text = self.source.get_issuer(self.http_request)
 52        return issuer
 53
 54    def get_name_id_policy(self) -> Element:
 55        """Get NameID Policy Element"""
 56        name_id_policy = Element(f"{{{NS_SAML_PROTOCOL}}}NameIDPolicy")
 57        name_id_policy.attrib["Format"] = self.source.name_id_policy
 58        return name_id_policy
 59
 60    def get_auth_n(self) -> Element:
 61        """Get full AuthnRequest"""
 62        auth_n_request = Element(f"{{{NS_SAML_PROTOCOL}}}AuthnRequest", nsmap=NS_MAP)
 63        auth_n_request.attrib["AssertionConsumerServiceURL"] = self.source.build_full_url(
 64            self.http_request
 65        )
 66        auth_n_request.attrib["Destination"] = self.source.sso_url
 67        auth_n_request.attrib["ID"] = self.request_id
 68        auth_n_request.attrib["IssueInstant"] = self.issue_instant
 69        auth_n_request.attrib["ProtocolBinding"] = SAML_BINDING_POST
 70        auth_n_request.attrib["Version"] = "2.0"
 71        if self.source.force_authn:
 72            auth_n_request.attrib["ForceAuthn"] = "true"
 73        # Create issuer object
 74        auth_n_request.append(self.get_issuer())
 75
 76        if self.source.signing_kp and self.source.binding_type != SAMLBindingTypes.REDIRECT:
 77            sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get(
 78                self.source.signature_algorithm, xmlsec.constants.TransformRsaSha1
 79            )
 80            signature = xmlsec.template.create(
 81                auth_n_request,
 82                xmlsec.constants.TransformExclC14N,
 83                sign_algorithm_transform,
 84                ns=xmlsec.constants.DSigNs,
 85            )
 86            auth_n_request.append(signature)
 87
 88        # Create NameID Policy Object
 89        auth_n_request.append(self.get_name_id_policy())
 90        return auth_n_request
 91
 92    def build_auth_n(self) -> str:
 93        """Get Signed string representation of AuthN Request
 94        (used for POST Bindings)"""
 95        auth_n_request = self.get_auth_n()
 96
 97        if self.source.signing_kp and self.source.binding_type != SAMLBindingTypes.REDIRECT:
 98            xmlsec.tree.add_ids(auth_n_request, ["ID"])
 99
100            ctx = xmlsec.SignatureContext()
101
102            key = xmlsec.Key.from_memory(
103                self.source.signing_kp.key_data, xmlsec.constants.KeyDataFormatPem, None
104            )
105            key.load_cert_from_memory(
106                self.source.signing_kp.certificate_data,
107                xmlsec.constants.KeyDataFormatCertPem,
108            )
109            ctx.key = key
110
111            digest_algorithm_transform = DIGEST_ALGORITHM_TRANSLATION_MAP.get(
112                self.source.digest_algorithm, xmlsec.constants.TransformSha1
113            )
114
115            signature_node = xmlsec.tree.find_node(auth_n_request, xmlsec.constants.NodeSignature)
116
117            ref = xmlsec.template.add_reference(
118                signature_node,
119                digest_algorithm_transform,
120                uri="#" + auth_n_request.attrib["ID"],
121            )
122            xmlsec.template.add_transform(ref, xmlsec.constants.TransformEnveloped)
123            xmlsec.template.add_transform(ref, xmlsec.constants.TransformExclC14N)
124            key_info = xmlsec.template.ensure_key_info(signature_node)
125            xmlsec.template.add_x509_data(key_info)
126
127            ctx.sign(remove_xml_newlines(auth_n_request, signature_node))
128
129        return etree.tostring(auth_n_request).decode()
130
131    def build_auth_n_detached(self) -> dict[str, str]:
132        """Get Dict AuthN Request for Redirect bindings, with detached
133        Signature. See https://docs.oasis-open.org/security/saml/v2.0/saml-bindings-2.0-os.pdf"""
134        auth_n_request = self.get_auth_n()
135
136        saml_request = deflate_and_base64_encode(etree.tostring(auth_n_request).decode())
137
138        response_dict = {
139            "SAMLRequest": saml_request,
140        }
141
142        if self.relay_state != "":
143            response_dict["RelayState"] = self.relay_state
144
145        if self.source.signing_kp:
146            sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get(
147                self.source.signature_algorithm, xmlsec.constants.TransformRsaSha1
148            )
149
150            # Create the full querystring in the correct order to be signed
151            querystring = f"SAMLRequest={quote_plus(saml_request)}&"
152            if "RelayState" in response_dict:
153                querystring += f"RelayState={quote_plus(response_dict['RelayState'])}&"
154            querystring += f"SigAlg={quote_plus(self.source.signature_algorithm)}"
155
156            ctx = xmlsec.SignatureContext()
157
158            key = xmlsec.Key.from_memory(
159                self.source.signing_kp.key_data, xmlsec.constants.KeyDataFormatPem, None
160            )
161            key.load_cert_from_memory(
162                self.source.signing_kp.certificate_data,
163                xmlsec.constants.KeyDataFormatPem,
164            )
165            ctx.key = key
166
167            signature = ctx.sign_binary(querystring.encode("utf-8"), sign_algorithm_transform)
168            response_dict["Signature"] = b64encode(signature).decode()
169            response_dict["SigAlg"] = self.source.signature_algorithm
170
171        return response_dict

SAML AuthnRequest Processor

RequestProcessor( source: authentik.sources.saml.models.SAMLSource, request: django.http.request.HttpRequest, relay_state: str)
40    def __init__(self, source: SAMLSource, request: HttpRequest, relay_state: str):
41        self.source = source
42        self.http_request = request
43        self.relay_state = relay_state
44        self.request_id = get_random_id()
45        self.http_request.session[SESSION_KEY_REQUEST_ID] = self.request_id
46        self.issue_instant = get_time_string()
http_request: django.http.request.HttpRequest
relay_state: str
request_id: str
issue_instant: str
def get_issuer(self) -> lxml.etree.Element:
48    def get_issuer(self) -> Element:
49        """Get Issuer Element"""
50        issuer = Element(f"{{{NS_SAML_ASSERTION}}}Issuer")
51        issuer.text = self.source.get_issuer(self.http_request)
52        return issuer

Get Issuer Element

def get_name_id_policy(self) -> lxml.etree.Element:
54    def get_name_id_policy(self) -> Element:
55        """Get NameID Policy Element"""
56        name_id_policy = Element(f"{{{NS_SAML_PROTOCOL}}}NameIDPolicy")
57        name_id_policy.attrib["Format"] = self.source.name_id_policy
58        return name_id_policy

Get NameID Policy Element

def get_auth_n(self) -> lxml.etree.Element:
60    def get_auth_n(self) -> Element:
61        """Get full AuthnRequest"""
62        auth_n_request = Element(f"{{{NS_SAML_PROTOCOL}}}AuthnRequest", nsmap=NS_MAP)
63        auth_n_request.attrib["AssertionConsumerServiceURL"] = self.source.build_full_url(
64            self.http_request
65        )
66        auth_n_request.attrib["Destination"] = self.source.sso_url
67        auth_n_request.attrib["ID"] = self.request_id
68        auth_n_request.attrib["IssueInstant"] = self.issue_instant
69        auth_n_request.attrib["ProtocolBinding"] = SAML_BINDING_POST
70        auth_n_request.attrib["Version"] = "2.0"
71        if self.source.force_authn:
72            auth_n_request.attrib["ForceAuthn"] = "true"
73        # Create issuer object
74        auth_n_request.append(self.get_issuer())
75
76        if self.source.signing_kp and self.source.binding_type != SAMLBindingTypes.REDIRECT:
77            sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get(
78                self.source.signature_algorithm, xmlsec.constants.TransformRsaSha1
79            )
80            signature = xmlsec.template.create(
81                auth_n_request,
82                xmlsec.constants.TransformExclC14N,
83                sign_algorithm_transform,
84                ns=xmlsec.constants.DSigNs,
85            )
86            auth_n_request.append(signature)
87
88        # Create NameID Policy Object
89        auth_n_request.append(self.get_name_id_policy())
90        return auth_n_request

Get full AuthnRequest

def build_auth_n(self) -> str:
 92    def build_auth_n(self) -> str:
 93        """Get Signed string representation of AuthN Request
 94        (used for POST Bindings)"""
 95        auth_n_request = self.get_auth_n()
 96
 97        if self.source.signing_kp and self.source.binding_type != SAMLBindingTypes.REDIRECT:
 98            xmlsec.tree.add_ids(auth_n_request, ["ID"])
 99
100            ctx = xmlsec.SignatureContext()
101
102            key = xmlsec.Key.from_memory(
103                self.source.signing_kp.key_data, xmlsec.constants.KeyDataFormatPem, None
104            )
105            key.load_cert_from_memory(
106                self.source.signing_kp.certificate_data,
107                xmlsec.constants.KeyDataFormatCertPem,
108            )
109            ctx.key = key
110
111            digest_algorithm_transform = DIGEST_ALGORITHM_TRANSLATION_MAP.get(
112                self.source.digest_algorithm, xmlsec.constants.TransformSha1
113            )
114
115            signature_node = xmlsec.tree.find_node(auth_n_request, xmlsec.constants.NodeSignature)
116
117            ref = xmlsec.template.add_reference(
118                signature_node,
119                digest_algorithm_transform,
120                uri="#" + auth_n_request.attrib["ID"],
121            )
122            xmlsec.template.add_transform(ref, xmlsec.constants.TransformEnveloped)
123            xmlsec.template.add_transform(ref, xmlsec.constants.TransformExclC14N)
124            key_info = xmlsec.template.ensure_key_info(signature_node)
125            xmlsec.template.add_x509_data(key_info)
126
127            ctx.sign(remove_xml_newlines(auth_n_request, signature_node))
128
129        return etree.tostring(auth_n_request).decode()

Get Signed string representation of AuthN Request (used for POST Bindings)

def build_auth_n_detached(self) -> dict[str, str]:
131    def build_auth_n_detached(self) -> dict[str, str]:
132        """Get Dict AuthN Request for Redirect bindings, with detached
133        Signature. See https://docs.oasis-open.org/security/saml/v2.0/saml-bindings-2.0-os.pdf"""
134        auth_n_request = self.get_auth_n()
135
136        saml_request = deflate_and_base64_encode(etree.tostring(auth_n_request).decode())
137
138        response_dict = {
139            "SAMLRequest": saml_request,
140        }
141
142        if self.relay_state != "":
143            response_dict["RelayState"] = self.relay_state
144
145        if self.source.signing_kp:
146            sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get(
147                self.source.signature_algorithm, xmlsec.constants.TransformRsaSha1
148            )
149
150            # Create the full querystring in the correct order to be signed
151            querystring = f"SAMLRequest={quote_plus(saml_request)}&"
152            if "RelayState" in response_dict:
153                querystring += f"RelayState={quote_plus(response_dict['RelayState'])}&"
154            querystring += f"SigAlg={quote_plus(self.source.signature_algorithm)}"
155
156            ctx = xmlsec.SignatureContext()
157
158            key = xmlsec.Key.from_memory(
159                self.source.signing_kp.key_data, xmlsec.constants.KeyDataFormatPem, None
160            )
161            key.load_cert_from_memory(
162                self.source.signing_kp.certificate_data,
163                xmlsec.constants.KeyDataFormatPem,
164            )
165            ctx.key = key
166
167            signature = ctx.sign_binary(querystring.encode("utf-8"), sign_algorithm_transform)
168            response_dict["Signature"] = b64encode(signature).decode()
169            response_dict["SigAlg"] = self.source.signature_algorithm
170
171        return response_dict

Get Dict AuthN Request for Redirect bindings, with detached Signature. See https://docs.oasis-open.org/security/saml/v2.0/saml-bindings-2.0-os.pdf