authentik.sources.plex.tasks

Plex tasks

 1"""Plex tasks"""
 2
 3from django.utils.translation import gettext_lazy as _
 4from dramatiq.actor import actor
 5from requests import RequestException
 6
 7from authentik.events.models import Event, EventAction
 8from authentik.lib.utils.errors import exception_to_string
 9from authentik.sources.plex.models import PlexSource
10from authentik.sources.plex.plex import PlexAuth
11from authentik.tasks.middleware import CurrentTask
12
13
14@actor(description=_("Check the validity of a Plex source."))
15def check_plex_token(source_pk: str):
16    """Check the validity of a Plex source."""
17    self = CurrentTask.get_task()
18    sources = PlexSource.objects.filter(pk=source_pk)
19    if not sources.exists():
20        return
21    source: PlexSource = sources.first()
22    auth = PlexAuth(source, source.plex_token)
23    try:
24        auth.get_user_info()
25        self.info("Plex token is valid.")
26    except RequestException as exc:
27        error = exception_to_string(exc)
28        if len(source.plex_token) > 0:
29            error = error.replace(source.plex_token, "$PLEX_TOKEN")
30        self.error("Plex token is invalid/an error occurred")
31        self.error(error)
32        Event.new(
33            EventAction.CONFIGURATION_ERROR,
34            message=f"Plex token invalid, please re-authenticate source.\n{error}",
35            source=source,
36        ).save()
check_plex_token = Actor(<function check_plex_token>, queue_name='default', actor_name='check_plex_token')

Check the validity of a Plex source.