authentik.lib.sync.outgoing.models
1from typing import Any, Self 2 3import pglock 4from django.core.paginator import Paginator 5from django.core.validators import MinValueValidator 6from django.db import connection, models 7from django.db.models import Model, QuerySet, TextChoices 8from django.utils.translation import gettext_lazy as _ 9from dramatiq.actor import Actor 10 11from authentik.core.models import Group, User 12from authentik.lib.sync.outgoing.base import BaseOutgoingSyncClient 13from authentik.lib.utils.time import fqdn_rand, timedelta_from_string, timedelta_string_validator 14from authentik.tasks.schedules.common import ScheduleSpec 15from authentik.tasks.schedules.models import ScheduledModel 16 17 18class OutgoingSyncDeleteAction(TextChoices): 19 """Action taken when a user/group is deleted in authentik. Suspend is not available for groups, 20 and will be treated as `do_nothing`""" 21 22 DO_NOTHING = "do_nothing" 23 DELETE = "delete" 24 SUSPEND = "suspend" 25 26 27class OutgoingSyncProvider(ScheduledModel, Model): 28 """Base abstract models for providers implementing outgoing sync""" 29 30 sync_page_size = models.PositiveIntegerField( 31 help_text=_("Controls the number of objects synced in a single task"), 32 default=100, 33 validators=[MinValueValidator(1)], 34 ) 35 sync_page_timeout = models.TextField( 36 help_text=_("Timeout for synchronization of a single page"), 37 default="minutes=30", 38 validators=[timedelta_string_validator], 39 ) 40 41 dry_run = models.BooleanField( 42 default=False, 43 help_text=_( 44 "When enabled, provider will not modify or create objects in the remote system." 45 ), 46 ) 47 48 class Meta: 49 abstract = True 50 51 def client_for_model[T: User | Group]( 52 self, model: type[T] 53 ) -> BaseOutgoingSyncClient[T, Any, Any, Self]: 54 raise NotImplementedError 55 56 def get_object_qs[T: User | Group](self, type: type[T], **kwargs) -> QuerySet[T]: 57 raise NotImplementedError 58 59 @classmethod 60 def get_object_mappings(cls, obj: User | Group) -> list[tuple[str, str]]: 61 """ 62 Get a list of mapping between User/Group and ProviderUser/Group: 63 [("provider_pk", "obj_pk")] 64 """ 65 raise NotImplementedError 66 67 def get_paginator[T: User | Group](self, type: type[T]) -> Paginator: 68 return Paginator(self.get_object_qs(type), self.sync_page_size) 69 70 def get_object_sync_time_limit_ms[T: User | Group](self, type: type[T]) -> int: 71 num_pages: int = self.get_paginator(type).num_pages 72 page_timeout_ms = timedelta_from_string(self.sync_page_timeout).total_seconds() * 1000 73 return int(num_pages * page_timeout_ms * 1.5) 74 75 def get_sync_time_limit_ms(self) -> int: 76 return int( 77 (self.get_object_sync_time_limit_ms(User) + self.get_object_sync_time_limit_ms(Group)) 78 * 1.5 79 ) 80 81 @property 82 def sync_lock(self) -> pglock.advisory: 83 """Postgres lock for syncing to prevent multiple parallel syncs happening""" 84 return pglock.advisory( 85 lock_id=f"goauthentik.io/{connection.schema_name}/providers/outgoing-sync/{str(self.pk)}", 86 timeout=0, 87 side_effect=pglock.Return, 88 ) 89 90 @property 91 def sync_actor(self) -> Actor: 92 raise NotImplementedError 93 94 def sync_dispatch(self) -> None: 95 for schedule in self.schedules.all(): 96 schedule.send() 97 98 @property 99 def schedule_specs(self) -> list[ScheduleSpec]: 100 return [ 101 ScheduleSpec( 102 actor=self.sync_actor, 103 uid=self.name, 104 args=(self.pk,), 105 options={ 106 "time_limit": self.get_sync_time_limit_ms(), 107 }, 108 send_on_save=True, 109 crontab=f"{fqdn_rand(self.pk)} */4 * * *", 110 ), 111 ]
19class OutgoingSyncDeleteAction(TextChoices): 20 """Action taken when a user/group is deleted in authentik. Suspend is not available for groups, 21 and will be treated as `do_nothing`""" 22 23 DO_NOTHING = "do_nothing" 24 DELETE = "delete" 25 SUSPEND = "suspend"
Action taken when a user/group is deleted in authentik. Suspend is not available for groups,
and will be treated as do_nothing
28class OutgoingSyncProvider(ScheduledModel, Model): 29 """Base abstract models for providers implementing outgoing sync""" 30 31 sync_page_size = models.PositiveIntegerField( 32 help_text=_("Controls the number of objects synced in a single task"), 33 default=100, 34 validators=[MinValueValidator(1)], 35 ) 36 sync_page_timeout = models.TextField( 37 help_text=_("Timeout for synchronization of a single page"), 38 default="minutes=30", 39 validators=[timedelta_string_validator], 40 ) 41 42 dry_run = models.BooleanField( 43 default=False, 44 help_text=_( 45 "When enabled, provider will not modify or create objects in the remote system." 46 ), 47 ) 48 49 class Meta: 50 abstract = True 51 52 def client_for_model[T: User | Group]( 53 self, model: type[T] 54 ) -> BaseOutgoingSyncClient[T, Any, Any, Self]: 55 raise NotImplementedError 56 57 def get_object_qs[T: User | Group](self, type: type[T], **kwargs) -> QuerySet[T]: 58 raise NotImplementedError 59 60 @classmethod 61 def get_object_mappings(cls, obj: User | Group) -> list[tuple[str, str]]: 62 """ 63 Get a list of mapping between User/Group and ProviderUser/Group: 64 [("provider_pk", "obj_pk")] 65 """ 66 raise NotImplementedError 67 68 def get_paginator[T: User | Group](self, type: type[T]) -> Paginator: 69 return Paginator(self.get_object_qs(type), self.sync_page_size) 70 71 def get_object_sync_time_limit_ms[T: User | Group](self, type: type[T]) -> int: 72 num_pages: int = self.get_paginator(type).num_pages 73 page_timeout_ms = timedelta_from_string(self.sync_page_timeout).total_seconds() * 1000 74 return int(num_pages * page_timeout_ms * 1.5) 75 76 def get_sync_time_limit_ms(self) -> int: 77 return int( 78 (self.get_object_sync_time_limit_ms(User) + self.get_object_sync_time_limit_ms(Group)) 79 * 1.5 80 ) 81 82 @property 83 def sync_lock(self) -> pglock.advisory: 84 """Postgres lock for syncing to prevent multiple parallel syncs happening""" 85 return pglock.advisory( 86 lock_id=f"goauthentik.io/{connection.schema_name}/providers/outgoing-sync/{str(self.pk)}", 87 timeout=0, 88 side_effect=pglock.Return, 89 ) 90 91 @property 92 def sync_actor(self) -> Actor: 93 raise NotImplementedError 94 95 def sync_dispatch(self) -> None: 96 for schedule in self.schedules.all(): 97 schedule.send() 98 99 @property 100 def schedule_specs(self) -> list[ScheduleSpec]: 101 return [ 102 ScheduleSpec( 103 actor=self.sync_actor, 104 uid=self.name, 105 args=(self.pk,), 106 options={ 107 "time_limit": self.get_sync_time_limit_ms(), 108 }, 109 send_on_save=True, 110 crontab=f"{fqdn_rand(self.pk)} */4 * * *", 111 ), 112 ]
Base abstract models for providers implementing outgoing sync
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
60 @classmethod 61 def get_object_mappings(cls, obj: User | Group) -> list[tuple[str, str]]: 62 """ 63 Get a list of mapping between User/Group and ProviderUser/Group: 64 [("provider_pk", "obj_pk")] 65 """ 66 raise NotImplementedError
Get a list of mapping between User/Group and ProviderUser/Group: [("provider_pk", "obj_pk")]
82 @property 83 def sync_lock(self) -> pglock.advisory: 84 """Postgres lock for syncing to prevent multiple parallel syncs happening""" 85 return pglock.advisory( 86 lock_id=f"goauthentik.io/{connection.schema_name}/providers/outgoing-sync/{str(self.pk)}", 87 timeout=0, 88 side_effect=pglock.Return, 89 )
Postgres lock for syncing to prevent multiple parallel syncs happening
99 @property 100 def schedule_specs(self) -> list[ScheduleSpec]: 101 return [ 102 ScheduleSpec( 103 actor=self.sync_actor, 104 uid=self.name, 105 args=(self.pk,), 106 options={ 107 "time_limit": self.get_sync_time_limit_ms(), 108 }, 109 send_on_save=True, 110 crontab=f"{fqdn_rand(self.pk)} */4 * * *", 111 ), 112 ]
Accessor to the related objects manager on the one-to-many relation created by GenericRelation.
In the example::
class Post(Model):
comments = GenericRelation(Comment)
post.comments is a ReverseGenericManyToOneDescriptor instance.
Accessor to the related objects manager on the one-to-many relation created by GenericRelation.
In the example::
class Post(Model):
comments = GenericRelation(Comment)
post.comments is a ReverseGenericManyToOneDescriptor instance.