authentik.providers.saml.processors.assertion

SAML Assertion generator

  1"""SAML Assertion generator"""
  2
  3from datetime import datetime
  4from hashlib import sha256
  5from types import GeneratorType
  6
  7import xmlsec
  8from django.http import HttpRequest
  9from django.utils.timezone import now
 10from lxml import etree  # nosec
 11from lxml.etree import Element, SubElement, _Element  # nosec
 12from structlog.stdlib import get_logger
 13
 14from authentik.common.saml.constants import (
 15    DIGEST_ALGORITHM_TRANSLATION_MAP,
 16    NS_MAP,
 17    NS_SAML_ASSERTION,
 18    NS_SAML_PROTOCOL,
 19    SAML_NAME_ID_FORMAT_EMAIL,
 20    SAML_NAME_ID_FORMAT_PERSISTENT,
 21    SAML_NAME_ID_FORMAT_TRANSIENT,
 22    SAML_NAME_ID_FORMAT_UNSPECIFIED,
 23    SAML_NAME_ID_FORMAT_WINDOWS,
 24    SAML_NAME_ID_FORMAT_X509,
 25    SIGN_ALGORITHM_TRANSFORM_MAP,
 26)
 27from authentik.core.expression.exceptions import PropertyMappingExpressionException
 28from authentik.events.models import Event, EventAction
 29from authentik.events.signals import get_login_event
 30from authentik.lib.utils.time import timedelta_from_string
 31from authentik.lib.xml import remove_xml_newlines
 32from authentik.providers.saml.models import SAMLPropertyMapping, SAMLProvider
 33from authentik.providers.saml.processors.authn_request_parser import AuthNRequest
 34from authentik.providers.saml.utils import get_random_id
 35from authentik.providers.saml.utils.time import get_time_string
 36from authentik.sources.ldap.auth import LDAP_DISTINGUISHED_NAME
 37from authentik.sources.saml.exceptions import (
 38    InvalidEncryption,
 39    InvalidSignature,
 40    UnsupportedNameIDFormat,
 41)
 42from authentik.stages.password.stage import PLAN_CONTEXT_METHOD, PLAN_CONTEXT_METHOD_ARGS
 43
 44LOGGER = get_logger()
 45
 46
 47class AssertionProcessor:
 48    """Generate a SAML Response from an AuthNRequest"""
 49
 50    provider: SAMLProvider
 51    http_request: HttpRequest
 52    auth_n_request: AuthNRequest
 53
 54    _issue_instant: str
 55    _assertion_id: str
 56    _response_id: str
 57
 58    _auth_instant: str
 59    _valid_not_before: str
 60    _session_not_on_or_after: str
 61    _valid_not_on_or_after: str
 62
 63    session_index: str
 64    name_id: str
 65    name_id_format: str
 66    session_not_on_or_after_datetime: datetime
 67
 68    def __init__(self, provider: SAMLProvider, request: HttpRequest, auth_n_request: AuthNRequest):
 69        self.provider = provider
 70        self.http_request = request
 71        self.auth_n_request = auth_n_request
 72
 73        self._issue_instant = get_time_string()
 74        self._assertion_id = get_random_id()
 75        self._response_id = get_random_id()
 76
 77        _login_event = get_login_event(self.http_request)
 78        _login_time = now()
 79        if _login_event:
 80            _login_time = _login_event.created
 81        self._auth_instant = get_time_string(_login_time)
 82        self._valid_not_before = get_time_string(
 83            timedelta_from_string(self.provider.assertion_valid_not_before)
 84        )
 85        self.session_not_on_or_after_datetime = now() + timedelta_from_string(
 86            self.provider.session_valid_not_on_or_after
 87        )
 88        self._session_not_on_or_after = get_time_string(self.session_not_on_or_after_datetime)
 89        self._valid_not_on_or_after = get_time_string(
 90            timedelta_from_string(self.provider.assertion_valid_not_on_or_after)
 91        )
 92
 93    def get_attributes(self) -> Element:
 94        """Get AttributeStatement Element with Attributes from Property Mappings."""
 95        # https://commons.lbl.gov/display/IDMgmt/Attribute+Definitions
 96        attribute_statement = Element(f"{{{NS_SAML_ASSERTION}}}AttributeStatement")
 97        user = self.http_request.user
 98        for mapping in SAMLPropertyMapping.objects.filter(provider=self.provider).order_by(
 99            "saml_name"
100        ):
101            try:
102                mapping: SAMLPropertyMapping
103                value = mapping.evaluate(
104                    user=user,
105                    request=self.http_request,
106                    provider=self.provider,
107                )
108                if value is None:
109                    continue
110
111                attribute = Element(f"{{{NS_SAML_ASSERTION}}}Attribute")
112                if mapping.friendly_name and mapping.friendly_name != "":
113                    attribute.attrib["FriendlyName"] = mapping.friendly_name
114                attribute.attrib["Name"] = mapping.saml_name
115
116                if not isinstance(value, list | GeneratorType):
117                    value = [value]
118
119                for value_item in value:
120                    attribute_value = SubElement(
121                        attribute, f"{{{NS_SAML_ASSERTION}}}AttributeValue"
122                    )
123                    str_value = str(value_item) if not isinstance(value_item, str) else value_item
124                    attribute_value.text = str_value
125
126                attribute_statement.append(attribute)
127
128            except (PropertyMappingExpressionException, ValueError) as exc:
129                # Value error can be raised when assigning invalid data to an attribute
130                Event.new(
131                    EventAction.CONFIGURATION_ERROR,
132                    message=f"Failed to evaluate property-mapping: '{mapping.name}'",
133                    provider=self.provider,
134                    mapping=mapping,
135                ).from_http(self.http_request)
136                LOGGER.warning("Failed to evaluate property mapping", exc=exc)
137                continue
138        return attribute_statement
139
140    def get_issuer(self) -> Element:
141        """Get Issuer Element"""
142        issuer = Element(f"{{{NS_SAML_ASSERTION}}}Issuer", nsmap=NS_MAP)
143        issuer.text = self.provider.issuer
144        return issuer
145
146    def get_assertion_auth_n_statement(self) -> Element:
147        """Generate AuthnStatement with AuthnContext and ContextClassRef Elements."""
148        auth_n_statement = Element(f"{{{NS_SAML_ASSERTION}}}AuthnStatement")
149        auth_n_statement.attrib["AuthnInstant"] = self._auth_instant
150        self.session_index = sha256(
151            self.http_request.session.session_key.encode("ascii")
152        ).hexdigest()
153        auth_n_statement.attrib["SessionIndex"] = self.session_index
154        auth_n_statement.attrib["SessionNotOnOrAfter"] = self._session_not_on_or_after
155
156        auth_n_context = SubElement(auth_n_statement, f"{{{NS_SAML_ASSERTION}}}AuthnContext")
157        auth_n_context_class_ref = SubElement(
158            auth_n_context, f"{{{NS_SAML_ASSERTION}}}AuthnContextClassRef"
159        )
160        auth_n_context_class_ref.text = "urn:oasis:names:tc:SAML:2.0:ac:classes:unspecified"
161        event = get_login_event(self.http_request)
162        if event:
163            method = event.context.get(PLAN_CONTEXT_METHOD, "")
164            method_args = event.context.get(PLAN_CONTEXT_METHOD_ARGS, {})
165            if method == "password":
166                auth_n_context_class_ref.text = (
167                    "urn:oasis:names:tc:SAML:2.0:ac:classes:PasswordProtectedTransport"
168                )
169            if "mfa_devices" in method_args:
170                auth_n_context_class_ref.text = (
171                    "urn:oasis:names:tc:SAML:2.0:ac:classes:MobileTwoFactorContract"
172                )
173            if method in ["auth_mfa", "auth_webauthn_pwl"]:
174                auth_n_context_class_ref.text = (
175                    "urn:oasis:names:tc:SAML:2.0:ac:classes:MobileOneFactorContract"
176                )
177        if self.provider.authn_context_class_ref_mapping:
178            try:
179                value = self.provider.authn_context_class_ref_mapping.evaluate(
180                    user=self.http_request.user,
181                    request=self.http_request,
182                    provider=self.provider,
183                )
184                if value is not None:
185                    auth_n_context_class_ref.text = str(value)
186                return auth_n_statement
187            except PropertyMappingExpressionException as exc:
188                Event.new(
189                    EventAction.CONFIGURATION_ERROR,
190                    message=(
191                        "Failed to evaluate property-mapping: "
192                        f"'{self.provider.authn_context_class_ref_mapping.name}'"
193                    ),
194                    provider=self.provider,
195                    mapping=self.provider.authn_context_class_ref_mapping,
196                ).from_http(self.http_request)
197                LOGGER.warning("Failed to evaluate property mapping", exc=exc)
198                return auth_n_statement
199        return auth_n_statement
200
201    def get_assertion_conditions(self) -> Element:
202        """Generate Conditions with AudienceRestriction and Audience Elements."""
203        conditions = Element(f"{{{NS_SAML_ASSERTION}}}Conditions")
204        conditions.attrib["NotBefore"] = self._valid_not_before
205        conditions.attrib["NotOnOrAfter"] = self._valid_not_on_or_after
206        if self.provider.audience != "":
207            audience_restriction = SubElement(
208                conditions, f"{{{NS_SAML_ASSERTION}}}AudienceRestriction"
209            )
210            audience = SubElement(audience_restriction, f"{{{NS_SAML_ASSERTION}}}Audience")
211            audience.text = self.provider.audience
212        return conditions
213
214    def get_name_id(self) -> Element:
215        """Get NameID Element"""
216        name_id = Element(f"{{{NS_SAML_ASSERTION}}}NameID")
217        # For requests that don't specify a NameIDPolicy, check if we
218        # can fall back to the provider default
219        if (
220            self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_UNSPECIFIED
221            and self.provider.default_name_id_policy != SAML_NAME_ID_FORMAT_UNSPECIFIED
222        ):
223            self.auth_n_request.name_id_policy = self.provider.default_name_id_policy
224        name_id.attrib["Format"] = self.auth_n_request.name_id_policy
225        self.name_id_format = self.auth_n_request.name_id_policy
226        # persistent is used as a fallback, so always generate it
227        persistent = self.http_request.user.uid
228        name_id.text = persistent
229        self.name_id = persistent
230        # If name_id_mapping is set, we override the value, regardless of what the SP asks for
231        if self.provider.name_id_mapping:
232            try:
233                value = self.provider.name_id_mapping.evaluate(
234                    user=self.http_request.user,
235                    request=self.http_request,
236                    provider=self.provider,
237                )
238                if value is not None:
239                    name_id.text = str(value)
240                    self.name_id = str(value)
241                return name_id
242            except PropertyMappingExpressionException as exc:
243                Event.new(
244                    EventAction.CONFIGURATION_ERROR,
245                    message=(
246                        "Failed to evaluate property-mapping: "
247                        f"'{self.provider.name_id_mapping.name}'",
248                    ),
249                    provider=self.provider,
250                    mapping=self.provider.name_id_mapping,
251                ).from_http(self.http_request)
252                LOGGER.warning("Failed to evaluate property mapping", exc=exc)
253                return name_id
254        if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_EMAIL:
255            name_id.text = self.http_request.user.email
256            self.name_id = self.http_request.user.email
257            return name_id
258        if self.auth_n_request.name_id_policy in [
259            SAML_NAME_ID_FORMAT_PERSISTENT,
260            SAML_NAME_ID_FORMAT_UNSPECIFIED,
261        ]:
262            name_id.text = persistent
263            self.name_id = persistent
264            return name_id
265        if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_X509:
266            # This attribute is statically set by the LDAP source
267            name_id.text = self.http_request.user.attributes.get(
268                LDAP_DISTINGUISHED_NAME, persistent
269            )
270            self.name_id = name_id.text
271            return name_id
272        if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_WINDOWS:
273            # This attribute is statically set by the LDAP source
274            name_id.text = self.http_request.user.attributes.get("upn", persistent)
275            self.name_id = name_id.text
276            return name_id
277        if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_TRANSIENT:
278            # Use the hash of the user's session, which changes every session
279            session_key: str = self.http_request.session.session_key
280            name_id.text = sha256(session_key.encode()).hexdigest()
281            self.name_id = name_id.text
282            return name_id
283        raise UnsupportedNameIDFormat(
284            "Assertion contains NameID with unsupported "
285            f"format {self.auth_n_request.name_id_policy}."
286        )
287
288    def get_assertion_subject(self) -> Element:
289        """Generate Subject Element with NameID and SubjectConfirmation Objects"""
290        subject = Element(f"{{{NS_SAML_ASSERTION}}}Subject")
291        subject.append(self.get_name_id())
292
293        subject_confirmation = SubElement(subject, f"{{{NS_SAML_ASSERTION}}}SubjectConfirmation")
294        subject_confirmation.attrib["Method"] = "urn:oasis:names:tc:SAML:2.0:cm:bearer"
295
296        subject_confirmation_data = SubElement(
297            subject_confirmation, f"{{{NS_SAML_ASSERTION}}}SubjectConfirmationData"
298        )
299        if self.auth_n_request.id:
300            subject_confirmation_data.attrib["InResponseTo"] = self.auth_n_request.id
301        subject_confirmation_data.attrib["NotOnOrAfter"] = self._valid_not_on_or_after
302        subject_confirmation_data.attrib["Recipient"] = self.provider.acs_url
303        return subject
304
305    def get_assertion(self) -> Element:
306        """Generate Main Assertion Element"""
307        assertion = Element(f"{{{NS_SAML_ASSERTION}}}Assertion", nsmap=NS_MAP)
308        assertion.attrib["Version"] = "2.0"
309        assertion.attrib["ID"] = self._assertion_id
310        assertion.attrib["IssueInstant"] = self._issue_instant
311        assertion.append(self.get_issuer())
312
313        if self.provider.signing_kp and self.provider.sign_assertion:
314            sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get(
315                self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha1
316            )
317            signature = xmlsec.template.create(
318                assertion,
319                xmlsec.constants.TransformExclC14N,
320                sign_algorithm_transform,
321                ns=xmlsec.constants.DSigNs,
322            )
323            assertion.append(signature)
324
325        assertion.append(self.get_assertion_subject())
326        assertion.append(self.get_assertion_conditions())
327        assertion.append(self.get_assertion_auth_n_statement())
328
329        assertion.append(self.get_attributes())
330        return assertion
331
332    def get_response(self) -> Element:
333        """Generate Root response element"""
334        response = Element(f"{{{NS_SAML_PROTOCOL}}}Response", nsmap=NS_MAP)
335        response.attrib["Version"] = "2.0"
336        response.attrib["IssueInstant"] = self._issue_instant
337        response.attrib["Destination"] = self.provider.acs_url
338        response.attrib["ID"] = self._response_id
339        if self.auth_n_request.id:
340            response.attrib["InResponseTo"] = self.auth_n_request.id
341
342        response.append(self.get_issuer())
343
344        if self.provider.signing_kp and self.provider.sign_response:
345            sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get(
346                self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha1
347            )
348            signature = xmlsec.template.create(
349                response,
350                xmlsec.constants.TransformExclC14N,
351                sign_algorithm_transform,
352                ns=xmlsec.constants.DSigNs,
353            )
354            response.append(signature)
355
356        status = SubElement(response, f"{{{NS_SAML_PROTOCOL}}}Status")
357        status_code = SubElement(status, f"{{{NS_SAML_PROTOCOL}}}StatusCode")
358        status_code.attrib["Value"] = "urn:oasis:names:tc:SAML:2.0:status:Success"
359
360        response.append(self.get_assertion())
361        return response
362
363    def _sign(self, element: _Element):
364        """Sign an XML element based on the providers' configured signing settings"""
365        digest_algorithm_transform = DIGEST_ALGORITHM_TRANSLATION_MAP.get(
366            self.provider.digest_algorithm, xmlsec.constants.TransformSha1
367        )
368        xmlsec.tree.add_ids(element, ["ID"])
369        signature_node = xmlsec.tree.find_node(element, xmlsec.constants.NodeSignature)
370        ref = xmlsec.template.add_reference(
371            signature_node,
372            digest_algorithm_transform,
373            uri="#" + element.attrib["ID"],
374        )
375        xmlsec.template.add_transform(ref, xmlsec.constants.TransformEnveloped)
376        xmlsec.template.add_transform(ref, xmlsec.constants.TransformExclC14N)
377        key_info = xmlsec.template.ensure_key_info(signature_node)
378        xmlsec.template.add_x509_data(key_info)
379
380        ctx = xmlsec.SignatureContext()
381
382        key = xmlsec.Key.from_memory(
383            self.provider.signing_kp.key_data,
384            xmlsec.constants.KeyDataFormatPem,
385            None,
386        )
387        key.load_cert_from_memory(
388            self.provider.signing_kp.certificate_data,
389            xmlsec.constants.KeyDataFormatCertPem,
390        )
391        ctx.key = key
392        try:
393            ctx.sign(remove_xml_newlines(element, signature_node))
394        except xmlsec.Error as exc:
395            raise InvalidSignature() from exc
396
397    def _encrypt(self, element: _Element, parent: _Element):
398        """Encrypt SAMLResponse EncryptedAssertion Element"""
399        # Create a standalone copy so namespace declarations are included in the encrypted content
400        element_xml = etree.tostring(element)
401        standalone_element = etree.fromstring(element_xml)
402
403        # Remove the original element from the tree since we're replacing it with encrypted version
404        parent.remove(element)
405
406        manager = xmlsec.KeysManager()
407        key = xmlsec.Key.from_memory(
408            self.provider.encryption_kp.certificate_data,
409            xmlsec.constants.KeyDataFormatCertPem,
410        )
411
412        manager.add_key(key)
413        encryption_context = xmlsec.EncryptionContext(manager)
414        encryption_context.key = xmlsec.Key.generate(
415            xmlsec.constants.KeyDataAes, 128, xmlsec.constants.KeyDataTypeSession
416        )
417
418        container = SubElement(parent, f"{{{NS_SAML_ASSERTION}}}EncryptedAssertion")
419        enc_data = xmlsec.template.encrypted_data_create(
420            container, xmlsec.Transform.AES128, type=xmlsec.EncryptionType.ELEMENT, ns="xenc"
421        )
422        xmlsec.template.encrypted_data_ensure_cipher_value(enc_data)
423        key_info = xmlsec.template.encrypted_data_ensure_key_info(enc_data, ns="ds")
424        enc_key = xmlsec.template.add_encrypted_key(key_info, xmlsec.Transform.RSA_OAEP)
425        xmlsec.template.encrypted_data_ensure_cipher_value(enc_key)
426
427        try:
428            enc_data = encryption_context.encrypt_xml(enc_data, standalone_element)
429        except xmlsec.Error as exc:
430            raise InvalidEncryption() from exc
431
432        container.append(enc_data)
433
434    def build_response(self) -> str:
435        """Build string XML Response and sign if signing is enabled."""
436        root_response = self.get_response()
437        # Sign assertion first (before encryption)
438        if self.provider.signing_kp and self.provider.sign_assertion:
439            assertion = root_response.xpath("//saml:Assertion", namespaces=NS_MAP)[0]
440            self._sign(assertion)
441        # Encrypt assertion (this replaces Assertion with EncryptedAssertion)
442        if self.provider.encryption_kp:
443            assertion = root_response.xpath("//saml:Assertion", namespaces=NS_MAP)[0]
444            self._encrypt(assertion, root_response)
445        # Sign response AFTER encryption so signature covers the encrypted content
446        if self.provider.signing_kp and self.provider.sign_response:
447            response = root_response.xpath("//samlp:Response", namespaces=NS_MAP)[0]
448            self._sign(response)
449        return etree.tostring(root_response).decode("utf-8")  # nosec
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
class AssertionProcessor:
 48class AssertionProcessor:
 49    """Generate a SAML Response from an AuthNRequest"""
 50
 51    provider: SAMLProvider
 52    http_request: HttpRequest
 53    auth_n_request: AuthNRequest
 54
 55    _issue_instant: str
 56    _assertion_id: str
 57    _response_id: str
 58
 59    _auth_instant: str
 60    _valid_not_before: str
 61    _session_not_on_or_after: str
 62    _valid_not_on_or_after: str
 63
 64    session_index: str
 65    name_id: str
 66    name_id_format: str
 67    session_not_on_or_after_datetime: datetime
 68
 69    def __init__(self, provider: SAMLProvider, request: HttpRequest, auth_n_request: AuthNRequest):
 70        self.provider = provider
 71        self.http_request = request
 72        self.auth_n_request = auth_n_request
 73
 74        self._issue_instant = get_time_string()
 75        self._assertion_id = get_random_id()
 76        self._response_id = get_random_id()
 77
 78        _login_event = get_login_event(self.http_request)
 79        _login_time = now()
 80        if _login_event:
 81            _login_time = _login_event.created
 82        self._auth_instant = get_time_string(_login_time)
 83        self._valid_not_before = get_time_string(
 84            timedelta_from_string(self.provider.assertion_valid_not_before)
 85        )
 86        self.session_not_on_or_after_datetime = now() + timedelta_from_string(
 87            self.provider.session_valid_not_on_or_after
 88        )
 89        self._session_not_on_or_after = get_time_string(self.session_not_on_or_after_datetime)
 90        self._valid_not_on_or_after = get_time_string(
 91            timedelta_from_string(self.provider.assertion_valid_not_on_or_after)
 92        )
 93
 94    def get_attributes(self) -> Element:
 95        """Get AttributeStatement Element with Attributes from Property Mappings."""
 96        # https://commons.lbl.gov/display/IDMgmt/Attribute+Definitions
 97        attribute_statement = Element(f"{{{NS_SAML_ASSERTION}}}AttributeStatement")
 98        user = self.http_request.user
 99        for mapping in SAMLPropertyMapping.objects.filter(provider=self.provider).order_by(
100            "saml_name"
101        ):
102            try:
103                mapping: SAMLPropertyMapping
104                value = mapping.evaluate(
105                    user=user,
106                    request=self.http_request,
107                    provider=self.provider,
108                )
109                if value is None:
110                    continue
111
112                attribute = Element(f"{{{NS_SAML_ASSERTION}}}Attribute")
113                if mapping.friendly_name and mapping.friendly_name != "":
114                    attribute.attrib["FriendlyName"] = mapping.friendly_name
115                attribute.attrib["Name"] = mapping.saml_name
116
117                if not isinstance(value, list | GeneratorType):
118                    value = [value]
119
120                for value_item in value:
121                    attribute_value = SubElement(
122                        attribute, f"{{{NS_SAML_ASSERTION}}}AttributeValue"
123                    )
124                    str_value = str(value_item) if not isinstance(value_item, str) else value_item
125                    attribute_value.text = str_value
126
127                attribute_statement.append(attribute)
128
129            except (PropertyMappingExpressionException, ValueError) as exc:
130                # Value error can be raised when assigning invalid data to an attribute
131                Event.new(
132                    EventAction.CONFIGURATION_ERROR,
133                    message=f"Failed to evaluate property-mapping: '{mapping.name}'",
134                    provider=self.provider,
135                    mapping=mapping,
136                ).from_http(self.http_request)
137                LOGGER.warning("Failed to evaluate property mapping", exc=exc)
138                continue
139        return attribute_statement
140
141    def get_issuer(self) -> Element:
142        """Get Issuer Element"""
143        issuer = Element(f"{{{NS_SAML_ASSERTION}}}Issuer", nsmap=NS_MAP)
144        issuer.text = self.provider.issuer
145        return issuer
146
147    def get_assertion_auth_n_statement(self) -> Element:
148        """Generate AuthnStatement with AuthnContext and ContextClassRef Elements."""
149        auth_n_statement = Element(f"{{{NS_SAML_ASSERTION}}}AuthnStatement")
150        auth_n_statement.attrib["AuthnInstant"] = self._auth_instant
151        self.session_index = sha256(
152            self.http_request.session.session_key.encode("ascii")
153        ).hexdigest()
154        auth_n_statement.attrib["SessionIndex"] = self.session_index
155        auth_n_statement.attrib["SessionNotOnOrAfter"] = self._session_not_on_or_after
156
157        auth_n_context = SubElement(auth_n_statement, f"{{{NS_SAML_ASSERTION}}}AuthnContext")
158        auth_n_context_class_ref = SubElement(
159            auth_n_context, f"{{{NS_SAML_ASSERTION}}}AuthnContextClassRef"
160        )
161        auth_n_context_class_ref.text = "urn:oasis:names:tc:SAML:2.0:ac:classes:unspecified"
162        event = get_login_event(self.http_request)
163        if event:
164            method = event.context.get(PLAN_CONTEXT_METHOD, "")
165            method_args = event.context.get(PLAN_CONTEXT_METHOD_ARGS, {})
166            if method == "password":
167                auth_n_context_class_ref.text = (
168                    "urn:oasis:names:tc:SAML:2.0:ac:classes:PasswordProtectedTransport"
169                )
170            if "mfa_devices" in method_args:
171                auth_n_context_class_ref.text = (
172                    "urn:oasis:names:tc:SAML:2.0:ac:classes:MobileTwoFactorContract"
173                )
174            if method in ["auth_mfa", "auth_webauthn_pwl"]:
175                auth_n_context_class_ref.text = (
176                    "urn:oasis:names:tc:SAML:2.0:ac:classes:MobileOneFactorContract"
177                )
178        if self.provider.authn_context_class_ref_mapping:
179            try:
180                value = self.provider.authn_context_class_ref_mapping.evaluate(
181                    user=self.http_request.user,
182                    request=self.http_request,
183                    provider=self.provider,
184                )
185                if value is not None:
186                    auth_n_context_class_ref.text = str(value)
187                return auth_n_statement
188            except PropertyMappingExpressionException as exc:
189                Event.new(
190                    EventAction.CONFIGURATION_ERROR,
191                    message=(
192                        "Failed to evaluate property-mapping: "
193                        f"'{self.provider.authn_context_class_ref_mapping.name}'"
194                    ),
195                    provider=self.provider,
196                    mapping=self.provider.authn_context_class_ref_mapping,
197                ).from_http(self.http_request)
198                LOGGER.warning("Failed to evaluate property mapping", exc=exc)
199                return auth_n_statement
200        return auth_n_statement
201
202    def get_assertion_conditions(self) -> Element:
203        """Generate Conditions with AudienceRestriction and Audience Elements."""
204        conditions = Element(f"{{{NS_SAML_ASSERTION}}}Conditions")
205        conditions.attrib["NotBefore"] = self._valid_not_before
206        conditions.attrib["NotOnOrAfter"] = self._valid_not_on_or_after
207        if self.provider.audience != "":
208            audience_restriction = SubElement(
209                conditions, f"{{{NS_SAML_ASSERTION}}}AudienceRestriction"
210            )
211            audience = SubElement(audience_restriction, f"{{{NS_SAML_ASSERTION}}}Audience")
212            audience.text = self.provider.audience
213        return conditions
214
215    def get_name_id(self) -> Element:
216        """Get NameID Element"""
217        name_id = Element(f"{{{NS_SAML_ASSERTION}}}NameID")
218        # For requests that don't specify a NameIDPolicy, check if we
219        # can fall back to the provider default
220        if (
221            self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_UNSPECIFIED
222            and self.provider.default_name_id_policy != SAML_NAME_ID_FORMAT_UNSPECIFIED
223        ):
224            self.auth_n_request.name_id_policy = self.provider.default_name_id_policy
225        name_id.attrib["Format"] = self.auth_n_request.name_id_policy
226        self.name_id_format = self.auth_n_request.name_id_policy
227        # persistent is used as a fallback, so always generate it
228        persistent = self.http_request.user.uid
229        name_id.text = persistent
230        self.name_id = persistent
231        # If name_id_mapping is set, we override the value, regardless of what the SP asks for
232        if self.provider.name_id_mapping:
233            try:
234                value = self.provider.name_id_mapping.evaluate(
235                    user=self.http_request.user,
236                    request=self.http_request,
237                    provider=self.provider,
238                )
239                if value is not None:
240                    name_id.text = str(value)
241                    self.name_id = str(value)
242                return name_id
243            except PropertyMappingExpressionException as exc:
244                Event.new(
245                    EventAction.CONFIGURATION_ERROR,
246                    message=(
247                        "Failed to evaluate property-mapping: "
248                        f"'{self.provider.name_id_mapping.name}'",
249                    ),
250                    provider=self.provider,
251                    mapping=self.provider.name_id_mapping,
252                ).from_http(self.http_request)
253                LOGGER.warning("Failed to evaluate property mapping", exc=exc)
254                return name_id
255        if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_EMAIL:
256            name_id.text = self.http_request.user.email
257            self.name_id = self.http_request.user.email
258            return name_id
259        if self.auth_n_request.name_id_policy in [
260            SAML_NAME_ID_FORMAT_PERSISTENT,
261            SAML_NAME_ID_FORMAT_UNSPECIFIED,
262        ]:
263            name_id.text = persistent
264            self.name_id = persistent
265            return name_id
266        if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_X509:
267            # This attribute is statically set by the LDAP source
268            name_id.text = self.http_request.user.attributes.get(
269                LDAP_DISTINGUISHED_NAME, persistent
270            )
271            self.name_id = name_id.text
272            return name_id
273        if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_WINDOWS:
274            # This attribute is statically set by the LDAP source
275            name_id.text = self.http_request.user.attributes.get("upn", persistent)
276            self.name_id = name_id.text
277            return name_id
278        if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_TRANSIENT:
279            # Use the hash of the user's session, which changes every session
280            session_key: str = self.http_request.session.session_key
281            name_id.text = sha256(session_key.encode()).hexdigest()
282            self.name_id = name_id.text
283            return name_id
284        raise UnsupportedNameIDFormat(
285            "Assertion contains NameID with unsupported "
286            f"format {self.auth_n_request.name_id_policy}."
287        )
288
289    def get_assertion_subject(self) -> Element:
290        """Generate Subject Element with NameID and SubjectConfirmation Objects"""
291        subject = Element(f"{{{NS_SAML_ASSERTION}}}Subject")
292        subject.append(self.get_name_id())
293
294        subject_confirmation = SubElement(subject, f"{{{NS_SAML_ASSERTION}}}SubjectConfirmation")
295        subject_confirmation.attrib["Method"] = "urn:oasis:names:tc:SAML:2.0:cm:bearer"
296
297        subject_confirmation_data = SubElement(
298            subject_confirmation, f"{{{NS_SAML_ASSERTION}}}SubjectConfirmationData"
299        )
300        if self.auth_n_request.id:
301            subject_confirmation_data.attrib["InResponseTo"] = self.auth_n_request.id
302        subject_confirmation_data.attrib["NotOnOrAfter"] = self._valid_not_on_or_after
303        subject_confirmation_data.attrib["Recipient"] = self.provider.acs_url
304        return subject
305
306    def get_assertion(self) -> Element:
307        """Generate Main Assertion Element"""
308        assertion = Element(f"{{{NS_SAML_ASSERTION}}}Assertion", nsmap=NS_MAP)
309        assertion.attrib["Version"] = "2.0"
310        assertion.attrib["ID"] = self._assertion_id
311        assertion.attrib["IssueInstant"] = self._issue_instant
312        assertion.append(self.get_issuer())
313
314        if self.provider.signing_kp and self.provider.sign_assertion:
315            sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get(
316                self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha1
317            )
318            signature = xmlsec.template.create(
319                assertion,
320                xmlsec.constants.TransformExclC14N,
321                sign_algorithm_transform,
322                ns=xmlsec.constants.DSigNs,
323            )
324            assertion.append(signature)
325
326        assertion.append(self.get_assertion_subject())
327        assertion.append(self.get_assertion_conditions())
328        assertion.append(self.get_assertion_auth_n_statement())
329
330        assertion.append(self.get_attributes())
331        return assertion
332
333    def get_response(self) -> Element:
334        """Generate Root response element"""
335        response = Element(f"{{{NS_SAML_PROTOCOL}}}Response", nsmap=NS_MAP)
336        response.attrib["Version"] = "2.0"
337        response.attrib["IssueInstant"] = self._issue_instant
338        response.attrib["Destination"] = self.provider.acs_url
339        response.attrib["ID"] = self._response_id
340        if self.auth_n_request.id:
341            response.attrib["InResponseTo"] = self.auth_n_request.id
342
343        response.append(self.get_issuer())
344
345        if self.provider.signing_kp and self.provider.sign_response:
346            sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get(
347                self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha1
348            )
349            signature = xmlsec.template.create(
350                response,
351                xmlsec.constants.TransformExclC14N,
352                sign_algorithm_transform,
353                ns=xmlsec.constants.DSigNs,
354            )
355            response.append(signature)
356
357        status = SubElement(response, f"{{{NS_SAML_PROTOCOL}}}Status")
358        status_code = SubElement(status, f"{{{NS_SAML_PROTOCOL}}}StatusCode")
359        status_code.attrib["Value"] = "urn:oasis:names:tc:SAML:2.0:status:Success"
360
361        response.append(self.get_assertion())
362        return response
363
364    def _sign(self, element: _Element):
365        """Sign an XML element based on the providers' configured signing settings"""
366        digest_algorithm_transform = DIGEST_ALGORITHM_TRANSLATION_MAP.get(
367            self.provider.digest_algorithm, xmlsec.constants.TransformSha1
368        )
369        xmlsec.tree.add_ids(element, ["ID"])
370        signature_node = xmlsec.tree.find_node(element, xmlsec.constants.NodeSignature)
371        ref = xmlsec.template.add_reference(
372            signature_node,
373            digest_algorithm_transform,
374            uri="#" + element.attrib["ID"],
375        )
376        xmlsec.template.add_transform(ref, xmlsec.constants.TransformEnveloped)
377        xmlsec.template.add_transform(ref, xmlsec.constants.TransformExclC14N)
378        key_info = xmlsec.template.ensure_key_info(signature_node)
379        xmlsec.template.add_x509_data(key_info)
380
381        ctx = xmlsec.SignatureContext()
382
383        key = xmlsec.Key.from_memory(
384            self.provider.signing_kp.key_data,
385            xmlsec.constants.KeyDataFormatPem,
386            None,
387        )
388        key.load_cert_from_memory(
389            self.provider.signing_kp.certificate_data,
390            xmlsec.constants.KeyDataFormatCertPem,
391        )
392        ctx.key = key
393        try:
394            ctx.sign(remove_xml_newlines(element, signature_node))
395        except xmlsec.Error as exc:
396            raise InvalidSignature() from exc
397
398    def _encrypt(self, element: _Element, parent: _Element):
399        """Encrypt SAMLResponse EncryptedAssertion Element"""
400        # Create a standalone copy so namespace declarations are included in the encrypted content
401        element_xml = etree.tostring(element)
402        standalone_element = etree.fromstring(element_xml)
403
404        # Remove the original element from the tree since we're replacing it with encrypted version
405        parent.remove(element)
406
407        manager = xmlsec.KeysManager()
408        key = xmlsec.Key.from_memory(
409            self.provider.encryption_kp.certificate_data,
410            xmlsec.constants.KeyDataFormatCertPem,
411        )
412
413        manager.add_key(key)
414        encryption_context = xmlsec.EncryptionContext(manager)
415        encryption_context.key = xmlsec.Key.generate(
416            xmlsec.constants.KeyDataAes, 128, xmlsec.constants.KeyDataTypeSession
417        )
418
419        container = SubElement(parent, f"{{{NS_SAML_ASSERTION}}}EncryptedAssertion")
420        enc_data = xmlsec.template.encrypted_data_create(
421            container, xmlsec.Transform.AES128, type=xmlsec.EncryptionType.ELEMENT, ns="xenc"
422        )
423        xmlsec.template.encrypted_data_ensure_cipher_value(enc_data)
424        key_info = xmlsec.template.encrypted_data_ensure_key_info(enc_data, ns="ds")
425        enc_key = xmlsec.template.add_encrypted_key(key_info, xmlsec.Transform.RSA_OAEP)
426        xmlsec.template.encrypted_data_ensure_cipher_value(enc_key)
427
428        try:
429            enc_data = encryption_context.encrypt_xml(enc_data, standalone_element)
430        except xmlsec.Error as exc:
431            raise InvalidEncryption() from exc
432
433        container.append(enc_data)
434
435    def build_response(self) -> str:
436        """Build string XML Response and sign if signing is enabled."""
437        root_response = self.get_response()
438        # Sign assertion first (before encryption)
439        if self.provider.signing_kp and self.provider.sign_assertion:
440            assertion = root_response.xpath("//saml:Assertion", namespaces=NS_MAP)[0]
441            self._sign(assertion)
442        # Encrypt assertion (this replaces Assertion with EncryptedAssertion)
443        if self.provider.encryption_kp:
444            assertion = root_response.xpath("//saml:Assertion", namespaces=NS_MAP)[0]
445            self._encrypt(assertion, root_response)
446        # Sign response AFTER encryption so signature covers the encrypted content
447        if self.provider.signing_kp and self.provider.sign_response:
448            response = root_response.xpath("//samlp:Response", namespaces=NS_MAP)[0]
449            self._sign(response)
450        return etree.tostring(root_response).decode("utf-8")  # nosec

Generate a SAML Response from an AuthNRequest

AssertionProcessor( provider: authentik.providers.saml.models.SAMLProvider, request: django.http.request.HttpRequest, auth_n_request: authentik.providers.saml.processors.authn_request_parser.AuthNRequest)
69    def __init__(self, provider: SAMLProvider, request: HttpRequest, auth_n_request: AuthNRequest):
70        self.provider = provider
71        self.http_request = request
72        self.auth_n_request = auth_n_request
73
74        self._issue_instant = get_time_string()
75        self._assertion_id = get_random_id()
76        self._response_id = get_random_id()
77
78        _login_event = get_login_event(self.http_request)
79        _login_time = now()
80        if _login_event:
81            _login_time = _login_event.created
82        self._auth_instant = get_time_string(_login_time)
83        self._valid_not_before = get_time_string(
84            timedelta_from_string(self.provider.assertion_valid_not_before)
85        )
86        self.session_not_on_or_after_datetime = now() + timedelta_from_string(
87            self.provider.session_valid_not_on_or_after
88        )
89        self._session_not_on_or_after = get_time_string(self.session_not_on_or_after_datetime)
90        self._valid_not_on_or_after = get_time_string(
91            timedelta_from_string(self.provider.assertion_valid_not_on_or_after)
92        )
http_request: django.http.request.HttpRequest
session_index: str
name_id: str
name_id_format: str
session_not_on_or_after_datetime: datetime.datetime
def get_attributes(self) -> lxml.etree.Element:
 94    def get_attributes(self) -> Element:
 95        """Get AttributeStatement Element with Attributes from Property Mappings."""
 96        # https://commons.lbl.gov/display/IDMgmt/Attribute+Definitions
 97        attribute_statement = Element(f"{{{NS_SAML_ASSERTION}}}AttributeStatement")
 98        user = self.http_request.user
 99        for mapping in SAMLPropertyMapping.objects.filter(provider=self.provider).order_by(
100            "saml_name"
101        ):
102            try:
103                mapping: SAMLPropertyMapping
104                value = mapping.evaluate(
105                    user=user,
106                    request=self.http_request,
107                    provider=self.provider,
108                )
109                if value is None:
110                    continue
111
112                attribute = Element(f"{{{NS_SAML_ASSERTION}}}Attribute")
113                if mapping.friendly_name and mapping.friendly_name != "":
114                    attribute.attrib["FriendlyName"] = mapping.friendly_name
115                attribute.attrib["Name"] = mapping.saml_name
116
117                if not isinstance(value, list | GeneratorType):
118                    value = [value]
119
120                for value_item in value:
121                    attribute_value = SubElement(
122                        attribute, f"{{{NS_SAML_ASSERTION}}}AttributeValue"
123                    )
124                    str_value = str(value_item) if not isinstance(value_item, str) else value_item
125                    attribute_value.text = str_value
126
127                attribute_statement.append(attribute)
128
129            except (PropertyMappingExpressionException, ValueError) as exc:
130                # Value error can be raised when assigning invalid data to an attribute
131                Event.new(
132                    EventAction.CONFIGURATION_ERROR,
133                    message=f"Failed to evaluate property-mapping: '{mapping.name}'",
134                    provider=self.provider,
135                    mapping=mapping,
136                ).from_http(self.http_request)
137                LOGGER.warning("Failed to evaluate property mapping", exc=exc)
138                continue
139        return attribute_statement

Get AttributeStatement Element with Attributes from Property Mappings.

def get_issuer(self) -> lxml.etree.Element:
141    def get_issuer(self) -> Element:
142        """Get Issuer Element"""
143        issuer = Element(f"{{{NS_SAML_ASSERTION}}}Issuer", nsmap=NS_MAP)
144        issuer.text = self.provider.issuer
145        return issuer

Get Issuer Element

def get_assertion_auth_n_statement(self) -> lxml.etree.Element:
147    def get_assertion_auth_n_statement(self) -> Element:
148        """Generate AuthnStatement with AuthnContext and ContextClassRef Elements."""
149        auth_n_statement = Element(f"{{{NS_SAML_ASSERTION}}}AuthnStatement")
150        auth_n_statement.attrib["AuthnInstant"] = self._auth_instant
151        self.session_index = sha256(
152            self.http_request.session.session_key.encode("ascii")
153        ).hexdigest()
154        auth_n_statement.attrib["SessionIndex"] = self.session_index
155        auth_n_statement.attrib["SessionNotOnOrAfter"] = self._session_not_on_or_after
156
157        auth_n_context = SubElement(auth_n_statement, f"{{{NS_SAML_ASSERTION}}}AuthnContext")
158        auth_n_context_class_ref = SubElement(
159            auth_n_context, f"{{{NS_SAML_ASSERTION}}}AuthnContextClassRef"
160        )
161        auth_n_context_class_ref.text = "urn:oasis:names:tc:SAML:2.0:ac:classes:unspecified"
162        event = get_login_event(self.http_request)
163        if event:
164            method = event.context.get(PLAN_CONTEXT_METHOD, "")
165            method_args = event.context.get(PLAN_CONTEXT_METHOD_ARGS, {})
166            if method == "password":
167                auth_n_context_class_ref.text = (
168                    "urn:oasis:names:tc:SAML:2.0:ac:classes:PasswordProtectedTransport"
169                )
170            if "mfa_devices" in method_args:
171                auth_n_context_class_ref.text = (
172                    "urn:oasis:names:tc:SAML:2.0:ac:classes:MobileTwoFactorContract"
173                )
174            if method in ["auth_mfa", "auth_webauthn_pwl"]:
175                auth_n_context_class_ref.text = (
176                    "urn:oasis:names:tc:SAML:2.0:ac:classes:MobileOneFactorContract"
177                )
178        if self.provider.authn_context_class_ref_mapping:
179            try:
180                value = self.provider.authn_context_class_ref_mapping.evaluate(
181                    user=self.http_request.user,
182                    request=self.http_request,
183                    provider=self.provider,
184                )
185                if value is not None:
186                    auth_n_context_class_ref.text = str(value)
187                return auth_n_statement
188            except PropertyMappingExpressionException as exc:
189                Event.new(
190                    EventAction.CONFIGURATION_ERROR,
191                    message=(
192                        "Failed to evaluate property-mapping: "
193                        f"'{self.provider.authn_context_class_ref_mapping.name}'"
194                    ),
195                    provider=self.provider,
196                    mapping=self.provider.authn_context_class_ref_mapping,
197                ).from_http(self.http_request)
198                LOGGER.warning("Failed to evaluate property mapping", exc=exc)
199                return auth_n_statement
200        return auth_n_statement

Generate AuthnStatement with AuthnContext and ContextClassRef Elements.

def get_assertion_conditions(self) -> lxml.etree.Element:
202    def get_assertion_conditions(self) -> Element:
203        """Generate Conditions with AudienceRestriction and Audience Elements."""
204        conditions = Element(f"{{{NS_SAML_ASSERTION}}}Conditions")
205        conditions.attrib["NotBefore"] = self._valid_not_before
206        conditions.attrib["NotOnOrAfter"] = self._valid_not_on_or_after
207        if self.provider.audience != "":
208            audience_restriction = SubElement(
209                conditions, f"{{{NS_SAML_ASSERTION}}}AudienceRestriction"
210            )
211            audience = SubElement(audience_restriction, f"{{{NS_SAML_ASSERTION}}}Audience")
212            audience.text = self.provider.audience
213        return conditions

Generate Conditions with AudienceRestriction and Audience Elements.

def get_name_id(self) -> lxml.etree.Element:
215    def get_name_id(self) -> Element:
216        """Get NameID Element"""
217        name_id = Element(f"{{{NS_SAML_ASSERTION}}}NameID")
218        # For requests that don't specify a NameIDPolicy, check if we
219        # can fall back to the provider default
220        if (
221            self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_UNSPECIFIED
222            and self.provider.default_name_id_policy != SAML_NAME_ID_FORMAT_UNSPECIFIED
223        ):
224            self.auth_n_request.name_id_policy = self.provider.default_name_id_policy
225        name_id.attrib["Format"] = self.auth_n_request.name_id_policy
226        self.name_id_format = self.auth_n_request.name_id_policy
227        # persistent is used as a fallback, so always generate it
228        persistent = self.http_request.user.uid
229        name_id.text = persistent
230        self.name_id = persistent
231        # If name_id_mapping is set, we override the value, regardless of what the SP asks for
232        if self.provider.name_id_mapping:
233            try:
234                value = self.provider.name_id_mapping.evaluate(
235                    user=self.http_request.user,
236                    request=self.http_request,
237                    provider=self.provider,
238                )
239                if value is not None:
240                    name_id.text = str(value)
241                    self.name_id = str(value)
242                return name_id
243            except PropertyMappingExpressionException as exc:
244                Event.new(
245                    EventAction.CONFIGURATION_ERROR,
246                    message=(
247                        "Failed to evaluate property-mapping: "
248                        f"'{self.provider.name_id_mapping.name}'",
249                    ),
250                    provider=self.provider,
251                    mapping=self.provider.name_id_mapping,
252                ).from_http(self.http_request)
253                LOGGER.warning("Failed to evaluate property mapping", exc=exc)
254                return name_id
255        if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_EMAIL:
256            name_id.text = self.http_request.user.email
257            self.name_id = self.http_request.user.email
258            return name_id
259        if self.auth_n_request.name_id_policy in [
260            SAML_NAME_ID_FORMAT_PERSISTENT,
261            SAML_NAME_ID_FORMAT_UNSPECIFIED,
262        ]:
263            name_id.text = persistent
264            self.name_id = persistent
265            return name_id
266        if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_X509:
267            # This attribute is statically set by the LDAP source
268            name_id.text = self.http_request.user.attributes.get(
269                LDAP_DISTINGUISHED_NAME, persistent
270            )
271            self.name_id = name_id.text
272            return name_id
273        if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_WINDOWS:
274            # This attribute is statically set by the LDAP source
275            name_id.text = self.http_request.user.attributes.get("upn", persistent)
276            self.name_id = name_id.text
277            return name_id
278        if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_TRANSIENT:
279            # Use the hash of the user's session, which changes every session
280            session_key: str = self.http_request.session.session_key
281            name_id.text = sha256(session_key.encode()).hexdigest()
282            self.name_id = name_id.text
283            return name_id
284        raise UnsupportedNameIDFormat(
285            "Assertion contains NameID with unsupported "
286            f"format {self.auth_n_request.name_id_policy}."
287        )

Get NameID Element

def get_assertion_subject(self) -> lxml.etree.Element:
289    def get_assertion_subject(self) -> Element:
290        """Generate Subject Element with NameID and SubjectConfirmation Objects"""
291        subject = Element(f"{{{NS_SAML_ASSERTION}}}Subject")
292        subject.append(self.get_name_id())
293
294        subject_confirmation = SubElement(subject, f"{{{NS_SAML_ASSERTION}}}SubjectConfirmation")
295        subject_confirmation.attrib["Method"] = "urn:oasis:names:tc:SAML:2.0:cm:bearer"
296
297        subject_confirmation_data = SubElement(
298            subject_confirmation, f"{{{NS_SAML_ASSERTION}}}SubjectConfirmationData"
299        )
300        if self.auth_n_request.id:
301            subject_confirmation_data.attrib["InResponseTo"] = self.auth_n_request.id
302        subject_confirmation_data.attrib["NotOnOrAfter"] = self._valid_not_on_or_after
303        subject_confirmation_data.attrib["Recipient"] = self.provider.acs_url
304        return subject

Generate Subject Element with NameID and SubjectConfirmation Objects

def get_assertion(self) -> lxml.etree.Element:
306    def get_assertion(self) -> Element:
307        """Generate Main Assertion Element"""
308        assertion = Element(f"{{{NS_SAML_ASSERTION}}}Assertion", nsmap=NS_MAP)
309        assertion.attrib["Version"] = "2.0"
310        assertion.attrib["ID"] = self._assertion_id
311        assertion.attrib["IssueInstant"] = self._issue_instant
312        assertion.append(self.get_issuer())
313
314        if self.provider.signing_kp and self.provider.sign_assertion:
315            sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get(
316                self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha1
317            )
318            signature = xmlsec.template.create(
319                assertion,
320                xmlsec.constants.TransformExclC14N,
321                sign_algorithm_transform,
322                ns=xmlsec.constants.DSigNs,
323            )
324            assertion.append(signature)
325
326        assertion.append(self.get_assertion_subject())
327        assertion.append(self.get_assertion_conditions())
328        assertion.append(self.get_assertion_auth_n_statement())
329
330        assertion.append(self.get_attributes())
331        return assertion

Generate Main Assertion Element

def get_response(self) -> lxml.etree.Element:
333    def get_response(self) -> Element:
334        """Generate Root response element"""
335        response = Element(f"{{{NS_SAML_PROTOCOL}}}Response", nsmap=NS_MAP)
336        response.attrib["Version"] = "2.0"
337        response.attrib["IssueInstant"] = self._issue_instant
338        response.attrib["Destination"] = self.provider.acs_url
339        response.attrib["ID"] = self._response_id
340        if self.auth_n_request.id:
341            response.attrib["InResponseTo"] = self.auth_n_request.id
342
343        response.append(self.get_issuer())
344
345        if self.provider.signing_kp and self.provider.sign_response:
346            sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get(
347                self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha1
348            )
349            signature = xmlsec.template.create(
350                response,
351                xmlsec.constants.TransformExclC14N,
352                sign_algorithm_transform,
353                ns=xmlsec.constants.DSigNs,
354            )
355            response.append(signature)
356
357        status = SubElement(response, f"{{{NS_SAML_PROTOCOL}}}Status")
358        status_code = SubElement(status, f"{{{NS_SAML_PROTOCOL}}}StatusCode")
359        status_code.attrib["Value"] = "urn:oasis:names:tc:SAML:2.0:status:Success"
360
361        response.append(self.get_assertion())
362        return response

Generate Root response element

def build_response(self) -> str:
435    def build_response(self) -> str:
436        """Build string XML Response and sign if signing is enabled."""
437        root_response = self.get_response()
438        # Sign assertion first (before encryption)
439        if self.provider.signing_kp and self.provider.sign_assertion:
440            assertion = root_response.xpath("//saml:Assertion", namespaces=NS_MAP)[0]
441            self._sign(assertion)
442        # Encrypt assertion (this replaces Assertion with EncryptedAssertion)
443        if self.provider.encryption_kp:
444            assertion = root_response.xpath("//saml:Assertion", namespaces=NS_MAP)[0]
445            self._encrypt(assertion, root_response)
446        # Sign response AFTER encryption so signature covers the encrypted content
447        if self.provider.signing_kp and self.provider.sign_response:
448            response = root_response.xpath("//samlp:Response", namespaces=NS_MAP)[0]
449            self._sign(response)
450        return etree.tostring(root_response).decode("utf-8")  # nosec

Build string XML Response and sign if signing is enabled.