authentik.lib.sync.outgoing.base
Basic outgoing sync Client
1"""Basic outgoing sync Client""" 2 3from enum import StrEnum 4from typing import TYPE_CHECKING 5 6from deepmerge import always_merger 7from django.db import DatabaseError 8from structlog.stdlib import get_logger 9 10from authentik.core.expression.exceptions import ( 11 PropertyMappingExpressionException, 12) 13from authentik.events.models import Event, EventAction 14from authentik.lib.expression.exceptions import ControlFlowException 15from authentik.lib.sync.mapper import PropertyMappingManager 16from authentik.lib.sync.outgoing.exceptions import NotFoundSyncException, StopSync 17 18if TYPE_CHECKING: 19 from django.db.models import Model 20 21 from authentik.lib.sync.outgoing.models import OutgoingSyncProvider 22 23 24class Direction(StrEnum): 25 add = "add" 26 remove = "remove" 27 28 29SAFE_METHODS = [ 30 "GET", 31 "HEAD", 32 "OPTIONS", 33 "TRACE", 34] 35 36 37class BaseOutgoingSyncClient[ 38 TModel: "Model", 39 TConnection: "Model", 40 TSchema: dict, 41 TProvider: "OutgoingSyncProvider", 42]: 43 """Basic Outgoing sync client Client""" 44 45 provider: TProvider 46 connection_type: type[TConnection] 47 connection_type_query: str 48 mapper: PropertyMappingManager 49 50 can_discover = False 51 52 def __init__(self, provider: TProvider): 53 self.logger = get_logger().bind(provider=provider.name) 54 self.provider = provider 55 56 def create(self, obj: TModel) -> TConnection: 57 """Create object in remote destination""" 58 raise NotImplementedError() 59 60 def update(self, obj: TModel, connection: TConnection): 61 """Update object in remote destination""" 62 raise NotImplementedError() 63 64 def write(self, obj: TModel) -> tuple[TConnection, bool]: 65 """Write object to destination. Uses self.create and self.update, but 66 can be overwritten for further logic""" 67 connection = self.connection_type.objects.filter( 68 provider=self.provider, **{self.connection_type_query: obj} 69 ).first() 70 try: 71 if not connection: 72 connection = self.create(obj) 73 return connection, True 74 try: 75 self.update(obj, connection) 76 return connection, False 77 except NotFoundSyncException: 78 connection.delete() 79 connection = self.create(obj) 80 return connection, True 81 except DatabaseError as exc: 82 self.logger.warning("Failed to write object", obj=obj, exc=exc) 83 if connection: 84 connection.delete() 85 return None, False 86 87 def delete(self, identifier: str): 88 """Delete object from destination""" 89 raise NotImplementedError() 90 91 def to_schema(self, obj: TModel, connection: TConnection | None, **defaults) -> TSchema: 92 """Convert object to destination schema""" 93 raw_final_object = {} 94 try: 95 eval_kwargs = { 96 "request": None, 97 "provider": self.provider, 98 "connection": connection, 99 obj._meta.model_name: obj, 100 } 101 eval_kwargs.setdefault("user", None) 102 for value in self.mapper.iter_eval(**eval_kwargs): 103 always_merger.merge(raw_final_object, value) 104 except ControlFlowException as exc: 105 raise exc from exc 106 except PropertyMappingExpressionException as exc: 107 # Value error can be raised when assigning invalid data to an attribute 108 Event.new( 109 EventAction.CONFIGURATION_ERROR, 110 message="Failed to evaluate property-mapping", 111 mapping=exc.mapping, 112 ).with_exception(exc).save() 113 raise StopSync(exc, obj, exc.mapping) from exc 114 if not raw_final_object: 115 raise StopSync(ValueError("No mappings configured"), obj) 116 for key, value in defaults.items(): 117 raw_final_object.setdefault(key, value) 118 return raw_final_object 119 120 def discover(self): 121 """Optional method. Can be used to implement a "discovery" where 122 upon creation of this provider, this function will be called and can 123 pre-link any users/groups in the remote system with the respective 124 object in authentik based on a common identifier""" 125 raise NotImplementedError() 126 127 def update_single_attribute(self, connection: TConnection): 128 """Update connection attributes on a connection object, when the connection 129 is manually created""" 130 raise NotImplementedError
class
Direction(enum.StrEnum):
add =
<Direction.add: 'add'>
remove =
<Direction.remove: 'remove'>
SAFE_METHODS =
['GET', 'HEAD', 'OPTIONS', 'TRACE']
class
BaseOutgoingSyncClient(typing.Generic[TModel, TConnection, TSchema, TProvider]):
38class BaseOutgoingSyncClient[ 39 TModel: "Model", 40 TConnection: "Model", 41 TSchema: dict, 42 TProvider: "OutgoingSyncProvider", 43]: 44 """Basic Outgoing sync client Client""" 45 46 provider: TProvider 47 connection_type: type[TConnection] 48 connection_type_query: str 49 mapper: PropertyMappingManager 50 51 can_discover = False 52 53 def __init__(self, provider: TProvider): 54 self.logger = get_logger().bind(provider=provider.name) 55 self.provider = provider 56 57 def create(self, obj: TModel) -> TConnection: 58 """Create object in remote destination""" 59 raise NotImplementedError() 60 61 def update(self, obj: TModel, connection: TConnection): 62 """Update object in remote destination""" 63 raise NotImplementedError() 64 65 def write(self, obj: TModel) -> tuple[TConnection, bool]: 66 """Write object to destination. Uses self.create and self.update, but 67 can be overwritten for further logic""" 68 connection = self.connection_type.objects.filter( 69 provider=self.provider, **{self.connection_type_query: obj} 70 ).first() 71 try: 72 if not connection: 73 connection = self.create(obj) 74 return connection, True 75 try: 76 self.update(obj, connection) 77 return connection, False 78 except NotFoundSyncException: 79 connection.delete() 80 connection = self.create(obj) 81 return connection, True 82 except DatabaseError as exc: 83 self.logger.warning("Failed to write object", obj=obj, exc=exc) 84 if connection: 85 connection.delete() 86 return None, False 87 88 def delete(self, identifier: str): 89 """Delete object from destination""" 90 raise NotImplementedError() 91 92 def to_schema(self, obj: TModel, connection: TConnection | None, **defaults) -> TSchema: 93 """Convert object to destination schema""" 94 raw_final_object = {} 95 try: 96 eval_kwargs = { 97 "request": None, 98 "provider": self.provider, 99 "connection": connection, 100 obj._meta.model_name: obj, 101 } 102 eval_kwargs.setdefault("user", None) 103 for value in self.mapper.iter_eval(**eval_kwargs): 104 always_merger.merge(raw_final_object, value) 105 except ControlFlowException as exc: 106 raise exc from exc 107 except PropertyMappingExpressionException as exc: 108 # Value error can be raised when assigning invalid data to an attribute 109 Event.new( 110 EventAction.CONFIGURATION_ERROR, 111 message="Failed to evaluate property-mapping", 112 mapping=exc.mapping, 113 ).with_exception(exc).save() 114 raise StopSync(exc, obj, exc.mapping) from exc 115 if not raw_final_object: 116 raise StopSync(ValueError("No mappings configured"), obj) 117 for key, value in defaults.items(): 118 raw_final_object.setdefault(key, value) 119 return raw_final_object 120 121 def discover(self): 122 """Optional method. Can be used to implement a "discovery" where 123 upon creation of this provider, this function will be called and can 124 pre-link any users/groups in the remote system with the respective 125 object in authentik based on a common identifier""" 126 raise NotImplementedError() 127 128 def update_single_attribute(self, connection: TConnection): 129 """Update connection attributes on a connection object, when the connection 130 is manually created""" 131 raise NotImplementedError
Basic Outgoing sync client Client
def
create(self, obj: TModel) -> TConnection:
57 def create(self, obj: TModel) -> TConnection: 58 """Create object in remote destination""" 59 raise NotImplementedError()
Create object in remote destination
def
update(self, obj: TModel, connection: TConnection):
61 def update(self, obj: TModel, connection: TConnection): 62 """Update object in remote destination""" 63 raise NotImplementedError()
Update object in remote destination
def
write(self, obj: TModel) -> tuple[TConnection, bool]:
65 def write(self, obj: TModel) -> tuple[TConnection, bool]: 66 """Write object to destination. Uses self.create and self.update, but 67 can be overwritten for further logic""" 68 connection = self.connection_type.objects.filter( 69 provider=self.provider, **{self.connection_type_query: obj} 70 ).first() 71 try: 72 if not connection: 73 connection = self.create(obj) 74 return connection, True 75 try: 76 self.update(obj, connection) 77 return connection, False 78 except NotFoundSyncException: 79 connection.delete() 80 connection = self.create(obj) 81 return connection, True 82 except DatabaseError as exc: 83 self.logger.warning("Failed to write object", obj=obj, exc=exc) 84 if connection: 85 connection.delete() 86 return None, False
Write object to destination. Uses self.create and self.update, but can be overwritten for further logic
def
delete(self, identifier: str):
88 def delete(self, identifier: str): 89 """Delete object from destination""" 90 raise NotImplementedError()
Delete object from destination
def
to_schema(self, obj: TModel, connection: TConnection | None, **defaults) -> TSchema:
92 def to_schema(self, obj: TModel, connection: TConnection | None, **defaults) -> TSchema: 93 """Convert object to destination schema""" 94 raw_final_object = {} 95 try: 96 eval_kwargs = { 97 "request": None, 98 "provider": self.provider, 99 "connection": connection, 100 obj._meta.model_name: obj, 101 } 102 eval_kwargs.setdefault("user", None) 103 for value in self.mapper.iter_eval(**eval_kwargs): 104 always_merger.merge(raw_final_object, value) 105 except ControlFlowException as exc: 106 raise exc from exc 107 except PropertyMappingExpressionException as exc: 108 # Value error can be raised when assigning invalid data to an attribute 109 Event.new( 110 EventAction.CONFIGURATION_ERROR, 111 message="Failed to evaluate property-mapping", 112 mapping=exc.mapping, 113 ).with_exception(exc).save() 114 raise StopSync(exc, obj, exc.mapping) from exc 115 if not raw_final_object: 116 raise StopSync(ValueError("No mappings configured"), obj) 117 for key, value in defaults.items(): 118 raw_final_object.setdefault(key, value) 119 return raw_final_object
Convert object to destination schema
def
discover(self):
121 def discover(self): 122 """Optional method. Can be used to implement a "discovery" where 123 upon creation of this provider, this function will be called and can 124 pre-link any users/groups in the remote system with the respective 125 object in authentik based on a common identifier""" 126 raise NotImplementedError()
Optional method. Can be used to implement a "discovery" where upon creation of this provider, this function will be called and can pre-link any users/groups in the remote system with the respective object in authentik based on a common identifier
def
update_single_attribute(self, connection: TConnection):
128 def update_single_attribute(self, connection: TConnection): 129 """Update connection attributes on a connection object, when the connection 130 is manually created""" 131 raise NotImplementedError
Update connection attributes on a connection object, when the connection is manually created