authentik.lib.sync.outgoing.signals

 1from contextlib import contextmanager
 2from contextvars import ContextVar
 3
 4from django.db.models import Model
 5from django.db.models.signals import m2m_changed, post_save, pre_delete
 6from dramatiq.actor import Actor
 7
 8from authentik.core.models import Group, User
 9from authentik.lib.sync.outgoing.models import OutgoingSyncProvider
10from authentik.lib.utils.reflection import class_to_path
11
12_CTX_INHIBIT_DISPATCH = ContextVar[bool](
13    "authentik_sync_outgoing_inhibit_dispatch",
14    default=False,
15)
16
17
18@contextmanager
19def sync_outgoing_inhibit_dispatch():
20    """
21    Prevent direct and m2m tasks from being dispatched when User/Group/membership change
22    """
23    _CTX_INHIBIT_DISPATCH.set(True)
24    try:
25        yield
26    finally:
27        _CTX_INHIBIT_DISPATCH.set(False)
28
29
30def register_signals(
31    provider_type: type[OutgoingSyncProvider],
32    task_sync_direct_dispatch: Actor[[str, str | int], None],
33    task_sync_delete_dispatch: Actor[[str, list[tuple[str, str]]], None],
34    task_sync_m2m_dispatch: Actor[[str, str, list[str], bool], None],
35):
36    """Register sync signals"""
37    uid = class_to_path(provider_type)
38
39    def model_post_save(
40        sender: type[Model],
41        instance: User | Group,
42        created: bool,
43        update_fields: list[str] | None = None,
44        **_,
45    ):
46        """Post save handler"""
47        # Special case for user object; don't start sync task when we've only updated `last_login`
48        # This primarily happens during user login
49        if sender == User and update_fields == {"last_login"}:
50            return
51        if _CTX_INHIBIT_DISPATCH.get():
52            return
53        if not provider_type.objects.exists():
54            return
55        task_sync_direct_dispatch.send(
56            class_to_path(instance.__class__),
57            instance.pk,
58        )
59
60    post_save.connect(model_post_save, User, dispatch_uid=uid, weak=False)
61    post_save.connect(model_post_save, Group, dispatch_uid=uid, weak=False)
62
63    def model_pre_delete(sender: type[Model], instance: User | Group, **_):
64        """Pre-delete handler"""
65        if _CTX_INHIBIT_DISPATCH.get():
66            return
67        mappings = provider_type.get_object_mappings(instance)
68        if not mappings:
69            return
70        task_sync_delete_dispatch.send(
71            class_to_path(instance.__class__),
72            mappings,
73        )
74
75    pre_delete.connect(model_pre_delete, User, dispatch_uid=uid, weak=False)
76    pre_delete.connect(model_pre_delete, Group, dispatch_uid=uid, weak=False)
77
78    def model_m2m_changed(
79        sender: type[Model], instance, action: str, pk_set: set, reverse: bool, **kwargs
80    ):
81        """Sync group membership"""
82        if action not in ["post_add", "post_remove"]:
83            return
84        if _CTX_INHIBIT_DISPATCH.get():
85            return
86        if not provider_type.objects.exists():
87            return
88        task_sync_m2m_dispatch.send(instance.pk, action, list(pk_set), reverse)
89
90    m2m_changed.connect(model_m2m_changed, User.groups.through, dispatch_uid=uid, weak=False)
@contextmanager
def sync_outgoing_inhibit_dispatch():
19@contextmanager
20def sync_outgoing_inhibit_dispatch():
21    """
22    Prevent direct and m2m tasks from being dispatched when User/Group/membership change
23    """
24    _CTX_INHIBIT_DISPATCH.set(True)
25    try:
26        yield
27    finally:
28        _CTX_INHIBIT_DISPATCH.set(False)

Prevent direct and m2m tasks from being dispatched when User/Group/membership change

def register_signals( provider_type: type[authentik.lib.sync.outgoing.models.OutgoingSyncProvider], task_sync_direct_dispatch: dramatiq.actor.Actor[[<class 'str'>, str | int], NoneType], task_sync_delete_dispatch: dramatiq.actor.Actor[[<class 'str'>, list[tuple[str, str]]], NoneType], task_sync_m2m_dispatch: dramatiq.actor.Actor[[<class 'str'>, <class 'str'>, list[str], <class 'bool'>], NoneType]):
31def register_signals(
32    provider_type: type[OutgoingSyncProvider],
33    task_sync_direct_dispatch: Actor[[str, str | int], None],
34    task_sync_delete_dispatch: Actor[[str, list[tuple[str, str]]], None],
35    task_sync_m2m_dispatch: Actor[[str, str, list[str], bool], None],
36):
37    """Register sync signals"""
38    uid = class_to_path(provider_type)
39
40    def model_post_save(
41        sender: type[Model],
42        instance: User | Group,
43        created: bool,
44        update_fields: list[str] | None = None,
45        **_,
46    ):
47        """Post save handler"""
48        # Special case for user object; don't start sync task when we've only updated `last_login`
49        # This primarily happens during user login
50        if sender == User and update_fields == {"last_login"}:
51            return
52        if _CTX_INHIBIT_DISPATCH.get():
53            return
54        if not provider_type.objects.exists():
55            return
56        task_sync_direct_dispatch.send(
57            class_to_path(instance.__class__),
58            instance.pk,
59        )
60
61    post_save.connect(model_post_save, User, dispatch_uid=uid, weak=False)
62    post_save.connect(model_post_save, Group, dispatch_uid=uid, weak=False)
63
64    def model_pre_delete(sender: type[Model], instance: User | Group, **_):
65        """Pre-delete handler"""
66        if _CTX_INHIBIT_DISPATCH.get():
67            return
68        mappings = provider_type.get_object_mappings(instance)
69        if not mappings:
70            return
71        task_sync_delete_dispatch.send(
72            class_to_path(instance.__class__),
73            mappings,
74        )
75
76    pre_delete.connect(model_pre_delete, User, dispatch_uid=uid, weak=False)
77    pre_delete.connect(model_pre_delete, Group, dispatch_uid=uid, weak=False)
78
79    def model_m2m_changed(
80        sender: type[Model], instance, action: str, pk_set: set, reverse: bool, **kwargs
81    ):
82        """Sync group membership"""
83        if action not in ["post_add", "post_remove"]:
84            return
85        if _CTX_INHIBIT_DISPATCH.get():
86            return
87        if not provider_type.objects.exists():
88            return
89        task_sync_m2m_dispatch.send(instance.pk, action, list(pk_set), reverse)
90
91    m2m_changed.connect(model_m2m_changed, User.groups.through, dispatch_uid=uid, weak=False)

Register sync signals