authentik.providers.saml.processors.assertion

SAML Assertion generator

  1"""SAML Assertion generator"""
  2
  3from datetime import datetime
  4from hashlib import sha256
  5from types import GeneratorType
  6
  7import xmlsec
  8from django.http import HttpRequest
  9from django.urls import reverse
 10from django.utils.timezone import now
 11from lxml import etree  # nosec
 12from lxml.etree import Element, SubElement, _Element  # nosec
 13from structlog.stdlib import get_logger
 14
 15from authentik.common.saml.constants import (
 16    DIGEST_ALGORITHM_TRANSLATION_MAP,
 17    NS_MAP,
 18    NS_SAML_ASSERTION,
 19    NS_SAML_PROTOCOL,
 20    SAML_NAME_ID_FORMAT_EMAIL,
 21    SAML_NAME_ID_FORMAT_PERSISTENT,
 22    SAML_NAME_ID_FORMAT_TRANSIENT,
 23    SAML_NAME_ID_FORMAT_UNSPECIFIED,
 24    SAML_NAME_ID_FORMAT_WINDOWS,
 25    SAML_NAME_ID_FORMAT_X509,
 26    SIGN_ALGORITHM_TRANSFORM_MAP,
 27)
 28from authentik.core.expression.exceptions import PropertyMappingExpressionException
 29from authentik.events.models import Event, EventAction
 30from authentik.events.signals import get_login_event
 31from authentik.lib.utils.time import timedelta_from_string
 32from authentik.lib.xml import remove_xml_newlines
 33from authentik.providers.saml.models import SAMLPropertyMapping, SAMLProvider
 34from authentik.providers.saml.processors.authn_request_parser import AuthNRequest
 35from authentik.providers.saml.utils import get_random_id
 36from authentik.providers.saml.utils.time import get_time_string
 37from authentik.sources.ldap.auth import LDAP_DISTINGUISHED_NAME
 38from authentik.sources.saml.exceptions import (
 39    InvalidEncryption,
 40    InvalidSignature,
 41    UnsupportedNameIDFormat,
 42)
 43from authentik.stages.password.stage import PLAN_CONTEXT_METHOD, PLAN_CONTEXT_METHOD_ARGS
 44
 45LOGGER = get_logger()
 46
 47
 48class AssertionProcessor:
 49    """Generate a SAML Response from an AuthNRequest"""
 50
 51    provider: SAMLProvider
 52    http_request: HttpRequest
 53    auth_n_request: AuthNRequest
 54
 55    _issue_instant: str
 56    _assertion_id: str
 57    _response_id: str
 58
 59    _auth_instant: str
 60    _valid_not_before: str
 61    _session_not_on_or_after: str
 62    _valid_not_on_or_after: str
 63
 64    session_index: str
 65    name_id: str
 66    name_id_format: str
 67    issuer: str
 68    session_not_on_or_after_datetime: datetime
 69
 70    def __init__(self, provider: SAMLProvider, request: HttpRequest, auth_n_request: AuthNRequest):
 71        self.provider = provider
 72        self.http_request = request
 73        self.auth_n_request = auth_n_request
 74
 75        self._issue_instant = get_time_string()
 76        self._assertion_id = get_random_id()
 77        self._response_id = get_random_id()
 78
 79        _login_event = get_login_event(self.http_request)
 80        _login_time = now()
 81        if _login_event:
 82            _login_time = _login_event.created
 83        self._auth_instant = get_time_string(_login_time)
 84        self._valid_not_before = get_time_string(
 85            timedelta_from_string(self.provider.assertion_valid_not_before)
 86        )
 87        self.session_not_on_or_after_datetime = now() + timedelta_from_string(
 88            self.provider.session_valid_not_on_or_after
 89        )
 90        self._session_not_on_or_after = get_time_string(self.session_not_on_or_after_datetime)
 91        self._valid_not_on_or_after = get_time_string(
 92            timedelta_from_string(self.provider.assertion_valid_not_on_or_after)
 93        )
 94
 95    def get_attributes(self) -> Element:
 96        """Get AttributeStatement Element with Attributes from Property Mappings."""
 97        # https://commons.lbl.gov/display/IDMgmt/Attribute+Definitions
 98        attribute_statement = Element(f"{{{NS_SAML_ASSERTION}}}AttributeStatement")
 99        user = self.http_request.user
100        for mapping in SAMLPropertyMapping.objects.filter(provider=self.provider).order_by(
101            "saml_name"
102        ):
103            try:
104                mapping: SAMLPropertyMapping
105                value = mapping.evaluate(
106                    user=user,
107                    request=self.http_request,
108                    provider=self.provider,
109                )
110                if value is None:
111                    continue
112
113                attribute = Element(f"{{{NS_SAML_ASSERTION}}}Attribute")
114                if mapping.friendly_name and mapping.friendly_name != "":
115                    attribute.attrib["FriendlyName"] = mapping.friendly_name
116                attribute.attrib["Name"] = mapping.saml_name
117
118                if not isinstance(value, list | GeneratorType):
119                    value = [value]
120
121                for value_item in value:
122                    attribute_value = SubElement(
123                        attribute, f"{{{NS_SAML_ASSERTION}}}AttributeValue"
124                    )
125                    str_value = str(value_item) if not isinstance(value_item, str) else value_item
126                    attribute_value.text = str_value
127
128                attribute_statement.append(attribute)
129
130            except (PropertyMappingExpressionException, ValueError) as exc:
131                # Value error can be raised when assigning invalid data to an attribute
132                Event.new(
133                    EventAction.CONFIGURATION_ERROR,
134                    message=f"Failed to evaluate property-mapping: '{mapping.name}'",
135                    provider=self.provider,
136                    mapping=mapping,
137                ).from_http(self.http_request)
138                LOGGER.warning("Failed to evaluate property mapping", exc=exc)
139                continue
140        return attribute_statement
141
142    def _get_issuer_value(self) -> str:
143        """Get issuer value, with fallback to generated URL if empty"""
144        # If user has set an override issuer, use it
145        if self.provider.issuer_override:
146            return self.provider.issuer_override
147
148        return self.http_request.build_absolute_uri(
149            reverse(
150                "authentik_providers_saml:metadata-download",
151                kwargs={"application_slug": self.provider.application.slug},
152            )
153        )
154
155    def get_issuer(self) -> Element:
156        """Get Issuer Element"""
157        issuer = Element(f"{{{NS_SAML_ASSERTION}}}Issuer", nsmap=NS_MAP)
158        self.issuer = self._get_issuer_value()
159        issuer.text = self.issuer
160        return issuer
161
162    def get_assertion_auth_n_statement(self) -> Element:
163        """Generate AuthnStatement with AuthnContext and ContextClassRef Elements."""
164        auth_n_statement = Element(f"{{{NS_SAML_ASSERTION}}}AuthnStatement")
165        auth_n_statement.attrib["AuthnInstant"] = self._auth_instant
166        self.session_index = sha256(
167            self.http_request.session.session_key.encode("ascii")
168        ).hexdigest()
169        auth_n_statement.attrib["SessionIndex"] = self.session_index
170        auth_n_statement.attrib["SessionNotOnOrAfter"] = self._session_not_on_or_after
171
172        auth_n_context = SubElement(auth_n_statement, f"{{{NS_SAML_ASSERTION}}}AuthnContext")
173        auth_n_context_class_ref = SubElement(
174            auth_n_context, f"{{{NS_SAML_ASSERTION}}}AuthnContextClassRef"
175        )
176        auth_n_context_class_ref.text = "urn:oasis:names:tc:SAML:2.0:ac:classes:unspecified"
177        event = get_login_event(self.http_request)
178        if event:
179            method = event.context.get(PLAN_CONTEXT_METHOD, "")
180            method_args = event.context.get(PLAN_CONTEXT_METHOD_ARGS, {})
181            if method == "password":
182                auth_n_context_class_ref.text = (
183                    "urn:oasis:names:tc:SAML:2.0:ac:classes:PasswordProtectedTransport"
184                )
185            if "mfa_devices" in method_args:
186                auth_n_context_class_ref.text = (
187                    "urn:oasis:names:tc:SAML:2.0:ac:classes:MobileTwoFactorContract"
188                )
189            if method in ["auth_mfa", "auth_webauthn_pwl"]:
190                auth_n_context_class_ref.text = (
191                    "urn:oasis:names:tc:SAML:2.0:ac:classes:MobileOneFactorContract"
192                )
193        if self.provider.authn_context_class_ref_mapping:
194            try:
195                value = self.provider.authn_context_class_ref_mapping.evaluate(
196                    user=self.http_request.user,
197                    request=self.http_request,
198                    provider=self.provider,
199                )
200                if value is not None:
201                    auth_n_context_class_ref.text = str(value)
202                return auth_n_statement
203            except PropertyMappingExpressionException as exc:
204                Event.new(
205                    EventAction.CONFIGURATION_ERROR,
206                    message=(
207                        "Failed to evaluate property-mapping: "
208                        f"'{self.provider.authn_context_class_ref_mapping.name}'"
209                    ),
210                    provider=self.provider,
211                    mapping=self.provider.authn_context_class_ref_mapping,
212                ).from_http(self.http_request)
213                LOGGER.warning("Failed to evaluate property mapping", exc=exc)
214                return auth_n_statement
215        return auth_n_statement
216
217    def get_assertion_conditions(self) -> Element:
218        """Generate Conditions with AudienceRestriction and Audience Elements."""
219        conditions = Element(f"{{{NS_SAML_ASSERTION}}}Conditions")
220        conditions.attrib["NotBefore"] = self._valid_not_before
221        conditions.attrib["NotOnOrAfter"] = self._valid_not_on_or_after
222        if self.provider.audience != "":
223            audience_restriction = SubElement(
224                conditions, f"{{{NS_SAML_ASSERTION}}}AudienceRestriction"
225            )
226            audience = SubElement(audience_restriction, f"{{{NS_SAML_ASSERTION}}}Audience")
227            audience.text = self.provider.audience
228        return conditions
229
230    def get_name_id(self) -> Element:
231        """Get NameID Element"""
232        name_id = Element(f"{{{NS_SAML_ASSERTION}}}NameID")
233        # For requests that don't specify a NameIDPolicy, check if we
234        # can fall back to the provider default
235        if (
236            self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_UNSPECIFIED
237            and self.provider.default_name_id_policy != SAML_NAME_ID_FORMAT_UNSPECIFIED
238        ):
239            self.auth_n_request.name_id_policy = self.provider.default_name_id_policy
240        name_id.attrib["Format"] = self.auth_n_request.name_id_policy
241        self.name_id_format = self.auth_n_request.name_id_policy
242        # persistent is used as a fallback, so always generate it
243        persistent = self.http_request.user.uid
244        name_id.text = persistent
245        self.name_id = persistent
246        # If name_id_mapping is set, we override the value, regardless of what the SP asks for
247        if self.provider.name_id_mapping:
248            try:
249                value = self.provider.name_id_mapping.evaluate(
250                    user=self.http_request.user,
251                    request=self.http_request,
252                    provider=self.provider,
253                )
254                if value is not None:
255                    name_id.text = str(value)
256                    self.name_id = str(value)
257                return name_id
258            except PropertyMappingExpressionException as exc:
259                Event.new(
260                    EventAction.CONFIGURATION_ERROR,
261                    message=(
262                        "Failed to evaluate property-mapping: "
263                        f"'{self.provider.name_id_mapping.name}'",
264                    ),
265                    provider=self.provider,
266                    mapping=self.provider.name_id_mapping,
267                ).from_http(self.http_request)
268                LOGGER.warning("Failed to evaluate property mapping", exc=exc)
269                return name_id
270        if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_EMAIL:
271            name_id.text = self.http_request.user.email
272            self.name_id = self.http_request.user.email
273            return name_id
274        if self.auth_n_request.name_id_policy in [
275            SAML_NAME_ID_FORMAT_PERSISTENT,
276            SAML_NAME_ID_FORMAT_UNSPECIFIED,
277        ]:
278            name_id.text = persistent
279            self.name_id = persistent
280            return name_id
281        if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_X509:
282            # This attribute is statically set by the LDAP source
283            name_id.text = self.http_request.user.attributes.get(
284                LDAP_DISTINGUISHED_NAME, persistent
285            )
286            self.name_id = name_id.text
287            return name_id
288        if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_WINDOWS:
289            # This attribute is statically set by the LDAP source
290            name_id.text = self.http_request.user.attributes.get("upn", persistent)
291            self.name_id = name_id.text
292            return name_id
293        if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_TRANSIENT:
294            # Use the hash of the user's session, which changes every session
295            session_key: str = self.http_request.session.session_key
296            name_id.text = sha256(session_key.encode()).hexdigest()
297            self.name_id = name_id.text
298            return name_id
299        raise UnsupportedNameIDFormat(
300            "Assertion contains NameID with unsupported "
301            f"format {self.auth_n_request.name_id_policy}."
302        )
303
304    def get_assertion_subject(self) -> Element:
305        """Generate Subject Element with NameID and SubjectConfirmation Objects"""
306        subject = Element(f"{{{NS_SAML_ASSERTION}}}Subject")
307        subject.append(self.get_name_id())
308
309        subject_confirmation = SubElement(subject, f"{{{NS_SAML_ASSERTION}}}SubjectConfirmation")
310        subject_confirmation.attrib["Method"] = "urn:oasis:names:tc:SAML:2.0:cm:bearer"
311
312        subject_confirmation_data = SubElement(
313            subject_confirmation, f"{{{NS_SAML_ASSERTION}}}SubjectConfirmationData"
314        )
315        if self.auth_n_request.id:
316            subject_confirmation_data.attrib["InResponseTo"] = self.auth_n_request.id
317        subject_confirmation_data.attrib["NotOnOrAfter"] = self._valid_not_on_or_after
318        subject_confirmation_data.attrib["Recipient"] = self.provider.acs_url
319        return subject
320
321    def get_assertion(self) -> Element:
322        """Generate Main Assertion Element"""
323        assertion = Element(f"{{{NS_SAML_ASSERTION}}}Assertion", nsmap=NS_MAP)
324        assertion.attrib["Version"] = "2.0"
325        assertion.attrib["ID"] = self._assertion_id
326        assertion.attrib["IssueInstant"] = self._issue_instant
327        assertion.append(self.get_issuer())
328
329        if self.provider.signing_kp and self.provider.sign_assertion:
330            sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get(
331                self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha1
332            )
333            signature = xmlsec.template.create(
334                assertion,
335                xmlsec.constants.TransformExclC14N,
336                sign_algorithm_transform,
337                ns=xmlsec.constants.DSigNs,
338            )
339            assertion.append(signature)
340
341        assertion.append(self.get_assertion_subject())
342        assertion.append(self.get_assertion_conditions())
343        assertion.append(self.get_assertion_auth_n_statement())
344
345        assertion.append(self.get_attributes())
346        return assertion
347
348    def get_response(self) -> Element:
349        """Generate Root response element"""
350        response = Element(f"{{{NS_SAML_PROTOCOL}}}Response", nsmap=NS_MAP)
351        response.attrib["Version"] = "2.0"
352        response.attrib["IssueInstant"] = self._issue_instant
353        response.attrib["Destination"] = self.provider.acs_url
354        response.attrib["ID"] = self._response_id
355        if self.auth_n_request.id:
356            response.attrib["InResponseTo"] = self.auth_n_request.id
357
358        response.append(self.get_issuer())
359
360        if self.provider.signing_kp and self.provider.sign_response:
361            sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get(
362                self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha1
363            )
364            signature = xmlsec.template.create(
365                response,
366                xmlsec.constants.TransformExclC14N,
367                sign_algorithm_transform,
368                ns=xmlsec.constants.DSigNs,
369            )
370            response.append(signature)
371
372        status = SubElement(response, f"{{{NS_SAML_PROTOCOL}}}Status")
373        status_code = SubElement(status, f"{{{NS_SAML_PROTOCOL}}}StatusCode")
374        status_code.attrib["Value"] = "urn:oasis:names:tc:SAML:2.0:status:Success"
375
376        response.append(self.get_assertion())
377        return response
378
379    def _sign(self, element: _Element):
380        """Sign an XML element based on the providers' configured signing settings"""
381        digest_algorithm_transform = DIGEST_ALGORITHM_TRANSLATION_MAP.get(
382            self.provider.digest_algorithm, xmlsec.constants.TransformSha1
383        )
384        xmlsec.tree.add_ids(element, ["ID"])
385        signature_node = xmlsec.tree.find_node(element, xmlsec.constants.NodeSignature)
386        ref = xmlsec.template.add_reference(
387            signature_node,
388            digest_algorithm_transform,
389            uri="#" + element.attrib["ID"],
390        )
391        xmlsec.template.add_transform(ref, xmlsec.constants.TransformEnveloped)
392        xmlsec.template.add_transform(ref, xmlsec.constants.TransformExclC14N)
393        key_info = xmlsec.template.ensure_key_info(signature_node)
394        xmlsec.template.add_x509_data(key_info)
395
396        ctx = xmlsec.SignatureContext()
397
398        key = xmlsec.Key.from_memory(
399            self.provider.signing_kp.key_data,
400            xmlsec.constants.KeyDataFormatPem,
401            None,
402        )
403        key.load_cert_from_memory(
404            self.provider.signing_kp.certificate_data,
405            xmlsec.constants.KeyDataFormatCertPem,
406        )
407        ctx.key = key
408        try:
409            ctx.sign(remove_xml_newlines(element, signature_node))
410        except xmlsec.Error as exc:
411            raise InvalidSignature() from exc
412
413    def _encrypt(self, element: _Element, parent: _Element):
414        """Encrypt SAMLResponse EncryptedAssertion Element"""
415        # Create a standalone copy so namespace declarations are included in the encrypted content
416        element_xml = etree.tostring(element)
417        standalone_element = etree.fromstring(element_xml)
418
419        # Remove the original element from the tree since we're replacing it with encrypted version
420        parent.remove(element)
421
422        manager = xmlsec.KeysManager()
423        key = xmlsec.Key.from_memory(
424            self.provider.encryption_kp.certificate_data,
425            xmlsec.constants.KeyDataFormatCertPem,
426        )
427
428        manager.add_key(key)
429        encryption_context = xmlsec.EncryptionContext(manager)
430        encryption_context.key = xmlsec.Key.generate(
431            xmlsec.constants.KeyDataAes, 128, xmlsec.constants.KeyDataTypeSession
432        )
433
434        container = SubElement(parent, f"{{{NS_SAML_ASSERTION}}}EncryptedAssertion")
435        enc_data = xmlsec.template.encrypted_data_create(
436            container, xmlsec.Transform.AES128, type=xmlsec.EncryptionType.ELEMENT, ns="xenc"
437        )
438        xmlsec.template.encrypted_data_ensure_cipher_value(enc_data)
439        key_info = xmlsec.template.encrypted_data_ensure_key_info(enc_data, ns="ds")
440        enc_key = xmlsec.template.add_encrypted_key(key_info, xmlsec.Transform.RSA_OAEP)
441        xmlsec.template.encrypted_data_ensure_cipher_value(enc_key)
442
443        try:
444            enc_data = encryption_context.encrypt_xml(enc_data, standalone_element)
445        except xmlsec.Error as exc:
446            raise InvalidEncryption() from exc
447
448        container.append(enc_data)
449
450    def build_response(self) -> str:
451        """Build string XML Response and sign if signing is enabled."""
452        root_response = self.get_response()
453        # Sign assertion first (before encryption)
454        if self.provider.signing_kp and self.provider.sign_assertion:
455            assertion = root_response.xpath("//saml:Assertion", namespaces=NS_MAP)[0]
456            self._sign(assertion)
457        # Encrypt assertion (this replaces Assertion with EncryptedAssertion)
458        if self.provider.encryption_kp:
459            assertion = root_response.xpath("//saml:Assertion", namespaces=NS_MAP)[0]
460            self._encrypt(assertion, root_response)
461        # Sign response AFTER encryption so signature covers the encrypted content
462        if self.provider.signing_kp and self.provider.sign_response:
463            response = root_response.xpath("//samlp:Response", namespaces=NS_MAP)[0]
464            self._sign(response)
465        return etree.tostring(root_response).decode("utf-8")  # nosec
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
class AssertionProcessor:
 49class AssertionProcessor:
 50    """Generate a SAML Response from an AuthNRequest"""
 51
 52    provider: SAMLProvider
 53    http_request: HttpRequest
 54    auth_n_request: AuthNRequest
 55
 56    _issue_instant: str
 57    _assertion_id: str
 58    _response_id: str
 59
 60    _auth_instant: str
 61    _valid_not_before: str
 62    _session_not_on_or_after: str
 63    _valid_not_on_or_after: str
 64
 65    session_index: str
 66    name_id: str
 67    name_id_format: str
 68    issuer: str
 69    session_not_on_or_after_datetime: datetime
 70
 71    def __init__(self, provider: SAMLProvider, request: HttpRequest, auth_n_request: AuthNRequest):
 72        self.provider = provider
 73        self.http_request = request
 74        self.auth_n_request = auth_n_request
 75
 76        self._issue_instant = get_time_string()
 77        self._assertion_id = get_random_id()
 78        self._response_id = get_random_id()
 79
 80        _login_event = get_login_event(self.http_request)
 81        _login_time = now()
 82        if _login_event:
 83            _login_time = _login_event.created
 84        self._auth_instant = get_time_string(_login_time)
 85        self._valid_not_before = get_time_string(
 86            timedelta_from_string(self.provider.assertion_valid_not_before)
 87        )
 88        self.session_not_on_or_after_datetime = now() + timedelta_from_string(
 89            self.provider.session_valid_not_on_or_after
 90        )
 91        self._session_not_on_or_after = get_time_string(self.session_not_on_or_after_datetime)
 92        self._valid_not_on_or_after = get_time_string(
 93            timedelta_from_string(self.provider.assertion_valid_not_on_or_after)
 94        )
 95
 96    def get_attributes(self) -> Element:
 97        """Get AttributeStatement Element with Attributes from Property Mappings."""
 98        # https://commons.lbl.gov/display/IDMgmt/Attribute+Definitions
 99        attribute_statement = Element(f"{{{NS_SAML_ASSERTION}}}AttributeStatement")
100        user = self.http_request.user
101        for mapping in SAMLPropertyMapping.objects.filter(provider=self.provider).order_by(
102            "saml_name"
103        ):
104            try:
105                mapping: SAMLPropertyMapping
106                value = mapping.evaluate(
107                    user=user,
108                    request=self.http_request,
109                    provider=self.provider,
110                )
111                if value is None:
112                    continue
113
114                attribute = Element(f"{{{NS_SAML_ASSERTION}}}Attribute")
115                if mapping.friendly_name and mapping.friendly_name != "":
116                    attribute.attrib["FriendlyName"] = mapping.friendly_name
117                attribute.attrib["Name"] = mapping.saml_name
118
119                if not isinstance(value, list | GeneratorType):
120                    value = [value]
121
122                for value_item in value:
123                    attribute_value = SubElement(
124                        attribute, f"{{{NS_SAML_ASSERTION}}}AttributeValue"
125                    )
126                    str_value = str(value_item) if not isinstance(value_item, str) else value_item
127                    attribute_value.text = str_value
128
129                attribute_statement.append(attribute)
130
131            except (PropertyMappingExpressionException, ValueError) as exc:
132                # Value error can be raised when assigning invalid data to an attribute
133                Event.new(
134                    EventAction.CONFIGURATION_ERROR,
135                    message=f"Failed to evaluate property-mapping: '{mapping.name}'",
136                    provider=self.provider,
137                    mapping=mapping,
138                ).from_http(self.http_request)
139                LOGGER.warning("Failed to evaluate property mapping", exc=exc)
140                continue
141        return attribute_statement
142
143    def _get_issuer_value(self) -> str:
144        """Get issuer value, with fallback to generated URL if empty"""
145        # If user has set an override issuer, use it
146        if self.provider.issuer_override:
147            return self.provider.issuer_override
148
149        return self.http_request.build_absolute_uri(
150            reverse(
151                "authentik_providers_saml:metadata-download",
152                kwargs={"application_slug": self.provider.application.slug},
153            )
154        )
155
156    def get_issuer(self) -> Element:
157        """Get Issuer Element"""
158        issuer = Element(f"{{{NS_SAML_ASSERTION}}}Issuer", nsmap=NS_MAP)
159        self.issuer = self._get_issuer_value()
160        issuer.text = self.issuer
161        return issuer
162
163    def get_assertion_auth_n_statement(self) -> Element:
164        """Generate AuthnStatement with AuthnContext and ContextClassRef Elements."""
165        auth_n_statement = Element(f"{{{NS_SAML_ASSERTION}}}AuthnStatement")
166        auth_n_statement.attrib["AuthnInstant"] = self._auth_instant
167        self.session_index = sha256(
168            self.http_request.session.session_key.encode("ascii")
169        ).hexdigest()
170        auth_n_statement.attrib["SessionIndex"] = self.session_index
171        auth_n_statement.attrib["SessionNotOnOrAfter"] = self._session_not_on_or_after
172
173        auth_n_context = SubElement(auth_n_statement, f"{{{NS_SAML_ASSERTION}}}AuthnContext")
174        auth_n_context_class_ref = SubElement(
175            auth_n_context, f"{{{NS_SAML_ASSERTION}}}AuthnContextClassRef"
176        )
177        auth_n_context_class_ref.text = "urn:oasis:names:tc:SAML:2.0:ac:classes:unspecified"
178        event = get_login_event(self.http_request)
179        if event:
180            method = event.context.get(PLAN_CONTEXT_METHOD, "")
181            method_args = event.context.get(PLAN_CONTEXT_METHOD_ARGS, {})
182            if method == "password":
183                auth_n_context_class_ref.text = (
184                    "urn:oasis:names:tc:SAML:2.0:ac:classes:PasswordProtectedTransport"
185                )
186            if "mfa_devices" in method_args:
187                auth_n_context_class_ref.text = (
188                    "urn:oasis:names:tc:SAML:2.0:ac:classes:MobileTwoFactorContract"
189                )
190            if method in ["auth_mfa", "auth_webauthn_pwl"]:
191                auth_n_context_class_ref.text = (
192                    "urn:oasis:names:tc:SAML:2.0:ac:classes:MobileOneFactorContract"
193                )
194        if self.provider.authn_context_class_ref_mapping:
195            try:
196                value = self.provider.authn_context_class_ref_mapping.evaluate(
197                    user=self.http_request.user,
198                    request=self.http_request,
199                    provider=self.provider,
200                )
201                if value is not None:
202                    auth_n_context_class_ref.text = str(value)
203                return auth_n_statement
204            except PropertyMappingExpressionException as exc:
205                Event.new(
206                    EventAction.CONFIGURATION_ERROR,
207                    message=(
208                        "Failed to evaluate property-mapping: "
209                        f"'{self.provider.authn_context_class_ref_mapping.name}'"
210                    ),
211                    provider=self.provider,
212                    mapping=self.provider.authn_context_class_ref_mapping,
213                ).from_http(self.http_request)
214                LOGGER.warning("Failed to evaluate property mapping", exc=exc)
215                return auth_n_statement
216        return auth_n_statement
217
218    def get_assertion_conditions(self) -> Element:
219        """Generate Conditions with AudienceRestriction and Audience Elements."""
220        conditions = Element(f"{{{NS_SAML_ASSERTION}}}Conditions")
221        conditions.attrib["NotBefore"] = self._valid_not_before
222        conditions.attrib["NotOnOrAfter"] = self._valid_not_on_or_after
223        if self.provider.audience != "":
224            audience_restriction = SubElement(
225                conditions, f"{{{NS_SAML_ASSERTION}}}AudienceRestriction"
226            )
227            audience = SubElement(audience_restriction, f"{{{NS_SAML_ASSERTION}}}Audience")
228            audience.text = self.provider.audience
229        return conditions
230
231    def get_name_id(self) -> Element:
232        """Get NameID Element"""
233        name_id = Element(f"{{{NS_SAML_ASSERTION}}}NameID")
234        # For requests that don't specify a NameIDPolicy, check if we
235        # can fall back to the provider default
236        if (
237            self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_UNSPECIFIED
238            and self.provider.default_name_id_policy != SAML_NAME_ID_FORMAT_UNSPECIFIED
239        ):
240            self.auth_n_request.name_id_policy = self.provider.default_name_id_policy
241        name_id.attrib["Format"] = self.auth_n_request.name_id_policy
242        self.name_id_format = self.auth_n_request.name_id_policy
243        # persistent is used as a fallback, so always generate it
244        persistent = self.http_request.user.uid
245        name_id.text = persistent
246        self.name_id = persistent
247        # If name_id_mapping is set, we override the value, regardless of what the SP asks for
248        if self.provider.name_id_mapping:
249            try:
250                value = self.provider.name_id_mapping.evaluate(
251                    user=self.http_request.user,
252                    request=self.http_request,
253                    provider=self.provider,
254                )
255                if value is not None:
256                    name_id.text = str(value)
257                    self.name_id = str(value)
258                return name_id
259            except PropertyMappingExpressionException as exc:
260                Event.new(
261                    EventAction.CONFIGURATION_ERROR,
262                    message=(
263                        "Failed to evaluate property-mapping: "
264                        f"'{self.provider.name_id_mapping.name}'",
265                    ),
266                    provider=self.provider,
267                    mapping=self.provider.name_id_mapping,
268                ).from_http(self.http_request)
269                LOGGER.warning("Failed to evaluate property mapping", exc=exc)
270                return name_id
271        if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_EMAIL:
272            name_id.text = self.http_request.user.email
273            self.name_id = self.http_request.user.email
274            return name_id
275        if self.auth_n_request.name_id_policy in [
276            SAML_NAME_ID_FORMAT_PERSISTENT,
277            SAML_NAME_ID_FORMAT_UNSPECIFIED,
278        ]:
279            name_id.text = persistent
280            self.name_id = persistent
281            return name_id
282        if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_X509:
283            # This attribute is statically set by the LDAP source
284            name_id.text = self.http_request.user.attributes.get(
285                LDAP_DISTINGUISHED_NAME, persistent
286            )
287            self.name_id = name_id.text
288            return name_id
289        if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_WINDOWS:
290            # This attribute is statically set by the LDAP source
291            name_id.text = self.http_request.user.attributes.get("upn", persistent)
292            self.name_id = name_id.text
293            return name_id
294        if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_TRANSIENT:
295            # Use the hash of the user's session, which changes every session
296            session_key: str = self.http_request.session.session_key
297            name_id.text = sha256(session_key.encode()).hexdigest()
298            self.name_id = name_id.text
299            return name_id
300        raise UnsupportedNameIDFormat(
301            "Assertion contains NameID with unsupported "
302            f"format {self.auth_n_request.name_id_policy}."
303        )
304
305    def get_assertion_subject(self) -> Element:
306        """Generate Subject Element with NameID and SubjectConfirmation Objects"""
307        subject = Element(f"{{{NS_SAML_ASSERTION}}}Subject")
308        subject.append(self.get_name_id())
309
310        subject_confirmation = SubElement(subject, f"{{{NS_SAML_ASSERTION}}}SubjectConfirmation")
311        subject_confirmation.attrib["Method"] = "urn:oasis:names:tc:SAML:2.0:cm:bearer"
312
313        subject_confirmation_data = SubElement(
314            subject_confirmation, f"{{{NS_SAML_ASSERTION}}}SubjectConfirmationData"
315        )
316        if self.auth_n_request.id:
317            subject_confirmation_data.attrib["InResponseTo"] = self.auth_n_request.id
318        subject_confirmation_data.attrib["NotOnOrAfter"] = self._valid_not_on_or_after
319        subject_confirmation_data.attrib["Recipient"] = self.provider.acs_url
320        return subject
321
322    def get_assertion(self) -> Element:
323        """Generate Main Assertion Element"""
324        assertion = Element(f"{{{NS_SAML_ASSERTION}}}Assertion", nsmap=NS_MAP)
325        assertion.attrib["Version"] = "2.0"
326        assertion.attrib["ID"] = self._assertion_id
327        assertion.attrib["IssueInstant"] = self._issue_instant
328        assertion.append(self.get_issuer())
329
330        if self.provider.signing_kp and self.provider.sign_assertion:
331            sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get(
332                self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha1
333            )
334            signature = xmlsec.template.create(
335                assertion,
336                xmlsec.constants.TransformExclC14N,
337                sign_algorithm_transform,
338                ns=xmlsec.constants.DSigNs,
339            )
340            assertion.append(signature)
341
342        assertion.append(self.get_assertion_subject())
343        assertion.append(self.get_assertion_conditions())
344        assertion.append(self.get_assertion_auth_n_statement())
345
346        assertion.append(self.get_attributes())
347        return assertion
348
349    def get_response(self) -> Element:
350        """Generate Root response element"""
351        response = Element(f"{{{NS_SAML_PROTOCOL}}}Response", nsmap=NS_MAP)
352        response.attrib["Version"] = "2.0"
353        response.attrib["IssueInstant"] = self._issue_instant
354        response.attrib["Destination"] = self.provider.acs_url
355        response.attrib["ID"] = self._response_id
356        if self.auth_n_request.id:
357            response.attrib["InResponseTo"] = self.auth_n_request.id
358
359        response.append(self.get_issuer())
360
361        if self.provider.signing_kp and self.provider.sign_response:
362            sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get(
363                self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha1
364            )
365            signature = xmlsec.template.create(
366                response,
367                xmlsec.constants.TransformExclC14N,
368                sign_algorithm_transform,
369                ns=xmlsec.constants.DSigNs,
370            )
371            response.append(signature)
372
373        status = SubElement(response, f"{{{NS_SAML_PROTOCOL}}}Status")
374        status_code = SubElement(status, f"{{{NS_SAML_PROTOCOL}}}StatusCode")
375        status_code.attrib["Value"] = "urn:oasis:names:tc:SAML:2.0:status:Success"
376
377        response.append(self.get_assertion())
378        return response
379
380    def _sign(self, element: _Element):
381        """Sign an XML element based on the providers' configured signing settings"""
382        digest_algorithm_transform = DIGEST_ALGORITHM_TRANSLATION_MAP.get(
383            self.provider.digest_algorithm, xmlsec.constants.TransformSha1
384        )
385        xmlsec.tree.add_ids(element, ["ID"])
386        signature_node = xmlsec.tree.find_node(element, xmlsec.constants.NodeSignature)
387        ref = xmlsec.template.add_reference(
388            signature_node,
389            digest_algorithm_transform,
390            uri="#" + element.attrib["ID"],
391        )
392        xmlsec.template.add_transform(ref, xmlsec.constants.TransformEnveloped)
393        xmlsec.template.add_transform(ref, xmlsec.constants.TransformExclC14N)
394        key_info = xmlsec.template.ensure_key_info(signature_node)
395        xmlsec.template.add_x509_data(key_info)
396
397        ctx = xmlsec.SignatureContext()
398
399        key = xmlsec.Key.from_memory(
400            self.provider.signing_kp.key_data,
401            xmlsec.constants.KeyDataFormatPem,
402            None,
403        )
404        key.load_cert_from_memory(
405            self.provider.signing_kp.certificate_data,
406            xmlsec.constants.KeyDataFormatCertPem,
407        )
408        ctx.key = key
409        try:
410            ctx.sign(remove_xml_newlines(element, signature_node))
411        except xmlsec.Error as exc:
412            raise InvalidSignature() from exc
413
414    def _encrypt(self, element: _Element, parent: _Element):
415        """Encrypt SAMLResponse EncryptedAssertion Element"""
416        # Create a standalone copy so namespace declarations are included in the encrypted content
417        element_xml = etree.tostring(element)
418        standalone_element = etree.fromstring(element_xml)
419
420        # Remove the original element from the tree since we're replacing it with encrypted version
421        parent.remove(element)
422
423        manager = xmlsec.KeysManager()
424        key = xmlsec.Key.from_memory(
425            self.provider.encryption_kp.certificate_data,
426            xmlsec.constants.KeyDataFormatCertPem,
427        )
428
429        manager.add_key(key)
430        encryption_context = xmlsec.EncryptionContext(manager)
431        encryption_context.key = xmlsec.Key.generate(
432            xmlsec.constants.KeyDataAes, 128, xmlsec.constants.KeyDataTypeSession
433        )
434
435        container = SubElement(parent, f"{{{NS_SAML_ASSERTION}}}EncryptedAssertion")
436        enc_data = xmlsec.template.encrypted_data_create(
437            container, xmlsec.Transform.AES128, type=xmlsec.EncryptionType.ELEMENT, ns="xenc"
438        )
439        xmlsec.template.encrypted_data_ensure_cipher_value(enc_data)
440        key_info = xmlsec.template.encrypted_data_ensure_key_info(enc_data, ns="ds")
441        enc_key = xmlsec.template.add_encrypted_key(key_info, xmlsec.Transform.RSA_OAEP)
442        xmlsec.template.encrypted_data_ensure_cipher_value(enc_key)
443
444        try:
445            enc_data = encryption_context.encrypt_xml(enc_data, standalone_element)
446        except xmlsec.Error as exc:
447            raise InvalidEncryption() from exc
448
449        container.append(enc_data)
450
451    def build_response(self) -> str:
452        """Build string XML Response and sign if signing is enabled."""
453        root_response = self.get_response()
454        # Sign assertion first (before encryption)
455        if self.provider.signing_kp and self.provider.sign_assertion:
456            assertion = root_response.xpath("//saml:Assertion", namespaces=NS_MAP)[0]
457            self._sign(assertion)
458        # Encrypt assertion (this replaces Assertion with EncryptedAssertion)
459        if self.provider.encryption_kp:
460            assertion = root_response.xpath("//saml:Assertion", namespaces=NS_MAP)[0]
461            self._encrypt(assertion, root_response)
462        # Sign response AFTER encryption so signature covers the encrypted content
463        if self.provider.signing_kp and self.provider.sign_response:
464            response = root_response.xpath("//samlp:Response", namespaces=NS_MAP)[0]
465            self._sign(response)
466        return etree.tostring(root_response).decode("utf-8")  # nosec

Generate a SAML Response from an AuthNRequest

AssertionProcessor( provider: authentik.providers.saml.models.SAMLProvider, request: django.http.request.HttpRequest, auth_n_request: authentik.providers.saml.processors.authn_request_parser.AuthNRequest)
71    def __init__(self, provider: SAMLProvider, request: HttpRequest, auth_n_request: AuthNRequest):
72        self.provider = provider
73        self.http_request = request
74        self.auth_n_request = auth_n_request
75
76        self._issue_instant = get_time_string()
77        self._assertion_id = get_random_id()
78        self._response_id = get_random_id()
79
80        _login_event = get_login_event(self.http_request)
81        _login_time = now()
82        if _login_event:
83            _login_time = _login_event.created
84        self._auth_instant = get_time_string(_login_time)
85        self._valid_not_before = get_time_string(
86            timedelta_from_string(self.provider.assertion_valid_not_before)
87        )
88        self.session_not_on_or_after_datetime = now() + timedelta_from_string(
89            self.provider.session_valid_not_on_or_after
90        )
91        self._session_not_on_or_after = get_time_string(self.session_not_on_or_after_datetime)
92        self._valid_not_on_or_after = get_time_string(
93            timedelta_from_string(self.provider.assertion_valid_not_on_or_after)
94        )
http_request: django.http.request.HttpRequest
session_index: str
name_id: str
name_id_format: str
issuer: str
session_not_on_or_after_datetime: datetime.datetime
def get_attributes(self) -> lxml.etree.Element:
 96    def get_attributes(self) -> Element:
 97        """Get AttributeStatement Element with Attributes from Property Mappings."""
 98        # https://commons.lbl.gov/display/IDMgmt/Attribute+Definitions
 99        attribute_statement = Element(f"{{{NS_SAML_ASSERTION}}}AttributeStatement")
100        user = self.http_request.user
101        for mapping in SAMLPropertyMapping.objects.filter(provider=self.provider).order_by(
102            "saml_name"
103        ):
104            try:
105                mapping: SAMLPropertyMapping
106                value = mapping.evaluate(
107                    user=user,
108                    request=self.http_request,
109                    provider=self.provider,
110                )
111                if value is None:
112                    continue
113
114                attribute = Element(f"{{{NS_SAML_ASSERTION}}}Attribute")
115                if mapping.friendly_name and mapping.friendly_name != "":
116                    attribute.attrib["FriendlyName"] = mapping.friendly_name
117                attribute.attrib["Name"] = mapping.saml_name
118
119                if not isinstance(value, list | GeneratorType):
120                    value = [value]
121
122                for value_item in value:
123                    attribute_value = SubElement(
124                        attribute, f"{{{NS_SAML_ASSERTION}}}AttributeValue"
125                    )
126                    str_value = str(value_item) if not isinstance(value_item, str) else value_item
127                    attribute_value.text = str_value
128
129                attribute_statement.append(attribute)
130
131            except (PropertyMappingExpressionException, ValueError) as exc:
132                # Value error can be raised when assigning invalid data to an attribute
133                Event.new(
134                    EventAction.CONFIGURATION_ERROR,
135                    message=f"Failed to evaluate property-mapping: '{mapping.name}'",
136                    provider=self.provider,
137                    mapping=mapping,
138                ).from_http(self.http_request)
139                LOGGER.warning("Failed to evaluate property mapping", exc=exc)
140                continue
141        return attribute_statement

Get AttributeStatement Element with Attributes from Property Mappings.

def get_issuer(self) -> lxml.etree.Element:
156    def get_issuer(self) -> Element:
157        """Get Issuer Element"""
158        issuer = Element(f"{{{NS_SAML_ASSERTION}}}Issuer", nsmap=NS_MAP)
159        self.issuer = self._get_issuer_value()
160        issuer.text = self.issuer
161        return issuer

Get Issuer Element

def get_assertion_auth_n_statement(self) -> lxml.etree.Element:
163    def get_assertion_auth_n_statement(self) -> Element:
164        """Generate AuthnStatement with AuthnContext and ContextClassRef Elements."""
165        auth_n_statement = Element(f"{{{NS_SAML_ASSERTION}}}AuthnStatement")
166        auth_n_statement.attrib["AuthnInstant"] = self._auth_instant
167        self.session_index = sha256(
168            self.http_request.session.session_key.encode("ascii")
169        ).hexdigest()
170        auth_n_statement.attrib["SessionIndex"] = self.session_index
171        auth_n_statement.attrib["SessionNotOnOrAfter"] = self._session_not_on_or_after
172
173        auth_n_context = SubElement(auth_n_statement, f"{{{NS_SAML_ASSERTION}}}AuthnContext")
174        auth_n_context_class_ref = SubElement(
175            auth_n_context, f"{{{NS_SAML_ASSERTION}}}AuthnContextClassRef"
176        )
177        auth_n_context_class_ref.text = "urn:oasis:names:tc:SAML:2.0:ac:classes:unspecified"
178        event = get_login_event(self.http_request)
179        if event:
180            method = event.context.get(PLAN_CONTEXT_METHOD, "")
181            method_args = event.context.get(PLAN_CONTEXT_METHOD_ARGS, {})
182            if method == "password":
183                auth_n_context_class_ref.text = (
184                    "urn:oasis:names:tc:SAML:2.0:ac:classes:PasswordProtectedTransport"
185                )
186            if "mfa_devices" in method_args:
187                auth_n_context_class_ref.text = (
188                    "urn:oasis:names:tc:SAML:2.0:ac:classes:MobileTwoFactorContract"
189                )
190            if method in ["auth_mfa", "auth_webauthn_pwl"]:
191                auth_n_context_class_ref.text = (
192                    "urn:oasis:names:tc:SAML:2.0:ac:classes:MobileOneFactorContract"
193                )
194        if self.provider.authn_context_class_ref_mapping:
195            try:
196                value = self.provider.authn_context_class_ref_mapping.evaluate(
197                    user=self.http_request.user,
198                    request=self.http_request,
199                    provider=self.provider,
200                )
201                if value is not None:
202                    auth_n_context_class_ref.text = str(value)
203                return auth_n_statement
204            except PropertyMappingExpressionException as exc:
205                Event.new(
206                    EventAction.CONFIGURATION_ERROR,
207                    message=(
208                        "Failed to evaluate property-mapping: "
209                        f"'{self.provider.authn_context_class_ref_mapping.name}'"
210                    ),
211                    provider=self.provider,
212                    mapping=self.provider.authn_context_class_ref_mapping,
213                ).from_http(self.http_request)
214                LOGGER.warning("Failed to evaluate property mapping", exc=exc)
215                return auth_n_statement
216        return auth_n_statement

Generate AuthnStatement with AuthnContext and ContextClassRef Elements.

def get_assertion_conditions(self) -> lxml.etree.Element:
218    def get_assertion_conditions(self) -> Element:
219        """Generate Conditions with AudienceRestriction and Audience Elements."""
220        conditions = Element(f"{{{NS_SAML_ASSERTION}}}Conditions")
221        conditions.attrib["NotBefore"] = self._valid_not_before
222        conditions.attrib["NotOnOrAfter"] = self._valid_not_on_or_after
223        if self.provider.audience != "":
224            audience_restriction = SubElement(
225                conditions, f"{{{NS_SAML_ASSERTION}}}AudienceRestriction"
226            )
227            audience = SubElement(audience_restriction, f"{{{NS_SAML_ASSERTION}}}Audience")
228            audience.text = self.provider.audience
229        return conditions

Generate Conditions with AudienceRestriction and Audience Elements.

def get_name_id(self) -> lxml.etree.Element:
231    def get_name_id(self) -> Element:
232        """Get NameID Element"""
233        name_id = Element(f"{{{NS_SAML_ASSERTION}}}NameID")
234        # For requests that don't specify a NameIDPolicy, check if we
235        # can fall back to the provider default
236        if (
237            self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_UNSPECIFIED
238            and self.provider.default_name_id_policy != SAML_NAME_ID_FORMAT_UNSPECIFIED
239        ):
240            self.auth_n_request.name_id_policy = self.provider.default_name_id_policy
241        name_id.attrib["Format"] = self.auth_n_request.name_id_policy
242        self.name_id_format = self.auth_n_request.name_id_policy
243        # persistent is used as a fallback, so always generate it
244        persistent = self.http_request.user.uid
245        name_id.text = persistent
246        self.name_id = persistent
247        # If name_id_mapping is set, we override the value, regardless of what the SP asks for
248        if self.provider.name_id_mapping:
249            try:
250                value = self.provider.name_id_mapping.evaluate(
251                    user=self.http_request.user,
252                    request=self.http_request,
253                    provider=self.provider,
254                )
255                if value is not None:
256                    name_id.text = str(value)
257                    self.name_id = str(value)
258                return name_id
259            except PropertyMappingExpressionException as exc:
260                Event.new(
261                    EventAction.CONFIGURATION_ERROR,
262                    message=(
263                        "Failed to evaluate property-mapping: "
264                        f"'{self.provider.name_id_mapping.name}'",
265                    ),
266                    provider=self.provider,
267                    mapping=self.provider.name_id_mapping,
268                ).from_http(self.http_request)
269                LOGGER.warning("Failed to evaluate property mapping", exc=exc)
270                return name_id
271        if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_EMAIL:
272            name_id.text = self.http_request.user.email
273            self.name_id = self.http_request.user.email
274            return name_id
275        if self.auth_n_request.name_id_policy in [
276            SAML_NAME_ID_FORMAT_PERSISTENT,
277            SAML_NAME_ID_FORMAT_UNSPECIFIED,
278        ]:
279            name_id.text = persistent
280            self.name_id = persistent
281            return name_id
282        if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_X509:
283            # This attribute is statically set by the LDAP source
284            name_id.text = self.http_request.user.attributes.get(
285                LDAP_DISTINGUISHED_NAME, persistent
286            )
287            self.name_id = name_id.text
288            return name_id
289        if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_WINDOWS:
290            # This attribute is statically set by the LDAP source
291            name_id.text = self.http_request.user.attributes.get("upn", persistent)
292            self.name_id = name_id.text
293            return name_id
294        if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_TRANSIENT:
295            # Use the hash of the user's session, which changes every session
296            session_key: str = self.http_request.session.session_key
297            name_id.text = sha256(session_key.encode()).hexdigest()
298            self.name_id = name_id.text
299            return name_id
300        raise UnsupportedNameIDFormat(
301            "Assertion contains NameID with unsupported "
302            f"format {self.auth_n_request.name_id_policy}."
303        )

Get NameID Element

def get_assertion_subject(self) -> lxml.etree.Element:
305    def get_assertion_subject(self) -> Element:
306        """Generate Subject Element with NameID and SubjectConfirmation Objects"""
307        subject = Element(f"{{{NS_SAML_ASSERTION}}}Subject")
308        subject.append(self.get_name_id())
309
310        subject_confirmation = SubElement(subject, f"{{{NS_SAML_ASSERTION}}}SubjectConfirmation")
311        subject_confirmation.attrib["Method"] = "urn:oasis:names:tc:SAML:2.0:cm:bearer"
312
313        subject_confirmation_data = SubElement(
314            subject_confirmation, f"{{{NS_SAML_ASSERTION}}}SubjectConfirmationData"
315        )
316        if self.auth_n_request.id:
317            subject_confirmation_data.attrib["InResponseTo"] = self.auth_n_request.id
318        subject_confirmation_data.attrib["NotOnOrAfter"] = self._valid_not_on_or_after
319        subject_confirmation_data.attrib["Recipient"] = self.provider.acs_url
320        return subject

Generate Subject Element with NameID and SubjectConfirmation Objects

def get_assertion(self) -> lxml.etree.Element:
322    def get_assertion(self) -> Element:
323        """Generate Main Assertion Element"""
324        assertion = Element(f"{{{NS_SAML_ASSERTION}}}Assertion", nsmap=NS_MAP)
325        assertion.attrib["Version"] = "2.0"
326        assertion.attrib["ID"] = self._assertion_id
327        assertion.attrib["IssueInstant"] = self._issue_instant
328        assertion.append(self.get_issuer())
329
330        if self.provider.signing_kp and self.provider.sign_assertion:
331            sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get(
332                self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha1
333            )
334            signature = xmlsec.template.create(
335                assertion,
336                xmlsec.constants.TransformExclC14N,
337                sign_algorithm_transform,
338                ns=xmlsec.constants.DSigNs,
339            )
340            assertion.append(signature)
341
342        assertion.append(self.get_assertion_subject())
343        assertion.append(self.get_assertion_conditions())
344        assertion.append(self.get_assertion_auth_n_statement())
345
346        assertion.append(self.get_attributes())
347        return assertion

Generate Main Assertion Element

def get_response(self) -> lxml.etree.Element:
349    def get_response(self) -> Element:
350        """Generate Root response element"""
351        response = Element(f"{{{NS_SAML_PROTOCOL}}}Response", nsmap=NS_MAP)
352        response.attrib["Version"] = "2.0"
353        response.attrib["IssueInstant"] = self._issue_instant
354        response.attrib["Destination"] = self.provider.acs_url
355        response.attrib["ID"] = self._response_id
356        if self.auth_n_request.id:
357            response.attrib["InResponseTo"] = self.auth_n_request.id
358
359        response.append(self.get_issuer())
360
361        if self.provider.signing_kp and self.provider.sign_response:
362            sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get(
363                self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha1
364            )
365            signature = xmlsec.template.create(
366                response,
367                xmlsec.constants.TransformExclC14N,
368                sign_algorithm_transform,
369                ns=xmlsec.constants.DSigNs,
370            )
371            response.append(signature)
372
373        status = SubElement(response, f"{{{NS_SAML_PROTOCOL}}}Status")
374        status_code = SubElement(status, f"{{{NS_SAML_PROTOCOL}}}StatusCode")
375        status_code.attrib["Value"] = "urn:oasis:names:tc:SAML:2.0:status:Success"
376
377        response.append(self.get_assertion())
378        return response

Generate Root response element

def build_response(self) -> str:
451    def build_response(self) -> str:
452        """Build string XML Response and sign if signing is enabled."""
453        root_response = self.get_response()
454        # Sign assertion first (before encryption)
455        if self.provider.signing_kp and self.provider.sign_assertion:
456            assertion = root_response.xpath("//saml:Assertion", namespaces=NS_MAP)[0]
457            self._sign(assertion)
458        # Encrypt assertion (this replaces Assertion with EncryptedAssertion)
459        if self.provider.encryption_kp:
460            assertion = root_response.xpath("//saml:Assertion", namespaces=NS_MAP)[0]
461            self._encrypt(assertion, root_response)
462        # Sign response AFTER encryption so signature covers the encrypted content
463        if self.provider.signing_kp and self.provider.sign_response:
464            response = root_response.xpath("//samlp:Response", namespaces=NS_MAP)[0]
465            self._sign(response)
466        return etree.tostring(root_response).decode("utf-8")  # nosec

Build string XML Response and sign if signing is enabled.