authentik.core.sources.matcher
Source user and group matching
1"""Source user and group matching""" 2 3from dataclasses import dataclass 4from enum import Enum 5from typing import Any 6 7from django.db.models import Q 8from structlog import get_logger 9 10from authentik.core.models import ( 11 Group, 12 GroupSourceConnection, 13 Source, 14 SourceGroupMatchingModes, 15 SourceUserMatchingModes, 16 User, 17 UserSourceConnection, 18) 19 20 21class Action(Enum): 22 """Actions that can be decided based on the request and source settings""" 23 24 LINK = "link" 25 AUTH = "auth" 26 ENROLL = "enroll" 27 DENY = "deny" 28 29 30@dataclass 31class MatchableProperty: 32 property: str 33 link_mode: SourceUserMatchingModes | SourceGroupMatchingModes 34 deny_mode: SourceUserMatchingModes | SourceGroupMatchingModes 35 36 37class SourceMatcher: 38 def __init__( 39 self, 40 source: Source, 41 user_connection_type: type[UserSourceConnection], 42 group_connection_type: type[GroupSourceConnection], 43 ): 44 self.source = source 45 self.user_connection_type = user_connection_type 46 self.group_connection_type = group_connection_type 47 self._logger = get_logger().bind(source=self.source) 48 49 def get_action( 50 self, 51 object_type: type[User | Group], 52 matchable_properties: list[MatchableProperty], 53 identifier: str, 54 properties: dict[str, Any | dict[str, Any]], 55 ) -> tuple[Action, UserSourceConnection | GroupSourceConnection | None]: 56 connection_type = None 57 matching_mode = None 58 identifier_matching_mode = None 59 if object_type == User: 60 connection_type = self.user_connection_type 61 matching_mode = self.source.user_matching_mode 62 identifier_matching_mode = SourceUserMatchingModes.IDENTIFIER 63 if object_type == Group: 64 connection_type = self.group_connection_type 65 matching_mode = self.source.group_matching_mode 66 identifier_matching_mode = SourceGroupMatchingModes.IDENTIFIER 67 if not connection_type or not matching_mode or not identifier_matching_mode: 68 return Action.DENY, None 69 70 new_connection = connection_type(source=self.source, identifier=identifier) 71 72 existing_connections = connection_type.objects.filter( 73 source=self.source, identifier=identifier 74 ) 75 if existing_connections.exists(): 76 return Action.AUTH, existing_connections.first() 77 # No connection exists, but we match on identifier, so enroll 78 if matching_mode == identifier_matching_mode: 79 # We don't save the connection here cause it doesn't have a user/group assigned yet 80 return Action.ENROLL, new_connection 81 82 # Check for existing users with matching attributes 83 query = Q() 84 for matchable_property in matchable_properties: 85 property = matchable_property.property 86 if matching_mode in [matchable_property.link_mode, matchable_property.deny_mode]: 87 if not properties.get(property, None): 88 self._logger.warning( 89 "Refusing to use none property", identifier=identifier, property=property 90 ) 91 return Action.DENY, None 92 query_args = { 93 f"{property}__exact": properties[property], 94 } 95 query = Q(**query_args) 96 self._logger.debug( 97 "Trying to link with existing object", query=query, identifier=identifier 98 ) 99 matching_objects = object_type.objects.filter(query) 100 # Not matching objects, always enroll 101 if not matching_objects.exists(): 102 self._logger.debug("No matching objects found, enrolling") 103 return Action.ENROLL, new_connection 104 105 obj = matching_objects.first() 106 if matching_mode in [mp.link_mode for mp in matchable_properties]: 107 attr = None 108 if object_type == User: 109 attr = "user" 110 if object_type == Group: 111 attr = "group" 112 setattr(new_connection, attr, obj) 113 return Action.LINK, new_connection 114 if matching_mode in [mp.deny_mode for mp in matchable_properties]: 115 self._logger.info("Denying source because object exists", obj=obj) 116 return Action.DENY, None 117 118 # Should never get here as default enroll case is returned above. 119 return Action.DENY, None # pragma: no cover 120 121 def get_user_action( 122 self, identifier: str, properties: dict[str, Any | dict[str, Any]] 123 ) -> tuple[Action, UserSourceConnection | None]: 124 return self.get_action( 125 User, 126 [ 127 MatchableProperty( 128 "username", 129 SourceUserMatchingModes.USERNAME_LINK, 130 SourceUserMatchingModes.USERNAME_DENY, 131 ), 132 MatchableProperty( 133 "email", SourceUserMatchingModes.EMAIL_LINK, SourceUserMatchingModes.EMAIL_DENY 134 ), 135 ], 136 identifier, 137 properties, 138 ) 139 140 def get_group_action( 141 self, identifier: str, properties: dict[str, Any | dict[str, Any]] 142 ) -> tuple[Action, GroupSourceConnection | None]: 143 return self.get_action( 144 Group, 145 [ 146 MatchableProperty( 147 "name", SourceGroupMatchingModes.NAME_LINK, SourceGroupMatchingModes.NAME_DENY 148 ), 149 ], 150 identifier, 151 properties, 152 )
class
Action(enum.Enum):
22class Action(Enum): 23 """Actions that can be decided based on the request and source settings""" 24 25 LINK = "link" 26 AUTH = "auth" 27 ENROLL = "enroll" 28 DENY = "deny"
Actions that can be decided based on the request and source settings
LINK =
<Action.LINK: 'link'>
AUTH =
<Action.AUTH: 'auth'>
ENROLL =
<Action.ENROLL: 'enroll'>
DENY =
<Action.DENY: 'deny'>
@dataclass
class
MatchableProperty:
31@dataclass 32class MatchableProperty: 33 property: str 34 link_mode: SourceUserMatchingModes | SourceGroupMatchingModes 35 deny_mode: SourceUserMatchingModes | SourceGroupMatchingModes
MatchableProperty( property: str, link_mode: authentik.core.models.SourceUserMatchingModes | authentik.core.models.SourceGroupMatchingModes, deny_mode: authentik.core.models.SourceUserMatchingModes | authentik.core.models.SourceGroupMatchingModes)
class
SourceMatcher:
38class SourceMatcher: 39 def __init__( 40 self, 41 source: Source, 42 user_connection_type: type[UserSourceConnection], 43 group_connection_type: type[GroupSourceConnection], 44 ): 45 self.source = source 46 self.user_connection_type = user_connection_type 47 self.group_connection_type = group_connection_type 48 self._logger = get_logger().bind(source=self.source) 49 50 def get_action( 51 self, 52 object_type: type[User | Group], 53 matchable_properties: list[MatchableProperty], 54 identifier: str, 55 properties: dict[str, Any | dict[str, Any]], 56 ) -> tuple[Action, UserSourceConnection | GroupSourceConnection | None]: 57 connection_type = None 58 matching_mode = None 59 identifier_matching_mode = None 60 if object_type == User: 61 connection_type = self.user_connection_type 62 matching_mode = self.source.user_matching_mode 63 identifier_matching_mode = SourceUserMatchingModes.IDENTIFIER 64 if object_type == Group: 65 connection_type = self.group_connection_type 66 matching_mode = self.source.group_matching_mode 67 identifier_matching_mode = SourceGroupMatchingModes.IDENTIFIER 68 if not connection_type or not matching_mode or not identifier_matching_mode: 69 return Action.DENY, None 70 71 new_connection = connection_type(source=self.source, identifier=identifier) 72 73 existing_connections = connection_type.objects.filter( 74 source=self.source, identifier=identifier 75 ) 76 if existing_connections.exists(): 77 return Action.AUTH, existing_connections.first() 78 # No connection exists, but we match on identifier, so enroll 79 if matching_mode == identifier_matching_mode: 80 # We don't save the connection here cause it doesn't have a user/group assigned yet 81 return Action.ENROLL, new_connection 82 83 # Check for existing users with matching attributes 84 query = Q() 85 for matchable_property in matchable_properties: 86 property = matchable_property.property 87 if matching_mode in [matchable_property.link_mode, matchable_property.deny_mode]: 88 if not properties.get(property, None): 89 self._logger.warning( 90 "Refusing to use none property", identifier=identifier, property=property 91 ) 92 return Action.DENY, None 93 query_args = { 94 f"{property}__exact": properties[property], 95 } 96 query = Q(**query_args) 97 self._logger.debug( 98 "Trying to link with existing object", query=query, identifier=identifier 99 ) 100 matching_objects = object_type.objects.filter(query) 101 # Not matching objects, always enroll 102 if not matching_objects.exists(): 103 self._logger.debug("No matching objects found, enrolling") 104 return Action.ENROLL, new_connection 105 106 obj = matching_objects.first() 107 if matching_mode in [mp.link_mode for mp in matchable_properties]: 108 attr = None 109 if object_type == User: 110 attr = "user" 111 if object_type == Group: 112 attr = "group" 113 setattr(new_connection, attr, obj) 114 return Action.LINK, new_connection 115 if matching_mode in [mp.deny_mode for mp in matchable_properties]: 116 self._logger.info("Denying source because object exists", obj=obj) 117 return Action.DENY, None 118 119 # Should never get here as default enroll case is returned above. 120 return Action.DENY, None # pragma: no cover 121 122 def get_user_action( 123 self, identifier: str, properties: dict[str, Any | dict[str, Any]] 124 ) -> tuple[Action, UserSourceConnection | None]: 125 return self.get_action( 126 User, 127 [ 128 MatchableProperty( 129 "username", 130 SourceUserMatchingModes.USERNAME_LINK, 131 SourceUserMatchingModes.USERNAME_DENY, 132 ), 133 MatchableProperty( 134 "email", SourceUserMatchingModes.EMAIL_LINK, SourceUserMatchingModes.EMAIL_DENY 135 ), 136 ], 137 identifier, 138 properties, 139 ) 140 141 def get_group_action( 142 self, identifier: str, properties: dict[str, Any | dict[str, Any]] 143 ) -> tuple[Action, GroupSourceConnection | None]: 144 return self.get_action( 145 Group, 146 [ 147 MatchableProperty( 148 "name", SourceGroupMatchingModes.NAME_LINK, SourceGroupMatchingModes.NAME_DENY 149 ), 150 ], 151 identifier, 152 properties, 153 )
SourceMatcher( source: authentik.core.models.Source, user_connection_type: type[authentik.core.models.UserSourceConnection], group_connection_type: type[authentik.core.models.GroupSourceConnection])
39 def __init__( 40 self, 41 source: Source, 42 user_connection_type: type[UserSourceConnection], 43 group_connection_type: type[GroupSourceConnection], 44 ): 45 self.source = source 46 self.user_connection_type = user_connection_type 47 self.group_connection_type = group_connection_type 48 self._logger = get_logger().bind(source=self.source)
def
get_action( self, object_type: type[authentik.core.models.User | authentik.core.models.Group], matchable_properties: list[MatchableProperty], identifier: str, properties: dict[str, typing.Any | dict[str, typing.Any]]) -> tuple[Action, authentik.core.models.UserSourceConnection | authentik.core.models.GroupSourceConnection | None]:
50 def get_action( 51 self, 52 object_type: type[User | Group], 53 matchable_properties: list[MatchableProperty], 54 identifier: str, 55 properties: dict[str, Any | dict[str, Any]], 56 ) -> tuple[Action, UserSourceConnection | GroupSourceConnection | None]: 57 connection_type = None 58 matching_mode = None 59 identifier_matching_mode = None 60 if object_type == User: 61 connection_type = self.user_connection_type 62 matching_mode = self.source.user_matching_mode 63 identifier_matching_mode = SourceUserMatchingModes.IDENTIFIER 64 if object_type == Group: 65 connection_type = self.group_connection_type 66 matching_mode = self.source.group_matching_mode 67 identifier_matching_mode = SourceGroupMatchingModes.IDENTIFIER 68 if not connection_type or not matching_mode or not identifier_matching_mode: 69 return Action.DENY, None 70 71 new_connection = connection_type(source=self.source, identifier=identifier) 72 73 existing_connections = connection_type.objects.filter( 74 source=self.source, identifier=identifier 75 ) 76 if existing_connections.exists(): 77 return Action.AUTH, existing_connections.first() 78 # No connection exists, but we match on identifier, so enroll 79 if matching_mode == identifier_matching_mode: 80 # We don't save the connection here cause it doesn't have a user/group assigned yet 81 return Action.ENROLL, new_connection 82 83 # Check for existing users with matching attributes 84 query = Q() 85 for matchable_property in matchable_properties: 86 property = matchable_property.property 87 if matching_mode in [matchable_property.link_mode, matchable_property.deny_mode]: 88 if not properties.get(property, None): 89 self._logger.warning( 90 "Refusing to use none property", identifier=identifier, property=property 91 ) 92 return Action.DENY, None 93 query_args = { 94 f"{property}__exact": properties[property], 95 } 96 query = Q(**query_args) 97 self._logger.debug( 98 "Trying to link with existing object", query=query, identifier=identifier 99 ) 100 matching_objects = object_type.objects.filter(query) 101 # Not matching objects, always enroll 102 if not matching_objects.exists(): 103 self._logger.debug("No matching objects found, enrolling") 104 return Action.ENROLL, new_connection 105 106 obj = matching_objects.first() 107 if matching_mode in [mp.link_mode for mp in matchable_properties]: 108 attr = None 109 if object_type == User: 110 attr = "user" 111 if object_type == Group: 112 attr = "group" 113 setattr(new_connection, attr, obj) 114 return Action.LINK, new_connection 115 if matching_mode in [mp.deny_mode for mp in matchable_properties]: 116 self._logger.info("Denying source because object exists", obj=obj) 117 return Action.DENY, None 118 119 # Should never get here as default enroll case is returned above. 120 return Action.DENY, None # pragma: no cover
def
get_user_action( self, identifier: str, properties: dict[str, typing.Any | dict[str, typing.Any]]) -> tuple[Action, authentik.core.models.UserSourceConnection | None]:
122 def get_user_action( 123 self, identifier: str, properties: dict[str, Any | dict[str, Any]] 124 ) -> tuple[Action, UserSourceConnection | None]: 125 return self.get_action( 126 User, 127 [ 128 MatchableProperty( 129 "username", 130 SourceUserMatchingModes.USERNAME_LINK, 131 SourceUserMatchingModes.USERNAME_DENY, 132 ), 133 MatchableProperty( 134 "email", SourceUserMatchingModes.EMAIL_LINK, SourceUserMatchingModes.EMAIL_DENY 135 ), 136 ], 137 identifier, 138 properties, 139 )
def
get_group_action( self, identifier: str, properties: dict[str, typing.Any | dict[str, typing.Any]]) -> tuple[Action, authentik.core.models.GroupSourceConnection | None]:
141 def get_group_action( 142 self, identifier: str, properties: dict[str, Any | dict[str, Any]] 143 ) -> tuple[Action, GroupSourceConnection | None]: 144 return self.get_action( 145 Group, 146 [ 147 MatchableProperty( 148 "name", SourceGroupMatchingModes.NAME_LINK, SourceGroupMatchingModes.NAME_DENY 149 ), 150 ], 151 identifier, 152 properties, 153 )