authentik.sources.saml.processors.response
authentik saml source processor
1"""authentik saml source processor""" 2 3from base64 import b64decode 4from datetime import UTC, datetime 5from time import mktime 6from typing import TYPE_CHECKING 7 8import xmlsec 9from defusedxml.lxml import fromstring 10from django.core.cache import cache 11from django.core.exceptions import SuspiciousOperation 12from django.http import HttpRequest 13from django.utils.timezone import now 14from lxml import etree # nosec 15from lxml.etree import _Element # nosec 16from structlog.stdlib import get_logger 17 18from authentik.common.saml.constants import ( 19 NS_MAP, 20 NS_SAML_ASSERTION, 21 NS_SAML_PROTOCOL, 22 SAML_NAME_ID_FORMAT_EMAIL, 23 SAML_NAME_ID_FORMAT_PERSISTENT, 24 SAML_NAME_ID_FORMAT_TRANSIENT, 25 SAML_NAME_ID_FORMAT_WINDOWS, 26 SAML_NAME_ID_FORMAT_X509, 27 SAML_STATUS_SUCCESS, 28) 29from authentik.core.models import ( 30 USER_ATTRIBUTE_DELETE_ON_LOGOUT, 31 USER_ATTRIBUTE_EXPIRES, 32 USER_ATTRIBUTE_GENERATED, 33 USER_ATTRIBUTE_SOURCES, 34 USERNAME_MAX_LENGTH, 35 User, 36) 37from authentik.core.sources.flow_manager import SourceFlowManager 38from authentik.lib.utils.time import timedelta_from_string 39from authentik.sources.saml.exceptions import ( 40 InvalidEncryption, 41 InvalidSignature, 42 MismatchedRequestID, 43 MissingSAMLResponse, 44 SAMLException, 45 UnsupportedNameIDFormat, 46) 47from authentik.sources.saml.models import ( 48 GroupSAMLSourceConnection, 49 SAMLSource, 50 UserSAMLSourceConnection, 51) 52from authentik.sources.saml.processors.request import SESSION_KEY_REQUEST_ID 53 54LOGGER = get_logger() 55if TYPE_CHECKING: 56 from xml.etree.ElementTree import Element # nosec 57 58CACHE_SEEN_REQUEST_ID = "authentik_saml_seen_ids_%s" 59 60 61class ResponseProcessor: 62 """SAML Response Processor""" 63 64 _source: SAMLSource 65 66 _root: _Element 67 _root_xml: bytes 68 69 _http_request: HttpRequest 70 71 _assertion: _Element | None = None 72 73 def __init__(self, source: SAMLSource, request: HttpRequest): 74 self._source = source 75 self._http_request = request 76 77 def parse(self): 78 """Check if `request` contains SAML Response data, parse and validate it.""" 79 # First off, check if we have any SAML Data at all. 80 raw_response = self._http_request.POST.get("SAMLResponse", None) 81 if not raw_response: 82 raise MissingSAMLResponse("Request does not contain 'SAMLResponse'") 83 # Check if response is compressed, b64 decode it 84 self._root_xml = b64decode(raw_response.encode()) 85 self._root = fromstring(self._root_xml) 86 87 # Verify response signature BEFORE decryption (signature covers encrypted content) 88 if self._source.verification_kp and self._source.signed_response: 89 self._verify_response_signature() 90 91 if self._source.encryption_kp: 92 self._decrypt_response() 93 94 # Verify assertion signature AFTER decryption (signature is inside encrypted content) 95 if self._source.verification_kp and self._source.signed_assertion: 96 self._verify_assertion_signature() 97 98 self._verify_request_id() 99 self._verify_status() 100 self._verify_conditions() 101 102 def _decrypt_response(self): 103 """Decrypt SAMLResponse EncryptedAssertion Element""" 104 manager = xmlsec.KeysManager() 105 key = xmlsec.Key.from_memory( 106 self._source.encryption_kp.key_data, 107 xmlsec.constants.KeyDataFormatPem, 108 ) 109 110 manager.add_key(key) 111 encryption_context = xmlsec.EncryptionContext(manager) 112 113 encrypted_assertion = self._root.find(f".//{{{NS_SAML_ASSERTION}}}EncryptedAssertion") 114 if encrypted_assertion is None: 115 raise InvalidEncryption() 116 encrypted_data = xmlsec.tree.find_child( 117 encrypted_assertion, "EncryptedData", xmlsec.constants.EncNs 118 ) 119 try: 120 decrypted_assertion = encryption_context.decrypt(encrypted_data) 121 except xmlsec.Error as exc: 122 raise InvalidEncryption() from exc 123 124 index_of = self._root.index(encrypted_assertion) 125 self._root.remove(encrypted_assertion) 126 self._root.insert( 127 index_of, 128 decrypted_assertion, 129 ) 130 self._assertion = decrypted_assertion 131 132 def _verify_conditions(self): 133 conditions = self.get_assertion().find(f"{{{NS_SAML_ASSERTION}}}Conditions") 134 if conditions is None: 135 return 136 _now = now() 137 before = conditions.attrib.get("NotBefore") 138 if before: 139 if datetime.fromisoformat(before).replace(tzinfo=UTC) > _now: 140 raise SAMLException("Assertion is not valid yet or expired.") 141 on_or_after = conditions.attrib.get("NotOnOrAfter") 142 if on_or_after: 143 if datetime.fromisoformat(on_or_after).replace(tzinfo=UTC) < _now: 144 raise SAMLException("Assertion is not valid yet or expired.") 145 146 def _verify_signature(self, signature_node: _Element, target: _Element): 147 """Verify a single signature node against the given target element.""" 148 target_id = target.attrib.get("ID") 149 if not target_id: 150 raise InvalidSignature("Signed element is missing an ID attribute.") 151 refs = signature_node.xpath("./ds:SignedInfo/ds:Reference", namespaces=NS_MAP) 152 if len(refs) != 1: 153 raise InvalidSignature("Signature must contain exactly one Reference.") 154 ref_uri = refs[0].get("URI", "") 155 if ref_uri not in ("", f"#{target_id}"): 156 raise InvalidSignature( 157 "Signature Reference URI does not match the signed element's ID." 158 ) 159 160 xmlsec.tree.add_ids(target, ["ID"]) 161 162 ctx = xmlsec.SignatureContext() 163 key = xmlsec.Key.from_memory( 164 self._source.verification_kp.certificate_data, 165 xmlsec.constants.KeyDataFormatCertPem, 166 ) 167 ctx.key = key 168 169 ctx.set_enabled_key_data([xmlsec.constants.KeyDataX509]) 170 try: 171 ctx.verify(signature_node) 172 except xmlsec.Error as exc: 173 raise InvalidSignature( 174 "The signature of the SAML object is either missing or invalid." 175 ) from exc 176 LOGGER.debug("Successfully verified signature") 177 178 def _verify_response_signature(self): 179 """Verify SAML Response's Signature (before decryption)""" 180 signature_nodes = self._root.xpath("/samlp:Response/ds:Signature", namespaces=NS_MAP) 181 182 if len(signature_nodes) != 1: 183 raise InvalidSignature("Expected exactly one Signature in the Response element.") 184 185 self._verify_signature(signature_nodes[0], self._root) 186 187 def _verify_assertion_signature(self): 188 """Verify SAML Assertion's Signature (after decryption)""" 189 signature_nodes = self._root.xpath( 190 "/samlp:Response/saml:Assertion/ds:Signature", namespaces=NS_MAP 191 ) 192 if len(signature_nodes) != 1: 193 raise InvalidSignature("Expected exactly one signed Assertion in the Response.") 194 signature_node = signature_nodes[0] 195 assertion = signature_node.getparent() 196 197 self._verify_signature(signature_node, assertion) 198 self._assertion = assertion 199 200 def _verify_request_id(self): 201 if self._source.allow_idp_initiated: 202 # If IdP-initiated SSO flows are enabled, we want to cache the Response ID 203 # somewhat mitigate replay attacks 204 seen_ids = cache.get(CACHE_SEEN_REQUEST_ID % self._source.pk, []) 205 if self._root.attrib["ID"] in seen_ids: 206 raise SuspiciousOperation("Replay attack detected") 207 seen_ids.append(self._root.attrib["ID"]) 208 cache.set(CACHE_SEEN_REQUEST_ID % self._source.pk, seen_ids) 209 return 210 if ( 211 SESSION_KEY_REQUEST_ID not in self._http_request.session 212 or "InResponseTo" not in self._root.attrib 213 ): 214 raise MismatchedRequestID( 215 "Missing InResponseTo and IdP-initiated Logins are not allowed" 216 ) 217 if self._http_request.session[SESSION_KEY_REQUEST_ID] != self._root.attrib["InResponseTo"]: 218 raise MismatchedRequestID("Mismatched request ID") 219 220 def _verify_status(self): 221 """Check for SAML Status elements""" 222 status = self._root.find(f"{{{NS_SAML_PROTOCOL}}}Status") 223 if status is None: 224 return 225 status_code = status.find(f"{{{NS_SAML_PROTOCOL}}}StatusCode") 226 message = status.find(f"{{{NS_SAML_PROTOCOL}}}StatusMessage") 227 message_text = message.text if message is not None else None 228 detail = status.find(f"{{{NS_SAML_PROTOCOL}}}StatusDetail") 229 detail_text = etree.tostring(detail, encoding="unicode") if detail is not None else None 230 if status_code.attrib.get("Value") != SAML_STATUS_SUCCESS: 231 if detail_text and message_text: 232 raise ValueError(f"{message_text}: {detail_text}") 233 raise ValueError( 234 detail_text or message_text or f"SAML Status: {status_code.attrib.get('Value')}" 235 ) 236 if message_text or detail_text: 237 LOGGER.debug("SAML Status message", message=message_text, detail=detail_text) 238 239 def _handle_name_id_transient(self) -> SourceFlowManager: 240 """Handle a NameID with the Format of Transient. This is a bit more complex than other 241 formats, as we need to create a temporary User that is used in the session. This 242 user has an attribute that refers to our Source for cleanup. The user is also deleted 243 on logout and periodically.""" 244 # Create a temporary User 245 name_id_el, name_id = self._get_name_id() 246 # trim username to ensure it is max 150 chars 247 username = f"ak-{name_id[: USERNAME_MAX_LENGTH - 14]}-transient" 248 expiry = mktime( 249 (now() + timedelta_from_string(self._source.temporary_user_delete_after)).timetuple() 250 ) 251 user: User = User.objects.create( 252 username=username, 253 attributes={ 254 USER_ATTRIBUTE_GENERATED: True, 255 USER_ATTRIBUTE_SOURCES: [ 256 self._source.name, 257 ], 258 USER_ATTRIBUTE_DELETE_ON_LOGOUT: True, 259 USER_ATTRIBUTE_EXPIRES: expiry, 260 }, 261 path=self._source.get_user_path(), 262 ) 263 LOGGER.debug("Created temporary user for NameID Transient", username=name_id) 264 user.set_unusable_password() 265 user.save() 266 UserSAMLSourceConnection.objects.create(source=self._source, user=user, identifier=name_id) 267 return SAMLSourceFlowManager( 268 source=self._source, 269 request=self._http_request, 270 identifier=str(name_id), 271 user_info={ 272 "root": self._root, 273 "assertion": self.get_assertion(), 274 "name_id": name_id_el, 275 }, 276 policy_context={}, 277 ) 278 279 def get_assertion(self) -> Element | None: 280 """Get assertion element, if we have a signed assertion""" 281 if self._assertion is not None: 282 return self._assertion 283 return self._root.find(f"{{{NS_SAML_ASSERTION}}}Assertion") 284 285 def _get_name_id(self) -> tuple[Element, str]: 286 """Get NameID Element""" 287 assertion = self.get_assertion() 288 if assertion is None: 289 raise ValueError("Assertion element not found") 290 subject = assertion.find(f"{{{NS_SAML_ASSERTION}}}Subject") 291 if subject is None: 292 raise ValueError("Subject element not found") 293 name_id = subject.find(f"{{{NS_SAML_ASSERTION}}}NameID") 294 if name_id is None: 295 raise ValueError("NameID element not found") 296 return name_id, "".join(name_id.itertext()) 297 298 def _get_name_id_filter(self) -> dict[str, str]: 299 """Returns the subject's NameID as a Filter for the `User`""" 300 name_id_el, name_id = self._get_name_id() 301 if not name_id: 302 raise UnsupportedNameIDFormat("Subject's NameID is empty.") 303 _format = name_id_el.attrib["Format"] 304 if _format == SAML_NAME_ID_FORMAT_EMAIL: 305 return {"email": name_id} 306 if _format == SAML_NAME_ID_FORMAT_PERSISTENT: 307 return {"username": name_id} 308 if _format == SAML_NAME_ID_FORMAT_X509: 309 # This attribute is statically set by the LDAP source 310 return {"attributes__distinguishedName": name_id} 311 if _format == SAML_NAME_ID_FORMAT_WINDOWS: 312 if "\\" in name_id: 313 name_id = name_id.split("\\")[1] 314 return {"username": name_id} 315 raise UnsupportedNameIDFormat( 316 f"Assertion contains NameID with unsupported format {_format}." 317 ) 318 319 def prepare_flow_manager(self) -> SourceFlowManager: 320 """Prepare flow plan depending on whether or not the user exists""" 321 name_id_el, name_id = self._get_name_id() 322 # Sanity check, show a warning if NameIDPolicy doesn't match what we go 323 if self._source.name_id_policy != name_id_el.attrib["Format"]: 324 LOGGER.warning( 325 "NameID from IdP doesn't match our policy", 326 expected=self._source.name_id_policy, 327 got=name_id_el.attrib["Format"], 328 ) 329 # transient NameIDs are handled separately as they don't have to go through flows. 330 if name_id_el.attrib["Format"] == SAML_NAME_ID_FORMAT_TRANSIENT: 331 return self._handle_name_id_transient() 332 333 return SAMLSourceFlowManager( 334 source=self._source, 335 request=self._http_request, 336 identifier=str(name_id), 337 user_info={ 338 "root": self._root, 339 "assertion": self.get_assertion(), 340 "name_id": name_id_el, 341 }, 342 policy_context={ 343 "saml_response": etree.tostring(self._root), 344 }, 345 ) 346 347 348class SAMLSourceFlowManager(SourceFlowManager): 349 """Source flow manager for SAML Sources""" 350 351 user_connection_type = UserSAMLSourceConnection 352 group_connection_type = GroupSAMLSourceConnection
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
CACHE_SEEN_REQUEST_ID =
'authentik_saml_seen_ids_%s'
class
ResponseProcessor:
62class ResponseProcessor: 63 """SAML Response Processor""" 64 65 _source: SAMLSource 66 67 _root: _Element 68 _root_xml: bytes 69 70 _http_request: HttpRequest 71 72 _assertion: _Element | None = None 73 74 def __init__(self, source: SAMLSource, request: HttpRequest): 75 self._source = source 76 self._http_request = request 77 78 def parse(self): 79 """Check if `request` contains SAML Response data, parse and validate it.""" 80 # First off, check if we have any SAML Data at all. 81 raw_response = self._http_request.POST.get("SAMLResponse", None) 82 if not raw_response: 83 raise MissingSAMLResponse("Request does not contain 'SAMLResponse'") 84 # Check if response is compressed, b64 decode it 85 self._root_xml = b64decode(raw_response.encode()) 86 self._root = fromstring(self._root_xml) 87 88 # Verify response signature BEFORE decryption (signature covers encrypted content) 89 if self._source.verification_kp and self._source.signed_response: 90 self._verify_response_signature() 91 92 if self._source.encryption_kp: 93 self._decrypt_response() 94 95 # Verify assertion signature AFTER decryption (signature is inside encrypted content) 96 if self._source.verification_kp and self._source.signed_assertion: 97 self._verify_assertion_signature() 98 99 self._verify_request_id() 100 self._verify_status() 101 self._verify_conditions() 102 103 def _decrypt_response(self): 104 """Decrypt SAMLResponse EncryptedAssertion Element""" 105 manager = xmlsec.KeysManager() 106 key = xmlsec.Key.from_memory( 107 self._source.encryption_kp.key_data, 108 xmlsec.constants.KeyDataFormatPem, 109 ) 110 111 manager.add_key(key) 112 encryption_context = xmlsec.EncryptionContext(manager) 113 114 encrypted_assertion = self._root.find(f".//{{{NS_SAML_ASSERTION}}}EncryptedAssertion") 115 if encrypted_assertion is None: 116 raise InvalidEncryption() 117 encrypted_data = xmlsec.tree.find_child( 118 encrypted_assertion, "EncryptedData", xmlsec.constants.EncNs 119 ) 120 try: 121 decrypted_assertion = encryption_context.decrypt(encrypted_data) 122 except xmlsec.Error as exc: 123 raise InvalidEncryption() from exc 124 125 index_of = self._root.index(encrypted_assertion) 126 self._root.remove(encrypted_assertion) 127 self._root.insert( 128 index_of, 129 decrypted_assertion, 130 ) 131 self._assertion = decrypted_assertion 132 133 def _verify_conditions(self): 134 conditions = self.get_assertion().find(f"{{{NS_SAML_ASSERTION}}}Conditions") 135 if conditions is None: 136 return 137 _now = now() 138 before = conditions.attrib.get("NotBefore") 139 if before: 140 if datetime.fromisoformat(before).replace(tzinfo=UTC) > _now: 141 raise SAMLException("Assertion is not valid yet or expired.") 142 on_or_after = conditions.attrib.get("NotOnOrAfter") 143 if on_or_after: 144 if datetime.fromisoformat(on_or_after).replace(tzinfo=UTC) < _now: 145 raise SAMLException("Assertion is not valid yet or expired.") 146 147 def _verify_signature(self, signature_node: _Element, target: _Element): 148 """Verify a single signature node against the given target element.""" 149 target_id = target.attrib.get("ID") 150 if not target_id: 151 raise InvalidSignature("Signed element is missing an ID attribute.") 152 refs = signature_node.xpath("./ds:SignedInfo/ds:Reference", namespaces=NS_MAP) 153 if len(refs) != 1: 154 raise InvalidSignature("Signature must contain exactly one Reference.") 155 ref_uri = refs[0].get("URI", "") 156 if ref_uri not in ("", f"#{target_id}"): 157 raise InvalidSignature( 158 "Signature Reference URI does not match the signed element's ID." 159 ) 160 161 xmlsec.tree.add_ids(target, ["ID"]) 162 163 ctx = xmlsec.SignatureContext() 164 key = xmlsec.Key.from_memory( 165 self._source.verification_kp.certificate_data, 166 xmlsec.constants.KeyDataFormatCertPem, 167 ) 168 ctx.key = key 169 170 ctx.set_enabled_key_data([xmlsec.constants.KeyDataX509]) 171 try: 172 ctx.verify(signature_node) 173 except xmlsec.Error as exc: 174 raise InvalidSignature( 175 "The signature of the SAML object is either missing or invalid." 176 ) from exc 177 LOGGER.debug("Successfully verified signature") 178 179 def _verify_response_signature(self): 180 """Verify SAML Response's Signature (before decryption)""" 181 signature_nodes = self._root.xpath("/samlp:Response/ds:Signature", namespaces=NS_MAP) 182 183 if len(signature_nodes) != 1: 184 raise InvalidSignature("Expected exactly one Signature in the Response element.") 185 186 self._verify_signature(signature_nodes[0], self._root) 187 188 def _verify_assertion_signature(self): 189 """Verify SAML Assertion's Signature (after decryption)""" 190 signature_nodes = self._root.xpath( 191 "/samlp:Response/saml:Assertion/ds:Signature", namespaces=NS_MAP 192 ) 193 if len(signature_nodes) != 1: 194 raise InvalidSignature("Expected exactly one signed Assertion in the Response.") 195 signature_node = signature_nodes[0] 196 assertion = signature_node.getparent() 197 198 self._verify_signature(signature_node, assertion) 199 self._assertion = assertion 200 201 def _verify_request_id(self): 202 if self._source.allow_idp_initiated: 203 # If IdP-initiated SSO flows are enabled, we want to cache the Response ID 204 # somewhat mitigate replay attacks 205 seen_ids = cache.get(CACHE_SEEN_REQUEST_ID % self._source.pk, []) 206 if self._root.attrib["ID"] in seen_ids: 207 raise SuspiciousOperation("Replay attack detected") 208 seen_ids.append(self._root.attrib["ID"]) 209 cache.set(CACHE_SEEN_REQUEST_ID % self._source.pk, seen_ids) 210 return 211 if ( 212 SESSION_KEY_REQUEST_ID not in self._http_request.session 213 or "InResponseTo" not in self._root.attrib 214 ): 215 raise MismatchedRequestID( 216 "Missing InResponseTo and IdP-initiated Logins are not allowed" 217 ) 218 if self._http_request.session[SESSION_KEY_REQUEST_ID] != self._root.attrib["InResponseTo"]: 219 raise MismatchedRequestID("Mismatched request ID") 220 221 def _verify_status(self): 222 """Check for SAML Status elements""" 223 status = self._root.find(f"{{{NS_SAML_PROTOCOL}}}Status") 224 if status is None: 225 return 226 status_code = status.find(f"{{{NS_SAML_PROTOCOL}}}StatusCode") 227 message = status.find(f"{{{NS_SAML_PROTOCOL}}}StatusMessage") 228 message_text = message.text if message is not None else None 229 detail = status.find(f"{{{NS_SAML_PROTOCOL}}}StatusDetail") 230 detail_text = etree.tostring(detail, encoding="unicode") if detail is not None else None 231 if status_code.attrib.get("Value") != SAML_STATUS_SUCCESS: 232 if detail_text and message_text: 233 raise ValueError(f"{message_text}: {detail_text}") 234 raise ValueError( 235 detail_text or message_text or f"SAML Status: {status_code.attrib.get('Value')}" 236 ) 237 if message_text or detail_text: 238 LOGGER.debug("SAML Status message", message=message_text, detail=detail_text) 239 240 def _handle_name_id_transient(self) -> SourceFlowManager: 241 """Handle a NameID with the Format of Transient. This is a bit more complex than other 242 formats, as we need to create a temporary User that is used in the session. This 243 user has an attribute that refers to our Source for cleanup. The user is also deleted 244 on logout and periodically.""" 245 # Create a temporary User 246 name_id_el, name_id = self._get_name_id() 247 # trim username to ensure it is max 150 chars 248 username = f"ak-{name_id[: USERNAME_MAX_LENGTH - 14]}-transient" 249 expiry = mktime( 250 (now() + timedelta_from_string(self._source.temporary_user_delete_after)).timetuple() 251 ) 252 user: User = User.objects.create( 253 username=username, 254 attributes={ 255 USER_ATTRIBUTE_GENERATED: True, 256 USER_ATTRIBUTE_SOURCES: [ 257 self._source.name, 258 ], 259 USER_ATTRIBUTE_DELETE_ON_LOGOUT: True, 260 USER_ATTRIBUTE_EXPIRES: expiry, 261 }, 262 path=self._source.get_user_path(), 263 ) 264 LOGGER.debug("Created temporary user for NameID Transient", username=name_id) 265 user.set_unusable_password() 266 user.save() 267 UserSAMLSourceConnection.objects.create(source=self._source, user=user, identifier=name_id) 268 return SAMLSourceFlowManager( 269 source=self._source, 270 request=self._http_request, 271 identifier=str(name_id), 272 user_info={ 273 "root": self._root, 274 "assertion": self.get_assertion(), 275 "name_id": name_id_el, 276 }, 277 policy_context={}, 278 ) 279 280 def get_assertion(self) -> Element | None: 281 """Get assertion element, if we have a signed assertion""" 282 if self._assertion is not None: 283 return self._assertion 284 return self._root.find(f"{{{NS_SAML_ASSERTION}}}Assertion") 285 286 def _get_name_id(self) -> tuple[Element, str]: 287 """Get NameID Element""" 288 assertion = self.get_assertion() 289 if assertion is None: 290 raise ValueError("Assertion element not found") 291 subject = assertion.find(f"{{{NS_SAML_ASSERTION}}}Subject") 292 if subject is None: 293 raise ValueError("Subject element not found") 294 name_id = subject.find(f"{{{NS_SAML_ASSERTION}}}NameID") 295 if name_id is None: 296 raise ValueError("NameID element not found") 297 return name_id, "".join(name_id.itertext()) 298 299 def _get_name_id_filter(self) -> dict[str, str]: 300 """Returns the subject's NameID as a Filter for the `User`""" 301 name_id_el, name_id = self._get_name_id() 302 if not name_id: 303 raise UnsupportedNameIDFormat("Subject's NameID is empty.") 304 _format = name_id_el.attrib["Format"] 305 if _format == SAML_NAME_ID_FORMAT_EMAIL: 306 return {"email": name_id} 307 if _format == SAML_NAME_ID_FORMAT_PERSISTENT: 308 return {"username": name_id} 309 if _format == SAML_NAME_ID_FORMAT_X509: 310 # This attribute is statically set by the LDAP source 311 return {"attributes__distinguishedName": name_id} 312 if _format == SAML_NAME_ID_FORMAT_WINDOWS: 313 if "\\" in name_id: 314 name_id = name_id.split("\\")[1] 315 return {"username": name_id} 316 raise UnsupportedNameIDFormat( 317 f"Assertion contains NameID with unsupported format {_format}." 318 ) 319 320 def prepare_flow_manager(self) -> SourceFlowManager: 321 """Prepare flow plan depending on whether or not the user exists""" 322 name_id_el, name_id = self._get_name_id() 323 # Sanity check, show a warning if NameIDPolicy doesn't match what we go 324 if self._source.name_id_policy != name_id_el.attrib["Format"]: 325 LOGGER.warning( 326 "NameID from IdP doesn't match our policy", 327 expected=self._source.name_id_policy, 328 got=name_id_el.attrib["Format"], 329 ) 330 # transient NameIDs are handled separately as they don't have to go through flows. 331 if name_id_el.attrib["Format"] == SAML_NAME_ID_FORMAT_TRANSIENT: 332 return self._handle_name_id_transient() 333 334 return SAMLSourceFlowManager( 335 source=self._source, 336 request=self._http_request, 337 identifier=str(name_id), 338 user_info={ 339 "root": self._root, 340 "assertion": self.get_assertion(), 341 "name_id": name_id_el, 342 }, 343 policy_context={ 344 "saml_response": etree.tostring(self._root), 345 }, 346 )
SAML Response Processor
ResponseProcessor( source: authentik.sources.saml.models.SAMLSource, request: django.http.request.HttpRequest)
def
parse(self):
78 def parse(self): 79 """Check if `request` contains SAML Response data, parse and validate it.""" 80 # First off, check if we have any SAML Data at all. 81 raw_response = self._http_request.POST.get("SAMLResponse", None) 82 if not raw_response: 83 raise MissingSAMLResponse("Request does not contain 'SAMLResponse'") 84 # Check if response is compressed, b64 decode it 85 self._root_xml = b64decode(raw_response.encode()) 86 self._root = fromstring(self._root_xml) 87 88 # Verify response signature BEFORE decryption (signature covers encrypted content) 89 if self._source.verification_kp and self._source.signed_response: 90 self._verify_response_signature() 91 92 if self._source.encryption_kp: 93 self._decrypt_response() 94 95 # Verify assertion signature AFTER decryption (signature is inside encrypted content) 96 if self._source.verification_kp and self._source.signed_assertion: 97 self._verify_assertion_signature() 98 99 self._verify_request_id() 100 self._verify_status() 101 self._verify_conditions()
Check if request contains SAML Response data, parse and validate it.
def
get_assertion(unknown):
280 def get_assertion(self) -> Element | None: 281 """Get assertion element, if we have a signed assertion""" 282 if self._assertion is not None: 283 return self._assertion 284 return self._root.find(f"{{{NS_SAML_ASSERTION}}}Assertion")
Get assertion element, if we have a signed assertion
320 def prepare_flow_manager(self) -> SourceFlowManager: 321 """Prepare flow plan depending on whether or not the user exists""" 322 name_id_el, name_id = self._get_name_id() 323 # Sanity check, show a warning if NameIDPolicy doesn't match what we go 324 if self._source.name_id_policy != name_id_el.attrib["Format"]: 325 LOGGER.warning( 326 "NameID from IdP doesn't match our policy", 327 expected=self._source.name_id_policy, 328 got=name_id_el.attrib["Format"], 329 ) 330 # transient NameIDs are handled separately as they don't have to go through flows. 331 if name_id_el.attrib["Format"] == SAML_NAME_ID_FORMAT_TRANSIENT: 332 return self._handle_name_id_transient() 333 334 return SAMLSourceFlowManager( 335 source=self._source, 336 request=self._http_request, 337 identifier=str(name_id), 338 user_info={ 339 "root": self._root, 340 "assertion": self.get_assertion(), 341 "name_id": name_id_el, 342 }, 343 policy_context={ 344 "saml_response": etree.tostring(self._root), 345 }, 346 )
Prepare flow plan depending on whether or not the user exists
349class SAMLSourceFlowManager(SourceFlowManager): 350 """Source flow manager for SAML Sources""" 351 352 user_connection_type = UserSAMLSourceConnection 353 group_connection_type = GroupSAMLSourceConnection
Source flow manager for SAML Sources
user_connection_type =
<class 'authentik.sources.saml.models.UserSAMLSourceConnection'>
group_connection_type =
<class 'authentik.sources.saml.models.GroupSAMLSourceConnection'>