authentik.sources.saml.processors.response

authentik saml source processor

  1"""authentik saml source processor"""
  2
  3from base64 import b64decode
  4from time import mktime
  5from typing import TYPE_CHECKING
  6
  7import xmlsec
  8from defusedxml.lxml import fromstring
  9from django.core.cache import cache
 10from django.core.exceptions import SuspiciousOperation
 11from django.http import HttpRequest
 12from django.utils.timezone import now
 13from lxml import etree  # nosec
 14from lxml.etree import _Element  # nosec
 15from structlog.stdlib import get_logger
 16
 17from authentik.common.saml.constants import (
 18    NS_MAP,
 19    NS_SAML_ASSERTION,
 20    NS_SAML_PROTOCOL,
 21    SAML_NAME_ID_FORMAT_EMAIL,
 22    SAML_NAME_ID_FORMAT_PERSISTENT,
 23    SAML_NAME_ID_FORMAT_TRANSIENT,
 24    SAML_NAME_ID_FORMAT_WINDOWS,
 25    SAML_NAME_ID_FORMAT_X509,
 26    SAML_STATUS_SUCCESS,
 27)
 28from authentik.core.models import (
 29    USER_ATTRIBUTE_DELETE_ON_LOGOUT,
 30    USER_ATTRIBUTE_EXPIRES,
 31    USER_ATTRIBUTE_GENERATED,
 32    USER_ATTRIBUTE_SOURCES,
 33    USERNAME_MAX_LENGTH,
 34    User,
 35)
 36from authentik.core.sources.flow_manager import SourceFlowManager
 37from authentik.lib.utils.time import timedelta_from_string
 38from authentik.sources.saml.exceptions import (
 39    InvalidEncryption,
 40    InvalidSignature,
 41    MismatchedRequestID,
 42    MissingSAMLResponse,
 43    UnsupportedNameIDFormat,
 44)
 45from authentik.sources.saml.models import (
 46    GroupSAMLSourceConnection,
 47    SAMLSource,
 48    UserSAMLSourceConnection,
 49)
 50from authentik.sources.saml.processors.request import SESSION_KEY_REQUEST_ID
 51
 52LOGGER = get_logger()
 53if TYPE_CHECKING:
 54    from xml.etree.ElementTree import Element  # nosec
 55
 56CACHE_SEEN_REQUEST_ID = "authentik_saml_seen_ids_%s"
 57
 58
 59class ResponseProcessor:
 60    """SAML Response Processor"""
 61
 62    _source: SAMLSource
 63
 64    _root: _Element
 65    _root_xml: bytes
 66
 67    _http_request: HttpRequest
 68
 69    _assertion: _Element | None = None
 70
 71    def __init__(self, source: SAMLSource, request: HttpRequest):
 72        self._source = source
 73        self._http_request = request
 74
 75    def parse(self):
 76        """Check if `request` contains SAML Response data, parse and validate it."""
 77        # First off, check if we have any SAML Data at all.
 78        raw_response = self._http_request.POST.get("SAMLResponse", None)
 79        if not raw_response:
 80            raise MissingSAMLResponse("Request does not contain 'SAMLResponse'")
 81        # Check if response is compressed, b64 decode it
 82        self._root_xml = b64decode(raw_response.encode())
 83        self._root = fromstring(self._root_xml)
 84
 85        # Verify response signature BEFORE decryption (signature covers encrypted content)
 86        if self._source.verification_kp and self._source.signed_response:
 87            self._verify_response_signature()
 88
 89        if self._source.encryption_kp:
 90            self._decrypt_response()
 91
 92        # Verify assertion signature AFTER decryption (signature is inside encrypted content)
 93        if self._source.verification_kp and self._source.signed_assertion:
 94            self._verify_assertion_signature()
 95
 96        self._verify_request_id()
 97        self._verify_status()
 98
 99    def _decrypt_response(self):
100        """Decrypt SAMLResponse EncryptedAssertion Element"""
101        manager = xmlsec.KeysManager()
102        key = xmlsec.Key.from_memory(
103            self._source.encryption_kp.key_data,
104            xmlsec.constants.KeyDataFormatPem,
105        )
106
107        manager.add_key(key)
108        encryption_context = xmlsec.EncryptionContext(manager)
109
110        encrypted_assertion = self._root.find(f".//{{{NS_SAML_ASSERTION}}}EncryptedAssertion")
111        if encrypted_assertion is None:
112            raise InvalidEncryption()
113        encrypted_data = xmlsec.tree.find_child(
114            encrypted_assertion, "EncryptedData", xmlsec.constants.EncNs
115        )
116        try:
117            decrypted_assertion = encryption_context.decrypt(encrypted_data)
118        except xmlsec.Error as exc:
119            raise InvalidEncryption() from exc
120
121        index_of = self._root.index(encrypted_assertion)
122        self._root.remove(encrypted_assertion)
123        self._root.insert(
124            index_of,
125            decrypted_assertion,
126        )
127        self._assertion = decrypted_assertion
128
129    def _verify_signature(self, signature_node: _Element):
130        """Verify a single signature node"""
131        xmlsec.tree.add_ids(self._root, ["ID"])
132
133        ctx = xmlsec.SignatureContext()
134        key = xmlsec.Key.from_memory(
135            self._source.verification_kp.certificate_data,
136            xmlsec.constants.KeyDataFormatCertPem,
137        )
138        ctx.key = key
139
140        ctx.set_enabled_key_data([xmlsec.constants.KeyDataX509])
141        try:
142            ctx.verify(signature_node)
143        except xmlsec.Error as exc:
144            raise InvalidSignature(
145                "The signature of the SAML object is either missing or invalid."
146            ) from exc
147        LOGGER.debug("Successfully verified signature")
148
149    def _verify_response_signature(self):
150        """Verify SAML Response's Signature (before decryption)"""
151        signature_nodes = self._root.xpath("/samlp:Response/ds:Signature", namespaces=NS_MAP)
152
153        if len(signature_nodes) != 1:
154            raise InvalidSignature("No Signature exists in the Response element.")
155
156        self._verify_signature(signature_nodes[0])
157
158    def _verify_assertion_signature(self):
159        """Verify SAML Assertion's Signature (after decryption)"""
160        signature_nodes = self._root.xpath(
161            "/samlp:Response/saml:Assertion/ds:Signature", namespaces=NS_MAP
162        )
163
164        if len(signature_nodes) != 1:
165            raise InvalidSignature("No Signature exists in the Assertion element.")
166
167        self._verify_signature(signature_nodes[0])
168        parent = signature_nodes[0].getparent()
169        if parent is None or parent.tag != f"{{{NS_SAML_ASSERTION}}}Assertion":
170            raise InvalidSignature("No Signature exists in the Assertion element.")
171        self._assertion = parent
172
173    def _verify_request_id(self):
174        if self._source.allow_idp_initiated:
175            # If IdP-initiated SSO flows are enabled, we want to cache the Response ID
176            # somewhat mitigate replay attacks
177            seen_ids = cache.get(CACHE_SEEN_REQUEST_ID % self._source.pk, [])
178            if self._root.attrib["ID"] in seen_ids:
179                raise SuspiciousOperation("Replay attack detected")
180            seen_ids.append(self._root.attrib["ID"])
181            cache.set(CACHE_SEEN_REQUEST_ID % self._source.pk, seen_ids)
182            return
183        if (
184            SESSION_KEY_REQUEST_ID not in self._http_request.session
185            or "InResponseTo" not in self._root.attrib
186        ):
187            raise MismatchedRequestID(
188                "Missing InResponseTo and IdP-initiated Logins are not allowed"
189            )
190        if self._http_request.session[SESSION_KEY_REQUEST_ID] != self._root.attrib["InResponseTo"]:
191            raise MismatchedRequestID("Mismatched request ID")
192
193    def _verify_status(self):
194        """Check for SAML Status elements"""
195        status = self._root.find(f"{{{NS_SAML_PROTOCOL}}}Status")
196        if status is None:
197            return
198        status_code = status.find(f"{{{NS_SAML_PROTOCOL}}}StatusCode")
199        message = status.find(f"{{{NS_SAML_PROTOCOL}}}StatusMessage")
200        message_text = message.text if message is not None else None
201        detail = status.find(f"{{{NS_SAML_PROTOCOL}}}StatusDetail")
202        detail_text = etree.tostring(detail, encoding="unicode") if detail is not None else None
203        if status_code.attrib.get("Value") != SAML_STATUS_SUCCESS:
204            if detail_text and message_text:
205                raise ValueError(f"{message_text}: {detail_text}")
206            raise ValueError(
207                detail_text or message_text or f"SAML Status: {status_code.attrib.get('Value')}"
208            )
209        if message_text or detail_text:
210            LOGGER.debug("SAML Status message", message=message_text, detail=detail_text)
211
212    def _handle_name_id_transient(self) -> SourceFlowManager:
213        """Handle a NameID with the Format of Transient. This is a bit more complex than other
214        formats, as we need to create a temporary User that is used in the session. This
215        user has an attribute that refers to our Source for cleanup. The user is also deleted
216        on logout and periodically."""
217        # Create a temporary User
218        name_id = self._get_name_id()
219        username = name_id.text
220        # trim username to ensure it is max 150 chars
221        username = f"ak-{username[: USERNAME_MAX_LENGTH - 14]}-transient"
222        expiry = mktime(
223            (now() + timedelta_from_string(self._source.temporary_user_delete_after)).timetuple()
224        )
225        user: User = User.objects.create(
226            username=username,
227            attributes={
228                USER_ATTRIBUTE_GENERATED: True,
229                USER_ATTRIBUTE_SOURCES: [
230                    self._source.name,
231                ],
232                USER_ATTRIBUTE_DELETE_ON_LOGOUT: True,
233                USER_ATTRIBUTE_EXPIRES: expiry,
234            },
235            path=self._source.get_user_path(),
236        )
237        LOGGER.debug("Created temporary user for NameID Transient", username=name_id.text)
238        user.set_unusable_password()
239        user.save()
240        UserSAMLSourceConnection.objects.create(
241            source=self._source, user=user, identifier=name_id.text
242        )
243        return SAMLSourceFlowManager(
244            source=self._source,
245            request=self._http_request,
246            identifier=str(name_id.text),
247            user_info={
248                "root": self._root,
249                "assertion": self.get_assertion(),
250                "name_id": name_id,
251            },
252            policy_context={},
253        )
254
255    def get_assertion(self) -> Element | None:
256        """Get assertion element, if we have a signed assertion"""
257        if self._assertion is not None:
258            return self._assertion
259        return self._root.find(f"{{{NS_SAML_ASSERTION}}}Assertion")
260
261    def _get_name_id(self) -> Element:
262        """Get NameID Element"""
263        assertion = self.get_assertion()
264        if assertion is None:
265            raise ValueError("Assertion element not found")
266        subject = assertion.find(f"{{{NS_SAML_ASSERTION}}}Subject")
267        if subject is None:
268            raise ValueError("Subject element not found")
269        name_id = subject.find(f"{{{NS_SAML_ASSERTION}}}NameID")
270        if name_id is None:
271            raise ValueError("NameID element not found")
272        return name_id
273
274    def _get_name_id_filter(self) -> dict[str, str]:
275        """Returns the subject's NameID as a Filter for the `User`"""
276        name_id_el = self._get_name_id()
277        name_id = name_id_el.text
278        if not name_id:
279            raise UnsupportedNameIDFormat("Subject's NameID is empty.")
280        _format = name_id_el.attrib["Format"]
281        if _format == SAML_NAME_ID_FORMAT_EMAIL:
282            return {"email": name_id}
283        if _format == SAML_NAME_ID_FORMAT_PERSISTENT:
284            return {"username": name_id}
285        if _format == SAML_NAME_ID_FORMAT_X509:
286            # This attribute is statically set by the LDAP source
287            return {"attributes__distinguishedName": name_id}
288        if _format == SAML_NAME_ID_FORMAT_WINDOWS:
289            if "\\" in name_id:
290                name_id = name_id.split("\\")[1]
291            return {"username": name_id}
292        raise UnsupportedNameIDFormat(
293            f"Assertion contains NameID with unsupported format {_format}."
294        )
295
296    def prepare_flow_manager(self) -> SourceFlowManager:
297        """Prepare flow plan depending on whether or not the user exists"""
298        name_id = self._get_name_id()
299        # Sanity check, show a warning if NameIDPolicy doesn't match what we go
300        if self._source.name_id_policy != name_id.attrib["Format"]:
301            LOGGER.warning(
302                "NameID from IdP doesn't match our policy",
303                expected=self._source.name_id_policy,
304                got=name_id.attrib["Format"],
305            )
306        # transient NameIDs are handled separately as they don't have to go through flows.
307        if name_id.attrib["Format"] == SAML_NAME_ID_FORMAT_TRANSIENT:
308            return self._handle_name_id_transient()
309
310        return SAMLSourceFlowManager(
311            source=self._source,
312            request=self._http_request,
313            identifier=str(name_id.text),
314            user_info={
315                "root": self._root,
316                "assertion": self.get_assertion(),
317                "name_id": name_id,
318            },
319            policy_context={
320                "saml_response": etree.tostring(self._root),
321            },
322        )
323
324
325class SAMLSourceFlowManager(SourceFlowManager):
326    """Source flow manager for SAML Sources"""
327
328    user_connection_type = UserSAMLSourceConnection
329    group_connection_type = GroupSAMLSourceConnection
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
CACHE_SEEN_REQUEST_ID = 'authentik_saml_seen_ids_%s'
class ResponseProcessor:
 60class ResponseProcessor:
 61    """SAML Response Processor"""
 62
 63    _source: SAMLSource
 64
 65    _root: _Element
 66    _root_xml: bytes
 67
 68    _http_request: HttpRequest
 69
 70    _assertion: _Element | None = None
 71
 72    def __init__(self, source: SAMLSource, request: HttpRequest):
 73        self._source = source
 74        self._http_request = request
 75
 76    def parse(self):
 77        """Check if `request` contains SAML Response data, parse and validate it."""
 78        # First off, check if we have any SAML Data at all.
 79        raw_response = self._http_request.POST.get("SAMLResponse", None)
 80        if not raw_response:
 81            raise MissingSAMLResponse("Request does not contain 'SAMLResponse'")
 82        # Check if response is compressed, b64 decode it
 83        self._root_xml = b64decode(raw_response.encode())
 84        self._root = fromstring(self._root_xml)
 85
 86        # Verify response signature BEFORE decryption (signature covers encrypted content)
 87        if self._source.verification_kp and self._source.signed_response:
 88            self._verify_response_signature()
 89
 90        if self._source.encryption_kp:
 91            self._decrypt_response()
 92
 93        # Verify assertion signature AFTER decryption (signature is inside encrypted content)
 94        if self._source.verification_kp and self._source.signed_assertion:
 95            self._verify_assertion_signature()
 96
 97        self._verify_request_id()
 98        self._verify_status()
 99
100    def _decrypt_response(self):
101        """Decrypt SAMLResponse EncryptedAssertion Element"""
102        manager = xmlsec.KeysManager()
103        key = xmlsec.Key.from_memory(
104            self._source.encryption_kp.key_data,
105            xmlsec.constants.KeyDataFormatPem,
106        )
107
108        manager.add_key(key)
109        encryption_context = xmlsec.EncryptionContext(manager)
110
111        encrypted_assertion = self._root.find(f".//{{{NS_SAML_ASSERTION}}}EncryptedAssertion")
112        if encrypted_assertion is None:
113            raise InvalidEncryption()
114        encrypted_data = xmlsec.tree.find_child(
115            encrypted_assertion, "EncryptedData", xmlsec.constants.EncNs
116        )
117        try:
118            decrypted_assertion = encryption_context.decrypt(encrypted_data)
119        except xmlsec.Error as exc:
120            raise InvalidEncryption() from exc
121
122        index_of = self._root.index(encrypted_assertion)
123        self._root.remove(encrypted_assertion)
124        self._root.insert(
125            index_of,
126            decrypted_assertion,
127        )
128        self._assertion = decrypted_assertion
129
130    def _verify_signature(self, signature_node: _Element):
131        """Verify a single signature node"""
132        xmlsec.tree.add_ids(self._root, ["ID"])
133
134        ctx = xmlsec.SignatureContext()
135        key = xmlsec.Key.from_memory(
136            self._source.verification_kp.certificate_data,
137            xmlsec.constants.KeyDataFormatCertPem,
138        )
139        ctx.key = key
140
141        ctx.set_enabled_key_data([xmlsec.constants.KeyDataX509])
142        try:
143            ctx.verify(signature_node)
144        except xmlsec.Error as exc:
145            raise InvalidSignature(
146                "The signature of the SAML object is either missing or invalid."
147            ) from exc
148        LOGGER.debug("Successfully verified signature")
149
150    def _verify_response_signature(self):
151        """Verify SAML Response's Signature (before decryption)"""
152        signature_nodes = self._root.xpath("/samlp:Response/ds:Signature", namespaces=NS_MAP)
153
154        if len(signature_nodes) != 1:
155            raise InvalidSignature("No Signature exists in the Response element.")
156
157        self._verify_signature(signature_nodes[0])
158
159    def _verify_assertion_signature(self):
160        """Verify SAML Assertion's Signature (after decryption)"""
161        signature_nodes = self._root.xpath(
162            "/samlp:Response/saml:Assertion/ds:Signature", namespaces=NS_MAP
163        )
164
165        if len(signature_nodes) != 1:
166            raise InvalidSignature("No Signature exists in the Assertion element.")
167
168        self._verify_signature(signature_nodes[0])
169        parent = signature_nodes[0].getparent()
170        if parent is None or parent.tag != f"{{{NS_SAML_ASSERTION}}}Assertion":
171            raise InvalidSignature("No Signature exists in the Assertion element.")
172        self._assertion = parent
173
174    def _verify_request_id(self):
175        if self._source.allow_idp_initiated:
176            # If IdP-initiated SSO flows are enabled, we want to cache the Response ID
177            # somewhat mitigate replay attacks
178            seen_ids = cache.get(CACHE_SEEN_REQUEST_ID % self._source.pk, [])
179            if self._root.attrib["ID"] in seen_ids:
180                raise SuspiciousOperation("Replay attack detected")
181            seen_ids.append(self._root.attrib["ID"])
182            cache.set(CACHE_SEEN_REQUEST_ID % self._source.pk, seen_ids)
183            return
184        if (
185            SESSION_KEY_REQUEST_ID not in self._http_request.session
186            or "InResponseTo" not in self._root.attrib
187        ):
188            raise MismatchedRequestID(
189                "Missing InResponseTo and IdP-initiated Logins are not allowed"
190            )
191        if self._http_request.session[SESSION_KEY_REQUEST_ID] != self._root.attrib["InResponseTo"]:
192            raise MismatchedRequestID("Mismatched request ID")
193
194    def _verify_status(self):
195        """Check for SAML Status elements"""
196        status = self._root.find(f"{{{NS_SAML_PROTOCOL}}}Status")
197        if status is None:
198            return
199        status_code = status.find(f"{{{NS_SAML_PROTOCOL}}}StatusCode")
200        message = status.find(f"{{{NS_SAML_PROTOCOL}}}StatusMessage")
201        message_text = message.text if message is not None else None
202        detail = status.find(f"{{{NS_SAML_PROTOCOL}}}StatusDetail")
203        detail_text = etree.tostring(detail, encoding="unicode") if detail is not None else None
204        if status_code.attrib.get("Value") != SAML_STATUS_SUCCESS:
205            if detail_text and message_text:
206                raise ValueError(f"{message_text}: {detail_text}")
207            raise ValueError(
208                detail_text or message_text or f"SAML Status: {status_code.attrib.get('Value')}"
209            )
210        if message_text or detail_text:
211            LOGGER.debug("SAML Status message", message=message_text, detail=detail_text)
212
213    def _handle_name_id_transient(self) -> SourceFlowManager:
214        """Handle a NameID with the Format of Transient. This is a bit more complex than other
215        formats, as we need to create a temporary User that is used in the session. This
216        user has an attribute that refers to our Source for cleanup. The user is also deleted
217        on logout and periodically."""
218        # Create a temporary User
219        name_id = self._get_name_id()
220        username = name_id.text
221        # trim username to ensure it is max 150 chars
222        username = f"ak-{username[: USERNAME_MAX_LENGTH - 14]}-transient"
223        expiry = mktime(
224            (now() + timedelta_from_string(self._source.temporary_user_delete_after)).timetuple()
225        )
226        user: User = User.objects.create(
227            username=username,
228            attributes={
229                USER_ATTRIBUTE_GENERATED: True,
230                USER_ATTRIBUTE_SOURCES: [
231                    self._source.name,
232                ],
233                USER_ATTRIBUTE_DELETE_ON_LOGOUT: True,
234                USER_ATTRIBUTE_EXPIRES: expiry,
235            },
236            path=self._source.get_user_path(),
237        )
238        LOGGER.debug("Created temporary user for NameID Transient", username=name_id.text)
239        user.set_unusable_password()
240        user.save()
241        UserSAMLSourceConnection.objects.create(
242            source=self._source, user=user, identifier=name_id.text
243        )
244        return SAMLSourceFlowManager(
245            source=self._source,
246            request=self._http_request,
247            identifier=str(name_id.text),
248            user_info={
249                "root": self._root,
250                "assertion": self.get_assertion(),
251                "name_id": name_id,
252            },
253            policy_context={},
254        )
255
256    def get_assertion(self) -> Element | None:
257        """Get assertion element, if we have a signed assertion"""
258        if self._assertion is not None:
259            return self._assertion
260        return self._root.find(f"{{{NS_SAML_ASSERTION}}}Assertion")
261
262    def _get_name_id(self) -> Element:
263        """Get NameID Element"""
264        assertion = self.get_assertion()
265        if assertion is None:
266            raise ValueError("Assertion element not found")
267        subject = assertion.find(f"{{{NS_SAML_ASSERTION}}}Subject")
268        if subject is None:
269            raise ValueError("Subject element not found")
270        name_id = subject.find(f"{{{NS_SAML_ASSERTION}}}NameID")
271        if name_id is None:
272            raise ValueError("NameID element not found")
273        return name_id
274
275    def _get_name_id_filter(self) -> dict[str, str]:
276        """Returns the subject's NameID as a Filter for the `User`"""
277        name_id_el = self._get_name_id()
278        name_id = name_id_el.text
279        if not name_id:
280            raise UnsupportedNameIDFormat("Subject's NameID is empty.")
281        _format = name_id_el.attrib["Format"]
282        if _format == SAML_NAME_ID_FORMAT_EMAIL:
283            return {"email": name_id}
284        if _format == SAML_NAME_ID_FORMAT_PERSISTENT:
285            return {"username": name_id}
286        if _format == SAML_NAME_ID_FORMAT_X509:
287            # This attribute is statically set by the LDAP source
288            return {"attributes__distinguishedName": name_id}
289        if _format == SAML_NAME_ID_FORMAT_WINDOWS:
290            if "\\" in name_id:
291                name_id = name_id.split("\\")[1]
292            return {"username": name_id}
293        raise UnsupportedNameIDFormat(
294            f"Assertion contains NameID with unsupported format {_format}."
295        )
296
297    def prepare_flow_manager(self) -> SourceFlowManager:
298        """Prepare flow plan depending on whether or not the user exists"""
299        name_id = self._get_name_id()
300        # Sanity check, show a warning if NameIDPolicy doesn't match what we go
301        if self._source.name_id_policy != name_id.attrib["Format"]:
302            LOGGER.warning(
303                "NameID from IdP doesn't match our policy",
304                expected=self._source.name_id_policy,
305                got=name_id.attrib["Format"],
306            )
307        # transient NameIDs are handled separately as they don't have to go through flows.
308        if name_id.attrib["Format"] == SAML_NAME_ID_FORMAT_TRANSIENT:
309            return self._handle_name_id_transient()
310
311        return SAMLSourceFlowManager(
312            source=self._source,
313            request=self._http_request,
314            identifier=str(name_id.text),
315            user_info={
316                "root": self._root,
317                "assertion": self.get_assertion(),
318                "name_id": name_id,
319            },
320            policy_context={
321                "saml_response": etree.tostring(self._root),
322            },
323        )

SAML Response Processor

ResponseProcessor( source: authentik.sources.saml.models.SAMLSource, request: django.http.request.HttpRequest)
72    def __init__(self, source: SAMLSource, request: HttpRequest):
73        self._source = source
74        self._http_request = request
def parse(self):
76    def parse(self):
77        """Check if `request` contains SAML Response data, parse and validate it."""
78        # First off, check if we have any SAML Data at all.
79        raw_response = self._http_request.POST.get("SAMLResponse", None)
80        if not raw_response:
81            raise MissingSAMLResponse("Request does not contain 'SAMLResponse'")
82        # Check if response is compressed, b64 decode it
83        self._root_xml = b64decode(raw_response.encode())
84        self._root = fromstring(self._root_xml)
85
86        # Verify response signature BEFORE decryption (signature covers encrypted content)
87        if self._source.verification_kp and self._source.signed_response:
88            self._verify_response_signature()
89
90        if self._source.encryption_kp:
91            self._decrypt_response()
92
93        # Verify assertion signature AFTER decryption (signature is inside encrypted content)
94        if self._source.verification_kp and self._source.signed_assertion:
95            self._verify_assertion_signature()
96
97        self._verify_request_id()
98        self._verify_status()

Check if request contains SAML Response data, parse and validate it.

def get_assertion(unknown):
256    def get_assertion(self) -> Element | None:
257        """Get assertion element, if we have a signed assertion"""
258        if self._assertion is not None:
259            return self._assertion
260        return self._root.find(f"{{{NS_SAML_ASSERTION}}}Assertion")

Get assertion element, if we have a signed assertion

def prepare_flow_manager(self) -> authentik.core.sources.flow_manager.SourceFlowManager:
297    def prepare_flow_manager(self) -> SourceFlowManager:
298        """Prepare flow plan depending on whether or not the user exists"""
299        name_id = self._get_name_id()
300        # Sanity check, show a warning if NameIDPolicy doesn't match what we go
301        if self._source.name_id_policy != name_id.attrib["Format"]:
302            LOGGER.warning(
303                "NameID from IdP doesn't match our policy",
304                expected=self._source.name_id_policy,
305                got=name_id.attrib["Format"],
306            )
307        # transient NameIDs are handled separately as they don't have to go through flows.
308        if name_id.attrib["Format"] == SAML_NAME_ID_FORMAT_TRANSIENT:
309            return self._handle_name_id_transient()
310
311        return SAMLSourceFlowManager(
312            source=self._source,
313            request=self._http_request,
314            identifier=str(name_id.text),
315            user_info={
316                "root": self._root,
317                "assertion": self.get_assertion(),
318                "name_id": name_id,
319            },
320            policy_context={
321                "saml_response": etree.tostring(self._root),
322            },
323        )

Prepare flow plan depending on whether or not the user exists

class SAMLSourceFlowManager(authentik.core.sources.flow_manager.SourceFlowManager):
326class SAMLSourceFlowManager(SourceFlowManager):
327    """Source flow manager for SAML Sources"""
328
329    user_connection_type = UserSAMLSourceConnection
330    group_connection_type = GroupSAMLSourceConnection

Source flow manager for SAML Sources