authentik.sources.saml.processors.response

authentik saml source processor

  1"""authentik saml source processor"""
  2
  3from base64 import b64decode
  4from datetime import UTC, datetime
  5from time import mktime
  6from typing import TYPE_CHECKING
  7
  8import xmlsec
  9from defusedxml.lxml import fromstring
 10from django.core.cache import cache
 11from django.core.exceptions import SuspiciousOperation
 12from django.http import HttpRequest
 13from django.utils.timezone import now
 14from lxml import etree  # nosec
 15from lxml.etree import _Element  # nosec
 16from structlog.stdlib import get_logger
 17
 18from authentik.common.saml.constants import (
 19    NS_MAP,
 20    NS_SAML_ASSERTION,
 21    NS_SAML_PROTOCOL,
 22    SAML_NAME_ID_FORMAT_EMAIL,
 23    SAML_NAME_ID_FORMAT_PERSISTENT,
 24    SAML_NAME_ID_FORMAT_TRANSIENT,
 25    SAML_NAME_ID_FORMAT_WINDOWS,
 26    SAML_NAME_ID_FORMAT_X509,
 27    SAML_STATUS_SUCCESS,
 28)
 29from authentik.core.models import (
 30    USER_ATTRIBUTE_DELETE_ON_LOGOUT,
 31    USER_ATTRIBUTE_EXPIRES,
 32    USER_ATTRIBUTE_GENERATED,
 33    USER_ATTRIBUTE_SOURCES,
 34    USERNAME_MAX_LENGTH,
 35    User,
 36)
 37from authentik.core.sources.flow_manager import SourceFlowManager
 38from authentik.lib.utils.time import timedelta_from_string
 39from authentik.sources.saml.exceptions import (
 40    InvalidEncryption,
 41    InvalidSignature,
 42    MismatchedRequestID,
 43    MissingSAMLResponse,
 44    SAMLException,
 45    UnsupportedNameIDFormat,
 46)
 47from authentik.sources.saml.models import (
 48    GroupSAMLSourceConnection,
 49    SAMLSource,
 50    UserSAMLSourceConnection,
 51)
 52from authentik.sources.saml.processors.request import SESSION_KEY_REQUEST_ID
 53
 54LOGGER = get_logger()
 55if TYPE_CHECKING:
 56    from xml.etree.ElementTree import Element  # nosec
 57
 58CACHE_SEEN_REQUEST_ID = "authentik_saml_seen_ids_%s"
 59
 60
 61class ResponseProcessor:
 62    """SAML Response Processor"""
 63
 64    _source: SAMLSource
 65
 66    _root: _Element
 67    _root_xml: bytes
 68
 69    _http_request: HttpRequest
 70
 71    _assertion: _Element | None = None
 72
 73    def __init__(self, source: SAMLSource, request: HttpRequest):
 74        self._source = source
 75        self._http_request = request
 76
 77    def parse(self):
 78        """Check if `request` contains SAML Response data, parse and validate it."""
 79        # First off, check if we have any SAML Data at all.
 80        raw_response = self._http_request.POST.get("SAMLResponse", None)
 81        if not raw_response:
 82            raise MissingSAMLResponse("Request does not contain 'SAMLResponse'")
 83        # Check if response is compressed, b64 decode it
 84        self._root_xml = b64decode(raw_response.encode())
 85        self._root = fromstring(self._root_xml)
 86
 87        # Verify response signature BEFORE decryption (signature covers encrypted content)
 88        if self._source.verification_kp and self._source.signed_response:
 89            self._verify_response_signature()
 90
 91        if self._source.encryption_kp:
 92            self._decrypt_response()
 93
 94        # Verify assertion signature AFTER decryption (signature is inside encrypted content)
 95        if self._source.verification_kp and self._source.signed_assertion:
 96            self._verify_assertion_signature()
 97
 98        self._verify_request_id()
 99        self._verify_status()
100        self._verify_conditions()
101
102    def _decrypt_response(self):
103        """Decrypt SAMLResponse EncryptedAssertion Element"""
104        manager = xmlsec.KeysManager()
105        key = xmlsec.Key.from_memory(
106            self._source.encryption_kp.key_data,
107            xmlsec.constants.KeyDataFormatPem,
108        )
109
110        manager.add_key(key)
111        encryption_context = xmlsec.EncryptionContext(manager)
112
113        encrypted_assertion = self._root.find(f".//{{{NS_SAML_ASSERTION}}}EncryptedAssertion")
114        if encrypted_assertion is None:
115            raise InvalidEncryption()
116        encrypted_data = xmlsec.tree.find_child(
117            encrypted_assertion, "EncryptedData", xmlsec.constants.EncNs
118        )
119        try:
120            decrypted_assertion = encryption_context.decrypt(encrypted_data)
121        except xmlsec.Error as exc:
122            raise InvalidEncryption() from exc
123
124        index_of = self._root.index(encrypted_assertion)
125        self._root.remove(encrypted_assertion)
126        self._root.insert(
127            index_of,
128            decrypted_assertion,
129        )
130        self._assertion = decrypted_assertion
131
132    def _verify_conditions(self):
133        conditions = self.get_assertion().find(f"{{{NS_SAML_ASSERTION}}}Conditions")
134        if conditions is None:
135            return
136        _now = now()
137        before = conditions.attrib.get("NotBefore")
138        if before:
139            if datetime.fromisoformat(before).replace(tzinfo=UTC) > _now:
140                raise SAMLException("Assertion is not valid yet or expired.")
141        on_or_after = conditions.attrib.get("NotOnOrAfter")
142        if on_or_after:
143            if datetime.fromisoformat(on_or_after).replace(tzinfo=UTC) < _now:
144                raise SAMLException("Assertion is not valid yet or expired.")
145
146    def _verify_signature(self, signature_node: _Element, target: _Element):
147        """Verify a single signature node against the given target element."""
148        target_id = target.attrib.get("ID")
149        if not target_id:
150            raise InvalidSignature("Signed element is missing an ID attribute.")
151        refs = signature_node.xpath("./ds:SignedInfo/ds:Reference", namespaces=NS_MAP)
152        if len(refs) != 1:
153            raise InvalidSignature("Signature must contain exactly one Reference.")
154        ref_uri = refs[0].get("URI", "")
155        if ref_uri not in ("", f"#{target_id}"):
156            raise InvalidSignature(
157                "Signature Reference URI does not match the signed element's ID."
158            )
159
160        xmlsec.tree.add_ids(target, ["ID"])
161
162        ctx = xmlsec.SignatureContext()
163        key = xmlsec.Key.from_memory(
164            self._source.verification_kp.certificate_data,
165            xmlsec.constants.KeyDataFormatCertPem,
166        )
167        ctx.key = key
168
169        ctx.set_enabled_key_data([xmlsec.constants.KeyDataX509])
170        try:
171            ctx.verify(signature_node)
172        except xmlsec.Error as exc:
173            raise InvalidSignature(
174                "The signature of the SAML object is either missing or invalid."
175            ) from exc
176        LOGGER.debug("Successfully verified signature")
177
178    def _verify_response_signature(self):
179        """Verify SAML Response's Signature (before decryption)"""
180        signature_nodes = self._root.xpath("/samlp:Response/ds:Signature", namespaces=NS_MAP)
181
182        if len(signature_nodes) != 1:
183            raise InvalidSignature("Expected exactly one Signature in the Response element.")
184
185        self._verify_signature(signature_nodes[0], self._root)
186
187    def _verify_assertion_signature(self):
188        """Verify SAML Assertion's Signature (after decryption)"""
189        signature_nodes = self._root.xpath(
190            "/samlp:Response/saml:Assertion/ds:Signature", namespaces=NS_MAP
191        )
192        if len(signature_nodes) != 1:
193            raise InvalidSignature("Expected exactly one signed Assertion in the Response.")
194        signature_node = signature_nodes[0]
195        assertion = signature_node.getparent()
196
197        self._verify_signature(signature_node, assertion)
198        self._assertion = assertion
199
200    def _verify_request_id(self):
201        if self._source.allow_idp_initiated:
202            # If IdP-initiated SSO flows are enabled, we want to cache the Response ID
203            # somewhat mitigate replay attacks
204            seen_ids = cache.get(CACHE_SEEN_REQUEST_ID % self._source.pk, [])
205            if self._root.attrib["ID"] in seen_ids:
206                raise SuspiciousOperation("Replay attack detected")
207            seen_ids.append(self._root.attrib["ID"])
208            cache.set(CACHE_SEEN_REQUEST_ID % self._source.pk, seen_ids)
209            return
210        if (
211            SESSION_KEY_REQUEST_ID not in self._http_request.session
212            or "InResponseTo" not in self._root.attrib
213        ):
214            raise MismatchedRequestID(
215                "Missing InResponseTo and IdP-initiated Logins are not allowed"
216            )
217        if self._http_request.session[SESSION_KEY_REQUEST_ID] != self._root.attrib["InResponseTo"]:
218            raise MismatchedRequestID("Mismatched request ID")
219
220    def _verify_status(self):
221        """Check for SAML Status elements"""
222        status = self._root.find(f"{{{NS_SAML_PROTOCOL}}}Status")
223        if status is None:
224            return
225        status_code = status.find(f"{{{NS_SAML_PROTOCOL}}}StatusCode")
226        message = status.find(f"{{{NS_SAML_PROTOCOL}}}StatusMessage")
227        message_text = message.text if message is not None else None
228        detail = status.find(f"{{{NS_SAML_PROTOCOL}}}StatusDetail")
229        detail_text = etree.tostring(detail, encoding="unicode") if detail is not None else None
230        if status_code.attrib.get("Value") != SAML_STATUS_SUCCESS:
231            if detail_text and message_text:
232                raise ValueError(f"{message_text}: {detail_text}")
233            raise ValueError(
234                detail_text or message_text or f"SAML Status: {status_code.attrib.get('Value')}"
235            )
236        if message_text or detail_text:
237            LOGGER.debug("SAML Status message", message=message_text, detail=detail_text)
238
239    def _handle_name_id_transient(self) -> SourceFlowManager:
240        """Handle a NameID with the Format of Transient. This is a bit more complex than other
241        formats, as we need to create a temporary User that is used in the session. This
242        user has an attribute that refers to our Source for cleanup. The user is also deleted
243        on logout and periodically."""
244        # Create a temporary User
245        name_id_el, name_id = self._get_name_id()
246        # trim username to ensure it is max 150 chars
247        username = f"ak-{name_id[: USERNAME_MAX_LENGTH - 14]}-transient"
248        expiry = mktime(
249            (now() + timedelta_from_string(self._source.temporary_user_delete_after)).timetuple()
250        )
251        user: User = User.objects.create(
252            username=username,
253            attributes={
254                USER_ATTRIBUTE_GENERATED: True,
255                USER_ATTRIBUTE_SOURCES: [
256                    self._source.name,
257                ],
258                USER_ATTRIBUTE_DELETE_ON_LOGOUT: True,
259                USER_ATTRIBUTE_EXPIRES: expiry,
260            },
261            path=self._source.get_user_path(),
262        )
263        LOGGER.debug("Created temporary user for NameID Transient", username=name_id)
264        user.set_unusable_password()
265        user.save()
266        UserSAMLSourceConnection.objects.create(source=self._source, user=user, identifier=name_id)
267        return SAMLSourceFlowManager(
268            source=self._source,
269            request=self._http_request,
270            identifier=str(name_id),
271            user_info={
272                "root": self._root,
273                "assertion": self.get_assertion(),
274                "name_id": name_id_el,
275            },
276            policy_context={},
277        )
278
279    def get_assertion(self) -> Element | None:
280        """Get assertion element, if we have a signed assertion"""
281        if self._assertion is not None:
282            return self._assertion
283        return self._root.find(f"{{{NS_SAML_ASSERTION}}}Assertion")
284
285    def _get_name_id(self) -> tuple[Element, str]:
286        """Get NameID Element"""
287        assertion = self.get_assertion()
288        if assertion is None:
289            raise ValueError("Assertion element not found")
290        subject = assertion.find(f"{{{NS_SAML_ASSERTION}}}Subject")
291        if subject is None:
292            raise ValueError("Subject element not found")
293        name_id = subject.find(f"{{{NS_SAML_ASSERTION}}}NameID")
294        if name_id is None:
295            raise ValueError("NameID element not found")
296        return name_id, "".join(name_id.itertext())
297
298    def _get_name_id_filter(self) -> dict[str, str]:
299        """Returns the subject's NameID as a Filter for the `User`"""
300        name_id_el, name_id = self._get_name_id()
301        if not name_id:
302            raise UnsupportedNameIDFormat("Subject's NameID is empty.")
303        _format = name_id_el.attrib["Format"]
304        if _format == SAML_NAME_ID_FORMAT_EMAIL:
305            return {"email": name_id}
306        if _format == SAML_NAME_ID_FORMAT_PERSISTENT:
307            return {"username": name_id}
308        if _format == SAML_NAME_ID_FORMAT_X509:
309            # This attribute is statically set by the LDAP source
310            return {"attributes__distinguishedName": name_id}
311        if _format == SAML_NAME_ID_FORMAT_WINDOWS:
312            if "\\" in name_id:
313                name_id = name_id.split("\\")[1]
314            return {"username": name_id}
315        raise UnsupportedNameIDFormat(
316            f"Assertion contains NameID with unsupported format {_format}."
317        )
318
319    def prepare_flow_manager(self) -> SourceFlowManager:
320        """Prepare flow plan depending on whether or not the user exists"""
321        name_id_el, name_id = self._get_name_id()
322        # Sanity check, show a warning if NameIDPolicy doesn't match what we go
323        if self._source.name_id_policy != name_id_el.attrib["Format"]:
324            LOGGER.warning(
325                "NameID from IdP doesn't match our policy",
326                expected=self._source.name_id_policy,
327                got=name_id_el.attrib["Format"],
328            )
329        # transient NameIDs are handled separately as they don't have to go through flows.
330        if name_id_el.attrib["Format"] == SAML_NAME_ID_FORMAT_TRANSIENT:
331            return self._handle_name_id_transient()
332
333        return SAMLSourceFlowManager(
334            source=self._source,
335            request=self._http_request,
336            identifier=str(name_id),
337            user_info={
338                "root": self._root,
339                "assertion": self.get_assertion(),
340                "name_id": name_id_el,
341            },
342            policy_context={
343                "saml_response": etree.tostring(self._root),
344            },
345        )
346
347
348class SAMLSourceFlowManager(SourceFlowManager):
349    """Source flow manager for SAML Sources"""
350
351    user_connection_type = UserSAMLSourceConnection
352    group_connection_type = GroupSAMLSourceConnection
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
CACHE_SEEN_REQUEST_ID = 'authentik_saml_seen_ids_%s'
class ResponseProcessor:
 62class ResponseProcessor:
 63    """SAML Response Processor"""
 64
 65    _source: SAMLSource
 66
 67    _root: _Element
 68    _root_xml: bytes
 69
 70    _http_request: HttpRequest
 71
 72    _assertion: _Element | None = None
 73
 74    def __init__(self, source: SAMLSource, request: HttpRequest):
 75        self._source = source
 76        self._http_request = request
 77
 78    def parse(self):
 79        """Check if `request` contains SAML Response data, parse and validate it."""
 80        # First off, check if we have any SAML Data at all.
 81        raw_response = self._http_request.POST.get("SAMLResponse", None)
 82        if not raw_response:
 83            raise MissingSAMLResponse("Request does not contain 'SAMLResponse'")
 84        # Check if response is compressed, b64 decode it
 85        self._root_xml = b64decode(raw_response.encode())
 86        self._root = fromstring(self._root_xml)
 87
 88        # Verify response signature BEFORE decryption (signature covers encrypted content)
 89        if self._source.verification_kp and self._source.signed_response:
 90            self._verify_response_signature()
 91
 92        if self._source.encryption_kp:
 93            self._decrypt_response()
 94
 95        # Verify assertion signature AFTER decryption (signature is inside encrypted content)
 96        if self._source.verification_kp and self._source.signed_assertion:
 97            self._verify_assertion_signature()
 98
 99        self._verify_request_id()
100        self._verify_status()
101        self._verify_conditions()
102
103    def _decrypt_response(self):
104        """Decrypt SAMLResponse EncryptedAssertion Element"""
105        manager = xmlsec.KeysManager()
106        key = xmlsec.Key.from_memory(
107            self._source.encryption_kp.key_data,
108            xmlsec.constants.KeyDataFormatPem,
109        )
110
111        manager.add_key(key)
112        encryption_context = xmlsec.EncryptionContext(manager)
113
114        encrypted_assertion = self._root.find(f".//{{{NS_SAML_ASSERTION}}}EncryptedAssertion")
115        if encrypted_assertion is None:
116            raise InvalidEncryption()
117        encrypted_data = xmlsec.tree.find_child(
118            encrypted_assertion, "EncryptedData", xmlsec.constants.EncNs
119        )
120        try:
121            decrypted_assertion = encryption_context.decrypt(encrypted_data)
122        except xmlsec.Error as exc:
123            raise InvalidEncryption() from exc
124
125        index_of = self._root.index(encrypted_assertion)
126        self._root.remove(encrypted_assertion)
127        self._root.insert(
128            index_of,
129            decrypted_assertion,
130        )
131        self._assertion = decrypted_assertion
132
133    def _verify_conditions(self):
134        conditions = self.get_assertion().find(f"{{{NS_SAML_ASSERTION}}}Conditions")
135        if conditions is None:
136            return
137        _now = now()
138        before = conditions.attrib.get("NotBefore")
139        if before:
140            if datetime.fromisoformat(before).replace(tzinfo=UTC) > _now:
141                raise SAMLException("Assertion is not valid yet or expired.")
142        on_or_after = conditions.attrib.get("NotOnOrAfter")
143        if on_or_after:
144            if datetime.fromisoformat(on_or_after).replace(tzinfo=UTC) < _now:
145                raise SAMLException("Assertion is not valid yet or expired.")
146
147    def _verify_signature(self, signature_node: _Element, target: _Element):
148        """Verify a single signature node against the given target element."""
149        target_id = target.attrib.get("ID")
150        if not target_id:
151            raise InvalidSignature("Signed element is missing an ID attribute.")
152        refs = signature_node.xpath("./ds:SignedInfo/ds:Reference", namespaces=NS_MAP)
153        if len(refs) != 1:
154            raise InvalidSignature("Signature must contain exactly one Reference.")
155        ref_uri = refs[0].get("URI", "")
156        if ref_uri not in ("", f"#{target_id}"):
157            raise InvalidSignature(
158                "Signature Reference URI does not match the signed element's ID."
159            )
160
161        xmlsec.tree.add_ids(target, ["ID"])
162
163        ctx = xmlsec.SignatureContext()
164        key = xmlsec.Key.from_memory(
165            self._source.verification_kp.certificate_data,
166            xmlsec.constants.KeyDataFormatCertPem,
167        )
168        ctx.key = key
169
170        ctx.set_enabled_key_data([xmlsec.constants.KeyDataX509])
171        try:
172            ctx.verify(signature_node)
173        except xmlsec.Error as exc:
174            raise InvalidSignature(
175                "The signature of the SAML object is either missing or invalid."
176            ) from exc
177        LOGGER.debug("Successfully verified signature")
178
179    def _verify_response_signature(self):
180        """Verify SAML Response's Signature (before decryption)"""
181        signature_nodes = self._root.xpath("/samlp:Response/ds:Signature", namespaces=NS_MAP)
182
183        if len(signature_nodes) != 1:
184            raise InvalidSignature("Expected exactly one Signature in the Response element.")
185
186        self._verify_signature(signature_nodes[0], self._root)
187
188    def _verify_assertion_signature(self):
189        """Verify SAML Assertion's Signature (after decryption)"""
190        signature_nodes = self._root.xpath(
191            "/samlp:Response/saml:Assertion/ds:Signature", namespaces=NS_MAP
192        )
193        if len(signature_nodes) != 1:
194            raise InvalidSignature("Expected exactly one signed Assertion in the Response.")
195        signature_node = signature_nodes[0]
196        assertion = signature_node.getparent()
197
198        self._verify_signature(signature_node, assertion)
199        self._assertion = assertion
200
201    def _verify_request_id(self):
202        if self._source.allow_idp_initiated:
203            # If IdP-initiated SSO flows are enabled, we want to cache the Response ID
204            # somewhat mitigate replay attacks
205            seen_ids = cache.get(CACHE_SEEN_REQUEST_ID % self._source.pk, [])
206            if self._root.attrib["ID"] in seen_ids:
207                raise SuspiciousOperation("Replay attack detected")
208            seen_ids.append(self._root.attrib["ID"])
209            cache.set(CACHE_SEEN_REQUEST_ID % self._source.pk, seen_ids)
210            return
211        if (
212            SESSION_KEY_REQUEST_ID not in self._http_request.session
213            or "InResponseTo" not in self._root.attrib
214        ):
215            raise MismatchedRequestID(
216                "Missing InResponseTo and IdP-initiated Logins are not allowed"
217            )
218        if self._http_request.session[SESSION_KEY_REQUEST_ID] != self._root.attrib["InResponseTo"]:
219            raise MismatchedRequestID("Mismatched request ID")
220
221    def _verify_status(self):
222        """Check for SAML Status elements"""
223        status = self._root.find(f"{{{NS_SAML_PROTOCOL}}}Status")
224        if status is None:
225            return
226        status_code = status.find(f"{{{NS_SAML_PROTOCOL}}}StatusCode")
227        message = status.find(f"{{{NS_SAML_PROTOCOL}}}StatusMessage")
228        message_text = message.text if message is not None else None
229        detail = status.find(f"{{{NS_SAML_PROTOCOL}}}StatusDetail")
230        detail_text = etree.tostring(detail, encoding="unicode") if detail is not None else None
231        if status_code.attrib.get("Value") != SAML_STATUS_SUCCESS:
232            if detail_text and message_text:
233                raise ValueError(f"{message_text}: {detail_text}")
234            raise ValueError(
235                detail_text or message_text or f"SAML Status: {status_code.attrib.get('Value')}"
236            )
237        if message_text or detail_text:
238            LOGGER.debug("SAML Status message", message=message_text, detail=detail_text)
239
240    def _handle_name_id_transient(self) -> SourceFlowManager:
241        """Handle a NameID with the Format of Transient. This is a bit more complex than other
242        formats, as we need to create a temporary User that is used in the session. This
243        user has an attribute that refers to our Source for cleanup. The user is also deleted
244        on logout and periodically."""
245        # Create a temporary User
246        name_id_el, name_id = self._get_name_id()
247        # trim username to ensure it is max 150 chars
248        username = f"ak-{name_id[: USERNAME_MAX_LENGTH - 14]}-transient"
249        expiry = mktime(
250            (now() + timedelta_from_string(self._source.temporary_user_delete_after)).timetuple()
251        )
252        user: User = User.objects.create(
253            username=username,
254            attributes={
255                USER_ATTRIBUTE_GENERATED: True,
256                USER_ATTRIBUTE_SOURCES: [
257                    self._source.name,
258                ],
259                USER_ATTRIBUTE_DELETE_ON_LOGOUT: True,
260                USER_ATTRIBUTE_EXPIRES: expiry,
261            },
262            path=self._source.get_user_path(),
263        )
264        LOGGER.debug("Created temporary user for NameID Transient", username=name_id)
265        user.set_unusable_password()
266        user.save()
267        UserSAMLSourceConnection.objects.create(source=self._source, user=user, identifier=name_id)
268        return SAMLSourceFlowManager(
269            source=self._source,
270            request=self._http_request,
271            identifier=str(name_id),
272            user_info={
273                "root": self._root,
274                "assertion": self.get_assertion(),
275                "name_id": name_id_el,
276            },
277            policy_context={},
278        )
279
280    def get_assertion(self) -> Element | None:
281        """Get assertion element, if we have a signed assertion"""
282        if self._assertion is not None:
283            return self._assertion
284        return self._root.find(f"{{{NS_SAML_ASSERTION}}}Assertion")
285
286    def _get_name_id(self) -> tuple[Element, str]:
287        """Get NameID Element"""
288        assertion = self.get_assertion()
289        if assertion is None:
290            raise ValueError("Assertion element not found")
291        subject = assertion.find(f"{{{NS_SAML_ASSERTION}}}Subject")
292        if subject is None:
293            raise ValueError("Subject element not found")
294        name_id = subject.find(f"{{{NS_SAML_ASSERTION}}}NameID")
295        if name_id is None:
296            raise ValueError("NameID element not found")
297        return name_id, "".join(name_id.itertext())
298
299    def _get_name_id_filter(self) -> dict[str, str]:
300        """Returns the subject's NameID as a Filter for the `User`"""
301        name_id_el, name_id = self._get_name_id()
302        if not name_id:
303            raise UnsupportedNameIDFormat("Subject's NameID is empty.")
304        _format = name_id_el.attrib["Format"]
305        if _format == SAML_NAME_ID_FORMAT_EMAIL:
306            return {"email": name_id}
307        if _format == SAML_NAME_ID_FORMAT_PERSISTENT:
308            return {"username": name_id}
309        if _format == SAML_NAME_ID_FORMAT_X509:
310            # This attribute is statically set by the LDAP source
311            return {"attributes__distinguishedName": name_id}
312        if _format == SAML_NAME_ID_FORMAT_WINDOWS:
313            if "\\" in name_id:
314                name_id = name_id.split("\\")[1]
315            return {"username": name_id}
316        raise UnsupportedNameIDFormat(
317            f"Assertion contains NameID with unsupported format {_format}."
318        )
319
320    def prepare_flow_manager(self) -> SourceFlowManager:
321        """Prepare flow plan depending on whether or not the user exists"""
322        name_id_el, name_id = self._get_name_id()
323        # Sanity check, show a warning if NameIDPolicy doesn't match what we go
324        if self._source.name_id_policy != name_id_el.attrib["Format"]:
325            LOGGER.warning(
326                "NameID from IdP doesn't match our policy",
327                expected=self._source.name_id_policy,
328                got=name_id_el.attrib["Format"],
329            )
330        # transient NameIDs are handled separately as they don't have to go through flows.
331        if name_id_el.attrib["Format"] == SAML_NAME_ID_FORMAT_TRANSIENT:
332            return self._handle_name_id_transient()
333
334        return SAMLSourceFlowManager(
335            source=self._source,
336            request=self._http_request,
337            identifier=str(name_id),
338            user_info={
339                "root": self._root,
340                "assertion": self.get_assertion(),
341                "name_id": name_id_el,
342            },
343            policy_context={
344                "saml_response": etree.tostring(self._root),
345            },
346        )

SAML Response Processor

ResponseProcessor( source: authentik.sources.saml.models.SAMLSource, request: django.http.request.HttpRequest)
74    def __init__(self, source: SAMLSource, request: HttpRequest):
75        self._source = source
76        self._http_request = request
def parse(self):
 78    def parse(self):
 79        """Check if `request` contains SAML Response data, parse and validate it."""
 80        # First off, check if we have any SAML Data at all.
 81        raw_response = self._http_request.POST.get("SAMLResponse", None)
 82        if not raw_response:
 83            raise MissingSAMLResponse("Request does not contain 'SAMLResponse'")
 84        # Check if response is compressed, b64 decode it
 85        self._root_xml = b64decode(raw_response.encode())
 86        self._root = fromstring(self._root_xml)
 87
 88        # Verify response signature BEFORE decryption (signature covers encrypted content)
 89        if self._source.verification_kp and self._source.signed_response:
 90            self._verify_response_signature()
 91
 92        if self._source.encryption_kp:
 93            self._decrypt_response()
 94
 95        # Verify assertion signature AFTER decryption (signature is inside encrypted content)
 96        if self._source.verification_kp and self._source.signed_assertion:
 97            self._verify_assertion_signature()
 98
 99        self._verify_request_id()
100        self._verify_status()
101        self._verify_conditions()

Check if request contains SAML Response data, parse and validate it.

def get_assertion(unknown):
280    def get_assertion(self) -> Element | None:
281        """Get assertion element, if we have a signed assertion"""
282        if self._assertion is not None:
283            return self._assertion
284        return self._root.find(f"{{{NS_SAML_ASSERTION}}}Assertion")

Get assertion element, if we have a signed assertion

def prepare_flow_manager(self) -> authentik.core.sources.flow_manager.SourceFlowManager:
320    def prepare_flow_manager(self) -> SourceFlowManager:
321        """Prepare flow plan depending on whether or not the user exists"""
322        name_id_el, name_id = self._get_name_id()
323        # Sanity check, show a warning if NameIDPolicy doesn't match what we go
324        if self._source.name_id_policy != name_id_el.attrib["Format"]:
325            LOGGER.warning(
326                "NameID from IdP doesn't match our policy",
327                expected=self._source.name_id_policy,
328                got=name_id_el.attrib["Format"],
329            )
330        # transient NameIDs are handled separately as they don't have to go through flows.
331        if name_id_el.attrib["Format"] == SAML_NAME_ID_FORMAT_TRANSIENT:
332            return self._handle_name_id_transient()
333
334        return SAMLSourceFlowManager(
335            source=self._source,
336            request=self._http_request,
337            identifier=str(name_id),
338            user_info={
339                "root": self._root,
340                "assertion": self.get_assertion(),
341                "name_id": name_id_el,
342            },
343            policy_context={
344                "saml_response": etree.tostring(self._root),
345            },
346        )

Prepare flow plan depending on whether or not the user exists

class SAMLSourceFlowManager(authentik.core.sources.flow_manager.SourceFlowManager):
349class SAMLSourceFlowManager(SourceFlowManager):
350    """Source flow manager for SAML Sources"""
351
352    user_connection_type = UserSAMLSourceConnection
353    group_connection_type = GroupSAMLSourceConnection

Source flow manager for SAML Sources