authentik.lib.sync.outgoing.tasks
1from django.core.paginator import Paginator 2from django.db.models import Model, QuerySet 3from django.db.models.query import Q 4from dramatiq.actor import Actor 5from dramatiq.composition import group 6from dramatiq.errors import Retry 7from structlog.stdlib import BoundLogger, get_logger 8 9from authentik.core.expression.exceptions import SkipObjectException 10from authentik.core.models import Group, User 11from authentik.events.utils import sanitize_item 12from authentik.lib.sync.outgoing.base import Direction 13from authentik.lib.sync.outgoing.exceptions import ( 14 BadRequestSyncException, 15 DryRunRejected, 16 NotFoundSyncException, 17 StopSync, 18 TransientSyncException, 19) 20from authentik.lib.sync.outgoing.models import OutgoingSyncProvider 21from authentik.lib.utils.errors import exception_to_dict 22from authentik.lib.utils.reflection import class_to_path, path_to_class 23from authentik.lib.utils.time import timedelta_from_string 24from authentik.tasks.middleware import CurrentTask 25from authentik.tasks.models import Task 26 27 28class SyncTasks: 29 """Container for all sync 'tasks' (this class doesn't actually contain 30 tasks due to dramatiq's magic, however exposes a number of functions to be called from tasks)""" 31 32 logger: BoundLogger 33 34 def __init__(self, provider_model: type[OutgoingSyncProvider]) -> None: 35 super().__init__() 36 self._provider_model = provider_model 37 38 def sync_paginator( 39 self, 40 current_task: Task, 41 provider: OutgoingSyncProvider, 42 sync_objects: Actor[[str, int, int, bool], None], 43 paginator: Paginator, 44 object_type: type[User | Group], 45 **options, 46 ): 47 tasks = [] 48 time_limit = timedelta_from_string(provider.sync_page_timeout).total_seconds() * 1000 49 for page in paginator.page_range: 50 page_sync = sync_objects.message_with_options( 51 args=(class_to_path(object_type), page, provider.pk), 52 time_limit=time_limit, 53 # Assign tasks to the same schedule as the current one 54 rel_obj=current_task.rel_obj, 55 uid=f"{provider.name}:{object_type._meta.model_name}:{page}", 56 **options, 57 ) 58 tasks.append(page_sync) 59 return tasks 60 61 def sync( 62 self, 63 provider_pk: int, 64 sync_objects: Actor[[str, int, int, bool], None], 65 ): 66 task = CurrentTask.get_task() 67 self.logger = get_logger().bind( 68 provider_type=class_to_path(self._provider_model), 69 provider_pk=provider_pk, 70 ) 71 provider: OutgoingSyncProvider = self._provider_model.objects.filter( 72 Q(backchannel_application__isnull=False) | Q(application__isnull=False), 73 pk=provider_pk, 74 ).first() 75 if not provider: 76 task.warning("No provider found. Is it assigned to an application?") 77 return 78 task.info("Starting full provider sync") 79 self.logger.debug("Starting provider sync") 80 with provider.sync_lock as lock_acquired: 81 if not lock_acquired: 82 task.info("Synchronization is already running. Skipping.") 83 self.logger.debug("Failed to acquire sync lock, skipping", provider=provider.name) 84 return 85 try: 86 users_tasks = group( 87 self.sync_paginator( 88 current_task=task, 89 provider=provider, 90 sync_objects=sync_objects, 91 paginator=provider.get_paginator(User), 92 object_type=User, 93 ) 94 ) 95 group_tasks = group( 96 self.sync_paginator( 97 current_task=task, 98 provider=provider, 99 sync_objects=sync_objects, 100 paginator=provider.get_paginator(Group), 101 object_type=Group, 102 ) 103 ) 104 users_tasks.run().wait(timeout=provider.get_object_sync_time_limit_ms(User)) 105 group_tasks.run().wait(timeout=provider.get_object_sync_time_limit_ms(Group)) 106 self._sync_cleanup(provider, task) 107 except TransientSyncException as exc: 108 self.logger.warning("transient sync exception", exc=exc) 109 task.warning("Sync encountered a transient exception. Retrying", exc=exc) 110 raise Retry() from exc 111 except StopSync as exc: 112 task.error(exc) 113 return 114 115 def _sync_cleanup(self, provider: OutgoingSyncProvider, task: Task): 116 """Delete remote objects that are no longer in scope""" 117 for object_type in (User, Group): 118 try: 119 client = provider.client_for_model(object_type) 120 except TransientSyncException: 121 continue 122 in_scope_pks = set(provider.get_object_qs(object_type).values_list("pk", flat=True)) 123 stale = client.connection_type.objects.filter(provider=provider).exclude( 124 **{f"{client.connection_type_query}__pk__in": in_scope_pks} 125 ) 126 for connection in stale: 127 try: 128 client.delete(connection.scim_id) 129 task.info( 130 f"Deleted out-of-scope {object_type._meta.verbose_name}", 131 scim_id=connection.scim_id, 132 ) 133 except NotFoundSyncException: 134 pass 135 except TransientSyncException as exc: 136 self.logger.warning("transient error during cleanup", exc=exc) 137 self.logger.warning( 138 "Cleanup encountered a transient exception. Retrying", exc=exc 139 ) 140 raise Retry() from exc 141 except DryRunRejected as exc: 142 self.logger.info("Rejected dry-run cleanup event", exc=exc) 143 144 def sync_objects( 145 self, 146 object_type: str, 147 page: int, 148 provider_pk: int, 149 override_dry_run=False, 150 **filter, 151 ): 152 task = CurrentTask.get_task() 153 _object_type: type[Model] = path_to_class(object_type) 154 self.logger = get_logger().bind( 155 provider_type=class_to_path(self._provider_model), 156 provider_pk=provider_pk, 157 object_type=object_type, 158 ) 159 provider: OutgoingSyncProvider | None = self._provider_model.objects.filter( 160 Q(backchannel_application__isnull=False) | Q(application__isnull=False), 161 pk=provider_pk, 162 ).first() 163 if not provider: 164 task.warning("No provider found. Is it assigned to an application?") 165 return 166 # Override dry run mode if requested, however don't save the provider 167 # so that scheduled sync tasks still run in dry_run mode 168 if override_dry_run: 169 provider.dry_run = False 170 try: 171 client = provider.client_for_model(_object_type) 172 except TransientSyncException: 173 return 174 paginator = Paginator( 175 provider.get_object_qs(_object_type, **filter), 176 provider.sync_page_size, 177 ) 178 if client.can_discover: 179 self.logger.debug("starting discover") 180 client.discover() 181 self.logger.debug("starting sync for page", page=page) 182 task.info(f"Syncing page {page} or {_object_type._meta.verbose_name_plural}") 183 for obj in paginator.page(page).object_list: 184 obj: Model 185 try: 186 client.write(obj) 187 except SkipObjectException: 188 self.logger.debug("skipping object due to SkipObject", obj=obj) 189 continue 190 except DryRunRejected as exc: 191 task.info( 192 "Dropping mutating request due to dry run", 193 obj=sanitize_item(obj), 194 method=exc.method, 195 url=exc.url, 196 body=exc.body, 197 ) 198 except BadRequestSyncException as exc: 199 self.logger.warning("failed to sync object", exc=exc, obj=obj) 200 task.warning( 201 f"Failed to sync {str(obj)} due to error: {str(exc)}", 202 arguments=exc.args[1:], 203 obj=sanitize_item(obj), 204 exception=exception_to_dict(exc), 205 ) 206 except TransientSyncException as exc: 207 self.logger.warning("failed to sync object", exc=exc, user=obj) 208 task.warning( 209 f"Failed to sync {str(obj)} due to transient error: {str(exc)}", 210 obj=sanitize_item(obj), 211 exception=exception_to_dict(exc), 212 ) 213 except StopSync as exc: 214 self.logger.warning("Stopping sync", exc=exc) 215 task.warning( 216 f"Stopping sync due to error: {exc.detail()}", 217 obj=sanitize_item(obj), 218 ) 219 break 220 221 def sync_signal_direct_dispatch( 222 self, 223 task_sync_signal_direct: Actor[[str, str | int, int], None], 224 model: str, 225 pk: str | int, 226 ): 227 model_class: type[Model] = path_to_class(model) 228 for provider in self._provider_model.objects.filter( 229 Q(backchannel_application__isnull=False) | Q(application__isnull=False) 230 ): 231 task_sync_signal_direct.send_with_options( 232 args=(model, pk, provider.pk), 233 rel_obj=provider, 234 uid=f"{provider.name}:{model_class._meta.model_name}:{pk}:direct", 235 ) 236 237 def sync_signal_direct( 238 self, 239 model: str, 240 pk: str | int, 241 provider_pk: int, 242 ): 243 task = CurrentTask.get_task() 244 self.logger = get_logger().bind( 245 provider_type=class_to_path(self._provider_model), 246 ) 247 model_class: type[Model] = path_to_class(model) 248 instance = model_class.objects.filter(pk=pk).first() 249 if not instance: 250 return 251 provider: OutgoingSyncProvider | None = self._provider_model.objects.filter( 252 Q(backchannel_application__isnull=False) | Q(application__isnull=False), 253 pk=provider_pk, 254 ).first() 255 if not provider: 256 task.warning("No provider found. Is it assigned to an application?") 257 return 258 client = provider.client_for_model(instance.__class__) 259 # Check if the object is allowed within the provider's restrictions 260 queryset = provider.get_object_qs(instance.__class__, pk=instance.pk) 261 # The queryset we get from the provider must include the instance we've got given 262 # otherwise ignore this provider 263 if not queryset or not queryset.exists(): 264 return 265 266 try: 267 client.write(instance) 268 except TransientSyncException as exc: 269 raise Retry() from exc 270 except SkipObjectException: 271 return 272 except DryRunRejected as exc: 273 self.logger.info("Rejected dry-run event", exc=exc) 274 except StopSync as exc: 275 self.logger.warning("Stopping sync", exc=exc, provider_pk=provider.pk) 276 277 def sync_signal_delete_dispatch( 278 self, 279 task_sync_signal_delete: Actor[[str, int, str], None], 280 model: str, 281 mappings: list[tuple[str, str]], 282 ): 283 model_class: type[Model] = path_to_class(model) 284 for provider_pk, identifier in mappings: 285 provider: OutgoingSyncProvider | None = self._provider_model.objects.filter( 286 pk=provider_pk 287 ).first() 288 if not provider: 289 continue 290 task_sync_signal_delete.send_with_options( 291 args=(model, identifier, provider_pk), 292 rel_obj=provider, 293 uid=f"{provider.name}:{model_class._meta.model_name}:{identifier}:delete", 294 ) 295 296 def sync_signal_delete( 297 self, 298 model: str, 299 identifier: str, 300 provider_pk: int, 301 ): 302 task = CurrentTask.get_task() 303 self.logger = get_logger().bind( 304 provider_type=class_to_path(self._provider_model), 305 ) 306 model_class: type[Model] = path_to_class(model) 307 provider: OutgoingSyncProvider | None = self._provider_model.objects.filter( 308 Q(backchannel_application__isnull=False) | Q(application__isnull=False), 309 pk=provider_pk, 310 ).first() 311 if not provider: 312 task.warning("No provider found. Is it assigned to an application?") 313 return 314 client = provider.client_for_model(model_class) 315 316 try: 317 client.delete(identifier) 318 except NotFoundSyncException as exc: 319 self.logger.info( 320 "Object not found in remote provider", 321 model_name=model_class._meta.model_name, 322 identifier=identifier, 323 exc=exc, 324 provider_pk=provider.pk, 325 ) 326 except TransientSyncException as exc: 327 raise Retry() from exc 328 except DryRunRejected as exc: 329 self.logger.info("Rejected dry-run event", exc=exc) 330 331 def sync_signal_m2m_dispatch( 332 self, 333 task_sync_signal_m2m: Actor[[str, int, str, list[int]], None], 334 instance_pk: str, 335 action: str, 336 pk_set: list[int], 337 reverse: bool, 338 ): 339 for provider in self._provider_model.objects.filter( 340 Q(backchannel_application__isnull=False) | Q(application__isnull=False) 341 ): 342 # reverse: instance is a Group, pk_set is a list of user pks 343 # non-reverse: instance is a User, pk_set is a list of groups 344 if reverse: 345 task_sync_signal_m2m.send_with_options( 346 args=(instance_pk, provider.pk, action, list(pk_set)), 347 rel_obj=provider, 348 uid=f"{provider.name}:group:{instance_pk}:m2m", 349 ) 350 else: 351 for pk in pk_set: 352 task_sync_signal_m2m.send_with_options( 353 args=(pk, provider.pk, action, [instance_pk]), 354 rel_obj=provider, 355 uid=f"{provider.name}:group:{pk}:m2m", 356 ) 357 358 def sync_signal_m2m( 359 self, 360 group_pk: str, 361 provider_pk: int, 362 action: str, 363 pk_set: list[int], 364 ): 365 task = CurrentTask.get_task() 366 self.logger = get_logger().bind( 367 provider_type=class_to_path(self._provider_model), 368 ) 369 group = Group.objects.filter(pk=group_pk).first() 370 if not group: 371 return 372 provider: OutgoingSyncProvider = self._provider_model.objects.filter( 373 Q(backchannel_application__isnull=False) | Q(application__isnull=False), 374 pk=provider_pk, 375 ).first() 376 if not provider: 377 task.warning("No provider found. Is it assigned to an application?") 378 return 379 380 # Check if the object is allowed within the provider's restrictions 381 queryset: QuerySet = provider.get_object_qs(Group, pk=group_pk) 382 # The queryset we get from the provider must include the instance we've got given 383 # otherwise ignore this provider 384 if not queryset or not queryset.filter().exists(): 385 return 386 387 client = provider.client_for_model(Group) 388 try: 389 operation = None 390 if action == "post_add": 391 operation = Direction.add 392 if action == "post_remove": 393 operation = Direction.remove 394 client.update_group(group, operation, pk_set) 395 except TransientSyncException as exc: 396 raise Retry() from exc 397 except SkipObjectException: 398 return 399 except DryRunRejected as exc: 400 self.logger.info("Rejected dry-run event", exc=exc) 401 except StopSync as exc: 402 self.logger.warning("Stopping sync", exc=exc, provider_pk=provider.pk)
class
SyncTasks:
29class SyncTasks: 30 """Container for all sync 'tasks' (this class doesn't actually contain 31 tasks due to dramatiq's magic, however exposes a number of functions to be called from tasks)""" 32 33 logger: BoundLogger 34 35 def __init__(self, provider_model: type[OutgoingSyncProvider]) -> None: 36 super().__init__() 37 self._provider_model = provider_model 38 39 def sync_paginator( 40 self, 41 current_task: Task, 42 provider: OutgoingSyncProvider, 43 sync_objects: Actor[[str, int, int, bool], None], 44 paginator: Paginator, 45 object_type: type[User | Group], 46 **options, 47 ): 48 tasks = [] 49 time_limit = timedelta_from_string(provider.sync_page_timeout).total_seconds() * 1000 50 for page in paginator.page_range: 51 page_sync = sync_objects.message_with_options( 52 args=(class_to_path(object_type), page, provider.pk), 53 time_limit=time_limit, 54 # Assign tasks to the same schedule as the current one 55 rel_obj=current_task.rel_obj, 56 uid=f"{provider.name}:{object_type._meta.model_name}:{page}", 57 **options, 58 ) 59 tasks.append(page_sync) 60 return tasks 61 62 def sync( 63 self, 64 provider_pk: int, 65 sync_objects: Actor[[str, int, int, bool], None], 66 ): 67 task = CurrentTask.get_task() 68 self.logger = get_logger().bind( 69 provider_type=class_to_path(self._provider_model), 70 provider_pk=provider_pk, 71 ) 72 provider: OutgoingSyncProvider = self._provider_model.objects.filter( 73 Q(backchannel_application__isnull=False) | Q(application__isnull=False), 74 pk=provider_pk, 75 ).first() 76 if not provider: 77 task.warning("No provider found. Is it assigned to an application?") 78 return 79 task.info("Starting full provider sync") 80 self.logger.debug("Starting provider sync") 81 with provider.sync_lock as lock_acquired: 82 if not lock_acquired: 83 task.info("Synchronization is already running. Skipping.") 84 self.logger.debug("Failed to acquire sync lock, skipping", provider=provider.name) 85 return 86 try: 87 users_tasks = group( 88 self.sync_paginator( 89 current_task=task, 90 provider=provider, 91 sync_objects=sync_objects, 92 paginator=provider.get_paginator(User), 93 object_type=User, 94 ) 95 ) 96 group_tasks = group( 97 self.sync_paginator( 98 current_task=task, 99 provider=provider, 100 sync_objects=sync_objects, 101 paginator=provider.get_paginator(Group), 102 object_type=Group, 103 ) 104 ) 105 users_tasks.run().wait(timeout=provider.get_object_sync_time_limit_ms(User)) 106 group_tasks.run().wait(timeout=provider.get_object_sync_time_limit_ms(Group)) 107 self._sync_cleanup(provider, task) 108 except TransientSyncException as exc: 109 self.logger.warning("transient sync exception", exc=exc) 110 task.warning("Sync encountered a transient exception. Retrying", exc=exc) 111 raise Retry() from exc 112 except StopSync as exc: 113 task.error(exc) 114 return 115 116 def _sync_cleanup(self, provider: OutgoingSyncProvider, task: Task): 117 """Delete remote objects that are no longer in scope""" 118 for object_type in (User, Group): 119 try: 120 client = provider.client_for_model(object_type) 121 except TransientSyncException: 122 continue 123 in_scope_pks = set(provider.get_object_qs(object_type).values_list("pk", flat=True)) 124 stale = client.connection_type.objects.filter(provider=provider).exclude( 125 **{f"{client.connection_type_query}__pk__in": in_scope_pks} 126 ) 127 for connection in stale: 128 try: 129 client.delete(connection.scim_id) 130 task.info( 131 f"Deleted out-of-scope {object_type._meta.verbose_name}", 132 scim_id=connection.scim_id, 133 ) 134 except NotFoundSyncException: 135 pass 136 except TransientSyncException as exc: 137 self.logger.warning("transient error during cleanup", exc=exc) 138 self.logger.warning( 139 "Cleanup encountered a transient exception. Retrying", exc=exc 140 ) 141 raise Retry() from exc 142 except DryRunRejected as exc: 143 self.logger.info("Rejected dry-run cleanup event", exc=exc) 144 145 def sync_objects( 146 self, 147 object_type: str, 148 page: int, 149 provider_pk: int, 150 override_dry_run=False, 151 **filter, 152 ): 153 task = CurrentTask.get_task() 154 _object_type: type[Model] = path_to_class(object_type) 155 self.logger = get_logger().bind( 156 provider_type=class_to_path(self._provider_model), 157 provider_pk=provider_pk, 158 object_type=object_type, 159 ) 160 provider: OutgoingSyncProvider | None = self._provider_model.objects.filter( 161 Q(backchannel_application__isnull=False) | Q(application__isnull=False), 162 pk=provider_pk, 163 ).first() 164 if not provider: 165 task.warning("No provider found. Is it assigned to an application?") 166 return 167 # Override dry run mode if requested, however don't save the provider 168 # so that scheduled sync tasks still run in dry_run mode 169 if override_dry_run: 170 provider.dry_run = False 171 try: 172 client = provider.client_for_model(_object_type) 173 except TransientSyncException: 174 return 175 paginator = Paginator( 176 provider.get_object_qs(_object_type, **filter), 177 provider.sync_page_size, 178 ) 179 if client.can_discover: 180 self.logger.debug("starting discover") 181 client.discover() 182 self.logger.debug("starting sync for page", page=page) 183 task.info(f"Syncing page {page} or {_object_type._meta.verbose_name_plural}") 184 for obj in paginator.page(page).object_list: 185 obj: Model 186 try: 187 client.write(obj) 188 except SkipObjectException: 189 self.logger.debug("skipping object due to SkipObject", obj=obj) 190 continue 191 except DryRunRejected as exc: 192 task.info( 193 "Dropping mutating request due to dry run", 194 obj=sanitize_item(obj), 195 method=exc.method, 196 url=exc.url, 197 body=exc.body, 198 ) 199 except BadRequestSyncException as exc: 200 self.logger.warning("failed to sync object", exc=exc, obj=obj) 201 task.warning( 202 f"Failed to sync {str(obj)} due to error: {str(exc)}", 203 arguments=exc.args[1:], 204 obj=sanitize_item(obj), 205 exception=exception_to_dict(exc), 206 ) 207 except TransientSyncException as exc: 208 self.logger.warning("failed to sync object", exc=exc, user=obj) 209 task.warning( 210 f"Failed to sync {str(obj)} due to transient error: {str(exc)}", 211 obj=sanitize_item(obj), 212 exception=exception_to_dict(exc), 213 ) 214 except StopSync as exc: 215 self.logger.warning("Stopping sync", exc=exc) 216 task.warning( 217 f"Stopping sync due to error: {exc.detail()}", 218 obj=sanitize_item(obj), 219 ) 220 break 221 222 def sync_signal_direct_dispatch( 223 self, 224 task_sync_signal_direct: Actor[[str, str | int, int], None], 225 model: str, 226 pk: str | int, 227 ): 228 model_class: type[Model] = path_to_class(model) 229 for provider in self._provider_model.objects.filter( 230 Q(backchannel_application__isnull=False) | Q(application__isnull=False) 231 ): 232 task_sync_signal_direct.send_with_options( 233 args=(model, pk, provider.pk), 234 rel_obj=provider, 235 uid=f"{provider.name}:{model_class._meta.model_name}:{pk}:direct", 236 ) 237 238 def sync_signal_direct( 239 self, 240 model: str, 241 pk: str | int, 242 provider_pk: int, 243 ): 244 task = CurrentTask.get_task() 245 self.logger = get_logger().bind( 246 provider_type=class_to_path(self._provider_model), 247 ) 248 model_class: type[Model] = path_to_class(model) 249 instance = model_class.objects.filter(pk=pk).first() 250 if not instance: 251 return 252 provider: OutgoingSyncProvider | None = self._provider_model.objects.filter( 253 Q(backchannel_application__isnull=False) | Q(application__isnull=False), 254 pk=provider_pk, 255 ).first() 256 if not provider: 257 task.warning("No provider found. Is it assigned to an application?") 258 return 259 client = provider.client_for_model(instance.__class__) 260 # Check if the object is allowed within the provider's restrictions 261 queryset = provider.get_object_qs(instance.__class__, pk=instance.pk) 262 # The queryset we get from the provider must include the instance we've got given 263 # otherwise ignore this provider 264 if not queryset or not queryset.exists(): 265 return 266 267 try: 268 client.write(instance) 269 except TransientSyncException as exc: 270 raise Retry() from exc 271 except SkipObjectException: 272 return 273 except DryRunRejected as exc: 274 self.logger.info("Rejected dry-run event", exc=exc) 275 except StopSync as exc: 276 self.logger.warning("Stopping sync", exc=exc, provider_pk=provider.pk) 277 278 def sync_signal_delete_dispatch( 279 self, 280 task_sync_signal_delete: Actor[[str, int, str], None], 281 model: str, 282 mappings: list[tuple[str, str]], 283 ): 284 model_class: type[Model] = path_to_class(model) 285 for provider_pk, identifier in mappings: 286 provider: OutgoingSyncProvider | None = self._provider_model.objects.filter( 287 pk=provider_pk 288 ).first() 289 if not provider: 290 continue 291 task_sync_signal_delete.send_with_options( 292 args=(model, identifier, provider_pk), 293 rel_obj=provider, 294 uid=f"{provider.name}:{model_class._meta.model_name}:{identifier}:delete", 295 ) 296 297 def sync_signal_delete( 298 self, 299 model: str, 300 identifier: str, 301 provider_pk: int, 302 ): 303 task = CurrentTask.get_task() 304 self.logger = get_logger().bind( 305 provider_type=class_to_path(self._provider_model), 306 ) 307 model_class: type[Model] = path_to_class(model) 308 provider: OutgoingSyncProvider | None = self._provider_model.objects.filter( 309 Q(backchannel_application__isnull=False) | Q(application__isnull=False), 310 pk=provider_pk, 311 ).first() 312 if not provider: 313 task.warning("No provider found. Is it assigned to an application?") 314 return 315 client = provider.client_for_model(model_class) 316 317 try: 318 client.delete(identifier) 319 except NotFoundSyncException as exc: 320 self.logger.info( 321 "Object not found in remote provider", 322 model_name=model_class._meta.model_name, 323 identifier=identifier, 324 exc=exc, 325 provider_pk=provider.pk, 326 ) 327 except TransientSyncException as exc: 328 raise Retry() from exc 329 except DryRunRejected as exc: 330 self.logger.info("Rejected dry-run event", exc=exc) 331 332 def sync_signal_m2m_dispatch( 333 self, 334 task_sync_signal_m2m: Actor[[str, int, str, list[int]], None], 335 instance_pk: str, 336 action: str, 337 pk_set: list[int], 338 reverse: bool, 339 ): 340 for provider in self._provider_model.objects.filter( 341 Q(backchannel_application__isnull=False) | Q(application__isnull=False) 342 ): 343 # reverse: instance is a Group, pk_set is a list of user pks 344 # non-reverse: instance is a User, pk_set is a list of groups 345 if reverse: 346 task_sync_signal_m2m.send_with_options( 347 args=(instance_pk, provider.pk, action, list(pk_set)), 348 rel_obj=provider, 349 uid=f"{provider.name}:group:{instance_pk}:m2m", 350 ) 351 else: 352 for pk in pk_set: 353 task_sync_signal_m2m.send_with_options( 354 args=(pk, provider.pk, action, [instance_pk]), 355 rel_obj=provider, 356 uid=f"{provider.name}:group:{pk}:m2m", 357 ) 358 359 def sync_signal_m2m( 360 self, 361 group_pk: str, 362 provider_pk: int, 363 action: str, 364 pk_set: list[int], 365 ): 366 task = CurrentTask.get_task() 367 self.logger = get_logger().bind( 368 provider_type=class_to_path(self._provider_model), 369 ) 370 group = Group.objects.filter(pk=group_pk).first() 371 if not group: 372 return 373 provider: OutgoingSyncProvider = self._provider_model.objects.filter( 374 Q(backchannel_application__isnull=False) | Q(application__isnull=False), 375 pk=provider_pk, 376 ).first() 377 if not provider: 378 task.warning("No provider found. Is it assigned to an application?") 379 return 380 381 # Check if the object is allowed within the provider's restrictions 382 queryset: QuerySet = provider.get_object_qs(Group, pk=group_pk) 383 # The queryset we get from the provider must include the instance we've got given 384 # otherwise ignore this provider 385 if not queryset or not queryset.filter().exists(): 386 return 387 388 client = provider.client_for_model(Group) 389 try: 390 operation = None 391 if action == "post_add": 392 operation = Direction.add 393 if action == "post_remove": 394 operation = Direction.remove 395 client.update_group(group, operation, pk_set) 396 except TransientSyncException as exc: 397 raise Retry() from exc 398 except SkipObjectException: 399 return 400 except DryRunRejected as exc: 401 self.logger.info("Rejected dry-run event", exc=exc) 402 except StopSync as exc: 403 self.logger.warning("Stopping sync", exc=exc, provider_pk=provider.pk)
Container for all sync 'tasks' (this class doesn't actually contain tasks due to dramatiq's magic, however exposes a number of functions to be called from tasks)
SyncTasks( provider_model: type[authentik.lib.sync.outgoing.models.OutgoingSyncProvider])
def
sync_paginator( self, current_task: authentik.tasks.models.Task, provider: authentik.lib.sync.outgoing.models.OutgoingSyncProvider, sync_objects: dramatiq.actor.Actor[[<class 'str'>, <class 'int'>, <class 'int'>, <class 'bool'>], NoneType], paginator: django.core.paginator.Paginator, object_type: type[authentik.core.models.User | authentik.core.models.Group], **options):
39 def sync_paginator( 40 self, 41 current_task: Task, 42 provider: OutgoingSyncProvider, 43 sync_objects: Actor[[str, int, int, bool], None], 44 paginator: Paginator, 45 object_type: type[User | Group], 46 **options, 47 ): 48 tasks = [] 49 time_limit = timedelta_from_string(provider.sync_page_timeout).total_seconds() * 1000 50 for page in paginator.page_range: 51 page_sync = sync_objects.message_with_options( 52 args=(class_to_path(object_type), page, provider.pk), 53 time_limit=time_limit, 54 # Assign tasks to the same schedule as the current one 55 rel_obj=current_task.rel_obj, 56 uid=f"{provider.name}:{object_type._meta.model_name}:{page}", 57 **options, 58 ) 59 tasks.append(page_sync) 60 return tasks
def
sync( self, provider_pk: int, sync_objects: dramatiq.actor.Actor[[<class 'str'>, <class 'int'>, <class 'int'>, <class 'bool'>], NoneType]):
62 def sync( 63 self, 64 provider_pk: int, 65 sync_objects: Actor[[str, int, int, bool], None], 66 ): 67 task = CurrentTask.get_task() 68 self.logger = get_logger().bind( 69 provider_type=class_to_path(self._provider_model), 70 provider_pk=provider_pk, 71 ) 72 provider: OutgoingSyncProvider = self._provider_model.objects.filter( 73 Q(backchannel_application__isnull=False) | Q(application__isnull=False), 74 pk=provider_pk, 75 ).first() 76 if not provider: 77 task.warning("No provider found. Is it assigned to an application?") 78 return 79 task.info("Starting full provider sync") 80 self.logger.debug("Starting provider sync") 81 with provider.sync_lock as lock_acquired: 82 if not lock_acquired: 83 task.info("Synchronization is already running. Skipping.") 84 self.logger.debug("Failed to acquire sync lock, skipping", provider=provider.name) 85 return 86 try: 87 users_tasks = group( 88 self.sync_paginator( 89 current_task=task, 90 provider=provider, 91 sync_objects=sync_objects, 92 paginator=provider.get_paginator(User), 93 object_type=User, 94 ) 95 ) 96 group_tasks = group( 97 self.sync_paginator( 98 current_task=task, 99 provider=provider, 100 sync_objects=sync_objects, 101 paginator=provider.get_paginator(Group), 102 object_type=Group, 103 ) 104 ) 105 users_tasks.run().wait(timeout=provider.get_object_sync_time_limit_ms(User)) 106 group_tasks.run().wait(timeout=provider.get_object_sync_time_limit_ms(Group)) 107 self._sync_cleanup(provider, task) 108 except TransientSyncException as exc: 109 self.logger.warning("transient sync exception", exc=exc) 110 task.warning("Sync encountered a transient exception. Retrying", exc=exc) 111 raise Retry() from exc 112 except StopSync as exc: 113 task.error(exc) 114 return
def
sync_objects( self, object_type: str, page: int, provider_pk: int, override_dry_run=False, **filter):
145 def sync_objects( 146 self, 147 object_type: str, 148 page: int, 149 provider_pk: int, 150 override_dry_run=False, 151 **filter, 152 ): 153 task = CurrentTask.get_task() 154 _object_type: type[Model] = path_to_class(object_type) 155 self.logger = get_logger().bind( 156 provider_type=class_to_path(self._provider_model), 157 provider_pk=provider_pk, 158 object_type=object_type, 159 ) 160 provider: OutgoingSyncProvider | None = self._provider_model.objects.filter( 161 Q(backchannel_application__isnull=False) | Q(application__isnull=False), 162 pk=provider_pk, 163 ).first() 164 if not provider: 165 task.warning("No provider found. Is it assigned to an application?") 166 return 167 # Override dry run mode if requested, however don't save the provider 168 # so that scheduled sync tasks still run in dry_run mode 169 if override_dry_run: 170 provider.dry_run = False 171 try: 172 client = provider.client_for_model(_object_type) 173 except TransientSyncException: 174 return 175 paginator = Paginator( 176 provider.get_object_qs(_object_type, **filter), 177 provider.sync_page_size, 178 ) 179 if client.can_discover: 180 self.logger.debug("starting discover") 181 client.discover() 182 self.logger.debug("starting sync for page", page=page) 183 task.info(f"Syncing page {page} or {_object_type._meta.verbose_name_plural}") 184 for obj in paginator.page(page).object_list: 185 obj: Model 186 try: 187 client.write(obj) 188 except SkipObjectException: 189 self.logger.debug("skipping object due to SkipObject", obj=obj) 190 continue 191 except DryRunRejected as exc: 192 task.info( 193 "Dropping mutating request due to dry run", 194 obj=sanitize_item(obj), 195 method=exc.method, 196 url=exc.url, 197 body=exc.body, 198 ) 199 except BadRequestSyncException as exc: 200 self.logger.warning("failed to sync object", exc=exc, obj=obj) 201 task.warning( 202 f"Failed to sync {str(obj)} due to error: {str(exc)}", 203 arguments=exc.args[1:], 204 obj=sanitize_item(obj), 205 exception=exception_to_dict(exc), 206 ) 207 except TransientSyncException as exc: 208 self.logger.warning("failed to sync object", exc=exc, user=obj) 209 task.warning( 210 f"Failed to sync {str(obj)} due to transient error: {str(exc)}", 211 obj=sanitize_item(obj), 212 exception=exception_to_dict(exc), 213 ) 214 except StopSync as exc: 215 self.logger.warning("Stopping sync", exc=exc) 216 task.warning( 217 f"Stopping sync due to error: {exc.detail()}", 218 obj=sanitize_item(obj), 219 ) 220 break
def
sync_signal_direct_dispatch( self, task_sync_signal_direct: dramatiq.actor.Actor[[<class 'str'>, str | int, <class 'int'>], NoneType], model: str, pk: str | int):
222 def sync_signal_direct_dispatch( 223 self, 224 task_sync_signal_direct: Actor[[str, str | int, int], None], 225 model: str, 226 pk: str | int, 227 ): 228 model_class: type[Model] = path_to_class(model) 229 for provider in self._provider_model.objects.filter( 230 Q(backchannel_application__isnull=False) | Q(application__isnull=False) 231 ): 232 task_sync_signal_direct.send_with_options( 233 args=(model, pk, provider.pk), 234 rel_obj=provider, 235 uid=f"{provider.name}:{model_class._meta.model_name}:{pk}:direct", 236 )
def
sync_signal_direct(self, model: str, pk: str | int, provider_pk: int):
238 def sync_signal_direct( 239 self, 240 model: str, 241 pk: str | int, 242 provider_pk: int, 243 ): 244 task = CurrentTask.get_task() 245 self.logger = get_logger().bind( 246 provider_type=class_to_path(self._provider_model), 247 ) 248 model_class: type[Model] = path_to_class(model) 249 instance = model_class.objects.filter(pk=pk).first() 250 if not instance: 251 return 252 provider: OutgoingSyncProvider | None = self._provider_model.objects.filter( 253 Q(backchannel_application__isnull=False) | Q(application__isnull=False), 254 pk=provider_pk, 255 ).first() 256 if not provider: 257 task.warning("No provider found. Is it assigned to an application?") 258 return 259 client = provider.client_for_model(instance.__class__) 260 # Check if the object is allowed within the provider's restrictions 261 queryset = provider.get_object_qs(instance.__class__, pk=instance.pk) 262 # The queryset we get from the provider must include the instance we've got given 263 # otherwise ignore this provider 264 if not queryset or not queryset.exists(): 265 return 266 267 try: 268 client.write(instance) 269 except TransientSyncException as exc: 270 raise Retry() from exc 271 except SkipObjectException: 272 return 273 except DryRunRejected as exc: 274 self.logger.info("Rejected dry-run event", exc=exc) 275 except StopSync as exc: 276 self.logger.warning("Stopping sync", exc=exc, provider_pk=provider.pk)
def
sync_signal_delete_dispatch( self, task_sync_signal_delete: dramatiq.actor.Actor[[<class 'str'>, <class 'int'>, <class 'str'>], NoneType], model: str, mappings: list[tuple[str, str]]):
278 def sync_signal_delete_dispatch( 279 self, 280 task_sync_signal_delete: Actor[[str, int, str], None], 281 model: str, 282 mappings: list[tuple[str, str]], 283 ): 284 model_class: type[Model] = path_to_class(model) 285 for provider_pk, identifier in mappings: 286 provider: OutgoingSyncProvider | None = self._provider_model.objects.filter( 287 pk=provider_pk 288 ).first() 289 if not provider: 290 continue 291 task_sync_signal_delete.send_with_options( 292 args=(model, identifier, provider_pk), 293 rel_obj=provider, 294 uid=f"{provider.name}:{model_class._meta.model_name}:{identifier}:delete", 295 )
def
sync_signal_delete(self, model: str, identifier: str, provider_pk: int):
297 def sync_signal_delete( 298 self, 299 model: str, 300 identifier: str, 301 provider_pk: int, 302 ): 303 task = CurrentTask.get_task() 304 self.logger = get_logger().bind( 305 provider_type=class_to_path(self._provider_model), 306 ) 307 model_class: type[Model] = path_to_class(model) 308 provider: OutgoingSyncProvider | None = self._provider_model.objects.filter( 309 Q(backchannel_application__isnull=False) | Q(application__isnull=False), 310 pk=provider_pk, 311 ).first() 312 if not provider: 313 task.warning("No provider found. Is it assigned to an application?") 314 return 315 client = provider.client_for_model(model_class) 316 317 try: 318 client.delete(identifier) 319 except NotFoundSyncException as exc: 320 self.logger.info( 321 "Object not found in remote provider", 322 model_name=model_class._meta.model_name, 323 identifier=identifier, 324 exc=exc, 325 provider_pk=provider.pk, 326 ) 327 except TransientSyncException as exc: 328 raise Retry() from exc 329 except DryRunRejected as exc: 330 self.logger.info("Rejected dry-run event", exc=exc)
def
sync_signal_m2m_dispatch( self, task_sync_signal_m2m: dramatiq.actor.Actor[[<class 'str'>, <class 'int'>, <class 'str'>, list[int]], NoneType], instance_pk: str, action: str, pk_set: list[int], reverse: bool):
332 def sync_signal_m2m_dispatch( 333 self, 334 task_sync_signal_m2m: Actor[[str, int, str, list[int]], None], 335 instance_pk: str, 336 action: str, 337 pk_set: list[int], 338 reverse: bool, 339 ): 340 for provider in self._provider_model.objects.filter( 341 Q(backchannel_application__isnull=False) | Q(application__isnull=False) 342 ): 343 # reverse: instance is a Group, pk_set is a list of user pks 344 # non-reverse: instance is a User, pk_set is a list of groups 345 if reverse: 346 task_sync_signal_m2m.send_with_options( 347 args=(instance_pk, provider.pk, action, list(pk_set)), 348 rel_obj=provider, 349 uid=f"{provider.name}:group:{instance_pk}:m2m", 350 ) 351 else: 352 for pk in pk_set: 353 task_sync_signal_m2m.send_with_options( 354 args=(pk, provider.pk, action, [instance_pk]), 355 rel_obj=provider, 356 uid=f"{provider.name}:group:{pk}:m2m", 357 )
def
sync_signal_m2m( self, group_pk: str, provider_pk: int, action: str, pk_set: list[int]):
359 def sync_signal_m2m( 360 self, 361 group_pk: str, 362 provider_pk: int, 363 action: str, 364 pk_set: list[int], 365 ): 366 task = CurrentTask.get_task() 367 self.logger = get_logger().bind( 368 provider_type=class_to_path(self._provider_model), 369 ) 370 group = Group.objects.filter(pk=group_pk).first() 371 if not group: 372 return 373 provider: OutgoingSyncProvider = self._provider_model.objects.filter( 374 Q(backchannel_application__isnull=False) | Q(application__isnull=False), 375 pk=provider_pk, 376 ).first() 377 if not provider: 378 task.warning("No provider found. Is it assigned to an application?") 379 return 380 381 # Check if the object is allowed within the provider's restrictions 382 queryset: QuerySet = provider.get_object_qs(Group, pk=group_pk) 383 # The queryset we get from the provider must include the instance we've got given 384 # otherwise ignore this provider 385 if not queryset or not queryset.filter().exists(): 386 return 387 388 client = provider.client_for_model(Group) 389 try: 390 operation = None 391 if action == "post_add": 392 operation = Direction.add 393 if action == "post_remove": 394 operation = Direction.remove 395 client.update_group(group, operation, pk_set) 396 except TransientSyncException as exc: 397 raise Retry() from exc 398 except SkipObjectException: 399 return 400 except DryRunRejected as exc: 401 self.logger.info("Rejected dry-run event", exc=exc) 402 except StopSync as exc: 403 self.logger.warning("Stopping sync", exc=exc, provider_pk=provider.pk)