authentik.lib.sync.outgoing.api

  1from django.db.models import Model
  2from dramatiq.actor import Actor
  3from dramatiq.results.errors import ResultFailure
  4from drf_spectacular.utils import extend_schema
  5from rest_framework.decorators import action
  6from rest_framework.fields import BooleanField, CharField, ChoiceField
  7from rest_framework.request import Request
  8from rest_framework.response import Response
  9
 10from authentik.api.validation import validate
 11from authentik.core.api.utils import ModelSerializer, PassiveSerializer
 12from authentik.core.models import Group, User
 13from authentik.events.logs import LogEventSerializer
 14from authentik.lib.sync.api import SyncStatusSerializer
 15from authentik.lib.sync.outgoing.models import OutgoingSyncProvider
 16from authentik.lib.utils.reflection import class_to_path, path_to_class
 17from authentik.rbac.filters import ObjectFilter
 18from authentik.tasks.models import Task, TaskStatus
 19
 20
 21class SyncObjectSerializer(PassiveSerializer):
 22    """Sync object serializer"""
 23
 24    sync_object_model = ChoiceField(
 25        choices=(
 26            (class_to_path(User), "user"),
 27            (class_to_path(Group), "group"),
 28        )
 29    )
 30    sync_object_id = CharField()
 31    override_dry_run = BooleanField(default=False)
 32
 33
 34class SyncObjectResultSerializer(PassiveSerializer):
 35    """Result of a single object sync"""
 36
 37    messages = LogEventSerializer(many=True, read_only=True)
 38
 39
 40class OutgoingSyncProviderStatusMixin:
 41    """Common API Endpoints for Outgoing sync providers"""
 42
 43    sync_task: Actor
 44    sync_objects_task: Actor
 45
 46    @extend_schema(responses={200: SyncStatusSerializer()})
 47    @action(
 48        methods=["GET"],
 49        detail=True,
 50        pagination_class=None,
 51        url_path="sync/status",
 52        filter_backends=[ObjectFilter],
 53    )
 54    def sync_status(self, request: Request, pk: int) -> Response:
 55        """Get provider's sync status"""
 56        provider: OutgoingSyncProvider = self.get_object()
 57
 58        status = {}
 59
 60        with provider.sync_lock as lock_acquired:
 61            # If we could not acquire the lock, it means a task is using it, and thus is running
 62            status["is_running"] = not lock_acquired
 63
 64        sync_schedule = None
 65        for schedule in provider.schedules.all():
 66            if schedule.actor_name == self.sync_task.actor_name:
 67                sync_schedule = schedule
 68
 69        if not sync_schedule:
 70            return Response(SyncStatusSerializer(status).data)
 71
 72        last_task: Task = (
 73            sync_schedule.tasks.filter(state__in=(TaskStatus.DONE, TaskStatus.REJECTED))
 74            .order_by("-mtime")
 75            .first()
 76        )
 77        last_successful_task: Task = (
 78            sync_schedule.tasks.filter(aggregated_status__in=(TaskStatus.DONE, TaskStatus.INFO))
 79            .order_by("-mtime")
 80            .first()
 81        )
 82
 83        if last_task:
 84            status["last_sync_status"] = last_task.aggregated_status
 85        if last_successful_task:
 86            status["last_successful_sync"] = last_successful_task.mtime
 87
 88        return Response(SyncStatusSerializer(status).data)
 89
 90    @extend_schema(
 91        request=SyncObjectSerializer,
 92        responses={200: SyncObjectResultSerializer()},
 93    )
 94    @action(
 95        methods=["POST"],
 96        detail=True,
 97        pagination_class=None,
 98        url_path="sync/object",
 99        filter_backends=[ObjectFilter],
100    )
101    @validate(SyncObjectSerializer)
102    def sync_object(self, request: Request, body: SyncObjectSerializer, pk: int) -> Response:
103        """Sync/Re-sync a single user/group object"""
104        provider = self.get_object()
105        object_type = body.validated_data["sync_object_model"]
106        _object_type: type[Model] = path_to_class(object_type)
107        pk = body.validated_data["sync_object_id"]
108        msg = self.sync_objects_task.send_with_options(
109            kwargs={
110                "object_type": object_type,
111                "page": 1,
112                "provider_pk": provider.pk,
113                "override_dry_run": body.validated_data["override_dry_run"],
114                "pk": pk,
115            },
116            retries=0,
117            rel_obj=provider,
118            uid=f"{provider.name}:{_object_type._meta.model_name}:{pk}:manual",
119        )
120        try:
121            msg.get_result(block=True)
122        except ResultFailure:
123            pass
124        task: Task = msg.options["task"]
125        task.refresh_from_db()
126        return Response(SyncObjectResultSerializer(instance={"messages": task._messages}).data)
127
128
129class OutgoingSyncConnectionCreateMixin:
130    """Mixin for connection objects that fetches remote data upon creation"""
131
132    def perform_create(self, serializer: ModelSerializer):
133        super().perform_create(serializer)
134        try:
135            instance = serializer.instance
136            client = instance.provider.client_for_model(instance.__class__)
137            client.update_single_attribute(instance)
138            instance.save()
139        except NotImplementedError:
140            pass
class SyncObjectSerializer(authentik.core.api.utils.PassiveSerializer):
22class SyncObjectSerializer(PassiveSerializer):
23    """Sync object serializer"""
24
25    sync_object_model = ChoiceField(
26        choices=(
27            (class_to_path(User), "user"),
28            (class_to_path(Group), "group"),
29        )
30    )
31    sync_object_id = CharField()
32    override_dry_run = BooleanField(default=False)

Sync object serializer

sync_object_model
sync_object_id
override_dry_run
class SyncObjectResultSerializer(authentik.core.api.utils.PassiveSerializer):
35class SyncObjectResultSerializer(PassiveSerializer):
36    """Result of a single object sync"""
37
38    messages = LogEventSerializer(many=True, read_only=True)

Result of a single object sync

messages
class OutgoingSyncProviderStatusMixin:
 41class OutgoingSyncProviderStatusMixin:
 42    """Common API Endpoints for Outgoing sync providers"""
 43
 44    sync_task: Actor
 45    sync_objects_task: Actor
 46
 47    @extend_schema(responses={200: SyncStatusSerializer()})
 48    @action(
 49        methods=["GET"],
 50        detail=True,
 51        pagination_class=None,
 52        url_path="sync/status",
 53        filter_backends=[ObjectFilter],
 54    )
 55    def sync_status(self, request: Request, pk: int) -> Response:
 56        """Get provider's sync status"""
 57        provider: OutgoingSyncProvider = self.get_object()
 58
 59        status = {}
 60
 61        with provider.sync_lock as lock_acquired:
 62            # If we could not acquire the lock, it means a task is using it, and thus is running
 63            status["is_running"] = not lock_acquired
 64
 65        sync_schedule = None
 66        for schedule in provider.schedules.all():
 67            if schedule.actor_name == self.sync_task.actor_name:
 68                sync_schedule = schedule
 69
 70        if not sync_schedule:
 71            return Response(SyncStatusSerializer(status).data)
 72
 73        last_task: Task = (
 74            sync_schedule.tasks.filter(state__in=(TaskStatus.DONE, TaskStatus.REJECTED))
 75            .order_by("-mtime")
 76            .first()
 77        )
 78        last_successful_task: Task = (
 79            sync_schedule.tasks.filter(aggregated_status__in=(TaskStatus.DONE, TaskStatus.INFO))
 80            .order_by("-mtime")
 81            .first()
 82        )
 83
 84        if last_task:
 85            status["last_sync_status"] = last_task.aggregated_status
 86        if last_successful_task:
 87            status["last_successful_sync"] = last_successful_task.mtime
 88
 89        return Response(SyncStatusSerializer(status).data)
 90
 91    @extend_schema(
 92        request=SyncObjectSerializer,
 93        responses={200: SyncObjectResultSerializer()},
 94    )
 95    @action(
 96        methods=["POST"],
 97        detail=True,
 98        pagination_class=None,
 99        url_path="sync/object",
100        filter_backends=[ObjectFilter],
101    )
102    @validate(SyncObjectSerializer)
103    def sync_object(self, request: Request, body: SyncObjectSerializer, pk: int) -> Response:
104        """Sync/Re-sync a single user/group object"""
105        provider = self.get_object()
106        object_type = body.validated_data["sync_object_model"]
107        _object_type: type[Model] = path_to_class(object_type)
108        pk = body.validated_data["sync_object_id"]
109        msg = self.sync_objects_task.send_with_options(
110            kwargs={
111                "object_type": object_type,
112                "page": 1,
113                "provider_pk": provider.pk,
114                "override_dry_run": body.validated_data["override_dry_run"],
115                "pk": pk,
116            },
117            retries=0,
118            rel_obj=provider,
119            uid=f"{provider.name}:{_object_type._meta.model_name}:{pk}:manual",
120        )
121        try:
122            msg.get_result(block=True)
123        except ResultFailure:
124            pass
125        task: Task = msg.options["task"]
126        task.refresh_from_db()
127        return Response(SyncObjectResultSerializer(instance={"messages": task._messages}).data)

Common API Endpoints for Outgoing sync providers

sync_task: dramatiq.actor.Actor
sync_objects_task: dramatiq.actor.Actor
@extend_schema(responses={200: SyncStatusSerializer()})
@action(methods=['GET'], detail=True, pagination_class=None, url_path='sync/status', filter_backends=[ObjectFilter])
def sync_status( self, request: rest_framework.request.Request, pk: int) -> rest_framework.response.Response:
47    @extend_schema(responses={200: SyncStatusSerializer()})
48    @action(
49        methods=["GET"],
50        detail=True,
51        pagination_class=None,
52        url_path="sync/status",
53        filter_backends=[ObjectFilter],
54    )
55    def sync_status(self, request: Request, pk: int) -> Response:
56        """Get provider's sync status"""
57        provider: OutgoingSyncProvider = self.get_object()
58
59        status = {}
60
61        with provider.sync_lock as lock_acquired:
62            # If we could not acquire the lock, it means a task is using it, and thus is running
63            status["is_running"] = not lock_acquired
64
65        sync_schedule = None
66        for schedule in provider.schedules.all():
67            if schedule.actor_name == self.sync_task.actor_name:
68                sync_schedule = schedule
69
70        if not sync_schedule:
71            return Response(SyncStatusSerializer(status).data)
72
73        last_task: Task = (
74            sync_schedule.tasks.filter(state__in=(TaskStatus.DONE, TaskStatus.REJECTED))
75            .order_by("-mtime")
76            .first()
77        )
78        last_successful_task: Task = (
79            sync_schedule.tasks.filter(aggregated_status__in=(TaskStatus.DONE, TaskStatus.INFO))
80            .order_by("-mtime")
81            .first()
82        )
83
84        if last_task:
85            status["last_sync_status"] = last_task.aggregated_status
86        if last_successful_task:
87            status["last_successful_sync"] = last_successful_task.mtime
88
89        return Response(SyncStatusSerializer(status).data)

Get provider's sync status

@extend_schema(request=SyncObjectSerializer, responses={200: SyncObjectResultSerializer()})
@action(methods=['POST'], detail=True, pagination_class=None, url_path='sync/object', filter_backends=[ObjectFilter])
@validate(SyncObjectSerializer)
def sync_object( self, request: rest_framework.request.Request, body: SyncObjectSerializer, pk: int) -> rest_framework.response.Response:
 91    @extend_schema(
 92        request=SyncObjectSerializer,
 93        responses={200: SyncObjectResultSerializer()},
 94    )
 95    @action(
 96        methods=["POST"],
 97        detail=True,
 98        pagination_class=None,
 99        url_path="sync/object",
100        filter_backends=[ObjectFilter],
101    )
102    @validate(SyncObjectSerializer)
103    def sync_object(self, request: Request, body: SyncObjectSerializer, pk: int) -> Response:
104        """Sync/Re-sync a single user/group object"""
105        provider = self.get_object()
106        object_type = body.validated_data["sync_object_model"]
107        _object_type: type[Model] = path_to_class(object_type)
108        pk = body.validated_data["sync_object_id"]
109        msg = self.sync_objects_task.send_with_options(
110            kwargs={
111                "object_type": object_type,
112                "page": 1,
113                "provider_pk": provider.pk,
114                "override_dry_run": body.validated_data["override_dry_run"],
115                "pk": pk,
116            },
117            retries=0,
118            rel_obj=provider,
119            uid=f"{provider.name}:{_object_type._meta.model_name}:{pk}:manual",
120        )
121        try:
122            msg.get_result(block=True)
123        except ResultFailure:
124            pass
125        task: Task = msg.options["task"]
126        task.refresh_from_db()
127        return Response(SyncObjectResultSerializer(instance={"messages": task._messages}).data)

Sync/Re-sync a single user/group object

class OutgoingSyncConnectionCreateMixin:
130class OutgoingSyncConnectionCreateMixin:
131    """Mixin for connection objects that fetches remote data upon creation"""
132
133    def perform_create(self, serializer: ModelSerializer):
134        super().perform_create(serializer)
135        try:
136            instance = serializer.instance
137            client = instance.provider.client_for_model(instance.__class__)
138            client.update_single_attribute(instance)
139            instance.save()
140        except NotImplementedError:
141            pass

Mixin for connection objects that fetches remote data upon creation

def perform_create(self, serializer: authentik.core.api.utils.ModelSerializer):
133    def perform_create(self, serializer: ModelSerializer):
134        super().perform_create(serializer)
135        try:
136            instance = serializer.instance
137            client = instance.provider.client_for_model(instance.__class__)
138            client.update_single_attribute(instance)
139            instance.save()
140        except NotImplementedError:
141            pass