authentik.sources.scim.patch.parser

  1from typing import Any
  2
  3from authentik.sources.scim.patch.lexer import SCIMPathLexer, TokenType
  4
  5
  6class SCIMPathParser:
  7    """Parser for SCIM paths including filter expressions"""
  8
  9    def __init__(self):
 10        self.lexer = None
 11        self.current_token = None
 12
 13    def parse_path(self, path: str | None) -> list[dict[str, Any]]:
 14        """Parse a SCIM path into components"""
 15        self.lexer = SCIMPathLexer(path)
 16        self.current_token = self.lexer.get_next_token()
 17
 18        components = []
 19
 20        while self.current_token.type != TokenType.EOF:
 21            component = self._parse_path_component()
 22            if component:
 23                components.append(component)
 24
 25        return components
 26
 27    def _parse_path_component(self) -> dict[str, Any] | None:
 28        """Parse a single path component"""
 29        if self.current_token.type != TokenType.ATTRIBUTE:
 30            return None
 31
 32        attribute = self.current_token.value
 33        self._consume(TokenType.ATTRIBUTE)
 34
 35        filter_expr = None
 36        sub_attribute = None
 37
 38        # Check for filter expression
 39        if self.current_token.type == TokenType.LBRACKET:
 40            self._consume(TokenType.LBRACKET)
 41            filter_expr = self._parse_filter_expression()
 42            self._consume(TokenType.RBRACKET)
 43
 44        # Check for sub-attribute
 45        if self.current_token.type == TokenType.DOT:
 46            self._consume(TokenType.DOT)
 47            if self.current_token.type == TokenType.ATTRIBUTE:
 48                sub_attribute = self.current_token.value
 49                self._consume(TokenType.ATTRIBUTE)
 50
 51        return {"attribute": attribute, "filter": filter_expr, "sub_attribute": sub_attribute}
 52
 53    def _parse_filter_expression(self) -> dict[str, Any] | None:
 54        """Parse a filter expression like 'primary eq true' or
 55        'type eq "work" and primary eq true'"""
 56        return self._parse_or_expression()
 57
 58    def _parse_or_expression(self) -> dict[str, Any] | None:
 59        """Parse OR expressions"""
 60        left = self._parse_and_expression()
 61
 62        while self.current_token.type == TokenType.OR:
 63            self._consume(TokenType.OR)
 64            right = self._parse_and_expression()
 65            left = {"type": "logical", "operator": "or", "left": left, "right": right}
 66
 67        return left
 68
 69    def _parse_and_expression(self) -> dict[str, Any] | None:
 70        """Parse AND expressions"""
 71        left = self._parse_primary_expression()
 72
 73        while self.current_token.type == TokenType.AND:
 74            self._consume(TokenType.AND)
 75            right = self._parse_primary_expression()
 76            left = {"type": "logical", "operator": "and", "left": left, "right": right}
 77
 78        return left
 79
 80    def _parse_primary_expression(self) -> dict[str, Any] | None:
 81        """Parse primary expressions (attribute operator value)"""
 82        if self.current_token.type == TokenType.LPAREN:
 83            self._consume(TokenType.LPAREN)
 84            expr = self._parse_or_expression()
 85            self._consume(TokenType.RPAREN)
 86            return expr
 87
 88        if self.current_token.type == TokenType.NOT:
 89            self._consume(TokenType.NOT)
 90            expr = self._parse_primary_expression()
 91            return {"type": "logical", "operator": "not", "operand": expr}
 92
 93        if self.current_token.type != TokenType.ATTRIBUTE:
 94            return None
 95
 96        attribute = self.current_token.value
 97        self._consume(TokenType.ATTRIBUTE)
 98
 99        if self.current_token.type != TokenType.OPERATOR:
100            return None
101
102        operator = self.current_token.value
103        self._consume(TokenType.OPERATOR)
104
105        # Parse value
106        value = None
107        if self.current_token.type == TokenType.STRING:
108            value = self.current_token.value
109            self._consume(TokenType.STRING)
110        elif self.current_token.type == TokenType.NUMBER:
111            value = (
112                float(self.current_token.value)
113                if "." in self.current_token.value
114                else int(self.current_token.value)
115            )
116            self._consume(TokenType.NUMBER)
117        elif self.current_token.type == TokenType.BOOLEAN:
118            value = self.current_token.value
119            self._consume(TokenType.BOOLEAN)
120        elif self.current_token.type == TokenType.NULL:
121            value = None
122            self._consume(TokenType.NULL)
123
124        return {"type": "comparison", "attribute": attribute, "operator": operator, "value": value}
125
126    def _consume(self, expected_type: TokenType):
127        """Consume a token of the expected type"""
128        if self.current_token.type == expected_type:
129            self.current_token = self.lexer.get_next_token()
130        else:
131            raise ValueError(f"Expected {expected_type}, got {self.current_token.type}")
class SCIMPathParser:
  7class SCIMPathParser:
  8    """Parser for SCIM paths including filter expressions"""
  9
 10    def __init__(self):
 11        self.lexer = None
 12        self.current_token = None
 13
 14    def parse_path(self, path: str | None) -> list[dict[str, Any]]:
 15        """Parse a SCIM path into components"""
 16        self.lexer = SCIMPathLexer(path)
 17        self.current_token = self.lexer.get_next_token()
 18
 19        components = []
 20
 21        while self.current_token.type != TokenType.EOF:
 22            component = self._parse_path_component()
 23            if component:
 24                components.append(component)
 25
 26        return components
 27
 28    def _parse_path_component(self) -> dict[str, Any] | None:
 29        """Parse a single path component"""
 30        if self.current_token.type != TokenType.ATTRIBUTE:
 31            return None
 32
 33        attribute = self.current_token.value
 34        self._consume(TokenType.ATTRIBUTE)
 35
 36        filter_expr = None
 37        sub_attribute = None
 38
 39        # Check for filter expression
 40        if self.current_token.type == TokenType.LBRACKET:
 41            self._consume(TokenType.LBRACKET)
 42            filter_expr = self._parse_filter_expression()
 43            self._consume(TokenType.RBRACKET)
 44
 45        # Check for sub-attribute
 46        if self.current_token.type == TokenType.DOT:
 47            self._consume(TokenType.DOT)
 48            if self.current_token.type == TokenType.ATTRIBUTE:
 49                sub_attribute = self.current_token.value
 50                self._consume(TokenType.ATTRIBUTE)
 51
 52        return {"attribute": attribute, "filter": filter_expr, "sub_attribute": sub_attribute}
 53
 54    def _parse_filter_expression(self) -> dict[str, Any] | None:
 55        """Parse a filter expression like 'primary eq true' or
 56        'type eq "work" and primary eq true'"""
 57        return self._parse_or_expression()
 58
 59    def _parse_or_expression(self) -> dict[str, Any] | None:
 60        """Parse OR expressions"""
 61        left = self._parse_and_expression()
 62
 63        while self.current_token.type == TokenType.OR:
 64            self._consume(TokenType.OR)
 65            right = self._parse_and_expression()
 66            left = {"type": "logical", "operator": "or", "left": left, "right": right}
 67
 68        return left
 69
 70    def _parse_and_expression(self) -> dict[str, Any] | None:
 71        """Parse AND expressions"""
 72        left = self._parse_primary_expression()
 73
 74        while self.current_token.type == TokenType.AND:
 75            self._consume(TokenType.AND)
 76            right = self._parse_primary_expression()
 77            left = {"type": "logical", "operator": "and", "left": left, "right": right}
 78
 79        return left
 80
 81    def _parse_primary_expression(self) -> dict[str, Any] | None:
 82        """Parse primary expressions (attribute operator value)"""
 83        if self.current_token.type == TokenType.LPAREN:
 84            self._consume(TokenType.LPAREN)
 85            expr = self._parse_or_expression()
 86            self._consume(TokenType.RPAREN)
 87            return expr
 88
 89        if self.current_token.type == TokenType.NOT:
 90            self._consume(TokenType.NOT)
 91            expr = self._parse_primary_expression()
 92            return {"type": "logical", "operator": "not", "operand": expr}
 93
 94        if self.current_token.type != TokenType.ATTRIBUTE:
 95            return None
 96
 97        attribute = self.current_token.value
 98        self._consume(TokenType.ATTRIBUTE)
 99
100        if self.current_token.type != TokenType.OPERATOR:
101            return None
102
103        operator = self.current_token.value
104        self._consume(TokenType.OPERATOR)
105
106        # Parse value
107        value = None
108        if self.current_token.type == TokenType.STRING:
109            value = self.current_token.value
110            self._consume(TokenType.STRING)
111        elif self.current_token.type == TokenType.NUMBER:
112            value = (
113                float(self.current_token.value)
114                if "." in self.current_token.value
115                else int(self.current_token.value)
116            )
117            self._consume(TokenType.NUMBER)
118        elif self.current_token.type == TokenType.BOOLEAN:
119            value = self.current_token.value
120            self._consume(TokenType.BOOLEAN)
121        elif self.current_token.type == TokenType.NULL:
122            value = None
123            self._consume(TokenType.NULL)
124
125        return {"type": "comparison", "attribute": attribute, "operator": operator, "value": value}
126
127    def _consume(self, expected_type: TokenType):
128        """Consume a token of the expected type"""
129        if self.current_token.type == expected_type:
130            self.current_token = self.lexer.get_next_token()
131        else:
132            raise ValueError(f"Expected {expected_type}, got {self.current_token.type}")

Parser for SCIM paths including filter expressions

lexer
current_token
def parse_path(self, path: str | None) -> list[dict[str, typing.Any]]:
14    def parse_path(self, path: str | None) -> list[dict[str, Any]]:
15        """Parse a SCIM path into components"""
16        self.lexer = SCIMPathLexer(path)
17        self.current_token = self.lexer.get_next_token()
18
19        components = []
20
21        while self.current_token.type != TokenType.EOF:
22            component = self._parse_path_component()
23            if component:
24                components.append(component)
25
26        return components

Parse a SCIM path into components