authentik.lib.sync.outgoing.api
1from django.db.models import Model 2from dramatiq.actor import Actor 3from dramatiq.results.errors import ResultFailure 4from drf_spectacular.utils import extend_schema 5from rest_framework.decorators import action 6from rest_framework.fields import BooleanField, CharField, ChoiceField 7from rest_framework.request import Request 8from rest_framework.response import Response 9 10from authentik.api.validation import validate 11from authentik.core.api.utils import ModelSerializer, PassiveSerializer 12from authentik.core.models import Group, User 13from authentik.events.logs import LogEventSerializer 14from authentik.lib.sync.api import SyncStatusSerializer 15from authentik.lib.sync.outgoing.models import OutgoingSyncProvider 16from authentik.lib.utils.reflection import class_to_path, path_to_class 17from authentik.rbac.filters import ObjectFilter 18from authentik.tasks.models import Task, TaskStatus 19 20 21class SyncObjectSerializer(PassiveSerializer): 22 """Sync object serializer""" 23 24 sync_object_model = ChoiceField( 25 choices=( 26 (class_to_path(User), "user"), 27 (class_to_path(Group), "group"), 28 ) 29 ) 30 sync_object_id = CharField() 31 override_dry_run = BooleanField(default=False) 32 33 34class SyncObjectResultSerializer(PassiveSerializer): 35 """Result of a single object sync""" 36 37 messages = LogEventSerializer(many=True, read_only=True) 38 39 40class OutgoingSyncProviderStatusMixin: 41 """Common API Endpoints for Outgoing sync providers""" 42 43 sync_task: Actor 44 sync_objects_task: Actor 45 46 @extend_schema(responses={200: SyncStatusSerializer()}) 47 @action( 48 methods=["GET"], 49 detail=True, 50 pagination_class=None, 51 url_path="sync/status", 52 filter_backends=[ObjectFilter], 53 ) 54 def sync_status(self, request: Request, pk: int) -> Response: 55 """Get provider's sync status""" 56 provider: OutgoingSyncProvider = self.get_object() 57 58 status = {} 59 60 with provider.sync_lock as lock_acquired: 61 # If we could not acquire the lock, it means a task is using it, and thus is running 62 status["is_running"] = not lock_acquired 63 64 sync_schedule = None 65 for schedule in provider.schedules.all(): 66 if schedule.actor_name == self.sync_task.actor_name: 67 sync_schedule = schedule 68 69 if not sync_schedule: 70 return Response(SyncStatusSerializer(status).data) 71 72 last_task: Task = ( 73 sync_schedule.tasks.filter(state__in=(TaskStatus.DONE, TaskStatus.REJECTED)) 74 .order_by("-mtime") 75 .first() 76 ) 77 last_successful_task: Task = ( 78 sync_schedule.tasks.filter(aggregated_status__in=(TaskStatus.DONE, TaskStatus.INFO)) 79 .order_by("-mtime") 80 .first() 81 ) 82 83 if last_task: 84 status["last_sync_status"] = last_task.aggregated_status 85 if last_successful_task: 86 status["last_successful_sync"] = last_successful_task.mtime 87 88 return Response(SyncStatusSerializer(status).data) 89 90 @extend_schema( 91 request=SyncObjectSerializer, 92 responses={200: SyncObjectResultSerializer()}, 93 ) 94 @action( 95 methods=["POST"], 96 detail=True, 97 pagination_class=None, 98 url_path="sync/object", 99 filter_backends=[ObjectFilter], 100 ) 101 @validate(SyncObjectSerializer) 102 def sync_object(self, request: Request, body: SyncObjectSerializer, pk: int) -> Response: 103 """Sync/Re-sync a single user/group object""" 104 provider = self.get_object() 105 object_type = body.validated_data["sync_object_model"] 106 _object_type: type[Model] = path_to_class(object_type) 107 pk = body.validated_data["sync_object_id"] 108 msg = self.sync_objects_task.send_with_options( 109 kwargs={ 110 "object_type": object_type, 111 "page": 1, 112 "provider_pk": provider.pk, 113 "override_dry_run": body.validated_data["override_dry_run"], 114 "pk": pk, 115 }, 116 retries=0, 117 rel_obj=provider, 118 uid=f"{provider.name}:{_object_type._meta.model_name}:{pk}:manual", 119 ) 120 try: 121 msg.get_result(block=True) 122 except ResultFailure: 123 pass 124 task: Task = msg.options["task"] 125 task.refresh_from_db() 126 return Response(SyncObjectResultSerializer(instance={"messages": task._messages}).data) 127 128 129class OutgoingSyncConnectionCreateMixin: 130 """Mixin for connection objects that fetches remote data upon creation""" 131 132 def perform_create(self, serializer: ModelSerializer): 133 super().perform_create(serializer) 134 try: 135 instance = serializer.instance 136 client = instance.provider.client_for_model(instance.__class__) 137 client.update_single_attribute(instance) 138 instance.save() 139 except NotImplementedError: 140 pass
22class SyncObjectSerializer(PassiveSerializer): 23 """Sync object serializer""" 24 25 sync_object_model = ChoiceField( 26 choices=( 27 (class_to_path(User), "user"), 28 (class_to_path(Group), "group"), 29 ) 30 ) 31 sync_object_id = CharField() 32 override_dry_run = BooleanField(default=False)
Sync object serializer
Inherited Members
35class SyncObjectResultSerializer(PassiveSerializer): 36 """Result of a single object sync""" 37 38 messages = LogEventSerializer(many=True, read_only=True)
Result of a single object sync
Inherited Members
class
OutgoingSyncProviderStatusMixin:
41class OutgoingSyncProviderStatusMixin: 42 """Common API Endpoints for Outgoing sync providers""" 43 44 sync_task: Actor 45 sync_objects_task: Actor 46 47 @extend_schema(responses={200: SyncStatusSerializer()}) 48 @action( 49 methods=["GET"], 50 detail=True, 51 pagination_class=None, 52 url_path="sync/status", 53 filter_backends=[ObjectFilter], 54 ) 55 def sync_status(self, request: Request, pk: int) -> Response: 56 """Get provider's sync status""" 57 provider: OutgoingSyncProvider = self.get_object() 58 59 status = {} 60 61 with provider.sync_lock as lock_acquired: 62 # If we could not acquire the lock, it means a task is using it, and thus is running 63 status["is_running"] = not lock_acquired 64 65 sync_schedule = None 66 for schedule in provider.schedules.all(): 67 if schedule.actor_name == self.sync_task.actor_name: 68 sync_schedule = schedule 69 70 if not sync_schedule: 71 return Response(SyncStatusSerializer(status).data) 72 73 last_task: Task = ( 74 sync_schedule.tasks.filter(state__in=(TaskStatus.DONE, TaskStatus.REJECTED)) 75 .order_by("-mtime") 76 .first() 77 ) 78 last_successful_task: Task = ( 79 sync_schedule.tasks.filter(aggregated_status__in=(TaskStatus.DONE, TaskStatus.INFO)) 80 .order_by("-mtime") 81 .first() 82 ) 83 84 if last_task: 85 status["last_sync_status"] = last_task.aggregated_status 86 if last_successful_task: 87 status["last_successful_sync"] = last_successful_task.mtime 88 89 return Response(SyncStatusSerializer(status).data) 90 91 @extend_schema( 92 request=SyncObjectSerializer, 93 responses={200: SyncObjectResultSerializer()}, 94 ) 95 @action( 96 methods=["POST"], 97 detail=True, 98 pagination_class=None, 99 url_path="sync/object", 100 filter_backends=[ObjectFilter], 101 ) 102 @validate(SyncObjectSerializer) 103 def sync_object(self, request: Request, body: SyncObjectSerializer, pk: int) -> Response: 104 """Sync/Re-sync a single user/group object""" 105 provider = self.get_object() 106 object_type = body.validated_data["sync_object_model"] 107 _object_type: type[Model] = path_to_class(object_type) 108 pk = body.validated_data["sync_object_id"] 109 msg = self.sync_objects_task.send_with_options( 110 kwargs={ 111 "object_type": object_type, 112 "page": 1, 113 "provider_pk": provider.pk, 114 "override_dry_run": body.validated_data["override_dry_run"], 115 "pk": pk, 116 }, 117 retries=0, 118 rel_obj=provider, 119 uid=f"{provider.name}:{_object_type._meta.model_name}:{pk}:manual", 120 ) 121 try: 122 msg.get_result(block=True) 123 except ResultFailure: 124 pass 125 task: Task = msg.options["task"] 126 task.refresh_from_db() 127 return Response(SyncObjectResultSerializer(instance={"messages": task._messages}).data)
Common API Endpoints for Outgoing sync providers
@extend_schema(responses={200: SyncStatusSerializer()})
@action(methods=['GET'], detail=True, pagination_class=None, url_path='sync/status', filter_backends=[ObjectFilter])
def
sync_status( self, request: rest_framework.request.Request, pk: int) -> rest_framework.response.Response:
47 @extend_schema(responses={200: SyncStatusSerializer()}) 48 @action( 49 methods=["GET"], 50 detail=True, 51 pagination_class=None, 52 url_path="sync/status", 53 filter_backends=[ObjectFilter], 54 ) 55 def sync_status(self, request: Request, pk: int) -> Response: 56 """Get provider's sync status""" 57 provider: OutgoingSyncProvider = self.get_object() 58 59 status = {} 60 61 with provider.sync_lock as lock_acquired: 62 # If we could not acquire the lock, it means a task is using it, and thus is running 63 status["is_running"] = not lock_acquired 64 65 sync_schedule = None 66 for schedule in provider.schedules.all(): 67 if schedule.actor_name == self.sync_task.actor_name: 68 sync_schedule = schedule 69 70 if not sync_schedule: 71 return Response(SyncStatusSerializer(status).data) 72 73 last_task: Task = ( 74 sync_schedule.tasks.filter(state__in=(TaskStatus.DONE, TaskStatus.REJECTED)) 75 .order_by("-mtime") 76 .first() 77 ) 78 last_successful_task: Task = ( 79 sync_schedule.tasks.filter(aggregated_status__in=(TaskStatus.DONE, TaskStatus.INFO)) 80 .order_by("-mtime") 81 .first() 82 ) 83 84 if last_task: 85 status["last_sync_status"] = last_task.aggregated_status 86 if last_successful_task: 87 status["last_successful_sync"] = last_successful_task.mtime 88 89 return Response(SyncStatusSerializer(status).data)
Get provider's sync status
@extend_schema(request=SyncObjectSerializer, responses={200: SyncObjectResultSerializer()})
@action(methods=['POST'], detail=True, pagination_class=None, url_path='sync/object', filter_backends=[ObjectFilter])
@validate(SyncObjectSerializer)
def
sync_object( self, request: rest_framework.request.Request, body: SyncObjectSerializer, pk: int) -> rest_framework.response.Response:
91 @extend_schema( 92 request=SyncObjectSerializer, 93 responses={200: SyncObjectResultSerializer()}, 94 ) 95 @action( 96 methods=["POST"], 97 detail=True, 98 pagination_class=None, 99 url_path="sync/object", 100 filter_backends=[ObjectFilter], 101 ) 102 @validate(SyncObjectSerializer) 103 def sync_object(self, request: Request, body: SyncObjectSerializer, pk: int) -> Response: 104 """Sync/Re-sync a single user/group object""" 105 provider = self.get_object() 106 object_type = body.validated_data["sync_object_model"] 107 _object_type: type[Model] = path_to_class(object_type) 108 pk = body.validated_data["sync_object_id"] 109 msg = self.sync_objects_task.send_with_options( 110 kwargs={ 111 "object_type": object_type, 112 "page": 1, 113 "provider_pk": provider.pk, 114 "override_dry_run": body.validated_data["override_dry_run"], 115 "pk": pk, 116 }, 117 retries=0, 118 rel_obj=provider, 119 uid=f"{provider.name}:{_object_type._meta.model_name}:{pk}:manual", 120 ) 121 try: 122 msg.get_result(block=True) 123 except ResultFailure: 124 pass 125 task: Task = msg.options["task"] 126 task.refresh_from_db() 127 return Response(SyncObjectResultSerializer(instance={"messages": task._messages}).data)
Sync/Re-sync a single user/group object
class
OutgoingSyncConnectionCreateMixin:
130class OutgoingSyncConnectionCreateMixin: 131 """Mixin for connection objects that fetches remote data upon creation""" 132 133 def perform_create(self, serializer: ModelSerializer): 134 super().perform_create(serializer) 135 try: 136 instance = serializer.instance 137 client = instance.provider.client_for_model(instance.__class__) 138 client.update_single_attribute(instance) 139 instance.save() 140 except NotImplementedError: 141 pass
Mixin for connection objects that fetches remote data upon creation
133 def perform_create(self, serializer: ModelSerializer): 134 super().perform_create(serializer) 135 try: 136 instance = serializer.instance 137 client = instance.provider.client_for_model(instance.__class__) 138 client.update_single_attribute(instance) 139 instance.save() 140 except NotImplementedError: 141 pass