authentik.lib.sync.outgoing.models

  1from typing import Any, Self
  2
  3import pglock
  4from django.core.paginator import Paginator
  5from django.core.validators import MinValueValidator
  6from django.db import connection, models
  7from django.db.models import Model, QuerySet, TextChoices
  8from django.utils.translation import gettext_lazy as _
  9from dramatiq.actor import Actor
 10
 11from authentik.core.models import Group, User
 12from authentik.lib.sync.outgoing.base import BaseOutgoingSyncClient
 13from authentik.lib.utils.time import fqdn_rand, timedelta_from_string, timedelta_string_validator
 14from authentik.tasks.schedules.common import ScheduleSpec
 15from authentik.tasks.schedules.models import ScheduledModel
 16
 17
 18class OutgoingSyncDeleteAction(TextChoices):
 19    """Action taken when a user/group is deleted in authentik. Suspend is not available for groups,
 20    and will be treated as `do_nothing`"""
 21
 22    DO_NOTHING = "do_nothing"
 23    DELETE = "delete"
 24    SUSPEND = "suspend"
 25
 26
 27class OutgoingSyncProvider(ScheduledModel, Model):
 28    """Base abstract models for providers implementing outgoing sync"""
 29
 30    sync_page_size = models.PositiveIntegerField(
 31        help_text=_("Controls the number of objects synced in a single task"),
 32        default=100,
 33        validators=[MinValueValidator(1)],
 34    )
 35    sync_page_timeout = models.TextField(
 36        help_text=_("Timeout for synchronization of a single page"),
 37        default="minutes=30",
 38        validators=[timedelta_string_validator],
 39    )
 40
 41    dry_run = models.BooleanField(
 42        default=False,
 43        help_text=_(
 44            "When enabled, provider will not modify or create objects in the remote system."
 45        ),
 46    )
 47
 48    class Meta:
 49        abstract = True
 50
 51    def client_for_model[T: User | Group](
 52        self, model: type[T]
 53    ) -> BaseOutgoingSyncClient[T, Any, Any, Self]:
 54        raise NotImplementedError
 55
 56    def get_object_qs[T: User | Group](self, type: type[T], **kwargs) -> QuerySet[T]:
 57        raise NotImplementedError
 58
 59    @classmethod
 60    def get_object_mappings(cls, obj: User | Group) -> list[tuple[str, str]]:
 61        """
 62        Get a list of mapping between User/Group and ProviderUser/Group:
 63        [("provider_pk", "obj_pk")]
 64        """
 65        raise NotImplementedError
 66
 67    def get_paginator[T: User | Group](self, type: type[T]) -> Paginator:
 68        return Paginator(self.get_object_qs(type), self.sync_page_size)
 69
 70    def get_object_sync_time_limit_ms[T: User | Group](self, type: type[T]) -> int:
 71        num_pages: int = self.get_paginator(type).num_pages
 72        page_timeout_ms = timedelta_from_string(self.sync_page_timeout).total_seconds() * 1000
 73        return int(num_pages * page_timeout_ms * 1.5)
 74
 75    def get_sync_time_limit_ms(self) -> int:
 76        return int(
 77            (self.get_object_sync_time_limit_ms(User) + self.get_object_sync_time_limit_ms(Group))
 78            * 1.5
 79        )
 80
 81    @property
 82    def sync_lock(self) -> pglock.advisory:
 83        """Postgres lock for syncing to prevent multiple parallel syncs happening"""
 84        return pglock.advisory(
 85            lock_id=f"goauthentik.io/{connection.schema_name}/providers/outgoing-sync/{str(self.pk)}",
 86            timeout=0,
 87            side_effect=pglock.Return,
 88        )
 89
 90    @property
 91    def sync_actor(self) -> Actor:
 92        raise NotImplementedError
 93
 94    def sync_dispatch(self) -> None:
 95        for schedule in self.schedules.all():
 96            schedule.send()
 97
 98    @property
 99    def schedule_specs(self) -> list[ScheduleSpec]:
100        return [
101            ScheduleSpec(
102                actor=self.sync_actor,
103                uid=self.name,
104                args=(self.pk,),
105                options={
106                    "time_limit": self.get_sync_time_limit_ms(),
107                },
108                send_on_save=True,
109                crontab=f"{fqdn_rand(self.pk)} */4 * * *",
110            ),
111        ]
class OutgoingSyncDeleteAction(django.db.models.enums.TextChoices):
19class OutgoingSyncDeleteAction(TextChoices):
20    """Action taken when a user/group is deleted in authentik. Suspend is not available for groups,
21    and will be treated as `do_nothing`"""
22
23    DO_NOTHING = "do_nothing"
24    DELETE = "delete"
25    SUSPEND = "suspend"

Action taken when a user/group is deleted in authentik. Suspend is not available for groups, and will be treated as do_nothing

class OutgoingSyncProvider(authentik.tasks.schedules.models.ScheduledModel, django.db.models.base.Model):
 28class OutgoingSyncProvider(ScheduledModel, Model):
 29    """Base abstract models for providers implementing outgoing sync"""
 30
 31    sync_page_size = models.PositiveIntegerField(
 32        help_text=_("Controls the number of objects synced in a single task"),
 33        default=100,
 34        validators=[MinValueValidator(1)],
 35    )
 36    sync_page_timeout = models.TextField(
 37        help_text=_("Timeout for synchronization of a single page"),
 38        default="minutes=30",
 39        validators=[timedelta_string_validator],
 40    )
 41
 42    dry_run = models.BooleanField(
 43        default=False,
 44        help_text=_(
 45            "When enabled, provider will not modify or create objects in the remote system."
 46        ),
 47    )
 48
 49    class Meta:
 50        abstract = True
 51
 52    def client_for_model[T: User | Group](
 53        self, model: type[T]
 54    ) -> BaseOutgoingSyncClient[T, Any, Any, Self]:
 55        raise NotImplementedError
 56
 57    def get_object_qs[T: User | Group](self, type: type[T], **kwargs) -> QuerySet[T]:
 58        raise NotImplementedError
 59
 60    @classmethod
 61    def get_object_mappings(cls, obj: User | Group) -> list[tuple[str, str]]:
 62        """
 63        Get a list of mapping between User/Group and ProviderUser/Group:
 64        [("provider_pk", "obj_pk")]
 65        """
 66        raise NotImplementedError
 67
 68    def get_paginator[T: User | Group](self, type: type[T]) -> Paginator:
 69        return Paginator(self.get_object_qs(type), self.sync_page_size)
 70
 71    def get_object_sync_time_limit_ms[T: User | Group](self, type: type[T]) -> int:
 72        num_pages: int = self.get_paginator(type).num_pages
 73        page_timeout_ms = timedelta_from_string(self.sync_page_timeout).total_seconds() * 1000
 74        return int(num_pages * page_timeout_ms * 1.5)
 75
 76    def get_sync_time_limit_ms(self) -> int:
 77        return int(
 78            (self.get_object_sync_time_limit_ms(User) + self.get_object_sync_time_limit_ms(Group))
 79            * 1.5
 80        )
 81
 82    @property
 83    def sync_lock(self) -> pglock.advisory:
 84        """Postgres lock for syncing to prevent multiple parallel syncs happening"""
 85        return pglock.advisory(
 86            lock_id=f"goauthentik.io/{connection.schema_name}/providers/outgoing-sync/{str(self.pk)}",
 87            timeout=0,
 88            side_effect=pglock.Return,
 89        )
 90
 91    @property
 92    def sync_actor(self) -> Actor:
 93        raise NotImplementedError
 94
 95    def sync_dispatch(self) -> None:
 96        for schedule in self.schedules.all():
 97            schedule.send()
 98
 99    @property
100    def schedule_specs(self) -> list[ScheduleSpec]:
101        return [
102            ScheduleSpec(
103                actor=self.sync_actor,
104                uid=self.name,
105                args=(self.pk,),
106                options={
107                    "time_limit": self.get_sync_time_limit_ms(),
108                },
109                send_on_save=True,
110                crontab=f"{fqdn_rand(self.pk)} */4 * * *",
111            ),
112        ]

Base abstract models for providers implementing outgoing sync

def sync_page_size(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def sync_page_timeout(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def dry_run(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def client_for_model( self, model: type[T]) -> authentik.lib.sync.outgoing.base.BaseOutgoingSyncClient[T, typing.Any, typing.Any, typing.Self]:
52    def client_for_model[T: User | Group](
53        self, model: type[T]
54    ) -> BaseOutgoingSyncClient[T, Any, Any, Self]:
55        raise NotImplementedError
def get_object_qs(self, type: type[T], **kwargs) -> django.db.models.query.QuerySet:
57    def get_object_qs[T: User | Group](self, type: type[T], **kwargs) -> QuerySet[T]:
58        raise NotImplementedError
@classmethod
def get_object_mappings( cls, obj: authentik.core.models.User | authentik.core.models.Group) -> list[tuple[str, str]]:
60    @classmethod
61    def get_object_mappings(cls, obj: User | Group) -> list[tuple[str, str]]:
62        """
63        Get a list of mapping between User/Group and ProviderUser/Group:
64        [("provider_pk", "obj_pk")]
65        """
66        raise NotImplementedError

Get a list of mapping between User/Group and ProviderUser/Group: [("provider_pk", "obj_pk")]

def get_paginator(self, type: type[T]) -> django.core.paginator.Paginator:
68    def get_paginator[T: User | Group](self, type: type[T]) -> Paginator:
69        return Paginator(self.get_object_qs(type), self.sync_page_size)
def get_object_sync_time_limit_ms(self, type: type[T]) -> int:
71    def get_object_sync_time_limit_ms[T: User | Group](self, type: type[T]) -> int:
72        num_pages: int = self.get_paginator(type).num_pages
73        page_timeout_ms = timedelta_from_string(self.sync_page_timeout).total_seconds() * 1000
74        return int(num_pages * page_timeout_ms * 1.5)
def get_sync_time_limit_ms(self) -> int:
76    def get_sync_time_limit_ms(self) -> int:
77        return int(
78            (self.get_object_sync_time_limit_ms(User) + self.get_object_sync_time_limit_ms(Group))
79            * 1.5
80        )
sync_lock: pglock.core.advisory
82    @property
83    def sync_lock(self) -> pglock.advisory:
84        """Postgres lock for syncing to prevent multiple parallel syncs happening"""
85        return pglock.advisory(
86            lock_id=f"goauthentik.io/{connection.schema_name}/providers/outgoing-sync/{str(self.pk)}",
87            timeout=0,
88            side_effect=pglock.Return,
89        )

Postgres lock for syncing to prevent multiple parallel syncs happening

sync_actor: dramatiq.actor.Actor
91    @property
92    def sync_actor(self) -> Actor:
93        raise NotImplementedError
def sync_dispatch(self) -> None:
95    def sync_dispatch(self) -> None:
96        for schedule in self.schedules.all():
97            schedule.send()
schedule_specs: list[authentik.tasks.schedules.common.ScheduleSpec]
 99    @property
100    def schedule_specs(self) -> list[ScheduleSpec]:
101        return [
102            ScheduleSpec(
103                actor=self.sync_actor,
104                uid=self.name,
105                args=(self.pk,),
106                options={
107                    "time_limit": self.get_sync_time_limit_ms(),
108                },
109                send_on_save=True,
110                crontab=f"{fqdn_rand(self.pk)} */4 * * *",
111            ),
112        ]
schedules

Accessor to the related objects manager on the one-to-many relation created by GenericRelation.

In the example::

class Post(Model):
    comments = GenericRelation(Comment)

post.comments is a ReverseGenericManyToOneDescriptor instance.

tasks

Accessor to the related objects manager on the one-to-many relation created by GenericRelation.

In the example::

class Post(Model):
    comments = GenericRelation(Comment)

post.comments is a ReverseGenericManyToOneDescriptor instance.

class OutgoingSyncProvider.Meta:
49    class Meta:
50        abstract = True
abstract = False