authentik.lib.sync.outgoing.tasks

  1from django.core.paginator import Paginator
  2from django.db.models import Model, QuerySet
  3from django.db.models.query import Q
  4from dramatiq.actor import Actor
  5from dramatiq.composition import group
  6from dramatiq.errors import Retry
  7from structlog.stdlib import BoundLogger, get_logger
  8
  9from authentik.core.expression.exceptions import SkipObjectException
 10from authentik.core.models import Group, User
 11from authentik.events.utils import sanitize_item
 12from authentik.lib.sync.outgoing.base import Direction
 13from authentik.lib.sync.outgoing.exceptions import (
 14    BadRequestSyncException,
 15    DryRunRejected,
 16    NotFoundSyncException,
 17    StopSync,
 18    TransientSyncException,
 19)
 20from authentik.lib.sync.outgoing.models import OutgoingSyncProvider
 21from authentik.lib.utils.errors import exception_to_dict
 22from authentik.lib.utils.reflection import class_to_path, path_to_class
 23from authentik.lib.utils.time import timedelta_from_string
 24from authentik.tasks.middleware import CurrentTask
 25from authentik.tasks.models import Task
 26
 27
 28class SyncTasks:
 29    """Container for all sync 'tasks' (this class doesn't actually contain
 30    tasks due to dramatiq's magic, however exposes a number of functions to be called from tasks)"""
 31
 32    logger: BoundLogger
 33
 34    def __init__(self, provider_model: type[OutgoingSyncProvider]) -> None:
 35        super().__init__()
 36        self._provider_model = provider_model
 37
 38    def sync_paginator(
 39        self,
 40        current_task: Task,
 41        provider: OutgoingSyncProvider,
 42        sync_objects: Actor[[str, int, int, bool], None],
 43        paginator: Paginator,
 44        object_type: type[User | Group],
 45        **options,
 46    ):
 47        tasks = []
 48        time_limit = timedelta_from_string(provider.sync_page_timeout).total_seconds() * 1000
 49        for page in paginator.page_range:
 50            page_sync = sync_objects.message_with_options(
 51                args=(class_to_path(object_type), page, provider.pk),
 52                time_limit=time_limit,
 53                # Assign tasks to the same schedule as the current one
 54                rel_obj=current_task.rel_obj,
 55                uid=f"{provider.name}:{object_type._meta.model_name}:{page}",
 56                **options,
 57            )
 58            tasks.append(page_sync)
 59        return tasks
 60
 61    def sync(
 62        self,
 63        provider_pk: int,
 64        sync_objects: Actor[[str, int, int, bool], None],
 65    ):
 66        task = CurrentTask.get_task()
 67        self.logger = get_logger().bind(
 68            provider_type=class_to_path(self._provider_model),
 69            provider_pk=provider_pk,
 70        )
 71        provider: OutgoingSyncProvider = self._provider_model.objects.filter(
 72            Q(backchannel_application__isnull=False) | Q(application__isnull=False),
 73            pk=provider_pk,
 74        ).first()
 75        if not provider:
 76            task.warning("No provider found. Is it assigned to an application?")
 77            return
 78        task.info("Starting full provider sync")
 79        self.logger.debug("Starting provider sync")
 80        with provider.sync_lock as lock_acquired:
 81            if not lock_acquired:
 82                task.info("Synchronization is already running. Skipping.")
 83                self.logger.debug("Failed to acquire sync lock, skipping", provider=provider.name)
 84                return
 85            try:
 86                users_tasks = group(
 87                    self.sync_paginator(
 88                        current_task=task,
 89                        provider=provider,
 90                        sync_objects=sync_objects,
 91                        paginator=provider.get_paginator(User),
 92                        object_type=User,
 93                    )
 94                )
 95                group_tasks = group(
 96                    self.sync_paginator(
 97                        current_task=task,
 98                        provider=provider,
 99                        sync_objects=sync_objects,
100                        paginator=provider.get_paginator(Group),
101                        object_type=Group,
102                    )
103                )
104                users_tasks.run().wait(timeout=provider.get_object_sync_time_limit_ms(User))
105                group_tasks.run().wait(timeout=provider.get_object_sync_time_limit_ms(Group))
106                self._sync_cleanup(provider, task)
107            except TransientSyncException as exc:
108                self.logger.warning("transient sync exception", exc=exc)
109                task.warning("Sync encountered a transient exception. Retrying", exc=exc)
110                raise Retry() from exc
111            except StopSync as exc:
112                task.error(exc)
113                return
114
115    def _sync_cleanup(self, provider: OutgoingSyncProvider, task: Task):
116        """Delete remote objects that are no longer in scope"""
117        for object_type in (User, Group):
118            try:
119                client = provider.client_for_model(object_type)
120            except TransientSyncException:
121                continue
122            in_scope_pks = set(provider.get_object_qs(object_type).values_list("pk", flat=True))
123            stale = client.connection_type.objects.filter(provider=provider).exclude(
124                **{f"{client.connection_type_query}__pk__in": in_scope_pks}
125            )
126            for connection in stale:
127                try:
128                    client.delete(connection.scim_id)
129                    task.info(
130                        f"Deleted out-of-scope {object_type._meta.verbose_name}",
131                        scim_id=connection.scim_id,
132                    )
133                except NotFoundSyncException:
134                    pass
135                except TransientSyncException as exc:
136                    self.logger.warning("transient error during cleanup", exc=exc)
137                    self.logger.warning(
138                        "Cleanup encountered a transient exception. Retrying", exc=exc
139                    )
140                    raise Retry() from exc
141                except DryRunRejected as exc:
142                    self.logger.info("Rejected dry-run cleanup event", exc=exc)
143
144    def sync_objects(
145        self,
146        object_type: str,
147        page: int,
148        provider_pk: int,
149        override_dry_run=False,
150        **filter,
151    ):
152        task = CurrentTask.get_task()
153        _object_type: type[Model] = path_to_class(object_type)
154        self.logger = get_logger().bind(
155            provider_type=class_to_path(self._provider_model),
156            provider_pk=provider_pk,
157            object_type=object_type,
158        )
159        provider: OutgoingSyncProvider | None = self._provider_model.objects.filter(
160            Q(backchannel_application__isnull=False) | Q(application__isnull=False),
161            pk=provider_pk,
162        ).first()
163        if not provider:
164            task.warning("No provider found. Is it assigned to an application?")
165            return
166        # Override dry run mode if requested, however don't save the provider
167        # so that scheduled sync tasks still run in dry_run mode
168        if override_dry_run:
169            provider.dry_run = False
170        try:
171            client = provider.client_for_model(_object_type)
172        except TransientSyncException:
173            return
174        paginator = Paginator(
175            provider.get_object_qs(_object_type, **filter),
176            provider.sync_page_size,
177        )
178        if client.can_discover:
179            self.logger.debug("starting discover")
180            client.discover()
181        self.logger.debug("starting sync for page", page=page)
182        task.info(f"Syncing page {page} or {_object_type._meta.verbose_name_plural}")
183        for obj in paginator.page(page).object_list:
184            obj: Model
185            try:
186                client.write(obj)
187            except SkipObjectException:
188                self.logger.debug("skipping object due to SkipObject", obj=obj)
189                continue
190            except DryRunRejected as exc:
191                task.info(
192                    "Dropping mutating request due to dry run",
193                    obj=sanitize_item(obj),
194                    method=exc.method,
195                    url=exc.url,
196                    body=exc.body,
197                )
198            except BadRequestSyncException as exc:
199                self.logger.warning("failed to sync object", exc=exc, obj=obj)
200                task.warning(
201                    f"Failed to sync {str(obj)} due to error: {str(exc)}",
202                    arguments=exc.args[1:],
203                    obj=sanitize_item(obj),
204                    exception=exception_to_dict(exc),
205                )
206            except TransientSyncException as exc:
207                self.logger.warning("failed to sync object", exc=exc, user=obj)
208                task.warning(
209                    f"Failed to sync {str(obj)} due to transient error: {str(exc)}",
210                    obj=sanitize_item(obj),
211                    exception=exception_to_dict(exc),
212                )
213            except StopSync as exc:
214                self.logger.warning("Stopping sync", exc=exc)
215                task.warning(
216                    f"Stopping sync due to error: {exc.detail()}",
217                    obj=sanitize_item(obj),
218                )
219                break
220
221    def sync_signal_direct_dispatch(
222        self,
223        task_sync_signal_direct: Actor[[str, str | int, int], None],
224        model: str,
225        pk: str | int,
226    ):
227        model_class: type[Model] = path_to_class(model)
228        for provider in self._provider_model.objects.filter(
229            Q(backchannel_application__isnull=False) | Q(application__isnull=False)
230        ):
231            task_sync_signal_direct.send_with_options(
232                args=(model, pk, provider.pk),
233                rel_obj=provider,
234                uid=f"{provider.name}:{model_class._meta.model_name}:{pk}:direct",
235            )
236
237    def sync_signal_direct(
238        self,
239        model: str,
240        pk: str | int,
241        provider_pk: int,
242    ):
243        task = CurrentTask.get_task()
244        self.logger = get_logger().bind(
245            provider_type=class_to_path(self._provider_model),
246        )
247        model_class: type[Model] = path_to_class(model)
248        instance = model_class.objects.filter(pk=pk).first()
249        if not instance:
250            return
251        provider: OutgoingSyncProvider | None = self._provider_model.objects.filter(
252            Q(backchannel_application__isnull=False) | Q(application__isnull=False),
253            pk=provider_pk,
254        ).first()
255        if not provider:
256            task.warning("No provider found. Is it assigned to an application?")
257            return
258        client = provider.client_for_model(instance.__class__)
259        # Check if the object is allowed within the provider's restrictions
260        queryset = provider.get_object_qs(instance.__class__, pk=instance.pk)
261        # The queryset we get from the provider must include the instance we've got given
262        # otherwise ignore this provider
263        if not queryset or not queryset.exists():
264            return
265
266        try:
267            client.write(instance)
268        except TransientSyncException as exc:
269            raise Retry() from exc
270        except SkipObjectException:
271            return
272        except DryRunRejected as exc:
273            self.logger.info("Rejected dry-run event", exc=exc)
274        except StopSync as exc:
275            self.logger.warning("Stopping sync", exc=exc, provider_pk=provider.pk)
276
277    def sync_signal_delete_dispatch(
278        self,
279        task_sync_signal_delete: Actor[[str, int, str], None],
280        model: str,
281        mappings: list[tuple[str, str]],
282    ):
283        model_class: type[Model] = path_to_class(model)
284        for provider_pk, identifier in mappings:
285            provider: OutgoingSyncProvider | None = self._provider_model.objects.filter(
286                pk=provider_pk
287            ).first()
288            if not provider:
289                continue
290            task_sync_signal_delete.send_with_options(
291                args=(model, identifier, provider_pk),
292                rel_obj=provider,
293                uid=f"{provider.name}:{model_class._meta.model_name}:{identifier}:delete",
294            )
295
296    def sync_signal_delete(
297        self,
298        model: str,
299        identifier: str,
300        provider_pk: int,
301    ):
302        task = CurrentTask.get_task()
303        self.logger = get_logger().bind(
304            provider_type=class_to_path(self._provider_model),
305        )
306        model_class: type[Model] = path_to_class(model)
307        provider: OutgoingSyncProvider | None = self._provider_model.objects.filter(
308            Q(backchannel_application__isnull=False) | Q(application__isnull=False),
309            pk=provider_pk,
310        ).first()
311        if not provider:
312            task.warning("No provider found. Is it assigned to an application?")
313            return
314        client = provider.client_for_model(model_class)
315
316        try:
317            client.delete(identifier)
318        except NotFoundSyncException as exc:
319            self.logger.info(
320                "Object not found in remote provider",
321                model_name=model_class._meta.model_name,
322                identifier=identifier,
323                exc=exc,
324                provider_pk=provider.pk,
325            )
326        except TransientSyncException as exc:
327            raise Retry() from exc
328        except DryRunRejected as exc:
329            self.logger.info("Rejected dry-run event", exc=exc)
330
331    def sync_signal_m2m_dispatch(
332        self,
333        task_sync_signal_m2m: Actor[[str, int, str, list[int]], None],
334        instance_pk: str,
335        action: str,
336        pk_set: list[int],
337        reverse: bool,
338    ):
339        for provider in self._provider_model.objects.filter(
340            Q(backchannel_application__isnull=False) | Q(application__isnull=False)
341        ):
342            # reverse: instance is a Group, pk_set is a list of user pks
343            # non-reverse: instance is a User, pk_set is a list of groups
344            if reverse:
345                task_sync_signal_m2m.send_with_options(
346                    args=(instance_pk, provider.pk, action, list(pk_set)),
347                    rel_obj=provider,
348                    uid=f"{provider.name}:group:{instance_pk}:m2m",
349                )
350            else:
351                for pk in pk_set:
352                    task_sync_signal_m2m.send_with_options(
353                        args=(pk, provider.pk, action, [instance_pk]),
354                        rel_obj=provider,
355                        uid=f"{provider.name}:group:{pk}:m2m",
356                    )
357
358    def sync_signal_m2m(
359        self,
360        group_pk: str,
361        provider_pk: int,
362        action: str,
363        pk_set: list[int],
364    ):
365        task = CurrentTask.get_task()
366        self.logger = get_logger().bind(
367            provider_type=class_to_path(self._provider_model),
368        )
369        group = Group.objects.filter(pk=group_pk).first()
370        if not group:
371            return
372        provider: OutgoingSyncProvider = self._provider_model.objects.filter(
373            Q(backchannel_application__isnull=False) | Q(application__isnull=False),
374            pk=provider_pk,
375        ).first()
376        if not provider:
377            task.warning("No provider found. Is it assigned to an application?")
378            return
379
380        # Check if the object is allowed within the provider's restrictions
381        queryset: QuerySet = provider.get_object_qs(Group, pk=group_pk)
382        # The queryset we get from the provider must include the instance we've got given
383        # otherwise ignore this provider
384        if not queryset or not queryset.filter().exists():
385            return
386
387        client = provider.client_for_model(Group)
388        try:
389            operation = None
390            if action == "post_add":
391                operation = Direction.add
392            if action == "post_remove":
393                operation = Direction.remove
394            client.update_group(group, operation, pk_set)
395        except TransientSyncException as exc:
396            raise Retry() from exc
397        except SkipObjectException:
398            return
399        except DryRunRejected as exc:
400            self.logger.info("Rejected dry-run event", exc=exc)
401        except StopSync as exc:
402            self.logger.warning("Stopping sync", exc=exc, provider_pk=provider.pk)
class SyncTasks:
 29class SyncTasks:
 30    """Container for all sync 'tasks' (this class doesn't actually contain
 31    tasks due to dramatiq's magic, however exposes a number of functions to be called from tasks)"""
 32
 33    logger: BoundLogger
 34
 35    def __init__(self, provider_model: type[OutgoingSyncProvider]) -> None:
 36        super().__init__()
 37        self._provider_model = provider_model
 38
 39    def sync_paginator(
 40        self,
 41        current_task: Task,
 42        provider: OutgoingSyncProvider,
 43        sync_objects: Actor[[str, int, int, bool], None],
 44        paginator: Paginator,
 45        object_type: type[User | Group],
 46        **options,
 47    ):
 48        tasks = []
 49        time_limit = timedelta_from_string(provider.sync_page_timeout).total_seconds() * 1000
 50        for page in paginator.page_range:
 51            page_sync = sync_objects.message_with_options(
 52                args=(class_to_path(object_type), page, provider.pk),
 53                time_limit=time_limit,
 54                # Assign tasks to the same schedule as the current one
 55                rel_obj=current_task.rel_obj,
 56                uid=f"{provider.name}:{object_type._meta.model_name}:{page}",
 57                **options,
 58            )
 59            tasks.append(page_sync)
 60        return tasks
 61
 62    def sync(
 63        self,
 64        provider_pk: int,
 65        sync_objects: Actor[[str, int, int, bool], None],
 66    ):
 67        task = CurrentTask.get_task()
 68        self.logger = get_logger().bind(
 69            provider_type=class_to_path(self._provider_model),
 70            provider_pk=provider_pk,
 71        )
 72        provider: OutgoingSyncProvider = self._provider_model.objects.filter(
 73            Q(backchannel_application__isnull=False) | Q(application__isnull=False),
 74            pk=provider_pk,
 75        ).first()
 76        if not provider:
 77            task.warning("No provider found. Is it assigned to an application?")
 78            return
 79        task.info("Starting full provider sync")
 80        self.logger.debug("Starting provider sync")
 81        with provider.sync_lock as lock_acquired:
 82            if not lock_acquired:
 83                task.info("Synchronization is already running. Skipping.")
 84                self.logger.debug("Failed to acquire sync lock, skipping", provider=provider.name)
 85                return
 86            try:
 87                users_tasks = group(
 88                    self.sync_paginator(
 89                        current_task=task,
 90                        provider=provider,
 91                        sync_objects=sync_objects,
 92                        paginator=provider.get_paginator(User),
 93                        object_type=User,
 94                    )
 95                )
 96                group_tasks = group(
 97                    self.sync_paginator(
 98                        current_task=task,
 99                        provider=provider,
100                        sync_objects=sync_objects,
101                        paginator=provider.get_paginator(Group),
102                        object_type=Group,
103                    )
104                )
105                users_tasks.run().wait(timeout=provider.get_object_sync_time_limit_ms(User))
106                group_tasks.run().wait(timeout=provider.get_object_sync_time_limit_ms(Group))
107                self._sync_cleanup(provider, task)
108            except TransientSyncException as exc:
109                self.logger.warning("transient sync exception", exc=exc)
110                task.warning("Sync encountered a transient exception. Retrying", exc=exc)
111                raise Retry() from exc
112            except StopSync as exc:
113                task.error(exc)
114                return
115
116    def _sync_cleanup(self, provider: OutgoingSyncProvider, task: Task):
117        """Delete remote objects that are no longer in scope"""
118        for object_type in (User, Group):
119            try:
120                client = provider.client_for_model(object_type)
121            except TransientSyncException:
122                continue
123            in_scope_pks = set(provider.get_object_qs(object_type).values_list("pk", flat=True))
124            stale = client.connection_type.objects.filter(provider=provider).exclude(
125                **{f"{client.connection_type_query}__pk__in": in_scope_pks}
126            )
127            for connection in stale:
128                try:
129                    client.delete(connection.scim_id)
130                    task.info(
131                        f"Deleted out-of-scope {object_type._meta.verbose_name}",
132                        scim_id=connection.scim_id,
133                    )
134                except NotFoundSyncException:
135                    pass
136                except TransientSyncException as exc:
137                    self.logger.warning("transient error during cleanup", exc=exc)
138                    self.logger.warning(
139                        "Cleanup encountered a transient exception. Retrying", exc=exc
140                    )
141                    raise Retry() from exc
142                except DryRunRejected as exc:
143                    self.logger.info("Rejected dry-run cleanup event", exc=exc)
144
145    def sync_objects(
146        self,
147        object_type: str,
148        page: int,
149        provider_pk: int,
150        override_dry_run=False,
151        **filter,
152    ):
153        task = CurrentTask.get_task()
154        _object_type: type[Model] = path_to_class(object_type)
155        self.logger = get_logger().bind(
156            provider_type=class_to_path(self._provider_model),
157            provider_pk=provider_pk,
158            object_type=object_type,
159        )
160        provider: OutgoingSyncProvider | None = self._provider_model.objects.filter(
161            Q(backchannel_application__isnull=False) | Q(application__isnull=False),
162            pk=provider_pk,
163        ).first()
164        if not provider:
165            task.warning("No provider found. Is it assigned to an application?")
166            return
167        # Override dry run mode if requested, however don't save the provider
168        # so that scheduled sync tasks still run in dry_run mode
169        if override_dry_run:
170            provider.dry_run = False
171        try:
172            client = provider.client_for_model(_object_type)
173        except TransientSyncException:
174            return
175        paginator = Paginator(
176            provider.get_object_qs(_object_type, **filter),
177            provider.sync_page_size,
178        )
179        if client.can_discover:
180            self.logger.debug("starting discover")
181            client.discover()
182        self.logger.debug("starting sync for page", page=page)
183        task.info(f"Syncing page {page} or {_object_type._meta.verbose_name_plural}")
184        for obj in paginator.page(page).object_list:
185            obj: Model
186            try:
187                client.write(obj)
188            except SkipObjectException:
189                self.logger.debug("skipping object due to SkipObject", obj=obj)
190                continue
191            except DryRunRejected as exc:
192                task.info(
193                    "Dropping mutating request due to dry run",
194                    obj=sanitize_item(obj),
195                    method=exc.method,
196                    url=exc.url,
197                    body=exc.body,
198                )
199            except BadRequestSyncException as exc:
200                self.logger.warning("failed to sync object", exc=exc, obj=obj)
201                task.warning(
202                    f"Failed to sync {str(obj)} due to error: {str(exc)}",
203                    arguments=exc.args[1:],
204                    obj=sanitize_item(obj),
205                    exception=exception_to_dict(exc),
206                )
207            except TransientSyncException as exc:
208                self.logger.warning("failed to sync object", exc=exc, user=obj)
209                task.warning(
210                    f"Failed to sync {str(obj)} due to transient error: {str(exc)}",
211                    obj=sanitize_item(obj),
212                    exception=exception_to_dict(exc),
213                )
214            except StopSync as exc:
215                self.logger.warning("Stopping sync", exc=exc)
216                task.warning(
217                    f"Stopping sync due to error: {exc.detail()}",
218                    obj=sanitize_item(obj),
219                )
220                break
221
222    def sync_signal_direct_dispatch(
223        self,
224        task_sync_signal_direct: Actor[[str, str | int, int], None],
225        model: str,
226        pk: str | int,
227    ):
228        model_class: type[Model] = path_to_class(model)
229        for provider in self._provider_model.objects.filter(
230            Q(backchannel_application__isnull=False) | Q(application__isnull=False)
231        ):
232            task_sync_signal_direct.send_with_options(
233                args=(model, pk, provider.pk),
234                rel_obj=provider,
235                uid=f"{provider.name}:{model_class._meta.model_name}:{pk}:direct",
236            )
237
238    def sync_signal_direct(
239        self,
240        model: str,
241        pk: str | int,
242        provider_pk: int,
243    ):
244        task = CurrentTask.get_task()
245        self.logger = get_logger().bind(
246            provider_type=class_to_path(self._provider_model),
247        )
248        model_class: type[Model] = path_to_class(model)
249        instance = model_class.objects.filter(pk=pk).first()
250        if not instance:
251            return
252        provider: OutgoingSyncProvider | None = self._provider_model.objects.filter(
253            Q(backchannel_application__isnull=False) | Q(application__isnull=False),
254            pk=provider_pk,
255        ).first()
256        if not provider:
257            task.warning("No provider found. Is it assigned to an application?")
258            return
259        client = provider.client_for_model(instance.__class__)
260        # Check if the object is allowed within the provider's restrictions
261        queryset = provider.get_object_qs(instance.__class__, pk=instance.pk)
262        # The queryset we get from the provider must include the instance we've got given
263        # otherwise ignore this provider
264        if not queryset or not queryset.exists():
265            return
266
267        try:
268            client.write(instance)
269        except TransientSyncException as exc:
270            raise Retry() from exc
271        except SkipObjectException:
272            return
273        except DryRunRejected as exc:
274            self.logger.info("Rejected dry-run event", exc=exc)
275        except StopSync as exc:
276            self.logger.warning("Stopping sync", exc=exc, provider_pk=provider.pk)
277
278    def sync_signal_delete_dispatch(
279        self,
280        task_sync_signal_delete: Actor[[str, int, str], None],
281        model: str,
282        mappings: list[tuple[str, str]],
283    ):
284        model_class: type[Model] = path_to_class(model)
285        for provider_pk, identifier in mappings:
286            provider: OutgoingSyncProvider | None = self._provider_model.objects.filter(
287                pk=provider_pk
288            ).first()
289            if not provider:
290                continue
291            task_sync_signal_delete.send_with_options(
292                args=(model, identifier, provider_pk),
293                rel_obj=provider,
294                uid=f"{provider.name}:{model_class._meta.model_name}:{identifier}:delete",
295            )
296
297    def sync_signal_delete(
298        self,
299        model: str,
300        identifier: str,
301        provider_pk: int,
302    ):
303        task = CurrentTask.get_task()
304        self.logger = get_logger().bind(
305            provider_type=class_to_path(self._provider_model),
306        )
307        model_class: type[Model] = path_to_class(model)
308        provider: OutgoingSyncProvider | None = self._provider_model.objects.filter(
309            Q(backchannel_application__isnull=False) | Q(application__isnull=False),
310            pk=provider_pk,
311        ).first()
312        if not provider:
313            task.warning("No provider found. Is it assigned to an application?")
314            return
315        client = provider.client_for_model(model_class)
316
317        try:
318            client.delete(identifier)
319        except NotFoundSyncException as exc:
320            self.logger.info(
321                "Object not found in remote provider",
322                model_name=model_class._meta.model_name,
323                identifier=identifier,
324                exc=exc,
325                provider_pk=provider.pk,
326            )
327        except TransientSyncException as exc:
328            raise Retry() from exc
329        except DryRunRejected as exc:
330            self.logger.info("Rejected dry-run event", exc=exc)
331
332    def sync_signal_m2m_dispatch(
333        self,
334        task_sync_signal_m2m: Actor[[str, int, str, list[int]], None],
335        instance_pk: str,
336        action: str,
337        pk_set: list[int],
338        reverse: bool,
339    ):
340        for provider in self._provider_model.objects.filter(
341            Q(backchannel_application__isnull=False) | Q(application__isnull=False)
342        ):
343            # reverse: instance is a Group, pk_set is a list of user pks
344            # non-reverse: instance is a User, pk_set is a list of groups
345            if reverse:
346                task_sync_signal_m2m.send_with_options(
347                    args=(instance_pk, provider.pk, action, list(pk_set)),
348                    rel_obj=provider,
349                    uid=f"{provider.name}:group:{instance_pk}:m2m",
350                )
351            else:
352                for pk in pk_set:
353                    task_sync_signal_m2m.send_with_options(
354                        args=(pk, provider.pk, action, [instance_pk]),
355                        rel_obj=provider,
356                        uid=f"{provider.name}:group:{pk}:m2m",
357                    )
358
359    def sync_signal_m2m(
360        self,
361        group_pk: str,
362        provider_pk: int,
363        action: str,
364        pk_set: list[int],
365    ):
366        task = CurrentTask.get_task()
367        self.logger = get_logger().bind(
368            provider_type=class_to_path(self._provider_model),
369        )
370        group = Group.objects.filter(pk=group_pk).first()
371        if not group:
372            return
373        provider: OutgoingSyncProvider = self._provider_model.objects.filter(
374            Q(backchannel_application__isnull=False) | Q(application__isnull=False),
375            pk=provider_pk,
376        ).first()
377        if not provider:
378            task.warning("No provider found. Is it assigned to an application?")
379            return
380
381        # Check if the object is allowed within the provider's restrictions
382        queryset: QuerySet = provider.get_object_qs(Group, pk=group_pk)
383        # The queryset we get from the provider must include the instance we've got given
384        # otherwise ignore this provider
385        if not queryset or not queryset.filter().exists():
386            return
387
388        client = provider.client_for_model(Group)
389        try:
390            operation = None
391            if action == "post_add":
392                operation = Direction.add
393            if action == "post_remove":
394                operation = Direction.remove
395            client.update_group(group, operation, pk_set)
396        except TransientSyncException as exc:
397            raise Retry() from exc
398        except SkipObjectException:
399            return
400        except DryRunRejected as exc:
401            self.logger.info("Rejected dry-run event", exc=exc)
402        except StopSync as exc:
403            self.logger.warning("Stopping sync", exc=exc, provider_pk=provider.pk)

Container for all sync 'tasks' (this class doesn't actually contain tasks due to dramatiq's magic, however exposes a number of functions to be called from tasks)

SyncTasks( provider_model: type[authentik.lib.sync.outgoing.models.OutgoingSyncProvider])
35    def __init__(self, provider_model: type[OutgoingSyncProvider]) -> None:
36        super().__init__()
37        self._provider_model = provider_model
logger: structlog.stdlib.BoundLogger
def sync_paginator( self, current_task: authentik.tasks.models.Task, provider: authentik.lib.sync.outgoing.models.OutgoingSyncProvider, sync_objects: dramatiq.actor.Actor[[<class 'str'>, <class 'int'>, <class 'int'>, <class 'bool'>], NoneType], paginator: django.core.paginator.Paginator, object_type: type[authentik.core.models.User | authentik.core.models.Group], **options):
39    def sync_paginator(
40        self,
41        current_task: Task,
42        provider: OutgoingSyncProvider,
43        sync_objects: Actor[[str, int, int, bool], None],
44        paginator: Paginator,
45        object_type: type[User | Group],
46        **options,
47    ):
48        tasks = []
49        time_limit = timedelta_from_string(provider.sync_page_timeout).total_seconds() * 1000
50        for page in paginator.page_range:
51            page_sync = sync_objects.message_with_options(
52                args=(class_to_path(object_type), page, provider.pk),
53                time_limit=time_limit,
54                # Assign tasks to the same schedule as the current one
55                rel_obj=current_task.rel_obj,
56                uid=f"{provider.name}:{object_type._meta.model_name}:{page}",
57                **options,
58            )
59            tasks.append(page_sync)
60        return tasks
def sync( self, provider_pk: int, sync_objects: dramatiq.actor.Actor[[<class 'str'>, <class 'int'>, <class 'int'>, <class 'bool'>], NoneType]):
 62    def sync(
 63        self,
 64        provider_pk: int,
 65        sync_objects: Actor[[str, int, int, bool], None],
 66    ):
 67        task = CurrentTask.get_task()
 68        self.logger = get_logger().bind(
 69            provider_type=class_to_path(self._provider_model),
 70            provider_pk=provider_pk,
 71        )
 72        provider: OutgoingSyncProvider = self._provider_model.objects.filter(
 73            Q(backchannel_application__isnull=False) | Q(application__isnull=False),
 74            pk=provider_pk,
 75        ).first()
 76        if not provider:
 77            task.warning("No provider found. Is it assigned to an application?")
 78            return
 79        task.info("Starting full provider sync")
 80        self.logger.debug("Starting provider sync")
 81        with provider.sync_lock as lock_acquired:
 82            if not lock_acquired:
 83                task.info("Synchronization is already running. Skipping.")
 84                self.logger.debug("Failed to acquire sync lock, skipping", provider=provider.name)
 85                return
 86            try:
 87                users_tasks = group(
 88                    self.sync_paginator(
 89                        current_task=task,
 90                        provider=provider,
 91                        sync_objects=sync_objects,
 92                        paginator=provider.get_paginator(User),
 93                        object_type=User,
 94                    )
 95                )
 96                group_tasks = group(
 97                    self.sync_paginator(
 98                        current_task=task,
 99                        provider=provider,
100                        sync_objects=sync_objects,
101                        paginator=provider.get_paginator(Group),
102                        object_type=Group,
103                    )
104                )
105                users_tasks.run().wait(timeout=provider.get_object_sync_time_limit_ms(User))
106                group_tasks.run().wait(timeout=provider.get_object_sync_time_limit_ms(Group))
107                self._sync_cleanup(provider, task)
108            except TransientSyncException as exc:
109                self.logger.warning("transient sync exception", exc=exc)
110                task.warning("Sync encountered a transient exception. Retrying", exc=exc)
111                raise Retry() from exc
112            except StopSync as exc:
113                task.error(exc)
114                return
def sync_objects( self, object_type: str, page: int, provider_pk: int, override_dry_run=False, **filter):
145    def sync_objects(
146        self,
147        object_type: str,
148        page: int,
149        provider_pk: int,
150        override_dry_run=False,
151        **filter,
152    ):
153        task = CurrentTask.get_task()
154        _object_type: type[Model] = path_to_class(object_type)
155        self.logger = get_logger().bind(
156            provider_type=class_to_path(self._provider_model),
157            provider_pk=provider_pk,
158            object_type=object_type,
159        )
160        provider: OutgoingSyncProvider | None = self._provider_model.objects.filter(
161            Q(backchannel_application__isnull=False) | Q(application__isnull=False),
162            pk=provider_pk,
163        ).first()
164        if not provider:
165            task.warning("No provider found. Is it assigned to an application?")
166            return
167        # Override dry run mode if requested, however don't save the provider
168        # so that scheduled sync tasks still run in dry_run mode
169        if override_dry_run:
170            provider.dry_run = False
171        try:
172            client = provider.client_for_model(_object_type)
173        except TransientSyncException:
174            return
175        paginator = Paginator(
176            provider.get_object_qs(_object_type, **filter),
177            provider.sync_page_size,
178        )
179        if client.can_discover:
180            self.logger.debug("starting discover")
181            client.discover()
182        self.logger.debug("starting sync for page", page=page)
183        task.info(f"Syncing page {page} or {_object_type._meta.verbose_name_plural}")
184        for obj in paginator.page(page).object_list:
185            obj: Model
186            try:
187                client.write(obj)
188            except SkipObjectException:
189                self.logger.debug("skipping object due to SkipObject", obj=obj)
190                continue
191            except DryRunRejected as exc:
192                task.info(
193                    "Dropping mutating request due to dry run",
194                    obj=sanitize_item(obj),
195                    method=exc.method,
196                    url=exc.url,
197                    body=exc.body,
198                )
199            except BadRequestSyncException as exc:
200                self.logger.warning("failed to sync object", exc=exc, obj=obj)
201                task.warning(
202                    f"Failed to sync {str(obj)} due to error: {str(exc)}",
203                    arguments=exc.args[1:],
204                    obj=sanitize_item(obj),
205                    exception=exception_to_dict(exc),
206                )
207            except TransientSyncException as exc:
208                self.logger.warning("failed to sync object", exc=exc, user=obj)
209                task.warning(
210                    f"Failed to sync {str(obj)} due to transient error: {str(exc)}",
211                    obj=sanitize_item(obj),
212                    exception=exception_to_dict(exc),
213                )
214            except StopSync as exc:
215                self.logger.warning("Stopping sync", exc=exc)
216                task.warning(
217                    f"Stopping sync due to error: {exc.detail()}",
218                    obj=sanitize_item(obj),
219                )
220                break
def sync_signal_direct_dispatch( self, task_sync_signal_direct: dramatiq.actor.Actor[[<class 'str'>, str | int, <class 'int'>], NoneType], model: str, pk: str | int):
222    def sync_signal_direct_dispatch(
223        self,
224        task_sync_signal_direct: Actor[[str, str | int, int], None],
225        model: str,
226        pk: str | int,
227    ):
228        model_class: type[Model] = path_to_class(model)
229        for provider in self._provider_model.objects.filter(
230            Q(backchannel_application__isnull=False) | Q(application__isnull=False)
231        ):
232            task_sync_signal_direct.send_with_options(
233                args=(model, pk, provider.pk),
234                rel_obj=provider,
235                uid=f"{provider.name}:{model_class._meta.model_name}:{pk}:direct",
236            )
def sync_signal_direct(self, model: str, pk: str | int, provider_pk: int):
238    def sync_signal_direct(
239        self,
240        model: str,
241        pk: str | int,
242        provider_pk: int,
243    ):
244        task = CurrentTask.get_task()
245        self.logger = get_logger().bind(
246            provider_type=class_to_path(self._provider_model),
247        )
248        model_class: type[Model] = path_to_class(model)
249        instance = model_class.objects.filter(pk=pk).first()
250        if not instance:
251            return
252        provider: OutgoingSyncProvider | None = self._provider_model.objects.filter(
253            Q(backchannel_application__isnull=False) | Q(application__isnull=False),
254            pk=provider_pk,
255        ).first()
256        if not provider:
257            task.warning("No provider found. Is it assigned to an application?")
258            return
259        client = provider.client_for_model(instance.__class__)
260        # Check if the object is allowed within the provider's restrictions
261        queryset = provider.get_object_qs(instance.__class__, pk=instance.pk)
262        # The queryset we get from the provider must include the instance we've got given
263        # otherwise ignore this provider
264        if not queryset or not queryset.exists():
265            return
266
267        try:
268            client.write(instance)
269        except TransientSyncException as exc:
270            raise Retry() from exc
271        except SkipObjectException:
272            return
273        except DryRunRejected as exc:
274            self.logger.info("Rejected dry-run event", exc=exc)
275        except StopSync as exc:
276            self.logger.warning("Stopping sync", exc=exc, provider_pk=provider.pk)
def sync_signal_delete_dispatch( self, task_sync_signal_delete: dramatiq.actor.Actor[[<class 'str'>, <class 'int'>, <class 'str'>], NoneType], model: str, mappings: list[tuple[str, str]]):
278    def sync_signal_delete_dispatch(
279        self,
280        task_sync_signal_delete: Actor[[str, int, str], None],
281        model: str,
282        mappings: list[tuple[str, str]],
283    ):
284        model_class: type[Model] = path_to_class(model)
285        for provider_pk, identifier in mappings:
286            provider: OutgoingSyncProvider | None = self._provider_model.objects.filter(
287                pk=provider_pk
288            ).first()
289            if not provider:
290                continue
291            task_sync_signal_delete.send_with_options(
292                args=(model, identifier, provider_pk),
293                rel_obj=provider,
294                uid=f"{provider.name}:{model_class._meta.model_name}:{identifier}:delete",
295            )
def sync_signal_delete(self, model: str, identifier: str, provider_pk: int):
297    def sync_signal_delete(
298        self,
299        model: str,
300        identifier: str,
301        provider_pk: int,
302    ):
303        task = CurrentTask.get_task()
304        self.logger = get_logger().bind(
305            provider_type=class_to_path(self._provider_model),
306        )
307        model_class: type[Model] = path_to_class(model)
308        provider: OutgoingSyncProvider | None = self._provider_model.objects.filter(
309            Q(backchannel_application__isnull=False) | Q(application__isnull=False),
310            pk=provider_pk,
311        ).first()
312        if not provider:
313            task.warning("No provider found. Is it assigned to an application?")
314            return
315        client = provider.client_for_model(model_class)
316
317        try:
318            client.delete(identifier)
319        except NotFoundSyncException as exc:
320            self.logger.info(
321                "Object not found in remote provider",
322                model_name=model_class._meta.model_name,
323                identifier=identifier,
324                exc=exc,
325                provider_pk=provider.pk,
326            )
327        except TransientSyncException as exc:
328            raise Retry() from exc
329        except DryRunRejected as exc:
330            self.logger.info("Rejected dry-run event", exc=exc)
def sync_signal_m2m_dispatch( self, task_sync_signal_m2m: dramatiq.actor.Actor[[<class 'str'>, <class 'int'>, <class 'str'>, list[int]], NoneType], instance_pk: str, action: str, pk_set: list[int], reverse: bool):
332    def sync_signal_m2m_dispatch(
333        self,
334        task_sync_signal_m2m: Actor[[str, int, str, list[int]], None],
335        instance_pk: str,
336        action: str,
337        pk_set: list[int],
338        reverse: bool,
339    ):
340        for provider in self._provider_model.objects.filter(
341            Q(backchannel_application__isnull=False) | Q(application__isnull=False)
342        ):
343            # reverse: instance is a Group, pk_set is a list of user pks
344            # non-reverse: instance is a User, pk_set is a list of groups
345            if reverse:
346                task_sync_signal_m2m.send_with_options(
347                    args=(instance_pk, provider.pk, action, list(pk_set)),
348                    rel_obj=provider,
349                    uid=f"{provider.name}:group:{instance_pk}:m2m",
350                )
351            else:
352                for pk in pk_set:
353                    task_sync_signal_m2m.send_with_options(
354                        args=(pk, provider.pk, action, [instance_pk]),
355                        rel_obj=provider,
356                        uid=f"{provider.name}:group:{pk}:m2m",
357                    )
def sync_signal_m2m( self, group_pk: str, provider_pk: int, action: str, pk_set: list[int]):
359    def sync_signal_m2m(
360        self,
361        group_pk: str,
362        provider_pk: int,
363        action: str,
364        pk_set: list[int],
365    ):
366        task = CurrentTask.get_task()
367        self.logger = get_logger().bind(
368            provider_type=class_to_path(self._provider_model),
369        )
370        group = Group.objects.filter(pk=group_pk).first()
371        if not group:
372            return
373        provider: OutgoingSyncProvider = self._provider_model.objects.filter(
374            Q(backchannel_application__isnull=False) | Q(application__isnull=False),
375            pk=provider_pk,
376        ).first()
377        if not provider:
378            task.warning("No provider found. Is it assigned to an application?")
379            return
380
381        # Check if the object is allowed within the provider's restrictions
382        queryset: QuerySet = provider.get_object_qs(Group, pk=group_pk)
383        # The queryset we get from the provider must include the instance we've got given
384        # otherwise ignore this provider
385        if not queryset or not queryset.filter().exists():
386            return
387
388        client = provider.client_for_model(Group)
389        try:
390            operation = None
391            if action == "post_add":
392                operation = Direction.add
393            if action == "post_remove":
394                operation = Direction.remove
395            client.update_group(group, operation, pk_set)
396        except TransientSyncException as exc:
397            raise Retry() from exc
398        except SkipObjectException:
399            return
400        except DryRunRejected as exc:
401            self.logger.info("Rejected dry-run event", exc=exc)
402        except StopSync as exc:
403            self.logger.warning("Stopping sync", exc=exc, provider_pk=provider.pk)