authentik.core.sources.matcher

Source user and group matching

  1"""Source user and group matching"""
  2
  3from dataclasses import dataclass
  4from enum import Enum
  5from typing import Any
  6
  7from django.db.models import Q
  8from structlog import get_logger
  9
 10from authentik.core.models import (
 11    Group,
 12    GroupSourceConnection,
 13    Source,
 14    SourceGroupMatchingModes,
 15    SourceUserMatchingModes,
 16    User,
 17    UserSourceConnection,
 18)
 19
 20
 21class Action(Enum):
 22    """Actions that can be decided based on the request and source settings"""
 23
 24    LINK = "link"
 25    AUTH = "auth"
 26    ENROLL = "enroll"
 27    DENY = "deny"
 28
 29
 30@dataclass
 31class MatchableProperty:
 32    property: str
 33    link_mode: SourceUserMatchingModes | SourceGroupMatchingModes
 34    deny_mode: SourceUserMatchingModes | SourceGroupMatchingModes
 35
 36
 37class SourceMatcher:
 38    def __init__(
 39        self,
 40        source: Source,
 41        user_connection_type: type[UserSourceConnection],
 42        group_connection_type: type[GroupSourceConnection],
 43    ):
 44        self.source = source
 45        self.user_connection_type = user_connection_type
 46        self.group_connection_type = group_connection_type
 47        self._logger = get_logger().bind(source=self.source)
 48
 49    def get_action(
 50        self,
 51        object_type: type[User | Group],
 52        matchable_properties: list[MatchableProperty],
 53        identifier: str,
 54        properties: dict[str, Any | dict[str, Any]],
 55    ) -> tuple[Action, UserSourceConnection | GroupSourceConnection | None]:
 56        connection_type = None
 57        matching_mode = None
 58        identifier_matching_mode = None
 59        if object_type == User:
 60            connection_type = self.user_connection_type
 61            matching_mode = self.source.user_matching_mode
 62            identifier_matching_mode = SourceUserMatchingModes.IDENTIFIER
 63        if object_type == Group:
 64            connection_type = self.group_connection_type
 65            matching_mode = self.source.group_matching_mode
 66            identifier_matching_mode = SourceGroupMatchingModes.IDENTIFIER
 67        if not connection_type or not matching_mode or not identifier_matching_mode:
 68            return Action.DENY, None
 69
 70        new_connection = connection_type(source=self.source, identifier=identifier)
 71
 72        existing_connections = connection_type.objects.filter(
 73            source=self.source, identifier=identifier
 74        )
 75        if existing_connections.exists():
 76            return Action.AUTH, existing_connections.first()
 77        # No connection exists, but we match on identifier, so enroll
 78        if matching_mode == identifier_matching_mode:
 79            # We don't save the connection here cause it doesn't have a user/group assigned yet
 80            return Action.ENROLL, new_connection
 81
 82        # Check for existing users with matching attributes
 83        query = Q()
 84        for matchable_property in matchable_properties:
 85            property = matchable_property.property
 86            if matching_mode in [matchable_property.link_mode, matchable_property.deny_mode]:
 87                if not properties.get(property, None):
 88                    self._logger.warning(
 89                        "Refusing to use none property", identifier=identifier, property=property
 90                    )
 91                    return Action.DENY, None
 92                query_args = {
 93                    f"{property}__exact": properties[property],
 94                }
 95                query = Q(**query_args)
 96        self._logger.debug(
 97            "Trying to link with existing object", query=query, identifier=identifier
 98        )
 99        matching_objects = object_type.objects.filter(query)
100        # Not matching objects, always enroll
101        if not matching_objects.exists():
102            self._logger.debug("No matching objects found, enrolling")
103            return Action.ENROLL, new_connection
104
105        obj = matching_objects.first()
106        if matching_mode in [mp.link_mode for mp in matchable_properties]:
107            attr = None
108            if object_type == User:
109                attr = "user"
110            if object_type == Group:
111                attr = "group"
112            setattr(new_connection, attr, obj)
113            return Action.LINK, new_connection
114        if matching_mode in [mp.deny_mode for mp in matchable_properties]:
115            self._logger.info("Denying source because object exists", obj=obj)
116            return Action.DENY, None
117
118        # Should never get here as default enroll case is returned above.
119        return Action.DENY, None  # pragma: no cover
120
121    def get_user_action(
122        self, identifier: str, properties: dict[str, Any | dict[str, Any]]
123    ) -> tuple[Action, UserSourceConnection | None]:
124        return self.get_action(
125            User,
126            [
127                MatchableProperty(
128                    "username",
129                    SourceUserMatchingModes.USERNAME_LINK,
130                    SourceUserMatchingModes.USERNAME_DENY,
131                ),
132                MatchableProperty(
133                    "email", SourceUserMatchingModes.EMAIL_LINK, SourceUserMatchingModes.EMAIL_DENY
134                ),
135            ],
136            identifier,
137            properties,
138        )
139
140    def get_group_action(
141        self, identifier: str, properties: dict[str, Any | dict[str, Any]]
142    ) -> tuple[Action, GroupSourceConnection | None]:
143        return self.get_action(
144            Group,
145            [
146                MatchableProperty(
147                    "name", SourceGroupMatchingModes.NAME_LINK, SourceGroupMatchingModes.NAME_DENY
148                ),
149            ],
150            identifier,
151            properties,
152        )
class Action(enum.Enum):
22class Action(Enum):
23    """Actions that can be decided based on the request and source settings"""
24
25    LINK = "link"
26    AUTH = "auth"
27    ENROLL = "enroll"
28    DENY = "deny"

Actions that can be decided based on the request and source settings

AUTH = <Action.AUTH: 'auth'>
ENROLL = <Action.ENROLL: 'enroll'>
DENY = <Action.DENY: 'deny'>
@dataclass
class MatchableProperty:
31@dataclass
32class MatchableProperty:
33    property: str
34    link_mode: SourceUserMatchingModes | SourceGroupMatchingModes
35    deny_mode: SourceUserMatchingModes | SourceGroupMatchingModes
property: str
class SourceMatcher:
 38class SourceMatcher:
 39    def __init__(
 40        self,
 41        source: Source,
 42        user_connection_type: type[UserSourceConnection],
 43        group_connection_type: type[GroupSourceConnection],
 44    ):
 45        self.source = source
 46        self.user_connection_type = user_connection_type
 47        self.group_connection_type = group_connection_type
 48        self._logger = get_logger().bind(source=self.source)
 49
 50    def get_action(
 51        self,
 52        object_type: type[User | Group],
 53        matchable_properties: list[MatchableProperty],
 54        identifier: str,
 55        properties: dict[str, Any | dict[str, Any]],
 56    ) -> tuple[Action, UserSourceConnection | GroupSourceConnection | None]:
 57        connection_type = None
 58        matching_mode = None
 59        identifier_matching_mode = None
 60        if object_type == User:
 61            connection_type = self.user_connection_type
 62            matching_mode = self.source.user_matching_mode
 63            identifier_matching_mode = SourceUserMatchingModes.IDENTIFIER
 64        if object_type == Group:
 65            connection_type = self.group_connection_type
 66            matching_mode = self.source.group_matching_mode
 67            identifier_matching_mode = SourceGroupMatchingModes.IDENTIFIER
 68        if not connection_type or not matching_mode or not identifier_matching_mode:
 69            return Action.DENY, None
 70
 71        new_connection = connection_type(source=self.source, identifier=identifier)
 72
 73        existing_connections = connection_type.objects.filter(
 74            source=self.source, identifier=identifier
 75        )
 76        if existing_connections.exists():
 77            return Action.AUTH, existing_connections.first()
 78        # No connection exists, but we match on identifier, so enroll
 79        if matching_mode == identifier_matching_mode:
 80            # We don't save the connection here cause it doesn't have a user/group assigned yet
 81            return Action.ENROLL, new_connection
 82
 83        # Check for existing users with matching attributes
 84        query = Q()
 85        for matchable_property in matchable_properties:
 86            property = matchable_property.property
 87            if matching_mode in [matchable_property.link_mode, matchable_property.deny_mode]:
 88                if not properties.get(property, None):
 89                    self._logger.warning(
 90                        "Refusing to use none property", identifier=identifier, property=property
 91                    )
 92                    return Action.DENY, None
 93                query_args = {
 94                    f"{property}__exact": properties[property],
 95                }
 96                query = Q(**query_args)
 97        self._logger.debug(
 98            "Trying to link with existing object", query=query, identifier=identifier
 99        )
100        matching_objects = object_type.objects.filter(query)
101        # Not matching objects, always enroll
102        if not matching_objects.exists():
103            self._logger.debug("No matching objects found, enrolling")
104            return Action.ENROLL, new_connection
105
106        obj = matching_objects.first()
107        if matching_mode in [mp.link_mode for mp in matchable_properties]:
108            attr = None
109            if object_type == User:
110                attr = "user"
111            if object_type == Group:
112                attr = "group"
113            setattr(new_connection, attr, obj)
114            return Action.LINK, new_connection
115        if matching_mode in [mp.deny_mode for mp in matchable_properties]:
116            self._logger.info("Denying source because object exists", obj=obj)
117            return Action.DENY, None
118
119        # Should never get here as default enroll case is returned above.
120        return Action.DENY, None  # pragma: no cover
121
122    def get_user_action(
123        self, identifier: str, properties: dict[str, Any | dict[str, Any]]
124    ) -> tuple[Action, UserSourceConnection | None]:
125        return self.get_action(
126            User,
127            [
128                MatchableProperty(
129                    "username",
130                    SourceUserMatchingModes.USERNAME_LINK,
131                    SourceUserMatchingModes.USERNAME_DENY,
132                ),
133                MatchableProperty(
134                    "email", SourceUserMatchingModes.EMAIL_LINK, SourceUserMatchingModes.EMAIL_DENY
135                ),
136            ],
137            identifier,
138            properties,
139        )
140
141    def get_group_action(
142        self, identifier: str, properties: dict[str, Any | dict[str, Any]]
143    ) -> tuple[Action, GroupSourceConnection | None]:
144        return self.get_action(
145            Group,
146            [
147                MatchableProperty(
148                    "name", SourceGroupMatchingModes.NAME_LINK, SourceGroupMatchingModes.NAME_DENY
149                ),
150            ],
151            identifier,
152            properties,
153        )
SourceMatcher( source: authentik.core.models.Source, user_connection_type: type[authentik.core.models.UserSourceConnection], group_connection_type: type[authentik.core.models.GroupSourceConnection])
39    def __init__(
40        self,
41        source: Source,
42        user_connection_type: type[UserSourceConnection],
43        group_connection_type: type[GroupSourceConnection],
44    ):
45        self.source = source
46        self.user_connection_type = user_connection_type
47        self.group_connection_type = group_connection_type
48        self._logger = get_logger().bind(source=self.source)
source
user_connection_type
group_connection_type
def get_action( self, object_type: type[authentik.core.models.User | authentik.core.models.Group], matchable_properties: list[MatchableProperty], identifier: str, properties: dict[str, typing.Any | dict[str, typing.Any]]) -> tuple[Action, authentik.core.models.UserSourceConnection | authentik.core.models.GroupSourceConnection | None]:
 50    def get_action(
 51        self,
 52        object_type: type[User | Group],
 53        matchable_properties: list[MatchableProperty],
 54        identifier: str,
 55        properties: dict[str, Any | dict[str, Any]],
 56    ) -> tuple[Action, UserSourceConnection | GroupSourceConnection | None]:
 57        connection_type = None
 58        matching_mode = None
 59        identifier_matching_mode = None
 60        if object_type == User:
 61            connection_type = self.user_connection_type
 62            matching_mode = self.source.user_matching_mode
 63            identifier_matching_mode = SourceUserMatchingModes.IDENTIFIER
 64        if object_type == Group:
 65            connection_type = self.group_connection_type
 66            matching_mode = self.source.group_matching_mode
 67            identifier_matching_mode = SourceGroupMatchingModes.IDENTIFIER
 68        if not connection_type or not matching_mode or not identifier_matching_mode:
 69            return Action.DENY, None
 70
 71        new_connection = connection_type(source=self.source, identifier=identifier)
 72
 73        existing_connections = connection_type.objects.filter(
 74            source=self.source, identifier=identifier
 75        )
 76        if existing_connections.exists():
 77            return Action.AUTH, existing_connections.first()
 78        # No connection exists, but we match on identifier, so enroll
 79        if matching_mode == identifier_matching_mode:
 80            # We don't save the connection here cause it doesn't have a user/group assigned yet
 81            return Action.ENROLL, new_connection
 82
 83        # Check for existing users with matching attributes
 84        query = Q()
 85        for matchable_property in matchable_properties:
 86            property = matchable_property.property
 87            if matching_mode in [matchable_property.link_mode, matchable_property.deny_mode]:
 88                if not properties.get(property, None):
 89                    self._logger.warning(
 90                        "Refusing to use none property", identifier=identifier, property=property
 91                    )
 92                    return Action.DENY, None
 93                query_args = {
 94                    f"{property}__exact": properties[property],
 95                }
 96                query = Q(**query_args)
 97        self._logger.debug(
 98            "Trying to link with existing object", query=query, identifier=identifier
 99        )
100        matching_objects = object_type.objects.filter(query)
101        # Not matching objects, always enroll
102        if not matching_objects.exists():
103            self._logger.debug("No matching objects found, enrolling")
104            return Action.ENROLL, new_connection
105
106        obj = matching_objects.first()
107        if matching_mode in [mp.link_mode for mp in matchable_properties]:
108            attr = None
109            if object_type == User:
110                attr = "user"
111            if object_type == Group:
112                attr = "group"
113            setattr(new_connection, attr, obj)
114            return Action.LINK, new_connection
115        if matching_mode in [mp.deny_mode for mp in matchable_properties]:
116            self._logger.info("Denying source because object exists", obj=obj)
117            return Action.DENY, None
118
119        # Should never get here as default enroll case is returned above.
120        return Action.DENY, None  # pragma: no cover
def get_user_action( self, identifier: str, properties: dict[str, typing.Any | dict[str, typing.Any]]) -> tuple[Action, authentik.core.models.UserSourceConnection | None]:
122    def get_user_action(
123        self, identifier: str, properties: dict[str, Any | dict[str, Any]]
124    ) -> tuple[Action, UserSourceConnection | None]:
125        return self.get_action(
126            User,
127            [
128                MatchableProperty(
129                    "username",
130                    SourceUserMatchingModes.USERNAME_LINK,
131                    SourceUserMatchingModes.USERNAME_DENY,
132                ),
133                MatchableProperty(
134                    "email", SourceUserMatchingModes.EMAIL_LINK, SourceUserMatchingModes.EMAIL_DENY
135                ),
136            ],
137            identifier,
138            properties,
139        )
def get_group_action( self, identifier: str, properties: dict[str, typing.Any | dict[str, typing.Any]]) -> tuple[Action, authentik.core.models.GroupSourceConnection | None]:
141    def get_group_action(
142        self, identifier: str, properties: dict[str, Any | dict[str, Any]]
143    ) -> tuple[Action, GroupSourceConnection | None]:
144        return self.get_action(
145            Group,
146            [
147                MatchableProperty(
148                    "name", SourceGroupMatchingModes.NAME_LINK, SourceGroupMatchingModes.NAME_DENY
149                ),
150            ],
151            identifier,
152            properties,
153        )