authentik.sources.saml.processors.response
authentik saml source processor
1"""authentik saml source processor""" 2 3from base64 import b64decode 4from time import mktime 5from typing import TYPE_CHECKING 6 7import xmlsec 8from defusedxml.lxml import fromstring 9from django.core.cache import cache 10from django.core.exceptions import SuspiciousOperation 11from django.http import HttpRequest 12from django.utils.timezone import now 13from lxml import etree # nosec 14from lxml.etree import _Element # nosec 15from structlog.stdlib import get_logger 16 17from authentik.common.saml.constants import ( 18 NS_MAP, 19 NS_SAML_ASSERTION, 20 NS_SAML_PROTOCOL, 21 SAML_NAME_ID_FORMAT_EMAIL, 22 SAML_NAME_ID_FORMAT_PERSISTENT, 23 SAML_NAME_ID_FORMAT_TRANSIENT, 24 SAML_NAME_ID_FORMAT_WINDOWS, 25 SAML_NAME_ID_FORMAT_X509, 26 SAML_STATUS_SUCCESS, 27) 28from authentik.core.models import ( 29 USER_ATTRIBUTE_DELETE_ON_LOGOUT, 30 USER_ATTRIBUTE_EXPIRES, 31 USER_ATTRIBUTE_GENERATED, 32 USER_ATTRIBUTE_SOURCES, 33 USERNAME_MAX_LENGTH, 34 User, 35) 36from authentik.core.sources.flow_manager import SourceFlowManager 37from authentik.lib.utils.time import timedelta_from_string 38from authentik.sources.saml.exceptions import ( 39 InvalidEncryption, 40 InvalidSignature, 41 MismatchedRequestID, 42 MissingSAMLResponse, 43 UnsupportedNameIDFormat, 44) 45from authentik.sources.saml.models import ( 46 GroupSAMLSourceConnection, 47 SAMLSource, 48 UserSAMLSourceConnection, 49) 50from authentik.sources.saml.processors.request import SESSION_KEY_REQUEST_ID 51 52LOGGER = get_logger() 53if TYPE_CHECKING: 54 from xml.etree.ElementTree import Element # nosec 55 56CACHE_SEEN_REQUEST_ID = "authentik_saml_seen_ids_%s" 57 58 59class ResponseProcessor: 60 """SAML Response Processor""" 61 62 _source: SAMLSource 63 64 _root: _Element 65 _root_xml: bytes 66 67 _http_request: HttpRequest 68 69 _assertion: _Element | None = None 70 71 def __init__(self, source: SAMLSource, request: HttpRequest): 72 self._source = source 73 self._http_request = request 74 75 def parse(self): 76 """Check if `request` contains SAML Response data, parse and validate it.""" 77 # First off, check if we have any SAML Data at all. 78 raw_response = self._http_request.POST.get("SAMLResponse", None) 79 if not raw_response: 80 raise MissingSAMLResponse("Request does not contain 'SAMLResponse'") 81 # Check if response is compressed, b64 decode it 82 self._root_xml = b64decode(raw_response.encode()) 83 self._root = fromstring(self._root_xml) 84 85 # Verify response signature BEFORE decryption (signature covers encrypted content) 86 if self._source.verification_kp and self._source.signed_response: 87 self._verify_response_signature() 88 89 if self._source.encryption_kp: 90 self._decrypt_response() 91 92 # Verify assertion signature AFTER decryption (signature is inside encrypted content) 93 if self._source.verification_kp and self._source.signed_assertion: 94 self._verify_assertion_signature() 95 96 self._verify_request_id() 97 self._verify_status() 98 99 def _decrypt_response(self): 100 """Decrypt SAMLResponse EncryptedAssertion Element""" 101 manager = xmlsec.KeysManager() 102 key = xmlsec.Key.from_memory( 103 self._source.encryption_kp.key_data, 104 xmlsec.constants.KeyDataFormatPem, 105 ) 106 107 manager.add_key(key) 108 encryption_context = xmlsec.EncryptionContext(manager) 109 110 encrypted_assertion = self._root.find(f".//{{{NS_SAML_ASSERTION}}}EncryptedAssertion") 111 if encrypted_assertion is None: 112 raise InvalidEncryption() 113 encrypted_data = xmlsec.tree.find_child( 114 encrypted_assertion, "EncryptedData", xmlsec.constants.EncNs 115 ) 116 try: 117 decrypted_assertion = encryption_context.decrypt(encrypted_data) 118 except xmlsec.Error as exc: 119 raise InvalidEncryption() from exc 120 121 index_of = self._root.index(encrypted_assertion) 122 self._root.remove(encrypted_assertion) 123 self._root.insert( 124 index_of, 125 decrypted_assertion, 126 ) 127 self._assertion = decrypted_assertion 128 129 def _verify_signature(self, signature_node: _Element): 130 """Verify a single signature node""" 131 xmlsec.tree.add_ids(self._root, ["ID"]) 132 133 ctx = xmlsec.SignatureContext() 134 key = xmlsec.Key.from_memory( 135 self._source.verification_kp.certificate_data, 136 xmlsec.constants.KeyDataFormatCertPem, 137 ) 138 ctx.key = key 139 140 ctx.set_enabled_key_data([xmlsec.constants.KeyDataX509]) 141 try: 142 ctx.verify(signature_node) 143 except xmlsec.Error as exc: 144 raise InvalidSignature( 145 "The signature of the SAML object is either missing or invalid." 146 ) from exc 147 LOGGER.debug("Successfully verified signature") 148 149 def _verify_response_signature(self): 150 """Verify SAML Response's Signature (before decryption)""" 151 signature_nodes = self._root.xpath("/samlp:Response/ds:Signature", namespaces=NS_MAP) 152 153 if len(signature_nodes) != 1: 154 raise InvalidSignature("No Signature exists in the Response element.") 155 156 self._verify_signature(signature_nodes[0]) 157 158 def _verify_assertion_signature(self): 159 """Verify SAML Assertion's Signature (after decryption)""" 160 signature_nodes = self._root.xpath( 161 "/samlp:Response/saml:Assertion/ds:Signature", namespaces=NS_MAP 162 ) 163 164 if len(signature_nodes) != 1: 165 raise InvalidSignature("No Signature exists in the Assertion element.") 166 167 self._verify_signature(signature_nodes[0]) 168 parent = signature_nodes[0].getparent() 169 if parent is None or parent.tag != f"{{{NS_SAML_ASSERTION}}}Assertion": 170 raise InvalidSignature("No Signature exists in the Assertion element.") 171 self._assertion = parent 172 173 def _verify_request_id(self): 174 if self._source.allow_idp_initiated: 175 # If IdP-initiated SSO flows are enabled, we want to cache the Response ID 176 # somewhat mitigate replay attacks 177 seen_ids = cache.get(CACHE_SEEN_REQUEST_ID % self._source.pk, []) 178 if self._root.attrib["ID"] in seen_ids: 179 raise SuspiciousOperation("Replay attack detected") 180 seen_ids.append(self._root.attrib["ID"]) 181 cache.set(CACHE_SEEN_REQUEST_ID % self._source.pk, seen_ids) 182 return 183 if ( 184 SESSION_KEY_REQUEST_ID not in self._http_request.session 185 or "InResponseTo" not in self._root.attrib 186 ): 187 raise MismatchedRequestID( 188 "Missing InResponseTo and IdP-initiated Logins are not allowed" 189 ) 190 if self._http_request.session[SESSION_KEY_REQUEST_ID] != self._root.attrib["InResponseTo"]: 191 raise MismatchedRequestID("Mismatched request ID") 192 193 def _verify_status(self): 194 """Check for SAML Status elements""" 195 status = self._root.find(f"{{{NS_SAML_PROTOCOL}}}Status") 196 if status is None: 197 return 198 status_code = status.find(f"{{{NS_SAML_PROTOCOL}}}StatusCode") 199 message = status.find(f"{{{NS_SAML_PROTOCOL}}}StatusMessage") 200 message_text = message.text if message is not None else None 201 detail = status.find(f"{{{NS_SAML_PROTOCOL}}}StatusDetail") 202 detail_text = etree.tostring(detail, encoding="unicode") if detail is not None else None 203 if status_code.attrib.get("Value") != SAML_STATUS_SUCCESS: 204 if detail_text and message_text: 205 raise ValueError(f"{message_text}: {detail_text}") 206 raise ValueError( 207 detail_text or message_text or f"SAML Status: {status_code.attrib.get('Value')}" 208 ) 209 if message_text or detail_text: 210 LOGGER.debug("SAML Status message", message=message_text, detail=detail_text) 211 212 def _handle_name_id_transient(self) -> SourceFlowManager: 213 """Handle a NameID with the Format of Transient. This is a bit more complex than other 214 formats, as we need to create a temporary User that is used in the session. This 215 user has an attribute that refers to our Source for cleanup. The user is also deleted 216 on logout and periodically.""" 217 # Create a temporary User 218 name_id = self._get_name_id() 219 username = name_id.text 220 # trim username to ensure it is max 150 chars 221 username = f"ak-{username[: USERNAME_MAX_LENGTH - 14]}-transient" 222 expiry = mktime( 223 (now() + timedelta_from_string(self._source.temporary_user_delete_after)).timetuple() 224 ) 225 user: User = User.objects.create( 226 username=username, 227 attributes={ 228 USER_ATTRIBUTE_GENERATED: True, 229 USER_ATTRIBUTE_SOURCES: [ 230 self._source.name, 231 ], 232 USER_ATTRIBUTE_DELETE_ON_LOGOUT: True, 233 USER_ATTRIBUTE_EXPIRES: expiry, 234 }, 235 path=self._source.get_user_path(), 236 ) 237 LOGGER.debug("Created temporary user for NameID Transient", username=name_id.text) 238 user.set_unusable_password() 239 user.save() 240 UserSAMLSourceConnection.objects.create( 241 source=self._source, user=user, identifier=name_id.text 242 ) 243 return SAMLSourceFlowManager( 244 source=self._source, 245 request=self._http_request, 246 identifier=str(name_id.text), 247 user_info={ 248 "root": self._root, 249 "assertion": self.get_assertion(), 250 "name_id": name_id, 251 }, 252 policy_context={}, 253 ) 254 255 def get_assertion(self) -> Element | None: 256 """Get assertion element, if we have a signed assertion""" 257 if self._assertion is not None: 258 return self._assertion 259 return self._root.find(f"{{{NS_SAML_ASSERTION}}}Assertion") 260 261 def _get_name_id(self) -> Element: 262 """Get NameID Element""" 263 assertion = self.get_assertion() 264 if assertion is None: 265 raise ValueError("Assertion element not found") 266 subject = assertion.find(f"{{{NS_SAML_ASSERTION}}}Subject") 267 if subject is None: 268 raise ValueError("Subject element not found") 269 name_id = subject.find(f"{{{NS_SAML_ASSERTION}}}NameID") 270 if name_id is None: 271 raise ValueError("NameID element not found") 272 return name_id 273 274 def _get_name_id_filter(self) -> dict[str, str]: 275 """Returns the subject's NameID as a Filter for the `User`""" 276 name_id_el = self._get_name_id() 277 name_id = name_id_el.text 278 if not name_id: 279 raise UnsupportedNameIDFormat("Subject's NameID is empty.") 280 _format = name_id_el.attrib["Format"] 281 if _format == SAML_NAME_ID_FORMAT_EMAIL: 282 return {"email": name_id} 283 if _format == SAML_NAME_ID_FORMAT_PERSISTENT: 284 return {"username": name_id} 285 if _format == SAML_NAME_ID_FORMAT_X509: 286 # This attribute is statically set by the LDAP source 287 return {"attributes__distinguishedName": name_id} 288 if _format == SAML_NAME_ID_FORMAT_WINDOWS: 289 if "\\" in name_id: 290 name_id = name_id.split("\\")[1] 291 return {"username": name_id} 292 raise UnsupportedNameIDFormat( 293 f"Assertion contains NameID with unsupported format {_format}." 294 ) 295 296 def prepare_flow_manager(self) -> SourceFlowManager: 297 """Prepare flow plan depending on whether or not the user exists""" 298 name_id = self._get_name_id() 299 # Sanity check, show a warning if NameIDPolicy doesn't match what we go 300 if self._source.name_id_policy != name_id.attrib["Format"]: 301 LOGGER.warning( 302 "NameID from IdP doesn't match our policy", 303 expected=self._source.name_id_policy, 304 got=name_id.attrib["Format"], 305 ) 306 # transient NameIDs are handled separately as they don't have to go through flows. 307 if name_id.attrib["Format"] == SAML_NAME_ID_FORMAT_TRANSIENT: 308 return self._handle_name_id_transient() 309 310 return SAMLSourceFlowManager( 311 source=self._source, 312 request=self._http_request, 313 identifier=str(name_id.text), 314 user_info={ 315 "root": self._root, 316 "assertion": self.get_assertion(), 317 "name_id": name_id, 318 }, 319 policy_context={ 320 "saml_response": etree.tostring(self._root), 321 }, 322 ) 323 324 325class SAMLSourceFlowManager(SourceFlowManager): 326 """Source flow manager for SAML Sources""" 327 328 user_connection_type = UserSAMLSourceConnection 329 group_connection_type = GroupSAMLSourceConnection
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
CACHE_SEEN_REQUEST_ID =
'authentik_saml_seen_ids_%s'
class
ResponseProcessor:
60class ResponseProcessor: 61 """SAML Response Processor""" 62 63 _source: SAMLSource 64 65 _root: _Element 66 _root_xml: bytes 67 68 _http_request: HttpRequest 69 70 _assertion: _Element | None = None 71 72 def __init__(self, source: SAMLSource, request: HttpRequest): 73 self._source = source 74 self._http_request = request 75 76 def parse(self): 77 """Check if `request` contains SAML Response data, parse and validate it.""" 78 # First off, check if we have any SAML Data at all. 79 raw_response = self._http_request.POST.get("SAMLResponse", None) 80 if not raw_response: 81 raise MissingSAMLResponse("Request does not contain 'SAMLResponse'") 82 # Check if response is compressed, b64 decode it 83 self._root_xml = b64decode(raw_response.encode()) 84 self._root = fromstring(self._root_xml) 85 86 # Verify response signature BEFORE decryption (signature covers encrypted content) 87 if self._source.verification_kp and self._source.signed_response: 88 self._verify_response_signature() 89 90 if self._source.encryption_kp: 91 self._decrypt_response() 92 93 # Verify assertion signature AFTER decryption (signature is inside encrypted content) 94 if self._source.verification_kp and self._source.signed_assertion: 95 self._verify_assertion_signature() 96 97 self._verify_request_id() 98 self._verify_status() 99 100 def _decrypt_response(self): 101 """Decrypt SAMLResponse EncryptedAssertion Element""" 102 manager = xmlsec.KeysManager() 103 key = xmlsec.Key.from_memory( 104 self._source.encryption_kp.key_data, 105 xmlsec.constants.KeyDataFormatPem, 106 ) 107 108 manager.add_key(key) 109 encryption_context = xmlsec.EncryptionContext(manager) 110 111 encrypted_assertion = self._root.find(f".//{{{NS_SAML_ASSERTION}}}EncryptedAssertion") 112 if encrypted_assertion is None: 113 raise InvalidEncryption() 114 encrypted_data = xmlsec.tree.find_child( 115 encrypted_assertion, "EncryptedData", xmlsec.constants.EncNs 116 ) 117 try: 118 decrypted_assertion = encryption_context.decrypt(encrypted_data) 119 except xmlsec.Error as exc: 120 raise InvalidEncryption() from exc 121 122 index_of = self._root.index(encrypted_assertion) 123 self._root.remove(encrypted_assertion) 124 self._root.insert( 125 index_of, 126 decrypted_assertion, 127 ) 128 self._assertion = decrypted_assertion 129 130 def _verify_signature(self, signature_node: _Element): 131 """Verify a single signature node""" 132 xmlsec.tree.add_ids(self._root, ["ID"]) 133 134 ctx = xmlsec.SignatureContext() 135 key = xmlsec.Key.from_memory( 136 self._source.verification_kp.certificate_data, 137 xmlsec.constants.KeyDataFormatCertPem, 138 ) 139 ctx.key = key 140 141 ctx.set_enabled_key_data([xmlsec.constants.KeyDataX509]) 142 try: 143 ctx.verify(signature_node) 144 except xmlsec.Error as exc: 145 raise InvalidSignature( 146 "The signature of the SAML object is either missing or invalid." 147 ) from exc 148 LOGGER.debug("Successfully verified signature") 149 150 def _verify_response_signature(self): 151 """Verify SAML Response's Signature (before decryption)""" 152 signature_nodes = self._root.xpath("/samlp:Response/ds:Signature", namespaces=NS_MAP) 153 154 if len(signature_nodes) != 1: 155 raise InvalidSignature("No Signature exists in the Response element.") 156 157 self._verify_signature(signature_nodes[0]) 158 159 def _verify_assertion_signature(self): 160 """Verify SAML Assertion's Signature (after decryption)""" 161 signature_nodes = self._root.xpath( 162 "/samlp:Response/saml:Assertion/ds:Signature", namespaces=NS_MAP 163 ) 164 165 if len(signature_nodes) != 1: 166 raise InvalidSignature("No Signature exists in the Assertion element.") 167 168 self._verify_signature(signature_nodes[0]) 169 parent = signature_nodes[0].getparent() 170 if parent is None or parent.tag != f"{{{NS_SAML_ASSERTION}}}Assertion": 171 raise InvalidSignature("No Signature exists in the Assertion element.") 172 self._assertion = parent 173 174 def _verify_request_id(self): 175 if self._source.allow_idp_initiated: 176 # If IdP-initiated SSO flows are enabled, we want to cache the Response ID 177 # somewhat mitigate replay attacks 178 seen_ids = cache.get(CACHE_SEEN_REQUEST_ID % self._source.pk, []) 179 if self._root.attrib["ID"] in seen_ids: 180 raise SuspiciousOperation("Replay attack detected") 181 seen_ids.append(self._root.attrib["ID"]) 182 cache.set(CACHE_SEEN_REQUEST_ID % self._source.pk, seen_ids) 183 return 184 if ( 185 SESSION_KEY_REQUEST_ID not in self._http_request.session 186 or "InResponseTo" not in self._root.attrib 187 ): 188 raise MismatchedRequestID( 189 "Missing InResponseTo and IdP-initiated Logins are not allowed" 190 ) 191 if self._http_request.session[SESSION_KEY_REQUEST_ID] != self._root.attrib["InResponseTo"]: 192 raise MismatchedRequestID("Mismatched request ID") 193 194 def _verify_status(self): 195 """Check for SAML Status elements""" 196 status = self._root.find(f"{{{NS_SAML_PROTOCOL}}}Status") 197 if status is None: 198 return 199 status_code = status.find(f"{{{NS_SAML_PROTOCOL}}}StatusCode") 200 message = status.find(f"{{{NS_SAML_PROTOCOL}}}StatusMessage") 201 message_text = message.text if message is not None else None 202 detail = status.find(f"{{{NS_SAML_PROTOCOL}}}StatusDetail") 203 detail_text = etree.tostring(detail, encoding="unicode") if detail is not None else None 204 if status_code.attrib.get("Value") != SAML_STATUS_SUCCESS: 205 if detail_text and message_text: 206 raise ValueError(f"{message_text}: {detail_text}") 207 raise ValueError( 208 detail_text or message_text or f"SAML Status: {status_code.attrib.get('Value')}" 209 ) 210 if message_text or detail_text: 211 LOGGER.debug("SAML Status message", message=message_text, detail=detail_text) 212 213 def _handle_name_id_transient(self) -> SourceFlowManager: 214 """Handle a NameID with the Format of Transient. This is a bit more complex than other 215 formats, as we need to create a temporary User that is used in the session. This 216 user has an attribute that refers to our Source for cleanup. The user is also deleted 217 on logout and periodically.""" 218 # Create a temporary User 219 name_id = self._get_name_id() 220 username = name_id.text 221 # trim username to ensure it is max 150 chars 222 username = f"ak-{username[: USERNAME_MAX_LENGTH - 14]}-transient" 223 expiry = mktime( 224 (now() + timedelta_from_string(self._source.temporary_user_delete_after)).timetuple() 225 ) 226 user: User = User.objects.create( 227 username=username, 228 attributes={ 229 USER_ATTRIBUTE_GENERATED: True, 230 USER_ATTRIBUTE_SOURCES: [ 231 self._source.name, 232 ], 233 USER_ATTRIBUTE_DELETE_ON_LOGOUT: True, 234 USER_ATTRIBUTE_EXPIRES: expiry, 235 }, 236 path=self._source.get_user_path(), 237 ) 238 LOGGER.debug("Created temporary user for NameID Transient", username=name_id.text) 239 user.set_unusable_password() 240 user.save() 241 UserSAMLSourceConnection.objects.create( 242 source=self._source, user=user, identifier=name_id.text 243 ) 244 return SAMLSourceFlowManager( 245 source=self._source, 246 request=self._http_request, 247 identifier=str(name_id.text), 248 user_info={ 249 "root": self._root, 250 "assertion": self.get_assertion(), 251 "name_id": name_id, 252 }, 253 policy_context={}, 254 ) 255 256 def get_assertion(self) -> Element | None: 257 """Get assertion element, if we have a signed assertion""" 258 if self._assertion is not None: 259 return self._assertion 260 return self._root.find(f"{{{NS_SAML_ASSERTION}}}Assertion") 261 262 def _get_name_id(self) -> Element: 263 """Get NameID Element""" 264 assertion = self.get_assertion() 265 if assertion is None: 266 raise ValueError("Assertion element not found") 267 subject = assertion.find(f"{{{NS_SAML_ASSERTION}}}Subject") 268 if subject is None: 269 raise ValueError("Subject element not found") 270 name_id = subject.find(f"{{{NS_SAML_ASSERTION}}}NameID") 271 if name_id is None: 272 raise ValueError("NameID element not found") 273 return name_id 274 275 def _get_name_id_filter(self) -> dict[str, str]: 276 """Returns the subject's NameID as a Filter for the `User`""" 277 name_id_el = self._get_name_id() 278 name_id = name_id_el.text 279 if not name_id: 280 raise UnsupportedNameIDFormat("Subject's NameID is empty.") 281 _format = name_id_el.attrib["Format"] 282 if _format == SAML_NAME_ID_FORMAT_EMAIL: 283 return {"email": name_id} 284 if _format == SAML_NAME_ID_FORMAT_PERSISTENT: 285 return {"username": name_id} 286 if _format == SAML_NAME_ID_FORMAT_X509: 287 # This attribute is statically set by the LDAP source 288 return {"attributes__distinguishedName": name_id} 289 if _format == SAML_NAME_ID_FORMAT_WINDOWS: 290 if "\\" in name_id: 291 name_id = name_id.split("\\")[1] 292 return {"username": name_id} 293 raise UnsupportedNameIDFormat( 294 f"Assertion contains NameID with unsupported format {_format}." 295 ) 296 297 def prepare_flow_manager(self) -> SourceFlowManager: 298 """Prepare flow plan depending on whether or not the user exists""" 299 name_id = self._get_name_id() 300 # Sanity check, show a warning if NameIDPolicy doesn't match what we go 301 if self._source.name_id_policy != name_id.attrib["Format"]: 302 LOGGER.warning( 303 "NameID from IdP doesn't match our policy", 304 expected=self._source.name_id_policy, 305 got=name_id.attrib["Format"], 306 ) 307 # transient NameIDs are handled separately as they don't have to go through flows. 308 if name_id.attrib["Format"] == SAML_NAME_ID_FORMAT_TRANSIENT: 309 return self._handle_name_id_transient() 310 311 return SAMLSourceFlowManager( 312 source=self._source, 313 request=self._http_request, 314 identifier=str(name_id.text), 315 user_info={ 316 "root": self._root, 317 "assertion": self.get_assertion(), 318 "name_id": name_id, 319 }, 320 policy_context={ 321 "saml_response": etree.tostring(self._root), 322 }, 323 )
SAML Response Processor
ResponseProcessor( source: authentik.sources.saml.models.SAMLSource, request: django.http.request.HttpRequest)
def
parse(self):
76 def parse(self): 77 """Check if `request` contains SAML Response data, parse and validate it.""" 78 # First off, check if we have any SAML Data at all. 79 raw_response = self._http_request.POST.get("SAMLResponse", None) 80 if not raw_response: 81 raise MissingSAMLResponse("Request does not contain 'SAMLResponse'") 82 # Check if response is compressed, b64 decode it 83 self._root_xml = b64decode(raw_response.encode()) 84 self._root = fromstring(self._root_xml) 85 86 # Verify response signature BEFORE decryption (signature covers encrypted content) 87 if self._source.verification_kp and self._source.signed_response: 88 self._verify_response_signature() 89 90 if self._source.encryption_kp: 91 self._decrypt_response() 92 93 # Verify assertion signature AFTER decryption (signature is inside encrypted content) 94 if self._source.verification_kp and self._source.signed_assertion: 95 self._verify_assertion_signature() 96 97 self._verify_request_id() 98 self._verify_status()
Check if request contains SAML Response data, parse and validate it.
def
get_assertion(unknown):
256 def get_assertion(self) -> Element | None: 257 """Get assertion element, if we have a signed assertion""" 258 if self._assertion is not None: 259 return self._assertion 260 return self._root.find(f"{{{NS_SAML_ASSERTION}}}Assertion")
Get assertion element, if we have a signed assertion
297 def prepare_flow_manager(self) -> SourceFlowManager: 298 """Prepare flow plan depending on whether or not the user exists""" 299 name_id = self._get_name_id() 300 # Sanity check, show a warning if NameIDPolicy doesn't match what we go 301 if self._source.name_id_policy != name_id.attrib["Format"]: 302 LOGGER.warning( 303 "NameID from IdP doesn't match our policy", 304 expected=self._source.name_id_policy, 305 got=name_id.attrib["Format"], 306 ) 307 # transient NameIDs are handled separately as they don't have to go through flows. 308 if name_id.attrib["Format"] == SAML_NAME_ID_FORMAT_TRANSIENT: 309 return self._handle_name_id_transient() 310 311 return SAMLSourceFlowManager( 312 source=self._source, 313 request=self._http_request, 314 identifier=str(name_id.text), 315 user_info={ 316 "root": self._root, 317 "assertion": self.get_assertion(), 318 "name_id": name_id, 319 }, 320 policy_context={ 321 "saml_response": etree.tostring(self._root), 322 }, 323 )
Prepare flow plan depending on whether or not the user exists
326class SAMLSourceFlowManager(SourceFlowManager): 327 """Source flow manager for SAML Sources""" 328 329 user_connection_type = UserSAMLSourceConnection 330 group_connection_type = GroupSAMLSourceConnection
Source flow manager for SAML Sources
user_connection_type =
<class 'authentik.sources.saml.models.UserSAMLSourceConnection'>
group_connection_type =
<class 'authentik.sources.saml.models.GroupSAMLSourceConnection'>