authentik.sources.scim.patch.processor
1from typing import Any 2 3from authentik.providers.scim.clients.schema import PatchOp, PatchOperation 4from authentik.sources.scim.constants import SCIM_URN_USER_ENTERPRISE 5from authentik.sources.scim.patch.parser import SCIMPathParser 6 7 8class SCIMPatchProcessor: 9 """Processes SCIM patch operations on Python dictionaries""" 10 11 def __init__(self): 12 self.parser = SCIMPathParser() 13 14 def apply_patches(self, data: dict[str, Any], patches: list[PatchOperation]) -> dict[str, Any]: 15 """Apply a list of patch operations to the data""" 16 result = data.copy() 17 18 for _patch in patches: 19 patch = PatchOperation.model_validate(_patch) 20 if patch.path is None: 21 # Handle operations with no path - value contains attribute paths as keys 22 self._apply_bulk_operation(result, patch.op, patch.value) 23 elif patch.op == PatchOp.add: 24 self._apply_add(result, patch.path, patch.value) 25 elif patch.op == PatchOp.remove: 26 self._apply_remove(result, patch.path) 27 elif patch.op == PatchOp.replace: 28 self._apply_replace(result, patch.path, patch.value) 29 30 return result 31 32 def _apply_bulk_operation( 33 self, data: dict[str, Any], operation: PatchOp, value: dict[str, Any] 34 ): 35 """Apply bulk operations when path is None""" 36 if not isinstance(value, dict): 37 return 38 for path, val in value.items(): 39 if operation == PatchOp.add: 40 self._apply_add(data, path, val) 41 elif operation == PatchOp.remove: 42 self._apply_remove(data, path) 43 elif operation == PatchOp.replace: 44 self._apply_replace(data, path, val) 45 46 def _apply_add(self, data: dict[str, Any], path: str, value: Any): 47 """Apply ADD operation""" 48 components = self.parser.parse_path(path) 49 50 if len(components) == 1 and not components[0]["filter"]: 51 # Simple path 52 attr = components[0]["attribute"] 53 if components[0]["sub_attribute"]: 54 if attr not in data: 55 data[attr] = {} 56 # Somewhat hacky workaround for the manager attribute of the enterprise schema 57 # ideally we'd do this based on the schema 58 if attr == SCIM_URN_USER_ENTERPRISE and components[0]["sub_attribute"] == "manager": 59 data[attr][components[0]["sub_attribute"]] = {"value": value} 60 else: 61 data[attr][components[0]["sub_attribute"]] = value 62 elif attr in data: 63 data[attr].append(value) 64 else: 65 data[attr] = value 66 else: 67 # Complex path with filters 68 self._navigate_and_modify(data, components, value, "add") 69 70 def _apply_remove(self, data: dict[str, Any], path: str): 71 """Apply REMOVE operation""" 72 components = self.parser.parse_path(path) 73 74 if len(components) == 1 and not components[0]["filter"]: 75 # Simple path 76 attr = components[0]["attribute"] 77 if components[0]["sub_attribute"]: 78 if attr in data and isinstance(data[attr], dict): 79 data[attr].pop(components[0]["sub_attribute"], None) 80 else: 81 data.pop(attr, None) 82 else: 83 # Complex path with filters 84 self._navigate_and_modify(data, components, None, "remove") 85 86 def _apply_replace(self, data: dict[str, Any], path: str, value: Any): 87 """Apply REPLACE operation""" 88 components = self.parser.parse_path(path) 89 90 if len(components) == 1 and not components[0]["filter"]: 91 # Simple path 92 attr = components[0]["attribute"] 93 if components[0]["sub_attribute"]: 94 if attr not in data: 95 data[attr] = {} 96 # Somewhat hacky workaround for the manager attribute of the enterprise schema 97 # ideally we'd do this based on the schema 98 if attr == SCIM_URN_USER_ENTERPRISE and components[0]["sub_attribute"] == "manager": 99 data[attr][components[0]["sub_attribute"]] = {"value": value} 100 else: 101 data[attr][components[0]["sub_attribute"]] = value 102 else: 103 data[attr] = value 104 else: 105 # Complex path with filters 106 self._navigate_and_modify(data, components, value, "replace") 107 108 def _navigate_and_modify( # noqa PLR0912 109 self, data: dict[str, Any], components: list[dict[str, Any]], value: Any, operation: str 110 ): 111 """Navigate through complex paths and apply modifications""" 112 current = data 113 114 for i, component in enumerate(components): 115 attr = component["attribute"] 116 filter_expr = component["filter"] 117 sub_attr = component["sub_attribute"] 118 119 if filter_expr: 120 # Handle array with filter 121 if attr not in current: 122 if operation == "add": 123 current[attr] = [] 124 else: 125 return 126 127 if not isinstance(current[attr], list): 128 return 129 130 # Find matching items 131 matching_items = [] 132 for item in current[attr]: 133 if self._matches_filter(item, filter_expr): 134 matching_items.append(item) 135 136 if not matching_items and operation == "add": 137 # Create new item if none match (only for simple comparison filters) 138 if filter_expr.get("type", "comparison") == "comparison": 139 new_item = {filter_expr["attribute"]: filter_expr["value"]} 140 current[attr].append(new_item) 141 matching_items = [new_item] 142 143 # Apply operation to matching items 144 for item in matching_items: 145 if sub_attr: 146 if operation in {"add", "replace"}: 147 item[sub_attr] = value 148 elif operation == "remove": 149 item.pop(sub_attr, None) 150 elif operation in {"add", "replace"}: 151 if isinstance(value, dict): 152 item.update(value) 153 else: 154 # If value is not a dict, we can't merge it 155 pass 156 elif operation == "remove": 157 # Remove the entire item 158 if item in current[attr]: 159 current[attr].remove(item) 160 # Handle simple attribute 161 elif i == len(components) - 1: 162 # Last component 163 if sub_attr: 164 if attr not in current: 165 current[attr] = {} 166 if operation in {"add", "replace"}: 167 current[attr][sub_attr] = value 168 elif operation == "remove": 169 current[attr].pop(sub_attr, None) 170 elif operation in {"add", "replace"}: 171 current[attr] = value 172 elif operation == "remove": 173 current.pop(attr, None) 174 else: 175 # Navigate deeper 176 if attr not in current: 177 current[attr] = {} 178 current = current[attr] 179 180 def _matches_filter(self, item: dict[str, Any], filter_expr: dict[str, Any]) -> bool: 181 """Check if an item matches the filter expression""" 182 if not filter_expr: 183 return True 184 185 filter_type = filter_expr.get("type", "comparison") 186 187 if filter_type == "comparison": 188 return self._matches_comparison(item, filter_expr) 189 elif filter_type == "logical": 190 return self._matches_logical(item, filter_expr) 191 192 return False 193 194 def _matches_comparison( # noqa PLR0912 195 self, item: dict[str, Any], filter_expr: dict[str, Any] 196 ) -> bool: 197 """Check if an item matches a comparison filter""" 198 attr = filter_expr["attribute"] 199 operator = filter_expr["operator"] 200 expected_value = filter_expr["value"] 201 202 if attr not in item: 203 return False 204 205 actual_value = item[attr] 206 207 if operator == "eq": 208 return actual_value == expected_value 209 elif operator == "ne": 210 return actual_value != expected_value 211 elif operator == "co": 212 return str(expected_value) in str(actual_value) 213 elif operator == "sw": 214 return str(actual_value).startswith(str(expected_value)) 215 elif operator == "ew": 216 return str(actual_value).endswith(str(expected_value)) 217 elif operator == "gt": 218 return actual_value > expected_value 219 elif operator == "lt": 220 return actual_value < expected_value 221 elif operator == "ge": 222 return actual_value >= expected_value 223 elif operator == "le": 224 return actual_value <= expected_value 225 elif operator == "pr": 226 return actual_value is not None 227 228 return False 229 230 def _matches_logical(self, item: dict[str, Any], filter_expr: dict[str, Any]) -> bool: 231 """Check if an item matches a logical filter expression""" 232 operator = filter_expr["operator"] 233 234 if operator == "and": 235 left_result = self._matches_filter(item, filter_expr["left"]) 236 right_result = self._matches_filter(item, filter_expr["right"]) 237 return left_result and right_result 238 elif operator == "or": 239 left_result = self._matches_filter(item, filter_expr["left"]) 240 right_result = self._matches_filter(item, filter_expr["right"]) 241 return left_result or right_result 242 elif operator == "not": 243 operand_result = self._matches_filter(item, filter_expr["operand"]) 244 return not operand_result 245 246 return False
class
SCIMPatchProcessor:
9class SCIMPatchProcessor: 10 """Processes SCIM patch operations on Python dictionaries""" 11 12 def __init__(self): 13 self.parser = SCIMPathParser() 14 15 def apply_patches(self, data: dict[str, Any], patches: list[PatchOperation]) -> dict[str, Any]: 16 """Apply a list of patch operations to the data""" 17 result = data.copy() 18 19 for _patch in patches: 20 patch = PatchOperation.model_validate(_patch) 21 if patch.path is None: 22 # Handle operations with no path - value contains attribute paths as keys 23 self._apply_bulk_operation(result, patch.op, patch.value) 24 elif patch.op == PatchOp.add: 25 self._apply_add(result, patch.path, patch.value) 26 elif patch.op == PatchOp.remove: 27 self._apply_remove(result, patch.path) 28 elif patch.op == PatchOp.replace: 29 self._apply_replace(result, patch.path, patch.value) 30 31 return result 32 33 def _apply_bulk_operation( 34 self, data: dict[str, Any], operation: PatchOp, value: dict[str, Any] 35 ): 36 """Apply bulk operations when path is None""" 37 if not isinstance(value, dict): 38 return 39 for path, val in value.items(): 40 if operation == PatchOp.add: 41 self._apply_add(data, path, val) 42 elif operation == PatchOp.remove: 43 self._apply_remove(data, path) 44 elif operation == PatchOp.replace: 45 self._apply_replace(data, path, val) 46 47 def _apply_add(self, data: dict[str, Any], path: str, value: Any): 48 """Apply ADD operation""" 49 components = self.parser.parse_path(path) 50 51 if len(components) == 1 and not components[0]["filter"]: 52 # Simple path 53 attr = components[0]["attribute"] 54 if components[0]["sub_attribute"]: 55 if attr not in data: 56 data[attr] = {} 57 # Somewhat hacky workaround for the manager attribute of the enterprise schema 58 # ideally we'd do this based on the schema 59 if attr == SCIM_URN_USER_ENTERPRISE and components[0]["sub_attribute"] == "manager": 60 data[attr][components[0]["sub_attribute"]] = {"value": value} 61 else: 62 data[attr][components[0]["sub_attribute"]] = value 63 elif attr in data: 64 data[attr].append(value) 65 else: 66 data[attr] = value 67 else: 68 # Complex path with filters 69 self._navigate_and_modify(data, components, value, "add") 70 71 def _apply_remove(self, data: dict[str, Any], path: str): 72 """Apply REMOVE operation""" 73 components = self.parser.parse_path(path) 74 75 if len(components) == 1 and not components[0]["filter"]: 76 # Simple path 77 attr = components[0]["attribute"] 78 if components[0]["sub_attribute"]: 79 if attr in data and isinstance(data[attr], dict): 80 data[attr].pop(components[0]["sub_attribute"], None) 81 else: 82 data.pop(attr, None) 83 else: 84 # Complex path with filters 85 self._navigate_and_modify(data, components, None, "remove") 86 87 def _apply_replace(self, data: dict[str, Any], path: str, value: Any): 88 """Apply REPLACE operation""" 89 components = self.parser.parse_path(path) 90 91 if len(components) == 1 and not components[0]["filter"]: 92 # Simple path 93 attr = components[0]["attribute"] 94 if components[0]["sub_attribute"]: 95 if attr not in data: 96 data[attr] = {} 97 # Somewhat hacky workaround for the manager attribute of the enterprise schema 98 # ideally we'd do this based on the schema 99 if attr == SCIM_URN_USER_ENTERPRISE and components[0]["sub_attribute"] == "manager": 100 data[attr][components[0]["sub_attribute"]] = {"value": value} 101 else: 102 data[attr][components[0]["sub_attribute"]] = value 103 else: 104 data[attr] = value 105 else: 106 # Complex path with filters 107 self._navigate_and_modify(data, components, value, "replace") 108 109 def _navigate_and_modify( # noqa PLR0912 110 self, data: dict[str, Any], components: list[dict[str, Any]], value: Any, operation: str 111 ): 112 """Navigate through complex paths and apply modifications""" 113 current = data 114 115 for i, component in enumerate(components): 116 attr = component["attribute"] 117 filter_expr = component["filter"] 118 sub_attr = component["sub_attribute"] 119 120 if filter_expr: 121 # Handle array with filter 122 if attr not in current: 123 if operation == "add": 124 current[attr] = [] 125 else: 126 return 127 128 if not isinstance(current[attr], list): 129 return 130 131 # Find matching items 132 matching_items = [] 133 for item in current[attr]: 134 if self._matches_filter(item, filter_expr): 135 matching_items.append(item) 136 137 if not matching_items and operation == "add": 138 # Create new item if none match (only for simple comparison filters) 139 if filter_expr.get("type", "comparison") == "comparison": 140 new_item = {filter_expr["attribute"]: filter_expr["value"]} 141 current[attr].append(new_item) 142 matching_items = [new_item] 143 144 # Apply operation to matching items 145 for item in matching_items: 146 if sub_attr: 147 if operation in {"add", "replace"}: 148 item[sub_attr] = value 149 elif operation == "remove": 150 item.pop(sub_attr, None) 151 elif operation in {"add", "replace"}: 152 if isinstance(value, dict): 153 item.update(value) 154 else: 155 # If value is not a dict, we can't merge it 156 pass 157 elif operation == "remove": 158 # Remove the entire item 159 if item in current[attr]: 160 current[attr].remove(item) 161 # Handle simple attribute 162 elif i == len(components) - 1: 163 # Last component 164 if sub_attr: 165 if attr not in current: 166 current[attr] = {} 167 if operation in {"add", "replace"}: 168 current[attr][sub_attr] = value 169 elif operation == "remove": 170 current[attr].pop(sub_attr, None) 171 elif operation in {"add", "replace"}: 172 current[attr] = value 173 elif operation == "remove": 174 current.pop(attr, None) 175 else: 176 # Navigate deeper 177 if attr not in current: 178 current[attr] = {} 179 current = current[attr] 180 181 def _matches_filter(self, item: dict[str, Any], filter_expr: dict[str, Any]) -> bool: 182 """Check if an item matches the filter expression""" 183 if not filter_expr: 184 return True 185 186 filter_type = filter_expr.get("type", "comparison") 187 188 if filter_type == "comparison": 189 return self._matches_comparison(item, filter_expr) 190 elif filter_type == "logical": 191 return self._matches_logical(item, filter_expr) 192 193 return False 194 195 def _matches_comparison( # noqa PLR0912 196 self, item: dict[str, Any], filter_expr: dict[str, Any] 197 ) -> bool: 198 """Check if an item matches a comparison filter""" 199 attr = filter_expr["attribute"] 200 operator = filter_expr["operator"] 201 expected_value = filter_expr["value"] 202 203 if attr not in item: 204 return False 205 206 actual_value = item[attr] 207 208 if operator == "eq": 209 return actual_value == expected_value 210 elif operator == "ne": 211 return actual_value != expected_value 212 elif operator == "co": 213 return str(expected_value) in str(actual_value) 214 elif operator == "sw": 215 return str(actual_value).startswith(str(expected_value)) 216 elif operator == "ew": 217 return str(actual_value).endswith(str(expected_value)) 218 elif operator == "gt": 219 return actual_value > expected_value 220 elif operator == "lt": 221 return actual_value < expected_value 222 elif operator == "ge": 223 return actual_value >= expected_value 224 elif operator == "le": 225 return actual_value <= expected_value 226 elif operator == "pr": 227 return actual_value is not None 228 229 return False 230 231 def _matches_logical(self, item: dict[str, Any], filter_expr: dict[str, Any]) -> bool: 232 """Check if an item matches a logical filter expression""" 233 operator = filter_expr["operator"] 234 235 if operator == "and": 236 left_result = self._matches_filter(item, filter_expr["left"]) 237 right_result = self._matches_filter(item, filter_expr["right"]) 238 return left_result and right_result 239 elif operator == "or": 240 left_result = self._matches_filter(item, filter_expr["left"]) 241 right_result = self._matches_filter(item, filter_expr["right"]) 242 return left_result or right_result 243 elif operator == "not": 244 operand_result = self._matches_filter(item, filter_expr["operand"]) 245 return not operand_result 246 247 return False
Processes SCIM patch operations on Python dictionaries
def
apply_patches( self, data: dict[str, typing.Any], patches: list[authentik.providers.scim.clients.schema.PatchOperation]) -> dict[str, typing.Any]:
15 def apply_patches(self, data: dict[str, Any], patches: list[PatchOperation]) -> dict[str, Any]: 16 """Apply a list of patch operations to the data""" 17 result = data.copy() 18 19 for _patch in patches: 20 patch = PatchOperation.model_validate(_patch) 21 if patch.path is None: 22 # Handle operations with no path - value contains attribute paths as keys 23 self._apply_bulk_operation(result, patch.op, patch.value) 24 elif patch.op == PatchOp.add: 25 self._apply_add(result, patch.path, patch.value) 26 elif patch.op == PatchOp.remove: 27 self._apply_remove(result, patch.path) 28 elif patch.op == PatchOp.replace: 29 self._apply_replace(result, patch.path, patch.value) 30 31 return result
Apply a list of patch operations to the data