authentik.providers.saml.processors.assertion
SAML Assertion generator
1"""SAML Assertion generator""" 2 3from datetime import datetime 4from hashlib import sha256 5from types import GeneratorType 6 7import xmlsec 8from django.http import HttpRequest 9from django.utils.timezone import now 10from lxml import etree # nosec 11from lxml.etree import Element, SubElement, _Element # nosec 12from structlog.stdlib import get_logger 13 14from authentik.common.saml.constants import ( 15 DIGEST_ALGORITHM_TRANSLATION_MAP, 16 NS_MAP, 17 NS_SAML_ASSERTION, 18 NS_SAML_PROTOCOL, 19 SAML_NAME_ID_FORMAT_EMAIL, 20 SAML_NAME_ID_FORMAT_PERSISTENT, 21 SAML_NAME_ID_FORMAT_TRANSIENT, 22 SAML_NAME_ID_FORMAT_UNSPECIFIED, 23 SAML_NAME_ID_FORMAT_WINDOWS, 24 SAML_NAME_ID_FORMAT_X509, 25 SIGN_ALGORITHM_TRANSFORM_MAP, 26) 27from authentik.core.expression.exceptions import PropertyMappingExpressionException 28from authentik.events.models import Event, EventAction 29from authentik.events.signals import get_login_event 30from authentik.lib.utils.time import timedelta_from_string 31from authentik.lib.xml import remove_xml_newlines 32from authentik.providers.saml.models import SAMLPropertyMapping, SAMLProvider 33from authentik.providers.saml.processors.authn_request_parser import AuthNRequest 34from authentik.providers.saml.utils import get_random_id 35from authentik.providers.saml.utils.time import get_time_string 36from authentik.sources.ldap.auth import LDAP_DISTINGUISHED_NAME 37from authentik.sources.saml.exceptions import ( 38 InvalidEncryption, 39 InvalidSignature, 40 UnsupportedNameIDFormat, 41) 42from authentik.stages.password.stage import PLAN_CONTEXT_METHOD, PLAN_CONTEXT_METHOD_ARGS 43 44LOGGER = get_logger() 45 46 47class AssertionProcessor: 48 """Generate a SAML Response from an AuthNRequest""" 49 50 provider: SAMLProvider 51 http_request: HttpRequest 52 auth_n_request: AuthNRequest 53 54 _issue_instant: str 55 _assertion_id: str 56 _response_id: str 57 58 _auth_instant: str 59 _valid_not_before: str 60 _session_not_on_or_after: str 61 _valid_not_on_or_after: str 62 63 session_index: str 64 name_id: str 65 name_id_format: str 66 session_not_on_or_after_datetime: datetime 67 68 def __init__(self, provider: SAMLProvider, request: HttpRequest, auth_n_request: AuthNRequest): 69 self.provider = provider 70 self.http_request = request 71 self.auth_n_request = auth_n_request 72 73 self._issue_instant = get_time_string() 74 self._assertion_id = get_random_id() 75 self._response_id = get_random_id() 76 77 _login_event = get_login_event(self.http_request) 78 _login_time = now() 79 if _login_event: 80 _login_time = _login_event.created 81 self._auth_instant = get_time_string(_login_time) 82 self._valid_not_before = get_time_string( 83 timedelta_from_string(self.provider.assertion_valid_not_before) 84 ) 85 self.session_not_on_or_after_datetime = now() + timedelta_from_string( 86 self.provider.session_valid_not_on_or_after 87 ) 88 self._session_not_on_or_after = get_time_string(self.session_not_on_or_after_datetime) 89 self._valid_not_on_or_after = get_time_string( 90 timedelta_from_string(self.provider.assertion_valid_not_on_or_after) 91 ) 92 93 def get_attributes(self) -> Element: 94 """Get AttributeStatement Element with Attributes from Property Mappings.""" 95 # https://commons.lbl.gov/display/IDMgmt/Attribute+Definitions 96 attribute_statement = Element(f"{{{NS_SAML_ASSERTION}}}AttributeStatement") 97 user = self.http_request.user 98 for mapping in SAMLPropertyMapping.objects.filter(provider=self.provider).order_by( 99 "saml_name" 100 ): 101 try: 102 mapping: SAMLPropertyMapping 103 value = mapping.evaluate( 104 user=user, 105 request=self.http_request, 106 provider=self.provider, 107 ) 108 if value is None: 109 continue 110 111 attribute = Element(f"{{{NS_SAML_ASSERTION}}}Attribute") 112 if mapping.friendly_name and mapping.friendly_name != "": 113 attribute.attrib["FriendlyName"] = mapping.friendly_name 114 attribute.attrib["Name"] = mapping.saml_name 115 116 if not isinstance(value, list | GeneratorType): 117 value = [value] 118 119 for value_item in value: 120 attribute_value = SubElement( 121 attribute, f"{{{NS_SAML_ASSERTION}}}AttributeValue" 122 ) 123 str_value = str(value_item) if not isinstance(value_item, str) else value_item 124 attribute_value.text = str_value 125 126 attribute_statement.append(attribute) 127 128 except (PropertyMappingExpressionException, ValueError) as exc: 129 # Value error can be raised when assigning invalid data to an attribute 130 Event.new( 131 EventAction.CONFIGURATION_ERROR, 132 message=f"Failed to evaluate property-mapping: '{mapping.name}'", 133 provider=self.provider, 134 mapping=mapping, 135 ).from_http(self.http_request) 136 LOGGER.warning("Failed to evaluate property mapping", exc=exc) 137 continue 138 return attribute_statement 139 140 def get_issuer(self) -> Element: 141 """Get Issuer Element""" 142 issuer = Element(f"{{{NS_SAML_ASSERTION}}}Issuer", nsmap=NS_MAP) 143 issuer.text = self.provider.issuer 144 return issuer 145 146 def get_assertion_auth_n_statement(self) -> Element: 147 """Generate AuthnStatement with AuthnContext and ContextClassRef Elements.""" 148 auth_n_statement = Element(f"{{{NS_SAML_ASSERTION}}}AuthnStatement") 149 auth_n_statement.attrib["AuthnInstant"] = self._auth_instant 150 self.session_index = sha256( 151 self.http_request.session.session_key.encode("ascii") 152 ).hexdigest() 153 auth_n_statement.attrib["SessionIndex"] = self.session_index 154 auth_n_statement.attrib["SessionNotOnOrAfter"] = self._session_not_on_or_after 155 156 auth_n_context = SubElement(auth_n_statement, f"{{{NS_SAML_ASSERTION}}}AuthnContext") 157 auth_n_context_class_ref = SubElement( 158 auth_n_context, f"{{{NS_SAML_ASSERTION}}}AuthnContextClassRef" 159 ) 160 auth_n_context_class_ref.text = "urn:oasis:names:tc:SAML:2.0:ac:classes:unspecified" 161 event = get_login_event(self.http_request) 162 if event: 163 method = event.context.get(PLAN_CONTEXT_METHOD, "") 164 method_args = event.context.get(PLAN_CONTEXT_METHOD_ARGS, {}) 165 if method == "password": 166 auth_n_context_class_ref.text = ( 167 "urn:oasis:names:tc:SAML:2.0:ac:classes:PasswordProtectedTransport" 168 ) 169 if "mfa_devices" in method_args: 170 auth_n_context_class_ref.text = ( 171 "urn:oasis:names:tc:SAML:2.0:ac:classes:MobileTwoFactorContract" 172 ) 173 if method in ["auth_mfa", "auth_webauthn_pwl"]: 174 auth_n_context_class_ref.text = ( 175 "urn:oasis:names:tc:SAML:2.0:ac:classes:MobileOneFactorContract" 176 ) 177 if self.provider.authn_context_class_ref_mapping: 178 try: 179 value = self.provider.authn_context_class_ref_mapping.evaluate( 180 user=self.http_request.user, 181 request=self.http_request, 182 provider=self.provider, 183 ) 184 if value is not None: 185 auth_n_context_class_ref.text = str(value) 186 return auth_n_statement 187 except PropertyMappingExpressionException as exc: 188 Event.new( 189 EventAction.CONFIGURATION_ERROR, 190 message=( 191 "Failed to evaluate property-mapping: " 192 f"'{self.provider.authn_context_class_ref_mapping.name}'" 193 ), 194 provider=self.provider, 195 mapping=self.provider.authn_context_class_ref_mapping, 196 ).from_http(self.http_request) 197 LOGGER.warning("Failed to evaluate property mapping", exc=exc) 198 return auth_n_statement 199 return auth_n_statement 200 201 def get_assertion_conditions(self) -> Element: 202 """Generate Conditions with AudienceRestriction and Audience Elements.""" 203 conditions = Element(f"{{{NS_SAML_ASSERTION}}}Conditions") 204 conditions.attrib["NotBefore"] = self._valid_not_before 205 conditions.attrib["NotOnOrAfter"] = self._valid_not_on_or_after 206 if self.provider.audience != "": 207 audience_restriction = SubElement( 208 conditions, f"{{{NS_SAML_ASSERTION}}}AudienceRestriction" 209 ) 210 audience = SubElement(audience_restriction, f"{{{NS_SAML_ASSERTION}}}Audience") 211 audience.text = self.provider.audience 212 return conditions 213 214 def get_name_id(self) -> Element: 215 """Get NameID Element""" 216 name_id = Element(f"{{{NS_SAML_ASSERTION}}}NameID") 217 # For requests that don't specify a NameIDPolicy, check if we 218 # can fall back to the provider default 219 if ( 220 self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_UNSPECIFIED 221 and self.provider.default_name_id_policy != SAML_NAME_ID_FORMAT_UNSPECIFIED 222 ): 223 self.auth_n_request.name_id_policy = self.provider.default_name_id_policy 224 name_id.attrib["Format"] = self.auth_n_request.name_id_policy 225 self.name_id_format = self.auth_n_request.name_id_policy 226 # persistent is used as a fallback, so always generate it 227 persistent = self.http_request.user.uid 228 name_id.text = persistent 229 self.name_id = persistent 230 # If name_id_mapping is set, we override the value, regardless of what the SP asks for 231 if self.provider.name_id_mapping: 232 try: 233 value = self.provider.name_id_mapping.evaluate( 234 user=self.http_request.user, 235 request=self.http_request, 236 provider=self.provider, 237 ) 238 if value is not None: 239 name_id.text = str(value) 240 self.name_id = str(value) 241 return name_id 242 except PropertyMappingExpressionException as exc: 243 Event.new( 244 EventAction.CONFIGURATION_ERROR, 245 message=( 246 "Failed to evaluate property-mapping: " 247 f"'{self.provider.name_id_mapping.name}'", 248 ), 249 provider=self.provider, 250 mapping=self.provider.name_id_mapping, 251 ).from_http(self.http_request) 252 LOGGER.warning("Failed to evaluate property mapping", exc=exc) 253 return name_id 254 if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_EMAIL: 255 name_id.text = self.http_request.user.email 256 self.name_id = self.http_request.user.email 257 return name_id 258 if self.auth_n_request.name_id_policy in [ 259 SAML_NAME_ID_FORMAT_PERSISTENT, 260 SAML_NAME_ID_FORMAT_UNSPECIFIED, 261 ]: 262 name_id.text = persistent 263 self.name_id = persistent 264 return name_id 265 if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_X509: 266 # This attribute is statically set by the LDAP source 267 name_id.text = self.http_request.user.attributes.get( 268 LDAP_DISTINGUISHED_NAME, persistent 269 ) 270 self.name_id = name_id.text 271 return name_id 272 if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_WINDOWS: 273 # This attribute is statically set by the LDAP source 274 name_id.text = self.http_request.user.attributes.get("upn", persistent) 275 self.name_id = name_id.text 276 return name_id 277 if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_TRANSIENT: 278 # Use the hash of the user's session, which changes every session 279 session_key: str = self.http_request.session.session_key 280 name_id.text = sha256(session_key.encode()).hexdigest() 281 self.name_id = name_id.text 282 return name_id 283 raise UnsupportedNameIDFormat( 284 "Assertion contains NameID with unsupported " 285 f"format {self.auth_n_request.name_id_policy}." 286 ) 287 288 def get_assertion_subject(self) -> Element: 289 """Generate Subject Element with NameID and SubjectConfirmation Objects""" 290 subject = Element(f"{{{NS_SAML_ASSERTION}}}Subject") 291 subject.append(self.get_name_id()) 292 293 subject_confirmation = SubElement(subject, f"{{{NS_SAML_ASSERTION}}}SubjectConfirmation") 294 subject_confirmation.attrib["Method"] = "urn:oasis:names:tc:SAML:2.0:cm:bearer" 295 296 subject_confirmation_data = SubElement( 297 subject_confirmation, f"{{{NS_SAML_ASSERTION}}}SubjectConfirmationData" 298 ) 299 if self.auth_n_request.id: 300 subject_confirmation_data.attrib["InResponseTo"] = self.auth_n_request.id 301 subject_confirmation_data.attrib["NotOnOrAfter"] = self._valid_not_on_or_after 302 subject_confirmation_data.attrib["Recipient"] = self.provider.acs_url 303 return subject 304 305 def get_assertion(self) -> Element: 306 """Generate Main Assertion Element""" 307 assertion = Element(f"{{{NS_SAML_ASSERTION}}}Assertion", nsmap=NS_MAP) 308 assertion.attrib["Version"] = "2.0" 309 assertion.attrib["ID"] = self._assertion_id 310 assertion.attrib["IssueInstant"] = self._issue_instant 311 assertion.append(self.get_issuer()) 312 313 if self.provider.signing_kp and self.provider.sign_assertion: 314 sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get( 315 self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha1 316 ) 317 signature = xmlsec.template.create( 318 assertion, 319 xmlsec.constants.TransformExclC14N, 320 sign_algorithm_transform, 321 ns=xmlsec.constants.DSigNs, 322 ) 323 assertion.append(signature) 324 325 assertion.append(self.get_assertion_subject()) 326 assertion.append(self.get_assertion_conditions()) 327 assertion.append(self.get_assertion_auth_n_statement()) 328 329 assertion.append(self.get_attributes()) 330 return assertion 331 332 def get_response(self) -> Element: 333 """Generate Root response element""" 334 response = Element(f"{{{NS_SAML_PROTOCOL}}}Response", nsmap=NS_MAP) 335 response.attrib["Version"] = "2.0" 336 response.attrib["IssueInstant"] = self._issue_instant 337 response.attrib["Destination"] = self.provider.acs_url 338 response.attrib["ID"] = self._response_id 339 if self.auth_n_request.id: 340 response.attrib["InResponseTo"] = self.auth_n_request.id 341 342 response.append(self.get_issuer()) 343 344 if self.provider.signing_kp and self.provider.sign_response: 345 sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get( 346 self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha1 347 ) 348 signature = xmlsec.template.create( 349 response, 350 xmlsec.constants.TransformExclC14N, 351 sign_algorithm_transform, 352 ns=xmlsec.constants.DSigNs, 353 ) 354 response.append(signature) 355 356 status = SubElement(response, f"{{{NS_SAML_PROTOCOL}}}Status") 357 status_code = SubElement(status, f"{{{NS_SAML_PROTOCOL}}}StatusCode") 358 status_code.attrib["Value"] = "urn:oasis:names:tc:SAML:2.0:status:Success" 359 360 response.append(self.get_assertion()) 361 return response 362 363 def _sign(self, element: _Element): 364 """Sign an XML element based on the providers' configured signing settings""" 365 digest_algorithm_transform = DIGEST_ALGORITHM_TRANSLATION_MAP.get( 366 self.provider.digest_algorithm, xmlsec.constants.TransformSha1 367 ) 368 xmlsec.tree.add_ids(element, ["ID"]) 369 signature_node = xmlsec.tree.find_node(element, xmlsec.constants.NodeSignature) 370 ref = xmlsec.template.add_reference( 371 signature_node, 372 digest_algorithm_transform, 373 uri="#" + element.attrib["ID"], 374 ) 375 xmlsec.template.add_transform(ref, xmlsec.constants.TransformEnveloped) 376 xmlsec.template.add_transform(ref, xmlsec.constants.TransformExclC14N) 377 key_info = xmlsec.template.ensure_key_info(signature_node) 378 xmlsec.template.add_x509_data(key_info) 379 380 ctx = xmlsec.SignatureContext() 381 382 key = xmlsec.Key.from_memory( 383 self.provider.signing_kp.key_data, 384 xmlsec.constants.KeyDataFormatPem, 385 None, 386 ) 387 key.load_cert_from_memory( 388 self.provider.signing_kp.certificate_data, 389 xmlsec.constants.KeyDataFormatCertPem, 390 ) 391 ctx.key = key 392 try: 393 ctx.sign(remove_xml_newlines(element, signature_node)) 394 except xmlsec.Error as exc: 395 raise InvalidSignature() from exc 396 397 def _encrypt(self, element: _Element, parent: _Element): 398 """Encrypt SAMLResponse EncryptedAssertion Element""" 399 # Create a standalone copy so namespace declarations are included in the encrypted content 400 element_xml = etree.tostring(element) 401 standalone_element = etree.fromstring(element_xml) 402 403 # Remove the original element from the tree since we're replacing it with encrypted version 404 parent.remove(element) 405 406 manager = xmlsec.KeysManager() 407 key = xmlsec.Key.from_memory( 408 self.provider.encryption_kp.certificate_data, 409 xmlsec.constants.KeyDataFormatCertPem, 410 ) 411 412 manager.add_key(key) 413 encryption_context = xmlsec.EncryptionContext(manager) 414 encryption_context.key = xmlsec.Key.generate( 415 xmlsec.constants.KeyDataAes, 128, xmlsec.constants.KeyDataTypeSession 416 ) 417 418 container = SubElement(parent, f"{{{NS_SAML_ASSERTION}}}EncryptedAssertion") 419 enc_data = xmlsec.template.encrypted_data_create( 420 container, xmlsec.Transform.AES128, type=xmlsec.EncryptionType.ELEMENT, ns="xenc" 421 ) 422 xmlsec.template.encrypted_data_ensure_cipher_value(enc_data) 423 key_info = xmlsec.template.encrypted_data_ensure_key_info(enc_data, ns="ds") 424 enc_key = xmlsec.template.add_encrypted_key(key_info, xmlsec.Transform.RSA_OAEP) 425 xmlsec.template.encrypted_data_ensure_cipher_value(enc_key) 426 427 try: 428 enc_data = encryption_context.encrypt_xml(enc_data, standalone_element) 429 except xmlsec.Error as exc: 430 raise InvalidEncryption() from exc 431 432 container.append(enc_data) 433 434 def build_response(self) -> str: 435 """Build string XML Response and sign if signing is enabled.""" 436 root_response = self.get_response() 437 # Sign assertion first (before encryption) 438 if self.provider.signing_kp and self.provider.sign_assertion: 439 assertion = root_response.xpath("//saml:Assertion", namespaces=NS_MAP)[0] 440 self._sign(assertion) 441 # Encrypt assertion (this replaces Assertion with EncryptedAssertion) 442 if self.provider.encryption_kp: 443 assertion = root_response.xpath("//saml:Assertion", namespaces=NS_MAP)[0] 444 self._encrypt(assertion, root_response) 445 # Sign response AFTER encryption so signature covers the encrypted content 446 if self.provider.signing_kp and self.provider.sign_response: 447 response = root_response.xpath("//samlp:Response", namespaces=NS_MAP)[0] 448 self._sign(response) 449 return etree.tostring(root_response).decode("utf-8") # nosec
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
class
AssertionProcessor:
48class AssertionProcessor: 49 """Generate a SAML Response from an AuthNRequest""" 50 51 provider: SAMLProvider 52 http_request: HttpRequest 53 auth_n_request: AuthNRequest 54 55 _issue_instant: str 56 _assertion_id: str 57 _response_id: str 58 59 _auth_instant: str 60 _valid_not_before: str 61 _session_not_on_or_after: str 62 _valid_not_on_or_after: str 63 64 session_index: str 65 name_id: str 66 name_id_format: str 67 session_not_on_or_after_datetime: datetime 68 69 def __init__(self, provider: SAMLProvider, request: HttpRequest, auth_n_request: AuthNRequest): 70 self.provider = provider 71 self.http_request = request 72 self.auth_n_request = auth_n_request 73 74 self._issue_instant = get_time_string() 75 self._assertion_id = get_random_id() 76 self._response_id = get_random_id() 77 78 _login_event = get_login_event(self.http_request) 79 _login_time = now() 80 if _login_event: 81 _login_time = _login_event.created 82 self._auth_instant = get_time_string(_login_time) 83 self._valid_not_before = get_time_string( 84 timedelta_from_string(self.provider.assertion_valid_not_before) 85 ) 86 self.session_not_on_or_after_datetime = now() + timedelta_from_string( 87 self.provider.session_valid_not_on_or_after 88 ) 89 self._session_not_on_or_after = get_time_string(self.session_not_on_or_after_datetime) 90 self._valid_not_on_or_after = get_time_string( 91 timedelta_from_string(self.provider.assertion_valid_not_on_or_after) 92 ) 93 94 def get_attributes(self) -> Element: 95 """Get AttributeStatement Element with Attributes from Property Mappings.""" 96 # https://commons.lbl.gov/display/IDMgmt/Attribute+Definitions 97 attribute_statement = Element(f"{{{NS_SAML_ASSERTION}}}AttributeStatement") 98 user = self.http_request.user 99 for mapping in SAMLPropertyMapping.objects.filter(provider=self.provider).order_by( 100 "saml_name" 101 ): 102 try: 103 mapping: SAMLPropertyMapping 104 value = mapping.evaluate( 105 user=user, 106 request=self.http_request, 107 provider=self.provider, 108 ) 109 if value is None: 110 continue 111 112 attribute = Element(f"{{{NS_SAML_ASSERTION}}}Attribute") 113 if mapping.friendly_name and mapping.friendly_name != "": 114 attribute.attrib["FriendlyName"] = mapping.friendly_name 115 attribute.attrib["Name"] = mapping.saml_name 116 117 if not isinstance(value, list | GeneratorType): 118 value = [value] 119 120 for value_item in value: 121 attribute_value = SubElement( 122 attribute, f"{{{NS_SAML_ASSERTION}}}AttributeValue" 123 ) 124 str_value = str(value_item) if not isinstance(value_item, str) else value_item 125 attribute_value.text = str_value 126 127 attribute_statement.append(attribute) 128 129 except (PropertyMappingExpressionException, ValueError) as exc: 130 # Value error can be raised when assigning invalid data to an attribute 131 Event.new( 132 EventAction.CONFIGURATION_ERROR, 133 message=f"Failed to evaluate property-mapping: '{mapping.name}'", 134 provider=self.provider, 135 mapping=mapping, 136 ).from_http(self.http_request) 137 LOGGER.warning("Failed to evaluate property mapping", exc=exc) 138 continue 139 return attribute_statement 140 141 def get_issuer(self) -> Element: 142 """Get Issuer Element""" 143 issuer = Element(f"{{{NS_SAML_ASSERTION}}}Issuer", nsmap=NS_MAP) 144 issuer.text = self.provider.issuer 145 return issuer 146 147 def get_assertion_auth_n_statement(self) -> Element: 148 """Generate AuthnStatement with AuthnContext and ContextClassRef Elements.""" 149 auth_n_statement = Element(f"{{{NS_SAML_ASSERTION}}}AuthnStatement") 150 auth_n_statement.attrib["AuthnInstant"] = self._auth_instant 151 self.session_index = sha256( 152 self.http_request.session.session_key.encode("ascii") 153 ).hexdigest() 154 auth_n_statement.attrib["SessionIndex"] = self.session_index 155 auth_n_statement.attrib["SessionNotOnOrAfter"] = self._session_not_on_or_after 156 157 auth_n_context = SubElement(auth_n_statement, f"{{{NS_SAML_ASSERTION}}}AuthnContext") 158 auth_n_context_class_ref = SubElement( 159 auth_n_context, f"{{{NS_SAML_ASSERTION}}}AuthnContextClassRef" 160 ) 161 auth_n_context_class_ref.text = "urn:oasis:names:tc:SAML:2.0:ac:classes:unspecified" 162 event = get_login_event(self.http_request) 163 if event: 164 method = event.context.get(PLAN_CONTEXT_METHOD, "") 165 method_args = event.context.get(PLAN_CONTEXT_METHOD_ARGS, {}) 166 if method == "password": 167 auth_n_context_class_ref.text = ( 168 "urn:oasis:names:tc:SAML:2.0:ac:classes:PasswordProtectedTransport" 169 ) 170 if "mfa_devices" in method_args: 171 auth_n_context_class_ref.text = ( 172 "urn:oasis:names:tc:SAML:2.0:ac:classes:MobileTwoFactorContract" 173 ) 174 if method in ["auth_mfa", "auth_webauthn_pwl"]: 175 auth_n_context_class_ref.text = ( 176 "urn:oasis:names:tc:SAML:2.0:ac:classes:MobileOneFactorContract" 177 ) 178 if self.provider.authn_context_class_ref_mapping: 179 try: 180 value = self.provider.authn_context_class_ref_mapping.evaluate( 181 user=self.http_request.user, 182 request=self.http_request, 183 provider=self.provider, 184 ) 185 if value is not None: 186 auth_n_context_class_ref.text = str(value) 187 return auth_n_statement 188 except PropertyMappingExpressionException as exc: 189 Event.new( 190 EventAction.CONFIGURATION_ERROR, 191 message=( 192 "Failed to evaluate property-mapping: " 193 f"'{self.provider.authn_context_class_ref_mapping.name}'" 194 ), 195 provider=self.provider, 196 mapping=self.provider.authn_context_class_ref_mapping, 197 ).from_http(self.http_request) 198 LOGGER.warning("Failed to evaluate property mapping", exc=exc) 199 return auth_n_statement 200 return auth_n_statement 201 202 def get_assertion_conditions(self) -> Element: 203 """Generate Conditions with AudienceRestriction and Audience Elements.""" 204 conditions = Element(f"{{{NS_SAML_ASSERTION}}}Conditions") 205 conditions.attrib["NotBefore"] = self._valid_not_before 206 conditions.attrib["NotOnOrAfter"] = self._valid_not_on_or_after 207 if self.provider.audience != "": 208 audience_restriction = SubElement( 209 conditions, f"{{{NS_SAML_ASSERTION}}}AudienceRestriction" 210 ) 211 audience = SubElement(audience_restriction, f"{{{NS_SAML_ASSERTION}}}Audience") 212 audience.text = self.provider.audience 213 return conditions 214 215 def get_name_id(self) -> Element: 216 """Get NameID Element""" 217 name_id = Element(f"{{{NS_SAML_ASSERTION}}}NameID") 218 # For requests that don't specify a NameIDPolicy, check if we 219 # can fall back to the provider default 220 if ( 221 self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_UNSPECIFIED 222 and self.provider.default_name_id_policy != SAML_NAME_ID_FORMAT_UNSPECIFIED 223 ): 224 self.auth_n_request.name_id_policy = self.provider.default_name_id_policy 225 name_id.attrib["Format"] = self.auth_n_request.name_id_policy 226 self.name_id_format = self.auth_n_request.name_id_policy 227 # persistent is used as a fallback, so always generate it 228 persistent = self.http_request.user.uid 229 name_id.text = persistent 230 self.name_id = persistent 231 # If name_id_mapping is set, we override the value, regardless of what the SP asks for 232 if self.provider.name_id_mapping: 233 try: 234 value = self.provider.name_id_mapping.evaluate( 235 user=self.http_request.user, 236 request=self.http_request, 237 provider=self.provider, 238 ) 239 if value is not None: 240 name_id.text = str(value) 241 self.name_id = str(value) 242 return name_id 243 except PropertyMappingExpressionException as exc: 244 Event.new( 245 EventAction.CONFIGURATION_ERROR, 246 message=( 247 "Failed to evaluate property-mapping: " 248 f"'{self.provider.name_id_mapping.name}'", 249 ), 250 provider=self.provider, 251 mapping=self.provider.name_id_mapping, 252 ).from_http(self.http_request) 253 LOGGER.warning("Failed to evaluate property mapping", exc=exc) 254 return name_id 255 if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_EMAIL: 256 name_id.text = self.http_request.user.email 257 self.name_id = self.http_request.user.email 258 return name_id 259 if self.auth_n_request.name_id_policy in [ 260 SAML_NAME_ID_FORMAT_PERSISTENT, 261 SAML_NAME_ID_FORMAT_UNSPECIFIED, 262 ]: 263 name_id.text = persistent 264 self.name_id = persistent 265 return name_id 266 if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_X509: 267 # This attribute is statically set by the LDAP source 268 name_id.text = self.http_request.user.attributes.get( 269 LDAP_DISTINGUISHED_NAME, persistent 270 ) 271 self.name_id = name_id.text 272 return name_id 273 if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_WINDOWS: 274 # This attribute is statically set by the LDAP source 275 name_id.text = self.http_request.user.attributes.get("upn", persistent) 276 self.name_id = name_id.text 277 return name_id 278 if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_TRANSIENT: 279 # Use the hash of the user's session, which changes every session 280 session_key: str = self.http_request.session.session_key 281 name_id.text = sha256(session_key.encode()).hexdigest() 282 self.name_id = name_id.text 283 return name_id 284 raise UnsupportedNameIDFormat( 285 "Assertion contains NameID with unsupported " 286 f"format {self.auth_n_request.name_id_policy}." 287 ) 288 289 def get_assertion_subject(self) -> Element: 290 """Generate Subject Element with NameID and SubjectConfirmation Objects""" 291 subject = Element(f"{{{NS_SAML_ASSERTION}}}Subject") 292 subject.append(self.get_name_id()) 293 294 subject_confirmation = SubElement(subject, f"{{{NS_SAML_ASSERTION}}}SubjectConfirmation") 295 subject_confirmation.attrib["Method"] = "urn:oasis:names:tc:SAML:2.0:cm:bearer" 296 297 subject_confirmation_data = SubElement( 298 subject_confirmation, f"{{{NS_SAML_ASSERTION}}}SubjectConfirmationData" 299 ) 300 if self.auth_n_request.id: 301 subject_confirmation_data.attrib["InResponseTo"] = self.auth_n_request.id 302 subject_confirmation_data.attrib["NotOnOrAfter"] = self._valid_not_on_or_after 303 subject_confirmation_data.attrib["Recipient"] = self.provider.acs_url 304 return subject 305 306 def get_assertion(self) -> Element: 307 """Generate Main Assertion Element""" 308 assertion = Element(f"{{{NS_SAML_ASSERTION}}}Assertion", nsmap=NS_MAP) 309 assertion.attrib["Version"] = "2.0" 310 assertion.attrib["ID"] = self._assertion_id 311 assertion.attrib["IssueInstant"] = self._issue_instant 312 assertion.append(self.get_issuer()) 313 314 if self.provider.signing_kp and self.provider.sign_assertion: 315 sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get( 316 self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha1 317 ) 318 signature = xmlsec.template.create( 319 assertion, 320 xmlsec.constants.TransformExclC14N, 321 sign_algorithm_transform, 322 ns=xmlsec.constants.DSigNs, 323 ) 324 assertion.append(signature) 325 326 assertion.append(self.get_assertion_subject()) 327 assertion.append(self.get_assertion_conditions()) 328 assertion.append(self.get_assertion_auth_n_statement()) 329 330 assertion.append(self.get_attributes()) 331 return assertion 332 333 def get_response(self) -> Element: 334 """Generate Root response element""" 335 response = Element(f"{{{NS_SAML_PROTOCOL}}}Response", nsmap=NS_MAP) 336 response.attrib["Version"] = "2.0" 337 response.attrib["IssueInstant"] = self._issue_instant 338 response.attrib["Destination"] = self.provider.acs_url 339 response.attrib["ID"] = self._response_id 340 if self.auth_n_request.id: 341 response.attrib["InResponseTo"] = self.auth_n_request.id 342 343 response.append(self.get_issuer()) 344 345 if self.provider.signing_kp and self.provider.sign_response: 346 sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get( 347 self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha1 348 ) 349 signature = xmlsec.template.create( 350 response, 351 xmlsec.constants.TransformExclC14N, 352 sign_algorithm_transform, 353 ns=xmlsec.constants.DSigNs, 354 ) 355 response.append(signature) 356 357 status = SubElement(response, f"{{{NS_SAML_PROTOCOL}}}Status") 358 status_code = SubElement(status, f"{{{NS_SAML_PROTOCOL}}}StatusCode") 359 status_code.attrib["Value"] = "urn:oasis:names:tc:SAML:2.0:status:Success" 360 361 response.append(self.get_assertion()) 362 return response 363 364 def _sign(self, element: _Element): 365 """Sign an XML element based on the providers' configured signing settings""" 366 digest_algorithm_transform = DIGEST_ALGORITHM_TRANSLATION_MAP.get( 367 self.provider.digest_algorithm, xmlsec.constants.TransformSha1 368 ) 369 xmlsec.tree.add_ids(element, ["ID"]) 370 signature_node = xmlsec.tree.find_node(element, xmlsec.constants.NodeSignature) 371 ref = xmlsec.template.add_reference( 372 signature_node, 373 digest_algorithm_transform, 374 uri="#" + element.attrib["ID"], 375 ) 376 xmlsec.template.add_transform(ref, xmlsec.constants.TransformEnveloped) 377 xmlsec.template.add_transform(ref, xmlsec.constants.TransformExclC14N) 378 key_info = xmlsec.template.ensure_key_info(signature_node) 379 xmlsec.template.add_x509_data(key_info) 380 381 ctx = xmlsec.SignatureContext() 382 383 key = xmlsec.Key.from_memory( 384 self.provider.signing_kp.key_data, 385 xmlsec.constants.KeyDataFormatPem, 386 None, 387 ) 388 key.load_cert_from_memory( 389 self.provider.signing_kp.certificate_data, 390 xmlsec.constants.KeyDataFormatCertPem, 391 ) 392 ctx.key = key 393 try: 394 ctx.sign(remove_xml_newlines(element, signature_node)) 395 except xmlsec.Error as exc: 396 raise InvalidSignature() from exc 397 398 def _encrypt(self, element: _Element, parent: _Element): 399 """Encrypt SAMLResponse EncryptedAssertion Element""" 400 # Create a standalone copy so namespace declarations are included in the encrypted content 401 element_xml = etree.tostring(element) 402 standalone_element = etree.fromstring(element_xml) 403 404 # Remove the original element from the tree since we're replacing it with encrypted version 405 parent.remove(element) 406 407 manager = xmlsec.KeysManager() 408 key = xmlsec.Key.from_memory( 409 self.provider.encryption_kp.certificate_data, 410 xmlsec.constants.KeyDataFormatCertPem, 411 ) 412 413 manager.add_key(key) 414 encryption_context = xmlsec.EncryptionContext(manager) 415 encryption_context.key = xmlsec.Key.generate( 416 xmlsec.constants.KeyDataAes, 128, xmlsec.constants.KeyDataTypeSession 417 ) 418 419 container = SubElement(parent, f"{{{NS_SAML_ASSERTION}}}EncryptedAssertion") 420 enc_data = xmlsec.template.encrypted_data_create( 421 container, xmlsec.Transform.AES128, type=xmlsec.EncryptionType.ELEMENT, ns="xenc" 422 ) 423 xmlsec.template.encrypted_data_ensure_cipher_value(enc_data) 424 key_info = xmlsec.template.encrypted_data_ensure_key_info(enc_data, ns="ds") 425 enc_key = xmlsec.template.add_encrypted_key(key_info, xmlsec.Transform.RSA_OAEP) 426 xmlsec.template.encrypted_data_ensure_cipher_value(enc_key) 427 428 try: 429 enc_data = encryption_context.encrypt_xml(enc_data, standalone_element) 430 except xmlsec.Error as exc: 431 raise InvalidEncryption() from exc 432 433 container.append(enc_data) 434 435 def build_response(self) -> str: 436 """Build string XML Response and sign if signing is enabled.""" 437 root_response = self.get_response() 438 # Sign assertion first (before encryption) 439 if self.provider.signing_kp and self.provider.sign_assertion: 440 assertion = root_response.xpath("//saml:Assertion", namespaces=NS_MAP)[0] 441 self._sign(assertion) 442 # Encrypt assertion (this replaces Assertion with EncryptedAssertion) 443 if self.provider.encryption_kp: 444 assertion = root_response.xpath("//saml:Assertion", namespaces=NS_MAP)[0] 445 self._encrypt(assertion, root_response) 446 # Sign response AFTER encryption so signature covers the encrypted content 447 if self.provider.signing_kp and self.provider.sign_response: 448 response = root_response.xpath("//samlp:Response", namespaces=NS_MAP)[0] 449 self._sign(response) 450 return etree.tostring(root_response).decode("utf-8") # nosec
Generate a SAML Response from an AuthNRequest
AssertionProcessor( provider: authentik.providers.saml.models.SAMLProvider, request: django.http.request.HttpRequest, auth_n_request: authentik.providers.saml.processors.authn_request_parser.AuthNRequest)
69 def __init__(self, provider: SAMLProvider, request: HttpRequest, auth_n_request: AuthNRequest): 70 self.provider = provider 71 self.http_request = request 72 self.auth_n_request = auth_n_request 73 74 self._issue_instant = get_time_string() 75 self._assertion_id = get_random_id() 76 self._response_id = get_random_id() 77 78 _login_event = get_login_event(self.http_request) 79 _login_time = now() 80 if _login_event: 81 _login_time = _login_event.created 82 self._auth_instant = get_time_string(_login_time) 83 self._valid_not_before = get_time_string( 84 timedelta_from_string(self.provider.assertion_valid_not_before) 85 ) 86 self.session_not_on_or_after_datetime = now() + timedelta_from_string( 87 self.provider.session_valid_not_on_or_after 88 ) 89 self._session_not_on_or_after = get_time_string(self.session_not_on_or_after_datetime) 90 self._valid_not_on_or_after = get_time_string( 91 timedelta_from_string(self.provider.assertion_valid_not_on_or_after) 92 )
def
get_attributes(self) -> lxml.etree.Element:
94 def get_attributes(self) -> Element: 95 """Get AttributeStatement Element with Attributes from Property Mappings.""" 96 # https://commons.lbl.gov/display/IDMgmt/Attribute+Definitions 97 attribute_statement = Element(f"{{{NS_SAML_ASSERTION}}}AttributeStatement") 98 user = self.http_request.user 99 for mapping in SAMLPropertyMapping.objects.filter(provider=self.provider).order_by( 100 "saml_name" 101 ): 102 try: 103 mapping: SAMLPropertyMapping 104 value = mapping.evaluate( 105 user=user, 106 request=self.http_request, 107 provider=self.provider, 108 ) 109 if value is None: 110 continue 111 112 attribute = Element(f"{{{NS_SAML_ASSERTION}}}Attribute") 113 if mapping.friendly_name and mapping.friendly_name != "": 114 attribute.attrib["FriendlyName"] = mapping.friendly_name 115 attribute.attrib["Name"] = mapping.saml_name 116 117 if not isinstance(value, list | GeneratorType): 118 value = [value] 119 120 for value_item in value: 121 attribute_value = SubElement( 122 attribute, f"{{{NS_SAML_ASSERTION}}}AttributeValue" 123 ) 124 str_value = str(value_item) if not isinstance(value_item, str) else value_item 125 attribute_value.text = str_value 126 127 attribute_statement.append(attribute) 128 129 except (PropertyMappingExpressionException, ValueError) as exc: 130 # Value error can be raised when assigning invalid data to an attribute 131 Event.new( 132 EventAction.CONFIGURATION_ERROR, 133 message=f"Failed to evaluate property-mapping: '{mapping.name}'", 134 provider=self.provider, 135 mapping=mapping, 136 ).from_http(self.http_request) 137 LOGGER.warning("Failed to evaluate property mapping", exc=exc) 138 continue 139 return attribute_statement
Get AttributeStatement Element with Attributes from Property Mappings.
def
get_issuer(self) -> lxml.etree.Element:
141 def get_issuer(self) -> Element: 142 """Get Issuer Element""" 143 issuer = Element(f"{{{NS_SAML_ASSERTION}}}Issuer", nsmap=NS_MAP) 144 issuer.text = self.provider.issuer 145 return issuer
Get Issuer Element
def
get_assertion_auth_n_statement(self) -> lxml.etree.Element:
147 def get_assertion_auth_n_statement(self) -> Element: 148 """Generate AuthnStatement with AuthnContext and ContextClassRef Elements.""" 149 auth_n_statement = Element(f"{{{NS_SAML_ASSERTION}}}AuthnStatement") 150 auth_n_statement.attrib["AuthnInstant"] = self._auth_instant 151 self.session_index = sha256( 152 self.http_request.session.session_key.encode("ascii") 153 ).hexdigest() 154 auth_n_statement.attrib["SessionIndex"] = self.session_index 155 auth_n_statement.attrib["SessionNotOnOrAfter"] = self._session_not_on_or_after 156 157 auth_n_context = SubElement(auth_n_statement, f"{{{NS_SAML_ASSERTION}}}AuthnContext") 158 auth_n_context_class_ref = SubElement( 159 auth_n_context, f"{{{NS_SAML_ASSERTION}}}AuthnContextClassRef" 160 ) 161 auth_n_context_class_ref.text = "urn:oasis:names:tc:SAML:2.0:ac:classes:unspecified" 162 event = get_login_event(self.http_request) 163 if event: 164 method = event.context.get(PLAN_CONTEXT_METHOD, "") 165 method_args = event.context.get(PLAN_CONTEXT_METHOD_ARGS, {}) 166 if method == "password": 167 auth_n_context_class_ref.text = ( 168 "urn:oasis:names:tc:SAML:2.0:ac:classes:PasswordProtectedTransport" 169 ) 170 if "mfa_devices" in method_args: 171 auth_n_context_class_ref.text = ( 172 "urn:oasis:names:tc:SAML:2.0:ac:classes:MobileTwoFactorContract" 173 ) 174 if method in ["auth_mfa", "auth_webauthn_pwl"]: 175 auth_n_context_class_ref.text = ( 176 "urn:oasis:names:tc:SAML:2.0:ac:classes:MobileOneFactorContract" 177 ) 178 if self.provider.authn_context_class_ref_mapping: 179 try: 180 value = self.provider.authn_context_class_ref_mapping.evaluate( 181 user=self.http_request.user, 182 request=self.http_request, 183 provider=self.provider, 184 ) 185 if value is not None: 186 auth_n_context_class_ref.text = str(value) 187 return auth_n_statement 188 except PropertyMappingExpressionException as exc: 189 Event.new( 190 EventAction.CONFIGURATION_ERROR, 191 message=( 192 "Failed to evaluate property-mapping: " 193 f"'{self.provider.authn_context_class_ref_mapping.name}'" 194 ), 195 provider=self.provider, 196 mapping=self.provider.authn_context_class_ref_mapping, 197 ).from_http(self.http_request) 198 LOGGER.warning("Failed to evaluate property mapping", exc=exc) 199 return auth_n_statement 200 return auth_n_statement
Generate AuthnStatement with AuthnContext and ContextClassRef Elements.
def
get_assertion_conditions(self) -> lxml.etree.Element:
202 def get_assertion_conditions(self) -> Element: 203 """Generate Conditions with AudienceRestriction and Audience Elements.""" 204 conditions = Element(f"{{{NS_SAML_ASSERTION}}}Conditions") 205 conditions.attrib["NotBefore"] = self._valid_not_before 206 conditions.attrib["NotOnOrAfter"] = self._valid_not_on_or_after 207 if self.provider.audience != "": 208 audience_restriction = SubElement( 209 conditions, f"{{{NS_SAML_ASSERTION}}}AudienceRestriction" 210 ) 211 audience = SubElement(audience_restriction, f"{{{NS_SAML_ASSERTION}}}Audience") 212 audience.text = self.provider.audience 213 return conditions
Generate Conditions with AudienceRestriction and Audience Elements.
def
get_name_id(self) -> lxml.etree.Element:
215 def get_name_id(self) -> Element: 216 """Get NameID Element""" 217 name_id = Element(f"{{{NS_SAML_ASSERTION}}}NameID") 218 # For requests that don't specify a NameIDPolicy, check if we 219 # can fall back to the provider default 220 if ( 221 self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_UNSPECIFIED 222 and self.provider.default_name_id_policy != SAML_NAME_ID_FORMAT_UNSPECIFIED 223 ): 224 self.auth_n_request.name_id_policy = self.provider.default_name_id_policy 225 name_id.attrib["Format"] = self.auth_n_request.name_id_policy 226 self.name_id_format = self.auth_n_request.name_id_policy 227 # persistent is used as a fallback, so always generate it 228 persistent = self.http_request.user.uid 229 name_id.text = persistent 230 self.name_id = persistent 231 # If name_id_mapping is set, we override the value, regardless of what the SP asks for 232 if self.provider.name_id_mapping: 233 try: 234 value = self.provider.name_id_mapping.evaluate( 235 user=self.http_request.user, 236 request=self.http_request, 237 provider=self.provider, 238 ) 239 if value is not None: 240 name_id.text = str(value) 241 self.name_id = str(value) 242 return name_id 243 except PropertyMappingExpressionException as exc: 244 Event.new( 245 EventAction.CONFIGURATION_ERROR, 246 message=( 247 "Failed to evaluate property-mapping: " 248 f"'{self.provider.name_id_mapping.name}'", 249 ), 250 provider=self.provider, 251 mapping=self.provider.name_id_mapping, 252 ).from_http(self.http_request) 253 LOGGER.warning("Failed to evaluate property mapping", exc=exc) 254 return name_id 255 if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_EMAIL: 256 name_id.text = self.http_request.user.email 257 self.name_id = self.http_request.user.email 258 return name_id 259 if self.auth_n_request.name_id_policy in [ 260 SAML_NAME_ID_FORMAT_PERSISTENT, 261 SAML_NAME_ID_FORMAT_UNSPECIFIED, 262 ]: 263 name_id.text = persistent 264 self.name_id = persistent 265 return name_id 266 if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_X509: 267 # This attribute is statically set by the LDAP source 268 name_id.text = self.http_request.user.attributes.get( 269 LDAP_DISTINGUISHED_NAME, persistent 270 ) 271 self.name_id = name_id.text 272 return name_id 273 if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_WINDOWS: 274 # This attribute is statically set by the LDAP source 275 name_id.text = self.http_request.user.attributes.get("upn", persistent) 276 self.name_id = name_id.text 277 return name_id 278 if self.auth_n_request.name_id_policy == SAML_NAME_ID_FORMAT_TRANSIENT: 279 # Use the hash of the user's session, which changes every session 280 session_key: str = self.http_request.session.session_key 281 name_id.text = sha256(session_key.encode()).hexdigest() 282 self.name_id = name_id.text 283 return name_id 284 raise UnsupportedNameIDFormat( 285 "Assertion contains NameID with unsupported " 286 f"format {self.auth_n_request.name_id_policy}." 287 )
Get NameID Element
def
get_assertion_subject(self) -> lxml.etree.Element:
289 def get_assertion_subject(self) -> Element: 290 """Generate Subject Element with NameID and SubjectConfirmation Objects""" 291 subject = Element(f"{{{NS_SAML_ASSERTION}}}Subject") 292 subject.append(self.get_name_id()) 293 294 subject_confirmation = SubElement(subject, f"{{{NS_SAML_ASSERTION}}}SubjectConfirmation") 295 subject_confirmation.attrib["Method"] = "urn:oasis:names:tc:SAML:2.0:cm:bearer" 296 297 subject_confirmation_data = SubElement( 298 subject_confirmation, f"{{{NS_SAML_ASSERTION}}}SubjectConfirmationData" 299 ) 300 if self.auth_n_request.id: 301 subject_confirmation_data.attrib["InResponseTo"] = self.auth_n_request.id 302 subject_confirmation_data.attrib["NotOnOrAfter"] = self._valid_not_on_or_after 303 subject_confirmation_data.attrib["Recipient"] = self.provider.acs_url 304 return subject
Generate Subject Element with NameID and SubjectConfirmation Objects
def
get_assertion(self) -> lxml.etree.Element:
306 def get_assertion(self) -> Element: 307 """Generate Main Assertion Element""" 308 assertion = Element(f"{{{NS_SAML_ASSERTION}}}Assertion", nsmap=NS_MAP) 309 assertion.attrib["Version"] = "2.0" 310 assertion.attrib["ID"] = self._assertion_id 311 assertion.attrib["IssueInstant"] = self._issue_instant 312 assertion.append(self.get_issuer()) 313 314 if self.provider.signing_kp and self.provider.sign_assertion: 315 sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get( 316 self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha1 317 ) 318 signature = xmlsec.template.create( 319 assertion, 320 xmlsec.constants.TransformExclC14N, 321 sign_algorithm_transform, 322 ns=xmlsec.constants.DSigNs, 323 ) 324 assertion.append(signature) 325 326 assertion.append(self.get_assertion_subject()) 327 assertion.append(self.get_assertion_conditions()) 328 assertion.append(self.get_assertion_auth_n_statement()) 329 330 assertion.append(self.get_attributes()) 331 return assertion
Generate Main Assertion Element
def
get_response(self) -> lxml.etree.Element:
333 def get_response(self) -> Element: 334 """Generate Root response element""" 335 response = Element(f"{{{NS_SAML_PROTOCOL}}}Response", nsmap=NS_MAP) 336 response.attrib["Version"] = "2.0" 337 response.attrib["IssueInstant"] = self._issue_instant 338 response.attrib["Destination"] = self.provider.acs_url 339 response.attrib["ID"] = self._response_id 340 if self.auth_n_request.id: 341 response.attrib["InResponseTo"] = self.auth_n_request.id 342 343 response.append(self.get_issuer()) 344 345 if self.provider.signing_kp and self.provider.sign_response: 346 sign_algorithm_transform = SIGN_ALGORITHM_TRANSFORM_MAP.get( 347 self.provider.signature_algorithm, xmlsec.constants.TransformRsaSha1 348 ) 349 signature = xmlsec.template.create( 350 response, 351 xmlsec.constants.TransformExclC14N, 352 sign_algorithm_transform, 353 ns=xmlsec.constants.DSigNs, 354 ) 355 response.append(signature) 356 357 status = SubElement(response, f"{{{NS_SAML_PROTOCOL}}}Status") 358 status_code = SubElement(status, f"{{{NS_SAML_PROTOCOL}}}StatusCode") 359 status_code.attrib["Value"] = "urn:oasis:names:tc:SAML:2.0:status:Success" 360 361 response.append(self.get_assertion()) 362 return response
Generate Root response element
def
build_response(self) -> str:
435 def build_response(self) -> str: 436 """Build string XML Response and sign if signing is enabled.""" 437 root_response = self.get_response() 438 # Sign assertion first (before encryption) 439 if self.provider.signing_kp and self.provider.sign_assertion: 440 assertion = root_response.xpath("//saml:Assertion", namespaces=NS_MAP)[0] 441 self._sign(assertion) 442 # Encrypt assertion (this replaces Assertion with EncryptedAssertion) 443 if self.provider.encryption_kp: 444 assertion = root_response.xpath("//saml:Assertion", namespaces=NS_MAP)[0] 445 self._encrypt(assertion, root_response) 446 # Sign response AFTER encryption so signature covers the encrypted content 447 if self.provider.signing_kp and self.provider.sign_response: 448 response = root_response.xpath("//samlp:Response", namespaces=NS_MAP)[0] 449 self._sign(response) 450 return etree.tostring(root_response).decode("utf-8") # nosec
Build string XML Response and sign if signing is enabled.