authentik.sources.scim.patch.processor

  1from typing import Any
  2
  3from authentik.providers.scim.clients.schema import PatchOp, PatchOperation
  4from authentik.sources.scim.constants import SCIM_URN_USER_ENTERPRISE
  5from authentik.sources.scim.patch.parser import SCIMPathParser
  6
  7
  8class SCIMPatchProcessor:
  9    """Processes SCIM patch operations on Python dictionaries"""
 10
 11    def __init__(self):
 12        self.parser = SCIMPathParser()
 13
 14    def apply_patches(self, data: dict[str, Any], patches: list[PatchOperation]) -> dict[str, Any]:
 15        """Apply a list of patch operations to the data"""
 16        result = data.copy()
 17
 18        for _patch in patches:
 19            patch = PatchOperation.model_validate(_patch)
 20            if patch.path is None:
 21                # Handle operations with no path - value contains attribute paths as keys
 22                self._apply_bulk_operation(result, patch.op, patch.value)
 23            elif patch.op == PatchOp.add:
 24                self._apply_add(result, patch.path, patch.value)
 25            elif patch.op == PatchOp.remove:
 26                self._apply_remove(result, patch.path)
 27            elif patch.op == PatchOp.replace:
 28                self._apply_replace(result, patch.path, patch.value)
 29
 30        return result
 31
 32    def _apply_bulk_operation(
 33        self, data: dict[str, Any], operation: PatchOp, value: dict[str, Any]
 34    ):
 35        """Apply bulk operations when path is None"""
 36        if not isinstance(value, dict):
 37            return
 38        for path, val in value.items():
 39            if operation == PatchOp.add:
 40                self._apply_add(data, path, val)
 41            elif operation == PatchOp.remove:
 42                self._apply_remove(data, path)
 43            elif operation == PatchOp.replace:
 44                self._apply_replace(data, path, val)
 45
 46    def _apply_add(self, data: dict[str, Any], path: str, value: Any):
 47        """Apply ADD operation"""
 48        components = self.parser.parse_path(path)
 49
 50        if len(components) == 1 and not components[0]["filter"]:
 51            # Simple path
 52            attr = components[0]["attribute"]
 53            if components[0]["sub_attribute"]:
 54                if attr not in data:
 55                    data[attr] = {}
 56                # Somewhat hacky workaround for the manager attribute of the enterprise schema
 57                # ideally we'd do this based on the schema
 58                if attr == SCIM_URN_USER_ENTERPRISE and components[0]["sub_attribute"] == "manager":
 59                    data[attr][components[0]["sub_attribute"]] = {"value": value}
 60                else:
 61                    data[attr][components[0]["sub_attribute"]] = value
 62            elif attr in data:
 63                data[attr].append(value)
 64            else:
 65                data[attr] = value
 66        else:
 67            # Complex path with filters
 68            self._navigate_and_modify(data, components, value, "add")
 69
 70    def _apply_remove(self, data: dict[str, Any], path: str):
 71        """Apply REMOVE operation"""
 72        components = self.parser.parse_path(path)
 73
 74        if len(components) == 1 and not components[0]["filter"]:
 75            # Simple path
 76            attr = components[0]["attribute"]
 77            if components[0]["sub_attribute"]:
 78                if attr in data and isinstance(data[attr], dict):
 79                    data[attr].pop(components[0]["sub_attribute"], None)
 80            else:
 81                data.pop(attr, None)
 82        else:
 83            # Complex path with filters
 84            self._navigate_and_modify(data, components, None, "remove")
 85
 86    def _apply_replace(self, data: dict[str, Any], path: str, value: Any):
 87        """Apply REPLACE operation"""
 88        components = self.parser.parse_path(path)
 89
 90        if len(components) == 1 and not components[0]["filter"]:
 91            # Simple path
 92            attr = components[0]["attribute"]
 93            if components[0]["sub_attribute"]:
 94                if attr not in data:
 95                    data[attr] = {}
 96                # Somewhat hacky workaround for the manager attribute of the enterprise schema
 97                # ideally we'd do this based on the schema
 98                if attr == SCIM_URN_USER_ENTERPRISE and components[0]["sub_attribute"] == "manager":
 99                    data[attr][components[0]["sub_attribute"]] = {"value": value}
100                else:
101                    data[attr][components[0]["sub_attribute"]] = value
102            else:
103                data[attr] = value
104        else:
105            # Complex path with filters
106            self._navigate_and_modify(data, components, value, "replace")
107
108    def _navigate_and_modify(  # noqa PLR0912
109        self, data: dict[str, Any], components: list[dict[str, Any]], value: Any, operation: str
110    ):
111        """Navigate through complex paths and apply modifications"""
112        current = data
113
114        for i, component in enumerate(components):
115            attr = component["attribute"]
116            filter_expr = component["filter"]
117            sub_attr = component["sub_attribute"]
118
119            if filter_expr:
120                # Handle array with filter
121                if attr not in current:
122                    if operation == "add":
123                        current[attr] = []
124                    else:
125                        return
126
127                if not isinstance(current[attr], list):
128                    return
129
130                # Find matching items
131                matching_items = []
132                for item in current[attr]:
133                    if self._matches_filter(item, filter_expr):
134                        matching_items.append(item)
135
136                if not matching_items and operation == "add":
137                    # Create new item if none match (only for simple comparison filters)
138                    if filter_expr.get("type", "comparison") == "comparison":
139                        new_item = {filter_expr["attribute"]: filter_expr["value"]}
140                        current[attr].append(new_item)
141                        matching_items = [new_item]
142
143                # Apply operation to matching items
144                for item in matching_items:
145                    if sub_attr:
146                        if operation in {"add", "replace"}:
147                            item[sub_attr] = value
148                        elif operation == "remove":
149                            item.pop(sub_attr, None)
150                    elif operation in {"add", "replace"}:
151                        if isinstance(value, dict):
152                            item.update(value)
153                        else:
154                            # If value is not a dict, we can't merge it
155                            pass
156                    elif operation == "remove":
157                        # Remove the entire item
158                        if item in current[attr]:
159                            current[attr].remove(item)
160            # Handle simple attribute
161            elif i == len(components) - 1:
162                # Last component
163                if sub_attr:
164                    if attr not in current:
165                        current[attr] = {}
166                    if operation in {"add", "replace"}:
167                        current[attr][sub_attr] = value
168                    elif operation == "remove":
169                        current[attr].pop(sub_attr, None)
170                elif operation in {"add", "replace"}:
171                    current[attr] = value
172                elif operation == "remove":
173                    current.pop(attr, None)
174            else:
175                # Navigate deeper
176                if attr not in current:
177                    current[attr] = {}
178                current = current[attr]
179
180    def _matches_filter(self, item: dict[str, Any], filter_expr: dict[str, Any]) -> bool:
181        """Check if an item matches the filter expression"""
182        if not filter_expr:
183            return True
184
185        filter_type = filter_expr.get("type", "comparison")
186
187        if filter_type == "comparison":
188            return self._matches_comparison(item, filter_expr)
189        elif filter_type == "logical":
190            return self._matches_logical(item, filter_expr)
191
192        return False
193
194    def _matches_comparison(  # noqa PLR0912
195        self, item: dict[str, Any], filter_expr: dict[str, Any]
196    ) -> bool:
197        """Check if an item matches a comparison filter"""
198        attr = filter_expr["attribute"]
199        operator = filter_expr["operator"]
200        expected_value = filter_expr["value"]
201
202        if attr not in item:
203            return False
204
205        actual_value = item[attr]
206
207        if operator == "eq":
208            return actual_value == expected_value
209        elif operator == "ne":
210            return actual_value != expected_value
211        elif operator == "co":
212            return str(expected_value) in str(actual_value)
213        elif operator == "sw":
214            return str(actual_value).startswith(str(expected_value))
215        elif operator == "ew":
216            return str(actual_value).endswith(str(expected_value))
217        elif operator == "gt":
218            return actual_value > expected_value
219        elif operator == "lt":
220            return actual_value < expected_value
221        elif operator == "ge":
222            return actual_value >= expected_value
223        elif operator == "le":
224            return actual_value <= expected_value
225        elif operator == "pr":
226            return actual_value is not None
227
228        return False
229
230    def _matches_logical(self, item: dict[str, Any], filter_expr: dict[str, Any]) -> bool:
231        """Check if an item matches a logical filter expression"""
232        operator = filter_expr["operator"]
233
234        if operator == "and":
235            left_result = self._matches_filter(item, filter_expr["left"])
236            right_result = self._matches_filter(item, filter_expr["right"])
237            return left_result and right_result
238        elif operator == "or":
239            left_result = self._matches_filter(item, filter_expr["left"])
240            right_result = self._matches_filter(item, filter_expr["right"])
241            return left_result or right_result
242        elif operator == "not":
243            operand_result = self._matches_filter(item, filter_expr["operand"])
244            return not operand_result
245
246        return False
class SCIMPatchProcessor:
  9class SCIMPatchProcessor:
 10    """Processes SCIM patch operations on Python dictionaries"""
 11
 12    def __init__(self):
 13        self.parser = SCIMPathParser()
 14
 15    def apply_patches(self, data: dict[str, Any], patches: list[PatchOperation]) -> dict[str, Any]:
 16        """Apply a list of patch operations to the data"""
 17        result = data.copy()
 18
 19        for _patch in patches:
 20            patch = PatchOperation.model_validate(_patch)
 21            if patch.path is None:
 22                # Handle operations with no path - value contains attribute paths as keys
 23                self._apply_bulk_operation(result, patch.op, patch.value)
 24            elif patch.op == PatchOp.add:
 25                self._apply_add(result, patch.path, patch.value)
 26            elif patch.op == PatchOp.remove:
 27                self._apply_remove(result, patch.path)
 28            elif patch.op == PatchOp.replace:
 29                self._apply_replace(result, patch.path, patch.value)
 30
 31        return result
 32
 33    def _apply_bulk_operation(
 34        self, data: dict[str, Any], operation: PatchOp, value: dict[str, Any]
 35    ):
 36        """Apply bulk operations when path is None"""
 37        if not isinstance(value, dict):
 38            return
 39        for path, val in value.items():
 40            if operation == PatchOp.add:
 41                self._apply_add(data, path, val)
 42            elif operation == PatchOp.remove:
 43                self._apply_remove(data, path)
 44            elif operation == PatchOp.replace:
 45                self._apply_replace(data, path, val)
 46
 47    def _apply_add(self, data: dict[str, Any], path: str, value: Any):
 48        """Apply ADD operation"""
 49        components = self.parser.parse_path(path)
 50
 51        if len(components) == 1 and not components[0]["filter"]:
 52            # Simple path
 53            attr = components[0]["attribute"]
 54            if components[0]["sub_attribute"]:
 55                if attr not in data:
 56                    data[attr] = {}
 57                # Somewhat hacky workaround for the manager attribute of the enterprise schema
 58                # ideally we'd do this based on the schema
 59                if attr == SCIM_URN_USER_ENTERPRISE and components[0]["sub_attribute"] == "manager":
 60                    data[attr][components[0]["sub_attribute"]] = {"value": value}
 61                else:
 62                    data[attr][components[0]["sub_attribute"]] = value
 63            elif attr in data:
 64                data[attr].append(value)
 65            else:
 66                data[attr] = value
 67        else:
 68            # Complex path with filters
 69            self._navigate_and_modify(data, components, value, "add")
 70
 71    def _apply_remove(self, data: dict[str, Any], path: str):
 72        """Apply REMOVE operation"""
 73        components = self.parser.parse_path(path)
 74
 75        if len(components) == 1 and not components[0]["filter"]:
 76            # Simple path
 77            attr = components[0]["attribute"]
 78            if components[0]["sub_attribute"]:
 79                if attr in data and isinstance(data[attr], dict):
 80                    data[attr].pop(components[0]["sub_attribute"], None)
 81            else:
 82                data.pop(attr, None)
 83        else:
 84            # Complex path with filters
 85            self._navigate_and_modify(data, components, None, "remove")
 86
 87    def _apply_replace(self, data: dict[str, Any], path: str, value: Any):
 88        """Apply REPLACE operation"""
 89        components = self.parser.parse_path(path)
 90
 91        if len(components) == 1 and not components[0]["filter"]:
 92            # Simple path
 93            attr = components[0]["attribute"]
 94            if components[0]["sub_attribute"]:
 95                if attr not in data:
 96                    data[attr] = {}
 97                # Somewhat hacky workaround for the manager attribute of the enterprise schema
 98                # ideally we'd do this based on the schema
 99                if attr == SCIM_URN_USER_ENTERPRISE and components[0]["sub_attribute"] == "manager":
100                    data[attr][components[0]["sub_attribute"]] = {"value": value}
101                else:
102                    data[attr][components[0]["sub_attribute"]] = value
103            else:
104                data[attr] = value
105        else:
106            # Complex path with filters
107            self._navigate_and_modify(data, components, value, "replace")
108
109    def _navigate_and_modify(  # noqa PLR0912
110        self, data: dict[str, Any], components: list[dict[str, Any]], value: Any, operation: str
111    ):
112        """Navigate through complex paths and apply modifications"""
113        current = data
114
115        for i, component in enumerate(components):
116            attr = component["attribute"]
117            filter_expr = component["filter"]
118            sub_attr = component["sub_attribute"]
119
120            if filter_expr:
121                # Handle array with filter
122                if attr not in current:
123                    if operation == "add":
124                        current[attr] = []
125                    else:
126                        return
127
128                if not isinstance(current[attr], list):
129                    return
130
131                # Find matching items
132                matching_items = []
133                for item in current[attr]:
134                    if self._matches_filter(item, filter_expr):
135                        matching_items.append(item)
136
137                if not matching_items and operation == "add":
138                    # Create new item if none match (only for simple comparison filters)
139                    if filter_expr.get("type", "comparison") == "comparison":
140                        new_item = {filter_expr["attribute"]: filter_expr["value"]}
141                        current[attr].append(new_item)
142                        matching_items = [new_item]
143
144                # Apply operation to matching items
145                for item in matching_items:
146                    if sub_attr:
147                        if operation in {"add", "replace"}:
148                            item[sub_attr] = value
149                        elif operation == "remove":
150                            item.pop(sub_attr, None)
151                    elif operation in {"add", "replace"}:
152                        if isinstance(value, dict):
153                            item.update(value)
154                        else:
155                            # If value is not a dict, we can't merge it
156                            pass
157                    elif operation == "remove":
158                        # Remove the entire item
159                        if item in current[attr]:
160                            current[attr].remove(item)
161            # Handle simple attribute
162            elif i == len(components) - 1:
163                # Last component
164                if sub_attr:
165                    if attr not in current:
166                        current[attr] = {}
167                    if operation in {"add", "replace"}:
168                        current[attr][sub_attr] = value
169                    elif operation == "remove":
170                        current[attr].pop(sub_attr, None)
171                elif operation in {"add", "replace"}:
172                    current[attr] = value
173                elif operation == "remove":
174                    current.pop(attr, None)
175            else:
176                # Navigate deeper
177                if attr not in current:
178                    current[attr] = {}
179                current = current[attr]
180
181    def _matches_filter(self, item: dict[str, Any], filter_expr: dict[str, Any]) -> bool:
182        """Check if an item matches the filter expression"""
183        if not filter_expr:
184            return True
185
186        filter_type = filter_expr.get("type", "comparison")
187
188        if filter_type == "comparison":
189            return self._matches_comparison(item, filter_expr)
190        elif filter_type == "logical":
191            return self._matches_logical(item, filter_expr)
192
193        return False
194
195    def _matches_comparison(  # noqa PLR0912
196        self, item: dict[str, Any], filter_expr: dict[str, Any]
197    ) -> bool:
198        """Check if an item matches a comparison filter"""
199        attr = filter_expr["attribute"]
200        operator = filter_expr["operator"]
201        expected_value = filter_expr["value"]
202
203        if attr not in item:
204            return False
205
206        actual_value = item[attr]
207
208        if operator == "eq":
209            return actual_value == expected_value
210        elif operator == "ne":
211            return actual_value != expected_value
212        elif operator == "co":
213            return str(expected_value) in str(actual_value)
214        elif operator == "sw":
215            return str(actual_value).startswith(str(expected_value))
216        elif operator == "ew":
217            return str(actual_value).endswith(str(expected_value))
218        elif operator == "gt":
219            return actual_value > expected_value
220        elif operator == "lt":
221            return actual_value < expected_value
222        elif operator == "ge":
223            return actual_value >= expected_value
224        elif operator == "le":
225            return actual_value <= expected_value
226        elif operator == "pr":
227            return actual_value is not None
228
229        return False
230
231    def _matches_logical(self, item: dict[str, Any], filter_expr: dict[str, Any]) -> bool:
232        """Check if an item matches a logical filter expression"""
233        operator = filter_expr["operator"]
234
235        if operator == "and":
236            left_result = self._matches_filter(item, filter_expr["left"])
237            right_result = self._matches_filter(item, filter_expr["right"])
238            return left_result and right_result
239        elif operator == "or":
240            left_result = self._matches_filter(item, filter_expr["left"])
241            right_result = self._matches_filter(item, filter_expr["right"])
242            return left_result or right_result
243        elif operator == "not":
244            operand_result = self._matches_filter(item, filter_expr["operand"])
245            return not operand_result
246
247        return False

Processes SCIM patch operations on Python dictionaries

parser
def apply_patches( self, data: dict[str, typing.Any], patches: list[authentik.providers.scim.clients.schema.PatchOperation]) -> dict[str, typing.Any]:
15    def apply_patches(self, data: dict[str, Any], patches: list[PatchOperation]) -> dict[str, Any]:
16        """Apply a list of patch operations to the data"""
17        result = data.copy()
18
19        for _patch in patches:
20            patch = PatchOperation.model_validate(_patch)
21            if patch.path is None:
22                # Handle operations with no path - value contains attribute paths as keys
23                self._apply_bulk_operation(result, patch.op, patch.value)
24            elif patch.op == PatchOp.add:
25                self._apply_add(result, patch.path, patch.value)
26            elif patch.op == PatchOp.remove:
27                self._apply_remove(result, patch.path)
28            elif patch.op == PatchOp.replace:
29                self._apply_replace(result, patch.path, patch.value)
30
31        return result

Apply a list of patch operations to the data