authentik.lib.sync.outgoing.models
1import math 2from typing import Any, Self 3 4import pglock 5from django.core.paginator import Paginator 6from django.core.validators import MinValueValidator 7from django.db import connection, models 8from django.db.models import Model, QuerySet, TextChoices 9from django.utils.translation import gettext_lazy as _ 10from dramatiq.actor import Actor 11 12from authentik.core.models import Group, User 13from authentik.lib.sync.outgoing.base import BaseOutgoingSyncClient 14from authentik.lib.utils.time import fqdn_rand, timedelta_from_string, timedelta_string_validator 15from authentik.tasks.schedules.common import ScheduleSpec 16from authentik.tasks.schedules.models import ScheduledModel 17 18 19class OutgoingSyncDeleteAction(TextChoices): 20 """Action taken when a user/group is deleted in authentik. Suspend is not available for groups, 21 and will be treated as `do_nothing`""" 22 23 DO_NOTHING = "do_nothing" 24 DELETE = "delete" 25 SUSPEND = "suspend" 26 27 28class OutgoingSyncProvider(ScheduledModel, Model): 29 """Base abstract models for providers implementing outgoing sync""" 30 31 sync_page_size = models.PositiveIntegerField( 32 help_text=_("Controls the number of objects synced in a single task"), 33 default=100, 34 validators=[MinValueValidator(1)], 35 ) 36 sync_page_timeout = models.TextField( 37 help_text=_("Timeout for synchronization of a single page"), 38 default="minutes=30", 39 validators=[timedelta_string_validator], 40 ) 41 42 dry_run = models.BooleanField( 43 default=False, 44 help_text=_( 45 "When enabled, provider will not modify or create objects in the remote system." 46 ), 47 ) 48 49 class Meta: 50 abstract = True 51 52 def client_for_model[T: User | Group]( 53 self, model: type[T] 54 ) -> BaseOutgoingSyncClient[T, Any, Any, Self]: 55 raise NotImplementedError 56 57 def get_object_qs[T: User | Group](self, type: type[T], **kwargs) -> QuerySet[T]: 58 raise NotImplementedError 59 60 @classmethod 61 def get_object_mappings(cls, obj: User | Group) -> list[tuple[str, str]]: 62 """ 63 Get a list of mapping between User/Group and ProviderUser/Group: 64 [("provider_pk", "obj_pk")] 65 """ 66 raise NotImplementedError 67 68 def get_paginator[T: User | Group](self, type: type[T]) -> Paginator: 69 return Paginator(self.get_object_qs(type), self.sync_page_size) 70 71 def get_object_sync_time_limit_ms[T: User | Group](self, type: type[T]) -> int: 72 # Use a simple COUNT(*) on the model instead of materializing get_object_qs(), 73 # which for some providers (e.g. SCIM) runs PolicyEngine per-user and is 74 # extremely expensive. The time limit is an upper-bound estimate, so using 75 # the total count (without policy filtering) is a safe overestimate. 76 total_count = type.objects.count() 77 num_pages = math.ceil(total_count / self.sync_page_size) if total_count > 0 else 1 78 page_timeout_ms = timedelta_from_string(self.sync_page_timeout).total_seconds() * 1000 79 return int(num_pages * page_timeout_ms * 1.5) 80 81 def get_sync_time_limit_ms(self) -> int: 82 return int( 83 (self.get_object_sync_time_limit_ms(User) + self.get_object_sync_time_limit_ms(Group)) 84 * 1.5 85 ) 86 87 @property 88 def sync_lock(self) -> pglock.advisory: 89 """Postgres lock for syncing to prevent multiple parallel syncs happening""" 90 return pglock.advisory( 91 lock_id=f"goauthentik.io/{connection.schema_name}/providers/outgoing-sync/{str(self.pk)}", 92 timeout=0, 93 side_effect=pglock.Return, 94 ) 95 96 @property 97 def sync_actor(self) -> Actor: 98 raise NotImplementedError 99 100 def sync_dispatch(self) -> None: 101 for schedule in self.schedules.all(): 102 schedule.send() 103 104 @property 105 def schedule_specs(self) -> list[ScheduleSpec]: 106 return [ 107 ScheduleSpec( 108 actor=self.sync_actor, 109 uid=self.name, 110 args=(self.pk,), 111 options={ 112 "time_limit": self.get_sync_time_limit_ms(), 113 }, 114 send_on_save=True, 115 crontab=f"{fqdn_rand(self.pk)} */4 * * *", 116 ), 117 ]
20class OutgoingSyncDeleteAction(TextChoices): 21 """Action taken when a user/group is deleted in authentik. Suspend is not available for groups, 22 and will be treated as `do_nothing`""" 23 24 DO_NOTHING = "do_nothing" 25 DELETE = "delete" 26 SUSPEND = "suspend"
Action taken when a user/group is deleted in authentik. Suspend is not available for groups,
and will be treated as do_nothing
29class OutgoingSyncProvider(ScheduledModel, Model): 30 """Base abstract models for providers implementing outgoing sync""" 31 32 sync_page_size = models.PositiveIntegerField( 33 help_text=_("Controls the number of objects synced in a single task"), 34 default=100, 35 validators=[MinValueValidator(1)], 36 ) 37 sync_page_timeout = models.TextField( 38 help_text=_("Timeout for synchronization of a single page"), 39 default="minutes=30", 40 validators=[timedelta_string_validator], 41 ) 42 43 dry_run = models.BooleanField( 44 default=False, 45 help_text=_( 46 "When enabled, provider will not modify or create objects in the remote system." 47 ), 48 ) 49 50 class Meta: 51 abstract = True 52 53 def client_for_model[T: User | Group]( 54 self, model: type[T] 55 ) -> BaseOutgoingSyncClient[T, Any, Any, Self]: 56 raise NotImplementedError 57 58 def get_object_qs[T: User | Group](self, type: type[T], **kwargs) -> QuerySet[T]: 59 raise NotImplementedError 60 61 @classmethod 62 def get_object_mappings(cls, obj: User | Group) -> list[tuple[str, str]]: 63 """ 64 Get a list of mapping between User/Group and ProviderUser/Group: 65 [("provider_pk", "obj_pk")] 66 """ 67 raise NotImplementedError 68 69 def get_paginator[T: User | Group](self, type: type[T]) -> Paginator: 70 return Paginator(self.get_object_qs(type), self.sync_page_size) 71 72 def get_object_sync_time_limit_ms[T: User | Group](self, type: type[T]) -> int: 73 # Use a simple COUNT(*) on the model instead of materializing get_object_qs(), 74 # which for some providers (e.g. SCIM) runs PolicyEngine per-user and is 75 # extremely expensive. The time limit is an upper-bound estimate, so using 76 # the total count (without policy filtering) is a safe overestimate. 77 total_count = type.objects.count() 78 num_pages = math.ceil(total_count / self.sync_page_size) if total_count > 0 else 1 79 page_timeout_ms = timedelta_from_string(self.sync_page_timeout).total_seconds() * 1000 80 return int(num_pages * page_timeout_ms * 1.5) 81 82 def get_sync_time_limit_ms(self) -> int: 83 return int( 84 (self.get_object_sync_time_limit_ms(User) + self.get_object_sync_time_limit_ms(Group)) 85 * 1.5 86 ) 87 88 @property 89 def sync_lock(self) -> pglock.advisory: 90 """Postgres lock for syncing to prevent multiple parallel syncs happening""" 91 return pglock.advisory( 92 lock_id=f"goauthentik.io/{connection.schema_name}/providers/outgoing-sync/{str(self.pk)}", 93 timeout=0, 94 side_effect=pglock.Return, 95 ) 96 97 @property 98 def sync_actor(self) -> Actor: 99 raise NotImplementedError 100 101 def sync_dispatch(self) -> None: 102 for schedule in self.schedules.all(): 103 schedule.send() 104 105 @property 106 def schedule_specs(self) -> list[ScheduleSpec]: 107 return [ 108 ScheduleSpec( 109 actor=self.sync_actor, 110 uid=self.name, 111 args=(self.pk,), 112 options={ 113 "time_limit": self.get_sync_time_limit_ms(), 114 }, 115 send_on_save=True, 116 crontab=f"{fqdn_rand(self.pk)} */4 * * *", 117 ), 118 ]
Base abstract models for providers implementing outgoing sync
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
61 @classmethod 62 def get_object_mappings(cls, obj: User | Group) -> list[tuple[str, str]]: 63 """ 64 Get a list of mapping between User/Group and ProviderUser/Group: 65 [("provider_pk", "obj_pk")] 66 """ 67 raise NotImplementedError
Get a list of mapping between User/Group and ProviderUser/Group: [("provider_pk", "obj_pk")]
72 def get_object_sync_time_limit_ms[T: User | Group](self, type: type[T]) -> int: 73 # Use a simple COUNT(*) on the model instead of materializing get_object_qs(), 74 # which for some providers (e.g. SCIM) runs PolicyEngine per-user and is 75 # extremely expensive. The time limit is an upper-bound estimate, so using 76 # the total count (without policy filtering) is a safe overestimate. 77 total_count = type.objects.count() 78 num_pages = math.ceil(total_count / self.sync_page_size) if total_count > 0 else 1 79 page_timeout_ms = timedelta_from_string(self.sync_page_timeout).total_seconds() * 1000 80 return int(num_pages * page_timeout_ms * 1.5)
88 @property 89 def sync_lock(self) -> pglock.advisory: 90 """Postgres lock for syncing to prevent multiple parallel syncs happening""" 91 return pglock.advisory( 92 lock_id=f"goauthentik.io/{connection.schema_name}/providers/outgoing-sync/{str(self.pk)}", 93 timeout=0, 94 side_effect=pglock.Return, 95 )
Postgres lock for syncing to prevent multiple parallel syncs happening
105 @property 106 def schedule_specs(self) -> list[ScheduleSpec]: 107 return [ 108 ScheduleSpec( 109 actor=self.sync_actor, 110 uid=self.name, 111 args=(self.pk,), 112 options={ 113 "time_limit": self.get_sync_time_limit_ms(), 114 }, 115 send_on_save=True, 116 crontab=f"{fqdn_rand(self.pk)} */4 * * *", 117 ), 118 ]
Accessor to the related objects manager on the one-to-many relation created by GenericRelation.
In the example::
class Post(Model):
comments = GenericRelation(Comment)
post.comments is a ReverseGenericManyToOneDescriptor instance.
Accessor to the related objects manager on the one-to-many relation created by GenericRelation.
In the example::
class Post(Model):
comments = GenericRelation(Comment)
post.comments is a ReverseGenericManyToOneDescriptor instance.