authentik.providers.saml.processors.assertion
SAML Assertion generator
1"""SAML Assertion generator""" 2 3from datetime import datetime 4from hashlib import sha256 5from types import GeneratorType 6 7import xmlsec 8from django.http import HttpRequest 9from django.urls import reverse 10from django.utils.timezone import now 11from lxml import etree # nosec 12from lxml.etree import Element, SubElement, _Element # nosec 13from structlog.stdlib import get_logger 14 15from authentik.common.saml.constants import ( 16 DIGEST_ALGORITHM_TRANSLATION_MAP, 17 NS_MAP, 18 NS_SAML_ASSERTION, 19 NS_SAML_PROTOCOL, 20 SAML_NAME_ID_FORMAT_EMAIL, 21 SAML_NAME_ID_FORMAT_PERSISTENT, 22 SAML_NAME_ID_FORMAT_TRANSIENT, 23 SAML_NAME_ID_FORMAT_UNSPECIFIED, 24 SAML_NAME_ID_FORMAT_WINDOWS, 25 SAML_NAME_ID_FORMAT_X509, 26 SIGN_ALGORITHM_TRANSFORM_MAP, 27) 28from authentik.core.expression.exceptions import PropertyMappingExpressionException 29from authentik.events.models import Event, EventAction 30from authentik.events.signals import get_login_event 31from authentik.lib.utils.time import timedelta_from_string 32from authentik.lib.xml import remove_xml_newlines 33from authentik.providers.saml.models import SAMLPropertyMapping, SAMLProvider 34from authentik.providers.saml.processors.authn_request_parser import AuthNRequest 35from authentik.providers.saml.utils import get_random_id 36from authentik.providers.saml.utils.time import get_time_string 37from authentik.sources.ldap.auth import LDAP_DISTINGUISHED_NAME 38from authentik.sources.saml.exceptions import ( 39 InvalidEncryption, 40 InvalidSignature, 41 UnsupportedNameIDFormat, 42) 43from authentik.stages.password.stage import PLAN_CONTEXT_METHOD, PLAN_CONTEXT_METHOD_ARGS 44 45LOGGER = get_logger() 46 47 48class AssertionProcessor: 49 """Generate a SAML Response from an AuthNRequest""" 50 51 provider: SAMLProvider 52 http_request: HttpRequest 53 auth_n_request: AuthNRequest 54 55 _issue_instant: str 56 _assertion_id: str 57 _response_id: str 58 59 _auth_instant: str 60 _valid_not_before: str 61 _session_not_on_or_after: str 62 _valid_not_on_or_after: str 63 64 session_index: str 65 name_id: str 66 name_id_format: str 67 issuer: str 68 session_not_on_or_after_datetime: datetime 69 70 def __init__(self, provider: SAMLProvider, request: HttpRequest, auth_n_request: AuthNRequest): 71 self.provider = provider 72 self.http_request = request 73 self.auth_n_request = auth_n_request 74 75 self._issue_instant = get_time_string() 76 self._assertion_id = get_random_id() 77 self._response_id = get_random_id() 78 79 _login_event = get_login_event(self.http_request) 80 _login_time = now() 81 if _login_event: 82 _login_time = _login_event.created 83 self._auth_instant = get_time_string(_login_time) 84 self._valid_not_before = get_time_string( 85 timedelta_from_string(self.provider.assertion_valid_not_before) 86 ) 87 self.session_not_on_or_after_datetime = now() + timedelta_from_string( 88 self.provider.session_valid_not_on_or_after 89 ) 90 self._session_not_on_or_after = get_time_string(self.session_not_on_or_after_datetime) 91 self._valid_not_on_or_after = get_time_string( 92 timedelta_from_string(self.provider.assertion_valid_not_on_or_after) 93 ) 94 95 def get_attributes(self) -> Element: 96 """Get AttributeStatement Element with Attributes from Property Mappings.""" 97 # https://commons.lbl.gov/display/IDMgmt/Attribute+Definitions 98 attribute_statement = Element(f"{{{NS_SAML_ASSERTION}}}AttributeStatement") 99 user = self.http_request.user 100 for mapping in SAMLPropertyMapping.objects.filter(provider=self.provider).order_by( 101 "saml_name" 102 ): 103 try: 104 mapping: SAMLPropertyMapping 105 value = mapping.evaluate( 106 user=user, 107 request=self.http_request, 108 provider=self.provider, 109 ) 110 if value is None: 111 continue 112 113 attribute = Element(f"{{{NS_SAML_ASSERTION}}}Attribute") 114 if mapping.friendly_name and mapping.friendly_name != "": 115 attribute.attrib["FriendlyName"] = mapping.friendly_name 116 attribute.attrib["Name"] = mapping.saml_name 117 118 if not isinstance(value, list | GeneratorType): 119 value = [value] 120 121 for value_item in value: 122 attribute_value = SubElement( 123 attribute, f"{{{NS_SAML_ASSERTION}}}AttributeValue" 124 ) 125 str_value = str(value_item) if not isinstance(value_item, str) else value_item 126 attribute_value.text = str_value 127 128 attribute_statement.append(attribute) 129 130 except (PropertyMappingExpressionException, ValueError) as exc: 131 # Value error can be raised when assigning invalid data to an attribute 132 Event.new( 133 EventAction.CONFIGURATION_ERROR, 134 message=f"Failed to evaluate property-mapping: '{mapping.name}'", 135 provider=self.provider, 136 mapping=mapping, 137 ).from_http(self.http_request) 138 LOGGER.warning("Failed to evaluate property mapping", exc=exc) 139 continue 140 return attribute_statement 141 142 def _get_issuer_value(self) -> str: 143 """Get issuer value, with fallback to generated URL if empty""" 144 # If user has set an override issuer, use it 145 if self.provider.issuer_override: 146 return self.provider.issuer_override 147 148 return self.http_request.build_absolute_uri( 149 reverse( 150 "authentik_providers_saml:metadata-download", 151 kwargs={"application_slug": self.provider.application.slug}, 152 ) 153 ) 154 155 def get_issuer(self) -> Element: 156 """Get Issuer Element""" 157 issuer = Element(f"{{{NS_SAML_ASSERTION}}}Issuer", nsmap=NS_MAP) 158 self.issuer = self._get_issuer_value() 159 issuer.text = self.issuer 160 return issuer 161 162 def get_assertion_auth_n_statement(self) -> Element: 163 """Generate AuthnStatement with AuthnContext and ContextClassRef Elements.""" 164 auth_n_statement = Element(f"{{{NS_SAML_ASSERTION}}}AuthnStatement") 165 auth_n_statement.attrib["AuthnInstant"] = self._auth_instant 166 self.session_index = sha256( 167 self.http_request.session.session_key.encode("ascii") 168 ).hexdigest() 169 auth_n_statement.attrib["SessionIndex"] = self.session_index 170 auth_n_statement.attrib["SessionNotOnOrAfter"] = self._session_not_on_or_after 171 172 auth_n_context = SubElement(auth_n_statement, f"{{{NS_SAML_ASSERTION}}}AuthnContext") 173 auth_n_context_class_ref = SubElement( 174 auth_n_context, f"{{{NS_SAML_ASSERTION}}}AuthnContextClassRef" 175 ) 176 auth_n_context_class_ref.text = "urn:oasis:names:tc:SAML:2.0:ac:classes:unspecified" 177 event = get_login_event(self.http_request) 178 if event: 179 method = event.context.get(PLAN_CONTEXT_METHOD, "") 180 method_args = event.context.get(PLAN_CONTEXT_METHOD_ARGS, {}) 181 if method == "password": 182 auth_n_context_class_ref.text = ( 183 "urn:oasis:names:tc:SAML:2.0:ac:classes:PasswordProtectedTransport" 184 ) 185 if "mfa_devices" in method_args: 186 auth_n_context_class_ref.text = ( 187 "urn:oasis:names:tc:SAML:2.0:ac:classes:MobileTwoFactorContract" 188 ) 189 if method in ["auth_mfa", "auth_webauthn_pwl"]: 190 auth_n_context_class_ref.text = ( 191 "urn:oasis:names:tc:SAML:2.0:ac:classes:MobileOneFactorContract" 192 ) 193 if self.provider.authn_context_class_ref_mapping: 194 try: 195 value = self.provider.authn_context_class_ref_mapping.evaluate( 196 user=self.http_request.user, 197 request=self.http_request, 198 provider=self.provider, 199 ) 200 if value is not None: 201 auth_n_context_class_ref.text = str(value) 202 return auth_n_statement 203 except PropertyMappingExpressionException as exc: 204 Event.new( 205 EventAction.CONFIGURATION_ERROR, 206 message=( 207 "Failed to evaluate property-mapping: " 208 f"'{self.provider.authn_context_class_ref_mapping.name}'" 209 ), 210 provider=self.provider, 211 mapping=self.provider.authn_context_class_ref_mapping, 212 ).from_http(self.http_request) 213 LOGGER.warning("Failed to evaluate property mapping", exc=exc) 214 return auth_n_statement 215 return auth_n_statement 216 217 def get_assertion_conditions(self) -> Element: 218 """Generate Conditions with AudienceRestriction and Audience Elements.""" 219 conditions = Element(f"{{{NS_SAML_ASSERTION}}}Conditions") 220 conditions.attrib["NotBefore"] = self._valid_not_before 221 conditions.attrib["NotOnOrAfter"] = self._valid_not_on_or_after 222 if self.provider.audience != "": 223 audience_restriction = SubElement( 224 conditions, f"{{{NS_SAML_ASSERTION}}}AudienceRestriction" 225 ) 226 audience = SubElement(audience_restriction, f"{{{NS_SAML_ASSERTION}}}Audience") 227 audience.text = self.provider.audience 228 return conditions 229 230 def get_name_id(self) -> Element: 231 """Get NameID Element""" 232 name_id = Element(f"{{{NS_SAML_ASSERTION}}}NameID") 233 # For requests that don't specify a NameIDPolicy, check if we 234 # can fall back to the provider default 235 if ( 236 self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_UNSPECIFIED 237 and self.provider.default_name_id_policy != SAML_NAME_ID_FORMAT_UNSPECIFIED 238 ): 239 self.auth_n_request.name_id_policy = self.provider.default_name_id_policy 240 name_id.attrib["Format"] = self.auth_n_request.name_id_policy 241 self.name_id_format = self.auth_n_request.name_id_policy 242 # persistent is used as a fallback, so always generate it 243 persistent = self.http_request.user.uid 244 name_id.text = persistent 245 self.name_id = persistent 246 # If name_id_mapping is set, we override the value, regardless of what the SP asks for 247 if self.provider.name_id_mapping: 248 try: 249 value = self.provider.name_id_mapping.evaluate( 250 user=self.http_request.user, 251 request=self.http_request, 252 provider=self.provider, 253 ) 254 if value is not None: 255 name_id.text = str(value) 256 self.name_id = str(value) 257 return name_id 258 except PropertyMappingExpressionException as exc: 259 Event.new( 260 EventAction.CONFIGURATION_ERROR, 261 message=( 262 "Failed to evaluate property-mapping: " 263 f"'{self.provider.name_id_mapping.name}'", 264 ), 265 provider=self.provider, 266 mapping=self.provider.name_id_mapping, 267 ).from_http(self.http_request) 268 LOGGER.warning("Failed to evaluate property mapping", exc=exc) 269 return name_id 270 if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_EMAIL: 271 name_id.text = self.http_request.user.email 272 self.name_id = self.http_request.user.email 273 return name_id 274 if self.auth_n_request.name_id_policy in [ 275 SAML_NAME_ID_FORMAT_PERSISTENT, 276 SAML_NAME_ID_FORMAT_UNSPECIFIED, 277 ]: 278 name_id.text = persistent 279 self.name_id = persistent 280 return name_id 281 if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_X509: 282 # This attribute is statically set by the LDAP source 283 name_id.text = self.http_request.user.attributes.get( 284 LDAP_DISTINGUISHED_NAME, persistent 285 ) 286 self.name_id = name_id.text 287 return name_id 288 if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_WINDOWS: 289 # This attribute is statically set by the LDAP source 290 name_id.text = self.http_request.user.attributes.get("upn", persistent) 291 self.name_id = name_id.text 292 return name_id 293 if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_TRANSIENT: 294 # Use the hash of the user's session, which changes every session 295 session_key: str = self.http_request.session.session_key 296 name_id.text = sha256(session_key.encode()).hexdigest() 297 self.name_id = name_id.text 298 return name_id 299 raise UnsupportedNameIDFormat( 300 "Assertion contains NameID with unsupported " 301 f"format {self.auth_n_request.name_id_policy}." 302 ) 303 304 def get_assertion_subject(self) -> Element: 305 """Generate Subject Element with NameID and SubjectConfirmation Objects""" 306 subject = Element(f"{{{NS_SAML_ASSERTION}}}Subject") 307 subject.append(self.get_name_id()) 308 309 subject_confirmation = SubElement(subject, f"{{{NS_SAML_ASSERTION}}}SubjectConfirmation") 310 subject_confirmation.attrib["Method"] = "urn:oasis:names:tc:SAML:2.0:cm:bearer" 311 312 subject_confirmation_data = SubElement( 313 subject_confirmation, f"{{{NS_SAML_ASSERTION}}}SubjectConfirmationData" 314 ) 315 if self.auth_n_request.id: 316 subject_confirmation_data.attrib["InResponseTo"] = self.auth_n_request.id 317 subject_confirmation_data.attrib["NotOnOrAfter"] = self._valid_not_on_or_after 318 subject_confirmation_data.attrib["Recipient"] = self.provider.acs_url 319 return subject 320 321 def get_assertion(self) -> Element: 322 """Generate Main Assertion Element""" 323 assertion = Element(f"{{{NS_SAML_ASSERTION}}}Assertion", nsmap=NS_MAP) 324 assertion.attrib["Version"] = "2.0" 325 assertion.attrib["ID"] = self._assertion_id 326 assertion.attrib["IssueInstant"] = self._issue_instant 327 assertion.append(self.get_issuer()) 328 329 if self.provider.signing_kp and self.provider.sign_assertion: 330 sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get( 331 self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha1 332 ) 333 signature = xmlsec.template.create( 334 assertion, 335 xmlsec.constants.TransformExclC14N, 336 sign_algorithm_transform, 337 ns=xmlsec.constants.DSigNs, 338 ) 339 assertion.append(signature) 340 341 assertion.append(self.get_assertion_subject()) 342 assertion.append(self.get_assertion_conditions()) 343 assertion.append(self.get_assertion_auth_n_statement()) 344 345 assertion.append(self.get_attributes()) 346 return assertion 347 348 def get_response(self) -> Element: 349 """Generate Root response element""" 350 response = Element(f"{{{NS_SAML_PROTOCOL}}}Response", nsmap=NS_MAP) 351 response.attrib["Version"] = "2.0" 352 response.attrib["IssueInstant"] = self._issue_instant 353 response.attrib["Destination"] = self.provider.acs_url 354 response.attrib["ID"] = self._response_id 355 if self.auth_n_request.id: 356 response.attrib["InResponseTo"] = self.auth_n_request.id 357 358 response.append(self.get_issuer()) 359 360 if self.provider.signing_kp and self.provider.sign_response: 361 sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get( 362 self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha1 363 ) 364 signature = xmlsec.template.create( 365 response, 366 xmlsec.constants.TransformExclC14N, 367 sign_algorithm_transform, 368 ns=xmlsec.constants.DSigNs, 369 ) 370 response.append(signature) 371 372 status = SubElement(response, f"{{{NS_SAML_PROTOCOL}}}Status") 373 status_code = SubElement(status, f"{{{NS_SAML_PROTOCOL}}}StatusCode") 374 status_code.attrib["Value"] = "urn:oasis:names:tc:SAML:2.0:status:Success" 375 376 response.append(self.get_assertion()) 377 return response 378 379 def _sign(self, element: _Element): 380 """Sign an XML element based on the providers' configured signing settings""" 381 digest_algorithm_transform = DIGEST_ALGORITHM_TRANSLATION_MAP.get( 382 self.provider.digest_algorithm, xmlsec.constants.TransformSha1 383 ) 384 xmlsec.tree.add_ids(element, ["ID"]) 385 signature_node = xmlsec.tree.find_node(element, xmlsec.constants.NodeSignature) 386 ref = xmlsec.template.add_reference( 387 signature_node, 388 digest_algorithm_transform, 389 uri="#" + element.attrib["ID"], 390 ) 391 xmlsec.template.add_transform(ref, xmlsec.constants.TransformEnveloped) 392 xmlsec.template.add_transform(ref, xmlsec.constants.TransformExclC14N) 393 key_info = xmlsec.template.ensure_key_info(signature_node) 394 xmlsec.template.add_x509_data(key_info) 395 396 ctx = xmlsec.SignatureContext() 397 398 key = xmlsec.Key.from_memory( 399 self.provider.signing_kp.key_data, 400 xmlsec.constants.KeyDataFormatPem, 401 None, 402 ) 403 key.load_cert_from_memory( 404 self.provider.signing_kp.certificate_data, 405 xmlsec.constants.KeyDataFormatCertPem, 406 ) 407 ctx.key = key 408 try: 409 ctx.sign(remove_xml_newlines(element, signature_node)) 410 except xmlsec.Error as exc: 411 raise InvalidSignature() from exc 412 413 def _encrypt(self, element: _Element, parent: _Element): 414 """Encrypt SAMLResponse EncryptedAssertion Element""" 415 # Create a standalone copy so namespace declarations are included in the encrypted content 416 element_xml = etree.tostring(element) 417 standalone_element = etree.fromstring(element_xml) 418 419 # Remove the original element from the tree since we're replacing it with encrypted version 420 parent.remove(element) 421 422 manager = xmlsec.KeysManager() 423 key = xmlsec.Key.from_memory( 424 self.provider.encryption_kp.certificate_data, 425 xmlsec.constants.KeyDataFormatCertPem, 426 ) 427 428 manager.add_key(key) 429 encryption_context = xmlsec.EncryptionContext(manager) 430 encryption_context.key = xmlsec.Key.generate( 431 xmlsec.constants.KeyDataAes, 128, xmlsec.constants.KeyDataTypeSession 432 ) 433 434 container = SubElement(parent, f"{{{NS_SAML_ASSERTION}}}EncryptedAssertion") 435 enc_data = xmlsec.template.encrypted_data_create( 436 container, xmlsec.Transform.AES128, type=xmlsec.EncryptionType.ELEMENT, ns="xenc" 437 ) 438 xmlsec.template.encrypted_data_ensure_cipher_value(enc_data) 439 key_info = xmlsec.template.encrypted_data_ensure_key_info(enc_data, ns="ds") 440 enc_key = xmlsec.template.add_encrypted_key(key_info, xmlsec.Transform.RSA_OAEP) 441 xmlsec.template.encrypted_data_ensure_cipher_value(enc_key) 442 443 try: 444 enc_data = encryption_context.encrypt_xml(enc_data, standalone_element) 445 except xmlsec.Error as exc: 446 raise InvalidEncryption() from exc 447 448 container.append(enc_data) 449 450 def build_response(self) -> str: 451 """Build string XML Response and sign if signing is enabled.""" 452 root_response = self.get_response() 453 # Sign assertion first (before encryption) 454 if self.provider.signing_kp and self.provider.sign_assertion: 455 assertion = root_response.xpath("//saml:Assertion", namespaces=NS_MAP)[0] 456 self._sign(assertion) 457 # Encrypt assertion (this replaces Assertion with EncryptedAssertion) 458 if self.provider.encryption_kp: 459 assertion = root_response.xpath("//saml:Assertion", namespaces=NS_MAP)[0] 460 self._encrypt(assertion, root_response) 461 # Sign response AFTER encryption so signature covers the encrypted content 462 if self.provider.signing_kp and self.provider.sign_response: 463 response = root_response.xpath("//samlp:Response", namespaces=NS_MAP)[0] 464 self._sign(response) 465 return etree.tostring(root_response).decode("utf-8") # nosec
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
class
AssertionProcessor:
49class AssertionProcessor: 50 """Generate a SAML Response from an AuthNRequest""" 51 52 provider: SAMLProvider 53 http_request: HttpRequest 54 auth_n_request: AuthNRequest 55 56 _issue_instant: str 57 _assertion_id: str 58 _response_id: str 59 60 _auth_instant: str 61 _valid_not_before: str 62 _session_not_on_or_after: str 63 _valid_not_on_or_after: str 64 65 session_index: str 66 name_id: str 67 name_id_format: str 68 issuer: str 69 session_not_on_or_after_datetime: datetime 70 71 def __init__(self, provider: SAMLProvider, request: HttpRequest, auth_n_request: AuthNRequest): 72 self.provider = provider 73 self.http_request = request 74 self.auth_n_request = auth_n_request 75 76 self._issue_instant = get_time_string() 77 self._assertion_id = get_random_id() 78 self._response_id = get_random_id() 79 80 _login_event = get_login_event(self.http_request) 81 _login_time = now() 82 if _login_event: 83 _login_time = _login_event.created 84 self._auth_instant = get_time_string(_login_time) 85 self._valid_not_before = get_time_string( 86 timedelta_from_string(self.provider.assertion_valid_not_before) 87 ) 88 self.session_not_on_or_after_datetime = now() + timedelta_from_string( 89 self.provider.session_valid_not_on_or_after 90 ) 91 self._session_not_on_or_after = get_time_string(self.session_not_on_or_after_datetime) 92 self._valid_not_on_or_after = get_time_string( 93 timedelta_from_string(self.provider.assertion_valid_not_on_or_after) 94 ) 95 96 def get_attributes(self) -> Element: 97 """Get AttributeStatement Element with Attributes from Property Mappings.""" 98 # https://commons.lbl.gov/display/IDMgmt/Attribute+Definitions 99 attribute_statement = Element(f"{{{NS_SAML_ASSERTION}}}AttributeStatement") 100 user = self.http_request.user 101 for mapping in SAMLPropertyMapping.objects.filter(provider=self.provider).order_by( 102 "saml_name" 103 ): 104 try: 105 mapping: SAMLPropertyMapping 106 value = mapping.evaluate( 107 user=user, 108 request=self.http_request, 109 provider=self.provider, 110 ) 111 if value is None: 112 continue 113 114 attribute = Element(f"{{{NS_SAML_ASSERTION}}}Attribute") 115 if mapping.friendly_name and mapping.friendly_name != "": 116 attribute.attrib["FriendlyName"] = mapping.friendly_name 117 attribute.attrib["Name"] = mapping.saml_name 118 119 if not isinstance(value, list | GeneratorType): 120 value = [value] 121 122 for value_item in value: 123 attribute_value = SubElement( 124 attribute, f"{{{NS_SAML_ASSERTION}}}AttributeValue" 125 ) 126 str_value = str(value_item) if not isinstance(value_item, str) else value_item 127 attribute_value.text = str_value 128 129 attribute_statement.append(attribute) 130 131 except (PropertyMappingExpressionException, ValueError) as exc: 132 # Value error can be raised when assigning invalid data to an attribute 133 Event.new( 134 EventAction.CONFIGURATION_ERROR, 135 message=f"Failed to evaluate property-mapping: '{mapping.name}'", 136 provider=self.provider, 137 mapping=mapping, 138 ).from_http(self.http_request) 139 LOGGER.warning("Failed to evaluate property mapping", exc=exc) 140 continue 141 return attribute_statement 142 143 def _get_issuer_value(self) -> str: 144 """Get issuer value, with fallback to generated URL if empty""" 145 # If user has set an override issuer, use it 146 if self.provider.issuer_override: 147 return self.provider.issuer_override 148 149 return self.http_request.build_absolute_uri( 150 reverse( 151 "authentik_providers_saml:metadata-download", 152 kwargs={"application_slug": self.provider.application.slug}, 153 ) 154 ) 155 156 def get_issuer(self) -> Element: 157 """Get Issuer Element""" 158 issuer = Element(f"{{{NS_SAML_ASSERTION}}}Issuer", nsmap=NS_MAP) 159 self.issuer = self._get_issuer_value() 160 issuer.text = self.issuer 161 return issuer 162 163 def get_assertion_auth_n_statement(self) -> Element: 164 """Generate AuthnStatement with AuthnContext and ContextClassRef Elements.""" 165 auth_n_statement = Element(f"{{{NS_SAML_ASSERTION}}}AuthnStatement") 166 auth_n_statement.attrib["AuthnInstant"] = self._auth_instant 167 self.session_index = sha256( 168 self.http_request.session.session_key.encode("ascii") 169 ).hexdigest() 170 auth_n_statement.attrib["SessionIndex"] = self.session_index 171 auth_n_statement.attrib["SessionNotOnOrAfter"] = self._session_not_on_or_after 172 173 auth_n_context = SubElement(auth_n_statement, f"{{{NS_SAML_ASSERTION}}}AuthnContext") 174 auth_n_context_class_ref = SubElement( 175 auth_n_context, f"{{{NS_SAML_ASSERTION}}}AuthnContextClassRef" 176 ) 177 auth_n_context_class_ref.text = "urn:oasis:names:tc:SAML:2.0:ac:classes:unspecified" 178 event = get_login_event(self.http_request) 179 if event: 180 method = event.context.get(PLAN_CONTEXT_METHOD, "") 181 method_args = event.context.get(PLAN_CONTEXT_METHOD_ARGS, {}) 182 if method == "password": 183 auth_n_context_class_ref.text = ( 184 "urn:oasis:names:tc:SAML:2.0:ac:classes:PasswordProtectedTransport" 185 ) 186 if "mfa_devices" in method_args: 187 auth_n_context_class_ref.text = ( 188 "urn:oasis:names:tc:SAML:2.0:ac:classes:MobileTwoFactorContract" 189 ) 190 if method in ["auth_mfa", "auth_webauthn_pwl"]: 191 auth_n_context_class_ref.text = ( 192 "urn:oasis:names:tc:SAML:2.0:ac:classes:MobileOneFactorContract" 193 ) 194 if self.provider.authn_context_class_ref_mapping: 195 try: 196 value = self.provider.authn_context_class_ref_mapping.evaluate( 197 user=self.http_request.user, 198 request=self.http_request, 199 provider=self.provider, 200 ) 201 if value is not None: 202 auth_n_context_class_ref.text = str(value) 203 return auth_n_statement 204 except PropertyMappingExpressionException as exc: 205 Event.new( 206 EventAction.CONFIGURATION_ERROR, 207 message=( 208 "Failed to evaluate property-mapping: " 209 f"'{self.provider.authn_context_class_ref_mapping.name}'" 210 ), 211 provider=self.provider, 212 mapping=self.provider.authn_context_class_ref_mapping, 213 ).from_http(self.http_request) 214 LOGGER.warning("Failed to evaluate property mapping", exc=exc) 215 return auth_n_statement 216 return auth_n_statement 217 218 def get_assertion_conditions(self) -> Element: 219 """Generate Conditions with AudienceRestriction and Audience Elements.""" 220 conditions = Element(f"{{{NS_SAML_ASSERTION}}}Conditions") 221 conditions.attrib["NotBefore"] = self._valid_not_before 222 conditions.attrib["NotOnOrAfter"] = self._valid_not_on_or_after 223 if self.provider.audience != "": 224 audience_restriction = SubElement( 225 conditions, f"{{{NS_SAML_ASSERTION}}}AudienceRestriction" 226 ) 227 audience = SubElement(audience_restriction, f"{{{NS_SAML_ASSERTION}}}Audience") 228 audience.text = self.provider.audience 229 return conditions 230 231 def get_name_id(self) -> Element: 232 """Get NameID Element""" 233 name_id = Element(f"{{{NS_SAML_ASSERTION}}}NameID") 234 # For requests that don't specify a NameIDPolicy, check if we 235 # can fall back to the provider default 236 if ( 237 self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_UNSPECIFIED 238 and self.provider.default_name_id_policy != SAML_NAME_ID_FORMAT_UNSPECIFIED 239 ): 240 self.auth_n_request.name_id_policy = self.provider.default_name_id_policy 241 name_id.attrib["Format"] = self.auth_n_request.name_id_policy 242 self.name_id_format = self.auth_n_request.name_id_policy 243 # persistent is used as a fallback, so always generate it 244 persistent = self.http_request.user.uid 245 name_id.text = persistent 246 self.name_id = persistent 247 # If name_id_mapping is set, we override the value, regardless of what the SP asks for 248 if self.provider.name_id_mapping: 249 try: 250 value = self.provider.name_id_mapping.evaluate( 251 user=self.http_request.user, 252 request=self.http_request, 253 provider=self.provider, 254 ) 255 if value is not None: 256 name_id.text = str(value) 257 self.name_id = str(value) 258 return name_id 259 except PropertyMappingExpressionException as exc: 260 Event.new( 261 EventAction.CONFIGURATION_ERROR, 262 message=( 263 "Failed to evaluate property-mapping: " 264 f"'{self.provider.name_id_mapping.name}'", 265 ), 266 provider=self.provider, 267 mapping=self.provider.name_id_mapping, 268 ).from_http(self.http_request) 269 LOGGER.warning("Failed to evaluate property mapping", exc=exc) 270 return name_id 271 if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_EMAIL: 272 name_id.text = self.http_request.user.email 273 self.name_id = self.http_request.user.email 274 return name_id 275 if self.auth_n_request.name_id_policy in [ 276 SAML_NAME_ID_FORMAT_PERSISTENT, 277 SAML_NAME_ID_FORMAT_UNSPECIFIED, 278 ]: 279 name_id.text = persistent 280 self.name_id = persistent 281 return name_id 282 if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_X509: 283 # This attribute is statically set by the LDAP source 284 name_id.text = self.http_request.user.attributes.get( 285 LDAP_DISTINGUISHED_NAME, persistent 286 ) 287 self.name_id = name_id.text 288 return name_id 289 if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_WINDOWS: 290 # This attribute is statically set by the LDAP source 291 name_id.text = self.http_request.user.attributes.get("upn", persistent) 292 self.name_id = name_id.text 293 return name_id 294 if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_TRANSIENT: 295 # Use the hash of the user's session, which changes every session 296 session_key: str = self.http_request.session.session_key 297 name_id.text = sha256(session_key.encode()).hexdigest() 298 self.name_id = name_id.text 299 return name_id 300 raise UnsupportedNameIDFormat( 301 "Assertion contains NameID with unsupported " 302 f"format {self.auth_n_request.name_id_policy}." 303 ) 304 305 def get_assertion_subject(self) -> Element: 306 """Generate Subject Element with NameID and SubjectConfirmation Objects""" 307 subject = Element(f"{{{NS_SAML_ASSERTION}}}Subject") 308 subject.append(self.get_name_id()) 309 310 subject_confirmation = SubElement(subject, f"{{{NS_SAML_ASSERTION}}}SubjectConfirmation") 311 subject_confirmation.attrib["Method"] = "urn:oasis:names:tc:SAML:2.0:cm:bearer" 312 313 subject_confirmation_data = SubElement( 314 subject_confirmation, f"{{{NS_SAML_ASSERTION}}}SubjectConfirmationData" 315 ) 316 if self.auth_n_request.id: 317 subject_confirmation_data.attrib["InResponseTo"] = self.auth_n_request.id 318 subject_confirmation_data.attrib["NotOnOrAfter"] = self._valid_not_on_or_after 319 subject_confirmation_data.attrib["Recipient"] = self.provider.acs_url 320 return subject 321 322 def get_assertion(self) -> Element: 323 """Generate Main Assertion Element""" 324 assertion = Element(f"{{{NS_SAML_ASSERTION}}}Assertion", nsmap=NS_MAP) 325 assertion.attrib["Version"] = "2.0" 326 assertion.attrib["ID"] = self._assertion_id 327 assertion.attrib["IssueInstant"] = self._issue_instant 328 assertion.append(self.get_issuer()) 329 330 if self.provider.signing_kp and self.provider.sign_assertion: 331 sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get( 332 self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha1 333 ) 334 signature = xmlsec.template.create( 335 assertion, 336 xmlsec.constants.TransformExclC14N, 337 sign_algorithm_transform, 338 ns=xmlsec.constants.DSigNs, 339 ) 340 assertion.append(signature) 341 342 assertion.append(self.get_assertion_subject()) 343 assertion.append(self.get_assertion_conditions()) 344 assertion.append(self.get_assertion_auth_n_statement()) 345 346 assertion.append(self.get_attributes()) 347 return assertion 348 349 def get_response(self) -> Element: 350 """Generate Root response element""" 351 response = Element(f"{{{NS_SAML_PROTOCOL}}}Response", nsmap=NS_MAP) 352 response.attrib["Version"] = "2.0" 353 response.attrib["IssueInstant"] = self._issue_instant 354 response.attrib["Destination"] = self.provider.acs_url 355 response.attrib["ID"] = self._response_id 356 if self.auth_n_request.id: 357 response.attrib["InResponseTo"] = self.auth_n_request.id 358 359 response.append(self.get_issuer()) 360 361 if self.provider.signing_kp and self.provider.sign_response: 362 sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get( 363 self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha1 364 ) 365 signature = xmlsec.template.create( 366 response, 367 xmlsec.constants.TransformExclC14N, 368 sign_algorithm_transform, 369 ns=xmlsec.constants.DSigNs, 370 ) 371 response.append(signature) 372 373 status = SubElement(response, f"{{{NS_SAML_PROTOCOL}}}Status") 374 status_code = SubElement(status, f"{{{NS_SAML_PROTOCOL}}}StatusCode") 375 status_code.attrib["Value"] = "urn:oasis:names:tc:SAML:2.0:status:Success" 376 377 response.append(self.get_assertion()) 378 return response 379 380 def _sign(self, element: _Element): 381 """Sign an XML element based on the providers' configured signing settings""" 382 digest_algorithm_transform = DIGEST_ALGORITHM_TRANSLATION_MAP.get( 383 self.provider.digest_algorithm, xmlsec.constants.TransformSha1 384 ) 385 xmlsec.tree.add_ids(element, ["ID"]) 386 signature_node = xmlsec.tree.find_node(element, xmlsec.constants.NodeSignature) 387 ref = xmlsec.template.add_reference( 388 signature_node, 389 digest_algorithm_transform, 390 uri="#" + element.attrib["ID"], 391 ) 392 xmlsec.template.add_transform(ref, xmlsec.constants.TransformEnveloped) 393 xmlsec.template.add_transform(ref, xmlsec.constants.TransformExclC14N) 394 key_info = xmlsec.template.ensure_key_info(signature_node) 395 xmlsec.template.add_x509_data(key_info) 396 397 ctx = xmlsec.SignatureContext() 398 399 key = xmlsec.Key.from_memory( 400 self.provider.signing_kp.key_data, 401 xmlsec.constants.KeyDataFormatPem, 402 None, 403 ) 404 key.load_cert_from_memory( 405 self.provider.signing_kp.certificate_data, 406 xmlsec.constants.KeyDataFormatCertPem, 407 ) 408 ctx.key = key 409 try: 410 ctx.sign(remove_xml_newlines(element, signature_node)) 411 except xmlsec.Error as exc: 412 raise InvalidSignature() from exc 413 414 def _encrypt(self, element: _Element, parent: _Element): 415 """Encrypt SAMLResponse EncryptedAssertion Element""" 416 # Create a standalone copy so namespace declarations are included in the encrypted content 417 element_xml = etree.tostring(element) 418 standalone_element = etree.fromstring(element_xml) 419 420 # Remove the original element from the tree since we're replacing it with encrypted version 421 parent.remove(element) 422 423 manager = xmlsec.KeysManager() 424 key = xmlsec.Key.from_memory( 425 self.provider.encryption_kp.certificate_data, 426 xmlsec.constants.KeyDataFormatCertPem, 427 ) 428 429 manager.add_key(key) 430 encryption_context = xmlsec.EncryptionContext(manager) 431 encryption_context.key = xmlsec.Key.generate( 432 xmlsec.constants.KeyDataAes, 128, xmlsec.constants.KeyDataTypeSession 433 ) 434 435 container = SubElement(parent, f"{{{NS_SAML_ASSERTION}}}EncryptedAssertion") 436 enc_data = xmlsec.template.encrypted_data_create( 437 container, xmlsec.Transform.AES128, type=xmlsec.EncryptionType.ELEMENT, ns="xenc" 438 ) 439 xmlsec.template.encrypted_data_ensure_cipher_value(enc_data) 440 key_info = xmlsec.template.encrypted_data_ensure_key_info(enc_data, ns="ds") 441 enc_key = xmlsec.template.add_encrypted_key(key_info, xmlsec.Transform.RSA_OAEP) 442 xmlsec.template.encrypted_data_ensure_cipher_value(enc_key) 443 444 try: 445 enc_data = encryption_context.encrypt_xml(enc_data, standalone_element) 446 except xmlsec.Error as exc: 447 raise InvalidEncryption() from exc 448 449 container.append(enc_data) 450 451 def build_response(self) -> str: 452 """Build string XML Response and sign if signing is enabled.""" 453 root_response = self.get_response() 454 # Sign assertion first (before encryption) 455 if self.provider.signing_kp and self.provider.sign_assertion: 456 assertion = root_response.xpath("//saml:Assertion", namespaces=NS_MAP)[0] 457 self._sign(assertion) 458 # Encrypt assertion (this replaces Assertion with EncryptedAssertion) 459 if self.provider.encryption_kp: 460 assertion = root_response.xpath("//saml:Assertion", namespaces=NS_MAP)[0] 461 self._encrypt(assertion, root_response) 462 # Sign response AFTER encryption so signature covers the encrypted content 463 if self.provider.signing_kp and self.provider.sign_response: 464 response = root_response.xpath("//samlp:Response", namespaces=NS_MAP)[0] 465 self._sign(response) 466 return etree.tostring(root_response).decode("utf-8") # nosec
Generate a SAML Response from an AuthNRequest
AssertionProcessor( provider: authentik.providers.saml.models.SAMLProvider, request: django.http.request.HttpRequest, auth_n_request: authentik.providers.saml.processors.authn_request_parser.AuthNRequest)
71 def __init__(self, provider: SAMLProvider, request: HttpRequest, auth_n_request: AuthNRequest): 72 self.provider = provider 73 self.http_request = request 74 self.auth_n_request = auth_n_request 75 76 self._issue_instant = get_time_string() 77 self._assertion_id = get_random_id() 78 self._response_id = get_random_id() 79 80 _login_event = get_login_event(self.http_request) 81 _login_time = now() 82 if _login_event: 83 _login_time = _login_event.created 84 self._auth_instant = get_time_string(_login_time) 85 self._valid_not_before = get_time_string( 86 timedelta_from_string(self.provider.assertion_valid_not_before) 87 ) 88 self.session_not_on_or_after_datetime = now() + timedelta_from_string( 89 self.provider.session_valid_not_on_or_after 90 ) 91 self._session_not_on_or_after = get_time_string(self.session_not_on_or_after_datetime) 92 self._valid_not_on_or_after = get_time_string( 93 timedelta_from_string(self.provider.assertion_valid_not_on_or_after) 94 )
def
get_attributes(self) -> lxml.etree.Element:
96 def get_attributes(self) -> Element: 97 """Get AttributeStatement Element with Attributes from Property Mappings.""" 98 # https://commons.lbl.gov/display/IDMgmt/Attribute+Definitions 99 attribute_statement = Element(f"{{{NS_SAML_ASSERTION}}}AttributeStatement") 100 user = self.http_request.user 101 for mapping in SAMLPropertyMapping.objects.filter(provider=self.provider).order_by( 102 "saml_name" 103 ): 104 try: 105 mapping: SAMLPropertyMapping 106 value = mapping.evaluate( 107 user=user, 108 request=self.http_request, 109 provider=self.provider, 110 ) 111 if value is None: 112 continue 113 114 attribute = Element(f"{{{NS_SAML_ASSERTION}}}Attribute") 115 if mapping.friendly_name and mapping.friendly_name != "": 116 attribute.attrib["FriendlyName"] = mapping.friendly_name 117 attribute.attrib["Name"] = mapping.saml_name 118 119 if not isinstance(value, list | GeneratorType): 120 value = [value] 121 122 for value_item in value: 123 attribute_value = SubElement( 124 attribute, f"{{{NS_SAML_ASSERTION}}}AttributeValue" 125 ) 126 str_value = str(value_item) if not isinstance(value_item, str) else value_item 127 attribute_value.text = str_value 128 129 attribute_statement.append(attribute) 130 131 except (PropertyMappingExpressionException, ValueError) as exc: 132 # Value error can be raised when assigning invalid data to an attribute 133 Event.new( 134 EventAction.CONFIGURATION_ERROR, 135 message=f"Failed to evaluate property-mapping: '{mapping.name}'", 136 provider=self.provider, 137 mapping=mapping, 138 ).from_http(self.http_request) 139 LOGGER.warning("Failed to evaluate property mapping", exc=exc) 140 continue 141 return attribute_statement
Get AttributeStatement Element with Attributes from Property Mappings.
def
get_issuer(self) -> lxml.etree.Element:
156 def get_issuer(self) -> Element: 157 """Get Issuer Element""" 158 issuer = Element(f"{{{NS_SAML_ASSERTION}}}Issuer", nsmap=NS_MAP) 159 self.issuer = self._get_issuer_value() 160 issuer.text = self.issuer 161 return issuer
Get Issuer Element
def
get_assertion_auth_n_statement(self) -> lxml.etree.Element:
163 def get_assertion_auth_n_statement(self) -> Element: 164 """Generate AuthnStatement with AuthnContext and ContextClassRef Elements.""" 165 auth_n_statement = Element(f"{{{NS_SAML_ASSERTION}}}AuthnStatement") 166 auth_n_statement.attrib["AuthnInstant"] = self._auth_instant 167 self.session_index = sha256( 168 self.http_request.session.session_key.encode("ascii") 169 ).hexdigest() 170 auth_n_statement.attrib["SessionIndex"] = self.session_index 171 auth_n_statement.attrib["SessionNotOnOrAfter"] = self._session_not_on_or_after 172 173 auth_n_context = SubElement(auth_n_statement, f"{{{NS_SAML_ASSERTION}}}AuthnContext") 174 auth_n_context_class_ref = SubElement( 175 auth_n_context, f"{{{NS_SAML_ASSERTION}}}AuthnContextClassRef" 176 ) 177 auth_n_context_class_ref.text = "urn:oasis:names:tc:SAML:2.0:ac:classes:unspecified" 178 event = get_login_event(self.http_request) 179 if event: 180 method = event.context.get(PLAN_CONTEXT_METHOD, "") 181 method_args = event.context.get(PLAN_CONTEXT_METHOD_ARGS, {}) 182 if method == "password": 183 auth_n_context_class_ref.text = ( 184 "urn:oasis:names:tc:SAML:2.0:ac:classes:PasswordProtectedTransport" 185 ) 186 if "mfa_devices" in method_args: 187 auth_n_context_class_ref.text = ( 188 "urn:oasis:names:tc:SAML:2.0:ac:classes:MobileTwoFactorContract" 189 ) 190 if method in ["auth_mfa", "auth_webauthn_pwl"]: 191 auth_n_context_class_ref.text = ( 192 "urn:oasis:names:tc:SAML:2.0:ac:classes:MobileOneFactorContract" 193 ) 194 if self.provider.authn_context_class_ref_mapping: 195 try: 196 value = self.provider.authn_context_class_ref_mapping.evaluate( 197 user=self.http_request.user, 198 request=self.http_request, 199 provider=self.provider, 200 ) 201 if value is not None: 202 auth_n_context_class_ref.text = str(value) 203 return auth_n_statement 204 except PropertyMappingExpressionException as exc: 205 Event.new( 206 EventAction.CONFIGURATION_ERROR, 207 message=( 208 "Failed to evaluate property-mapping: " 209 f"'{self.provider.authn_context_class_ref_mapping.name}'" 210 ), 211 provider=self.provider, 212 mapping=self.provider.authn_context_class_ref_mapping, 213 ).from_http(self.http_request) 214 LOGGER.warning("Failed to evaluate property mapping", exc=exc) 215 return auth_n_statement 216 return auth_n_statement
Generate AuthnStatement with AuthnContext and ContextClassRef Elements.
def
get_assertion_conditions(self) -> lxml.etree.Element:
218 def get_assertion_conditions(self) -> Element: 219 """Generate Conditions with AudienceRestriction and Audience Elements.""" 220 conditions = Element(f"{{{NS_SAML_ASSERTION}}}Conditions") 221 conditions.attrib["NotBefore"] = self._valid_not_before 222 conditions.attrib["NotOnOrAfter"] = self._valid_not_on_or_after 223 if self.provider.audience != "": 224 audience_restriction = SubElement( 225 conditions, f"{{{NS_SAML_ASSERTION}}}AudienceRestriction" 226 ) 227 audience = SubElement(audience_restriction, f"{{{NS_SAML_ASSERTION}}}Audience") 228 audience.text = self.provider.audience 229 return conditions
Generate Conditions with AudienceRestriction and Audience Elements.
def
get_name_id(self) -> lxml.etree.Element:
231 def get_name_id(self) -> Element: 232 """Get NameID Element""" 233 name_id = Element(f"{{{NS_SAML_ASSERTION}}}NameID") 234 # For requests that don't specify a NameIDPolicy, check if we 235 # can fall back to the provider default 236 if ( 237 self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_UNSPECIFIED 238 and self.provider.default_name_id_policy != SAML_NAME_ID_FORMAT_UNSPECIFIED 239 ): 240 self.auth_n_request.name_id_policy = self.provider.default_name_id_policy 241 name_id.attrib["Format"] = self.auth_n_request.name_id_policy 242 self.name_id_format = self.auth_n_request.name_id_policy 243 # persistent is used as a fallback, so always generate it 244 persistent = self.http_request.user.uid 245 name_id.text = persistent 246 self.name_id = persistent 247 # If name_id_mapping is set, we override the value, regardless of what the SP asks for 248 if self.provider.name_id_mapping: 249 try: 250 value = self.provider.name_id_mapping.evaluate( 251 user=self.http_request.user, 252 request=self.http_request, 253 provider=self.provider, 254 ) 255 if value is not None: 256 name_id.text = str(value) 257 self.name_id = str(value) 258 return name_id 259 except PropertyMappingExpressionException as exc: 260 Event.new( 261 EventAction.CONFIGURATION_ERROR, 262 message=( 263 "Failed to evaluate property-mapping: " 264 f"'{self.provider.name_id_mapping.name}'", 265 ), 266 provider=self.provider, 267 mapping=self.provider.name_id_mapping, 268 ).from_http(self.http_request) 269 LOGGER.warning("Failed to evaluate property mapping", exc=exc) 270 return name_id 271 if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_EMAIL: 272 name_id.text = self.http_request.user.email 273 self.name_id = self.http_request.user.email 274 return name_id 275 if self.auth_n_request.name_id_policy in [ 276 SAML_NAME_ID_FORMAT_PERSISTENT, 277 SAML_NAME_ID_FORMAT_UNSPECIFIED, 278 ]: 279 name_id.text = persistent 280 self.name_id = persistent 281 return name_id 282 if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_X509: 283 # This attribute is statically set by the LDAP source 284 name_id.text = self.http_request.user.attributes.get( 285 LDAP_DISTINGUISHED_NAME, persistent 286 ) 287 self.name_id = name_id.text 288 return name_id 289 if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_WINDOWS: 290 # This attribute is statically set by the LDAP source 291 name_id.text = self.http_request.user.attributes.get("upn", persistent) 292 self.name_id = name_id.text 293 return name_id 294 if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_TRANSIENT: 295 # Use the hash of the user's session, which changes every session 296 session_key: str = self.http_request.session.session_key 297 name_id.text = sha256(session_key.encode()).hexdigest() 298 self.name_id = name_id.text 299 return name_id 300 raise UnsupportedNameIDFormat( 301 "Assertion contains NameID with unsupported " 302 f"format {self.auth_n_request.name_id_policy}." 303 )
Get NameID Element
def
get_assertion_subject(self) -> lxml.etree.Element:
305 def get_assertion_subject(self) -> Element: 306 """Generate Subject Element with NameID and SubjectConfirmation Objects""" 307 subject = Element(f"{{{NS_SAML_ASSERTION}}}Subject") 308 subject.append(self.get_name_id()) 309 310 subject_confirmation = SubElement(subject, f"{{{NS_SAML_ASSERTION}}}SubjectConfirmation") 311 subject_confirmation.attrib["Method"] = "urn:oasis:names:tc:SAML:2.0:cm:bearer" 312 313 subject_confirmation_data = SubElement( 314 subject_confirmation, f"{{{NS_SAML_ASSERTION}}}SubjectConfirmationData" 315 ) 316 if self.auth_n_request.id: 317 subject_confirmation_data.attrib["InResponseTo"] = self.auth_n_request.id 318 subject_confirmation_data.attrib["NotOnOrAfter"] = self._valid_not_on_or_after 319 subject_confirmation_data.attrib["Recipient"] = self.provider.acs_url 320 return subject
Generate Subject Element with NameID and SubjectConfirmation Objects
def
get_assertion(self) -> lxml.etree.Element:
322 def get_assertion(self) -> Element: 323 """Generate Main Assertion Element""" 324 assertion = Element(f"{{{NS_SAML_ASSERTION}}}Assertion", nsmap=NS_MAP) 325 assertion.attrib["Version"] = "2.0" 326 assertion.attrib["ID"] = self._assertion_id 327 assertion.attrib["IssueInstant"] = self._issue_instant 328 assertion.append(self.get_issuer()) 329 330 if self.provider.signing_kp and self.provider.sign_assertion: 331 sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get( 332 self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha1 333 ) 334 signature = xmlsec.template.create( 335 assertion, 336 xmlsec.constants.TransformExclC14N, 337 sign_algorithm_transform, 338 ns=xmlsec.constants.DSigNs, 339 ) 340 assertion.append(signature) 341 342 assertion.append(self.get_assertion_subject()) 343 assertion.append(self.get_assertion_conditions()) 344 assertion.append(self.get_assertion_auth_n_statement()) 345 346 assertion.append(self.get_attributes()) 347 return assertion
Generate Main Assertion Element
def
get_response(self) -> lxml.etree.Element:
349 def get_response(self) -> Element: 350 """Generate Root response element""" 351 response = Element(f"{{{NS_SAML_PROTOCOL}}}Response", nsmap=NS_MAP) 352 response.attrib["Version"] = "2.0" 353 response.attrib["IssueInstant"] = self._issue_instant 354 response.attrib["Destination"] = self.provider.acs_url 355 response.attrib["ID"] = self._response_id 356 if self.auth_n_request.id: 357 response.attrib["InResponseTo"] = self.auth_n_request.id 358 359 response.append(self.get_issuer()) 360 361 if self.provider.signing_kp and self.provider.sign_response: 362 sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get( 363 self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha1 364 ) 365 signature = xmlsec.template.create( 366 response, 367 xmlsec.constants.TransformExclC14N, 368 sign_algorithm_transform, 369 ns=xmlsec.constants.DSigNs, 370 ) 371 response.append(signature) 372 373 status = SubElement(response, f"{{{NS_SAML_PROTOCOL}}}Status") 374 status_code = SubElement(status, f"{{{NS_SAML_PROTOCOL}}}StatusCode") 375 status_code.attrib["Value"] = "urn:oasis:names:tc:SAML:2.0:status:Success" 376 377 response.append(self.get_assertion()) 378 return response
Generate Root response element
def
build_response(self) -> str:
451 def build_response(self) -> str: 452 """Build string XML Response and sign if signing is enabled.""" 453 root_response = self.get_response() 454 # Sign assertion first (before encryption) 455 if self.provider.signing_kp and self.provider.sign_assertion: 456 assertion = root_response.xpath("//saml:Assertion", namespaces=NS_MAP)[0] 457 self._sign(assertion) 458 # Encrypt assertion (this replaces Assertion with EncryptedAssertion) 459 if self.provider.encryption_kp: 460 assertion = root_response.xpath("//saml:Assertion", namespaces=NS_MAP)[0] 461 self._encrypt(assertion, root_response) 462 # Sign response AFTER encryption so signature covers the encrypted content 463 if self.provider.signing_kp and self.provider.sign_response: 464 response = root_response.xpath("//samlp:Response", namespaces=NS_MAP)[0] 465 self._sign(response) 466 return etree.tostring(root_response).decode("utf-8") # nosec
Build string XML Response and sign if signing is enabled.