authentik.lib.sync.outgoing.models

  1import math
  2from typing import Any, Self
  3
  4import pglock
  5from django.core.paginator import Paginator
  6from django.core.validators import MinValueValidator
  7from django.db import connection, models
  8from django.db.models import Model, QuerySet, TextChoices
  9from django.utils.translation import gettext_lazy as _
 10from dramatiq.actor import Actor
 11
 12from authentik.core.models import Group, User
 13from authentik.lib.sync.outgoing.base import BaseOutgoingSyncClient
 14from authentik.lib.utils.time import fqdn_rand, timedelta_from_string, timedelta_string_validator
 15from authentik.tasks.schedules.common import ScheduleSpec
 16from authentik.tasks.schedules.models import ScheduledModel
 17
 18
 19class OutgoingSyncDeleteAction(TextChoices):
 20    """Action taken when a user/group is deleted in authentik. Suspend is not available for groups,
 21    and will be treated as `do_nothing`"""
 22
 23    DO_NOTHING = "do_nothing"
 24    DELETE = "delete"
 25    SUSPEND = "suspend"
 26
 27
 28class OutgoingSyncProvider(ScheduledModel, Model):
 29    """Base abstract models for providers implementing outgoing sync"""
 30
 31    sync_page_size = models.PositiveIntegerField(
 32        help_text=_("Controls the number of objects synced in a single task"),
 33        default=100,
 34        validators=[MinValueValidator(1)],
 35    )
 36    sync_page_timeout = models.TextField(
 37        help_text=_("Timeout for synchronization of a single page"),
 38        default="minutes=30",
 39        validators=[timedelta_string_validator],
 40    )
 41
 42    dry_run = models.BooleanField(
 43        default=False,
 44        help_text=_(
 45            "When enabled, provider will not modify or create objects in the remote system."
 46        ),
 47    )
 48
 49    class Meta:
 50        abstract = True
 51
 52    def client_for_model[T: User | Group](
 53        self, model: type[T]
 54    ) -> BaseOutgoingSyncClient[T, Any, Any, Self]:
 55        raise NotImplementedError
 56
 57    def get_object_qs[T: User | Group](self, type: type[T], **kwargs) -> QuerySet[T]:
 58        raise NotImplementedError
 59
 60    @classmethod
 61    def get_object_mappings(cls, obj: User | Group) -> list[tuple[str, str]]:
 62        """
 63        Get a list of mapping between User/Group and ProviderUser/Group:
 64        [("provider_pk", "obj_pk")]
 65        """
 66        raise NotImplementedError
 67
 68    def get_paginator[T: User | Group](self, type: type[T]) -> Paginator:
 69        return Paginator(self.get_object_qs(type), self.sync_page_size)
 70
 71    def get_object_sync_time_limit_ms[T: User | Group](self, type: type[T]) -> int:
 72        # Use a simple COUNT(*) on the model instead of materializing get_object_qs(),
 73        # which for some providers (e.g. SCIM) runs PolicyEngine per-user and is
 74        # extremely expensive. The time limit is an upper-bound estimate, so using
 75        # the total count (without policy filtering) is a safe overestimate.
 76        total_count = type.objects.count()
 77        num_pages = math.ceil(total_count / self.sync_page_size) if total_count > 0 else 1
 78        page_timeout_ms = timedelta_from_string(self.sync_page_timeout).total_seconds() * 1000
 79        return int(num_pages * page_timeout_ms * 1.5)
 80
 81    def get_sync_time_limit_ms(self) -> int:
 82        return int(
 83            (self.get_object_sync_time_limit_ms(User) + self.get_object_sync_time_limit_ms(Group))
 84            * 1.5
 85        )
 86
 87    @property
 88    def sync_lock(self) -> pglock.advisory:
 89        """Postgres lock for syncing to prevent multiple parallel syncs happening"""
 90        return pglock.advisory(
 91            lock_id=f"goauthentik.io/{connection.schema_name}/providers/outgoing-sync/{str(self.pk)}",
 92            timeout=0,
 93            side_effect=pglock.Return,
 94        )
 95
 96    @property
 97    def sync_actor(self) -> Actor:
 98        raise NotImplementedError
 99
100    def sync_dispatch(self) -> None:
101        for schedule in self.schedules.all():
102            schedule.send()
103
104    @property
105    def schedule_specs(self) -> list[ScheduleSpec]:
106        return [
107            ScheduleSpec(
108                actor=self.sync_actor,
109                uid=self.name,
110                args=(self.pk,),
111                options={
112                    "time_limit": self.get_sync_time_limit_ms(),
113                },
114                send_on_save=True,
115                crontab=f"{fqdn_rand(self.pk)} */4 * * *",
116            ),
117        ]
class OutgoingSyncDeleteAction(django.db.models.enums.TextChoices):
20class OutgoingSyncDeleteAction(TextChoices):
21    """Action taken when a user/group is deleted in authentik. Suspend is not available for groups,
22    and will be treated as `do_nothing`"""
23
24    DO_NOTHING = "do_nothing"
25    DELETE = "delete"
26    SUSPEND = "suspend"

Action taken when a user/group is deleted in authentik. Suspend is not available for groups, and will be treated as do_nothing

class OutgoingSyncProvider(authentik.tasks.schedules.models.ScheduledModel, django.db.models.base.Model):
 29class OutgoingSyncProvider(ScheduledModel, Model):
 30    """Base abstract models for providers implementing outgoing sync"""
 31
 32    sync_page_size = models.PositiveIntegerField(
 33        help_text=_("Controls the number of objects synced in a single task"),
 34        default=100,
 35        validators=[MinValueValidator(1)],
 36    )
 37    sync_page_timeout = models.TextField(
 38        help_text=_("Timeout for synchronization of a single page"),
 39        default="minutes=30",
 40        validators=[timedelta_string_validator],
 41    )
 42
 43    dry_run = models.BooleanField(
 44        default=False,
 45        help_text=_(
 46            "When enabled, provider will not modify or create objects in the remote system."
 47        ),
 48    )
 49
 50    class Meta:
 51        abstract = True
 52
 53    def client_for_model[T: User | Group](
 54        self, model: type[T]
 55    ) -> BaseOutgoingSyncClient[T, Any, Any, Self]:
 56        raise NotImplementedError
 57
 58    def get_object_qs[T: User | Group](self, type: type[T], **kwargs) -> QuerySet[T]:
 59        raise NotImplementedError
 60
 61    @classmethod
 62    def get_object_mappings(cls, obj: User | Group) -> list[tuple[str, str]]:
 63        """
 64        Get a list of mapping between User/Group and ProviderUser/Group:
 65        [("provider_pk", "obj_pk")]
 66        """
 67        raise NotImplementedError
 68
 69    def get_paginator[T: User | Group](self, type: type[T]) -> Paginator:
 70        return Paginator(self.get_object_qs(type), self.sync_page_size)
 71
 72    def get_object_sync_time_limit_ms[T: User | Group](self, type: type[T]) -> int:
 73        # Use a simple COUNT(*) on the model instead of materializing get_object_qs(),
 74        # which for some providers (e.g. SCIM) runs PolicyEngine per-user and is
 75        # extremely expensive. The time limit is an upper-bound estimate, so using
 76        # the total count (without policy filtering) is a safe overestimate.
 77        total_count = type.objects.count()
 78        num_pages = math.ceil(total_count / self.sync_page_size) if total_count > 0 else 1
 79        page_timeout_ms = timedelta_from_string(self.sync_page_timeout).total_seconds() * 1000
 80        return int(num_pages * page_timeout_ms * 1.5)
 81
 82    def get_sync_time_limit_ms(self) -> int:
 83        return int(
 84            (self.get_object_sync_time_limit_ms(User) + self.get_object_sync_time_limit_ms(Group))
 85            * 1.5
 86        )
 87
 88    @property
 89    def sync_lock(self) -> pglock.advisory:
 90        """Postgres lock for syncing to prevent multiple parallel syncs happening"""
 91        return pglock.advisory(
 92            lock_id=f"goauthentik.io/{connection.schema_name}/providers/outgoing-sync/{str(self.pk)}",
 93            timeout=0,
 94            side_effect=pglock.Return,
 95        )
 96
 97    @property
 98    def sync_actor(self) -> Actor:
 99        raise NotImplementedError
100
101    def sync_dispatch(self) -> None:
102        for schedule in self.schedules.all():
103            schedule.send()
104
105    @property
106    def schedule_specs(self) -> list[ScheduleSpec]:
107        return [
108            ScheduleSpec(
109                actor=self.sync_actor,
110                uid=self.name,
111                args=(self.pk,),
112                options={
113                    "time_limit": self.get_sync_time_limit_ms(),
114                },
115                send_on_save=True,
116                crontab=f"{fqdn_rand(self.pk)} */4 * * *",
117            ),
118        ]

Base abstract models for providers implementing outgoing sync

def sync_page_size(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def sync_page_timeout(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def dry_run(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def client_for_model( self, model: type[T]) -> authentik.lib.sync.outgoing.base.BaseOutgoingSyncClient[T, typing.Any, typing.Any, typing.Self]:
53    def client_for_model[T: User | Group](
54        self, model: type[T]
55    ) -> BaseOutgoingSyncClient[T, Any, Any, Self]:
56        raise NotImplementedError
def get_object_qs(self, type: type[T], **kwargs) -> django.db.models.query.QuerySet:
58    def get_object_qs[T: User | Group](self, type: type[T], **kwargs) -> QuerySet[T]:
59        raise NotImplementedError
@classmethod
def get_object_mappings( cls, obj: authentik.core.models.User | authentik.core.models.Group) -> list[tuple[str, str]]:
61    @classmethod
62    def get_object_mappings(cls, obj: User | Group) -> list[tuple[str, str]]:
63        """
64        Get a list of mapping between User/Group and ProviderUser/Group:
65        [("provider_pk", "obj_pk")]
66        """
67        raise NotImplementedError

Get a list of mapping between User/Group and ProviderUser/Group: [("provider_pk", "obj_pk")]

def get_paginator(self, type: type[T]) -> django.core.paginator.Paginator:
69    def get_paginator[T: User | Group](self, type: type[T]) -> Paginator:
70        return Paginator(self.get_object_qs(type), self.sync_page_size)
def get_object_sync_time_limit_ms(self, type: type[T]) -> int:
72    def get_object_sync_time_limit_ms[T: User | Group](self, type: type[T]) -> int:
73        # Use a simple COUNT(*) on the model instead of materializing get_object_qs(),
74        # which for some providers (e.g. SCIM) runs PolicyEngine per-user and is
75        # extremely expensive. The time limit is an upper-bound estimate, so using
76        # the total count (without policy filtering) is a safe overestimate.
77        total_count = type.objects.count()
78        num_pages = math.ceil(total_count / self.sync_page_size) if total_count > 0 else 1
79        page_timeout_ms = timedelta_from_string(self.sync_page_timeout).total_seconds() * 1000
80        return int(num_pages * page_timeout_ms * 1.5)
def get_sync_time_limit_ms(self) -> int:
82    def get_sync_time_limit_ms(self) -> int:
83        return int(
84            (self.get_object_sync_time_limit_ms(User) + self.get_object_sync_time_limit_ms(Group))
85            * 1.5
86        )
sync_lock: pglock.core.advisory
88    @property
89    def sync_lock(self) -> pglock.advisory:
90        """Postgres lock for syncing to prevent multiple parallel syncs happening"""
91        return pglock.advisory(
92            lock_id=f"goauthentik.io/{connection.schema_name}/providers/outgoing-sync/{str(self.pk)}",
93            timeout=0,
94            side_effect=pglock.Return,
95        )

Postgres lock for syncing to prevent multiple parallel syncs happening

sync_actor: dramatiq.actor.Actor
97    @property
98    def sync_actor(self) -> Actor:
99        raise NotImplementedError
def sync_dispatch(self) -> None:
101    def sync_dispatch(self) -> None:
102        for schedule in self.schedules.all():
103            schedule.send()
schedule_specs: list[authentik.tasks.schedules.common.ScheduleSpec]
105    @property
106    def schedule_specs(self) -> list[ScheduleSpec]:
107        return [
108            ScheduleSpec(
109                actor=self.sync_actor,
110                uid=self.name,
111                args=(self.pk,),
112                options={
113                    "time_limit": self.get_sync_time_limit_ms(),
114                },
115                send_on_save=True,
116                crontab=f"{fqdn_rand(self.pk)} */4 * * *",
117            ),
118        ]
schedules

Accessor to the related objects manager on the one-to-many relation created by GenericRelation.

In the example::

class Post(Model):
    comments = GenericRelation(Comment)

post.comments is a ReverseGenericManyToOneDescriptor instance.

tasks

Accessor to the related objects manager on the one-to-many relation created by GenericRelation.

In the example::

class Post(Model):
    comments = GenericRelation(Comment)

post.comments is a ReverseGenericManyToOneDescriptor instance.

class OutgoingSyncProvider.Meta:
50    class Meta:
51        abstract = True
abstract = False