authentik.lib.sync.outgoing.signals
1from contextlib import contextmanager 2from contextvars import ContextVar 3 4from django.db.models import Model 5from django.db.models.signals import m2m_changed, post_save, pre_delete 6from dramatiq.actor import Actor 7 8from authentik.core.models import Group, User 9from authentik.lib.sync.outgoing.models import OutgoingSyncProvider 10from authentik.lib.utils.reflection import class_to_path 11 12_CTX_INHIBIT_DISPATCH = ContextVar[bool]( 13 "authentik_sync_outgoing_inhibit_dispatch", 14 default=False, 15) 16 17 18@contextmanager 19def sync_outgoing_inhibit_dispatch(): 20 """ 21 Prevent direct and m2m tasks from being dispatched when User/Group/membership change 22 """ 23 _CTX_INHIBIT_DISPATCH.set(True) 24 try: 25 yield 26 finally: 27 _CTX_INHIBIT_DISPATCH.set(False) 28 29 30def register_signals( 31 provider_type: type[OutgoingSyncProvider], 32 task_sync_direct_dispatch: Actor[[str, str | int], None], 33 task_sync_delete_dispatch: Actor[[str, list[tuple[str, str]]], None], 34 task_sync_m2m_dispatch: Actor[[str, str, list[str], bool], None], 35): 36 """Register sync signals""" 37 uid = class_to_path(provider_type) 38 39 def model_post_save( 40 sender: type[Model], 41 instance: User | Group, 42 created: bool, 43 update_fields: list[str] | None = None, 44 **_, 45 ): 46 """Post save handler""" 47 # Special case for user object; don't start sync task when we've only updated `last_login` 48 # This primarily happens during user login 49 if sender == User and update_fields == {"last_login"}: 50 return 51 if _CTX_INHIBIT_DISPATCH.get(): 52 return 53 if not provider_type.objects.exists(): 54 return 55 task_sync_direct_dispatch.send( 56 class_to_path(instance.__class__), 57 instance.pk, 58 ) 59 60 post_save.connect(model_post_save, User, dispatch_uid=uid, weak=False) 61 post_save.connect(model_post_save, Group, dispatch_uid=uid, weak=False) 62 63 def model_pre_delete(sender: type[Model], instance: User | Group, **_): 64 """Pre-delete handler""" 65 if _CTX_INHIBIT_DISPATCH.get(): 66 return 67 mappings = provider_type.get_object_mappings(instance) 68 if not mappings: 69 return 70 task_sync_delete_dispatch.send( 71 class_to_path(instance.__class__), 72 mappings, 73 ) 74 75 pre_delete.connect(model_pre_delete, User, dispatch_uid=uid, weak=False) 76 pre_delete.connect(model_pre_delete, Group, dispatch_uid=uid, weak=False) 77 78 def model_m2m_changed( 79 sender: type[Model], instance, action: str, pk_set: set, reverse: bool, **kwargs 80 ): 81 """Sync group membership""" 82 if action not in ["post_add", "post_remove"]: 83 return 84 if _CTX_INHIBIT_DISPATCH.get(): 85 return 86 if not provider_type.objects.exists(): 87 return 88 task_sync_m2m_dispatch.send(instance.pk, action, list(pk_set), reverse) 89 90 m2m_changed.connect(model_m2m_changed, User.groups.through, dispatch_uid=uid, weak=False)
@contextmanager
def
sync_outgoing_inhibit_dispatch():
19@contextmanager 20def sync_outgoing_inhibit_dispatch(): 21 """ 22 Prevent direct and m2m tasks from being dispatched when User/Group/membership change 23 """ 24 _CTX_INHIBIT_DISPATCH.set(True) 25 try: 26 yield 27 finally: 28 _CTX_INHIBIT_DISPATCH.set(False)
Prevent direct and m2m tasks from being dispatched when User/Group/membership change
def
register_signals( provider_type: type[authentik.lib.sync.outgoing.models.OutgoingSyncProvider], task_sync_direct_dispatch: dramatiq.actor.Actor[[<class 'str'>, str | int], NoneType], task_sync_delete_dispatch: dramatiq.actor.Actor[[<class 'str'>, list[tuple[str, str]]], NoneType], task_sync_m2m_dispatch: dramatiq.actor.Actor[[<class 'str'>, <class 'str'>, list[str], <class 'bool'>], NoneType]):
31def register_signals( 32 provider_type: type[OutgoingSyncProvider], 33 task_sync_direct_dispatch: Actor[[str, str | int], None], 34 task_sync_delete_dispatch: Actor[[str, list[tuple[str, str]]], None], 35 task_sync_m2m_dispatch: Actor[[str, str, list[str], bool], None], 36): 37 """Register sync signals""" 38 uid = class_to_path(provider_type) 39 40 def model_post_save( 41 sender: type[Model], 42 instance: User | Group, 43 created: bool, 44 update_fields: list[str] | None = None, 45 **_, 46 ): 47 """Post save handler""" 48 # Special case for user object; don't start sync task when we've only updated `last_login` 49 # This primarily happens during user login 50 if sender == User and update_fields == {"last_login"}: 51 return 52 if _CTX_INHIBIT_DISPATCH.get(): 53 return 54 if not provider_type.objects.exists(): 55 return 56 task_sync_direct_dispatch.send( 57 class_to_path(instance.__class__), 58 instance.pk, 59 ) 60 61 post_save.connect(model_post_save, User, dispatch_uid=uid, weak=False) 62 post_save.connect(model_post_save, Group, dispatch_uid=uid, weak=False) 63 64 def model_pre_delete(sender: type[Model], instance: User | Group, **_): 65 """Pre-delete handler""" 66 if _CTX_INHIBIT_DISPATCH.get(): 67 return 68 mappings = provider_type.get_object_mappings(instance) 69 if not mappings: 70 return 71 task_sync_delete_dispatch.send( 72 class_to_path(instance.__class__), 73 mappings, 74 ) 75 76 pre_delete.connect(model_pre_delete, User, dispatch_uid=uid, weak=False) 77 pre_delete.connect(model_pre_delete, Group, dispatch_uid=uid, weak=False) 78 79 def model_m2m_changed( 80 sender: type[Model], instance, action: str, pk_set: set, reverse: bool, **kwargs 81 ): 82 """Sync group membership""" 83 if action not in ["post_add", "post_remove"]: 84 return 85 if _CTX_INHIBIT_DISPATCH.get(): 86 return 87 if not provider_type.objects.exists(): 88 return 89 task_sync_m2m_dispatch.send(instance.pk, action, list(pk_set), reverse) 90 91 m2m_changed.connect(model_m2m_changed, User.groups.through, dispatch_uid=uid, weak=False)
Register sync signals