authentik.sources.plex.tasks
Plex tasks
1"""Plex tasks""" 2 3from django.utils.translation import gettext_lazy as _ 4from dramatiq.actor import actor 5from requests import RequestException 6 7from authentik.events.models import Event, EventAction 8from authentik.lib.utils.errors import exception_to_string 9from authentik.sources.plex.models import PlexSource 10from authentik.sources.plex.plex import PlexAuth 11from authentik.tasks.middleware import CurrentTask 12 13 14@actor(description=_("Check the validity of a Plex source.")) 15def check_plex_token(source_pk: str): 16 """Check the validity of a Plex source.""" 17 self = CurrentTask.get_task() 18 sources = PlexSource.objects.filter(pk=source_pk) 19 if not sources.exists(): 20 return 21 source: PlexSource = sources.first() 22 auth = PlexAuth(source, source.plex_token) 23 try: 24 auth.get_user_info() 25 self.info("Plex token is valid.") 26 except RequestException as exc: 27 error = exception_to_string(exc) 28 if len(source.plex_token) > 0: 29 error = error.replace(source.plex_token, "$PLEX_TOKEN") 30 self.error("Plex token is invalid/an error occurred") 31 self.error(error) 32 Event.new( 33 EventAction.CONFIGURATION_ERROR, 34 message=f"Plex token invalid, please re-authenticate source.\n{error}", 35 source=source, 36 ).save()
check_plex_token =
Actor(<function check_plex_token>, queue_name='default', actor_name='check_plex_token')
Check the validity of a Plex source.