authentik.sources.plex.api.source
Plex Source Serializer
1"""Plex Source Serializer""" 2 3from django.shortcuts import get_object_or_404 4from drf_spectacular.types import OpenApiTypes 5from drf_spectacular.utils import OpenApiParameter, OpenApiResponse, extend_schema 6from rest_framework.decorators import action 7from rest_framework.exceptions import PermissionDenied 8from rest_framework.fields import CharField 9from rest_framework.permissions import AllowAny, IsAuthenticated 10from rest_framework.request import Request 11from rest_framework.response import Response 12from rest_framework.serializers import ValidationError 13from rest_framework.viewsets import ModelViewSet 14from structlog.stdlib import get_logger 15 16from authentik.api.validation import validate 17from authentik.core.api.sources import SourceSerializer 18from authentik.core.api.used_by import UsedByMixin 19from authentik.core.api.utils import PassiveSerializer 20from authentik.flows.challenge import RedirectChallenge 21from authentik.flows.views.executor import to_stage_response 22from authentik.rbac.decorators import permission_required 23from authentik.sources.plex.models import PlexSource, UserPlexSourceConnection 24from authentik.sources.plex.plex import PlexAuth, PlexSourceFlowManager 25 26LOGGER = get_logger() 27 28 29class PlexSourceSerializer(SourceSerializer): 30 """Plex Source Serializer""" 31 32 class Meta: 33 model = PlexSource 34 fields = SourceSerializer.Meta.fields + [ 35 "group_matching_mode", 36 "client_id", 37 "allowed_servers", 38 "allow_friends", 39 "plex_token", 40 ] 41 42 43class PlexTokenRedeemSerializer(PassiveSerializer): 44 """Serializer to redeem a plex token""" 45 46 plex_token = CharField() 47 48 49class PlexSourceViewSet(UsedByMixin, ModelViewSet): 50 """Plex source Viewset""" 51 52 queryset = PlexSource.objects.all() 53 serializer_class = PlexSourceSerializer 54 lookup_field = "slug" 55 filterset_fields = [ 56 "pbm_uuid", 57 "name", 58 "slug", 59 "enabled", 60 "authentication_flow", 61 "enrollment_flow", 62 "policy_engine_mode", 63 "user_matching_mode", 64 "group_matching_mode", 65 "client_id", 66 "allow_friends", 67 ] 68 search_fields = ["name", "slug"] 69 ordering = ["name"] 70 71 @permission_required(None) 72 @extend_schema( 73 request=PlexTokenRedeemSerializer(), 74 responses={ 75 200: RedirectChallenge(), 76 400: OpenApiResponse(description="Token not found"), 77 403: OpenApiResponse(description="Access denied"), 78 }, 79 parameters=[ 80 OpenApiParameter( 81 name="slug", 82 location=OpenApiParameter.QUERY, 83 type=OpenApiTypes.STR, 84 ) 85 ], 86 ) 87 @action( 88 methods=["POST"], 89 detail=False, 90 pagination_class=None, 91 filter_backends=[], 92 permission_classes=[AllowAny], 93 ) 94 @validate(PlexTokenRedeemSerializer) 95 def redeem_token(self, request: Request, body: PlexTokenRedeemSerializer) -> Response: 96 """Redeem a plex token, check it's access to resources against what's allowed 97 for the source, and redirect to an authentication/enrollment flow.""" 98 source: PlexSource = get_object_or_404( 99 PlexSource, slug=request.query_params.get("slug", "") 100 ) 101 plex_token = body.validated_data.get("plex_token", None) 102 if not plex_token: 103 raise ValidationError("No plex token given") 104 auth_api = PlexAuth(source, plex_token) 105 user_info, identifier = auth_api.get_user_info() 106 # Check friendship first, then check server overlay 107 friends_allowed = False 108 if source.allow_friends: 109 owner_api = PlexAuth(source, source.plex_token) 110 friends_allowed = owner_api.check_friends_overlap(identifier) 111 servers_allowed = auth_api.check_server_overlap() 112 if any([friends_allowed, servers_allowed]): 113 sfm = PlexSourceFlowManager( 114 source=source, 115 request=request, 116 identifier=str(identifier), 117 user_info={ 118 "info": user_info, 119 "auth_api": auth_api, 120 }, 121 policy_context={}, 122 ) 123 return to_stage_response(request, sfm.get_flow(plex_token=plex_token)) 124 LOGGER.warning( 125 "Denying plex auth because no server overlay and no friends and not owner", 126 user=user_info["username"], 127 ) 128 raise PermissionDenied("Access denied.") 129 130 @extend_schema( 131 request=PlexTokenRedeemSerializer(), 132 responses={ 133 204: OpenApiResponse(), 134 400: OpenApiResponse(description="Token not found"), 135 403: OpenApiResponse(description="Access denied"), 136 }, 137 parameters=[ 138 OpenApiParameter( 139 name="slug", 140 location=OpenApiParameter.QUERY, 141 type=OpenApiTypes.STR, 142 ) 143 ], 144 ) 145 @action( 146 methods=["POST"], 147 detail=False, 148 pagination_class=None, 149 filter_backends=[], 150 permission_classes=[IsAuthenticated], 151 ) 152 @validate(PlexTokenRedeemSerializer) 153 def redeem_token_authenticated( 154 self, request: Request, body: PlexTokenRedeemSerializer 155 ) -> Response: 156 """Redeem a plex token for an authenticated user, creating a connection""" 157 source: PlexSource = get_object_or_404( 158 PlexSource, slug=request.query_params.get("slug", "") 159 ) 160 plex_token = body.validated_data.get("plex_token", None) 161 if not plex_token: 162 raise ValidationError("No plex token given") 163 auth_api = PlexAuth(source, plex_token) 164 user_info, identifier = auth_api.get_user_info() 165 # Check friendship first, then check server overlay 166 friends_allowed = False 167 if source.allow_friends: 168 owner_api = PlexAuth(source, source.plex_token) 169 friends_allowed = owner_api.check_friends_overlap(identifier) 170 servers_allowed = auth_api.check_server_overlap() 171 if any([friends_allowed, servers_allowed]): 172 UserPlexSourceConnection.objects.create( 173 plex_token=plex_token, 174 user=request.user, 175 identifier=identifier, 176 source=source, 177 ) 178 return Response(status=204) 179 LOGGER.warning( 180 "Denying plex connection because no server overlay and no friends and not owner", 181 user=user_info["username"], 182 friends_allowed=friends_allowed, 183 servers_allowed=servers_allowed, 184 ) 185 raise PermissionDenied("Access denied.")
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
30class PlexSourceSerializer(SourceSerializer): 31 """Plex Source Serializer""" 32 33 class Meta: 34 model = PlexSource 35 fields = SourceSerializer.Meta.fields + [ 36 "group_matching_mode", 37 "client_id", 38 "allowed_servers", 39 "allow_friends", 40 "plex_token", 41 ]
Plex Source Serializer
Inherited Members
class
PlexSourceSerializer.Meta:
33 class Meta: 34 model = PlexSource 35 fields = SourceSerializer.Meta.fields + [ 36 "group_matching_mode", 37 "client_id", 38 "allowed_servers", 39 "allow_friends", 40 "plex_token", 41 ]
model =
<class 'authentik.sources.plex.models.PlexSource'>
fields =
['pk', 'name', 'slug', 'enabled', 'promoted', 'authentication_flow', 'enrollment_flow', 'user_property_mappings', 'group_property_mappings', 'component', 'verbose_name', 'verbose_name_plural', 'meta_model_name', 'policy_engine_mode', 'user_matching_mode', 'managed', 'user_path_template', 'icon', 'icon_url', 'icon_themed_urls', 'group_matching_mode', 'client_id', 'allowed_servers', 'allow_friends', 'plex_token']
44class PlexTokenRedeemSerializer(PassiveSerializer): 45 """Serializer to redeem a plex token""" 46 47 plex_token = CharField()
Serializer to redeem a plex token
Inherited Members
class
PlexSourceViewSet(authentik.core.api.used_by.UsedByMixin, rest_framework.viewsets.ModelViewSet):
50class PlexSourceViewSet(UsedByMixin, ModelViewSet): 51 """Plex source Viewset""" 52 53 queryset = PlexSource.objects.all() 54 serializer_class = PlexSourceSerializer 55 lookup_field = "slug" 56 filterset_fields = [ 57 "pbm_uuid", 58 "name", 59 "slug", 60 "enabled", 61 "authentication_flow", 62 "enrollment_flow", 63 "policy_engine_mode", 64 "user_matching_mode", 65 "group_matching_mode", 66 "client_id", 67 "allow_friends", 68 ] 69 search_fields = ["name", "slug"] 70 ordering = ["name"] 71 72 @permission_required(None) 73 @extend_schema( 74 request=PlexTokenRedeemSerializer(), 75 responses={ 76 200: RedirectChallenge(), 77 400: OpenApiResponse(description="Token not found"), 78 403: OpenApiResponse(description="Access denied"), 79 }, 80 parameters=[ 81 OpenApiParameter( 82 name="slug", 83 location=OpenApiParameter.QUERY, 84 type=OpenApiTypes.STR, 85 ) 86 ], 87 ) 88 @action( 89 methods=["POST"], 90 detail=False, 91 pagination_class=None, 92 filter_backends=[], 93 permission_classes=[AllowAny], 94 ) 95 @validate(PlexTokenRedeemSerializer) 96 def redeem_token(self, request: Request, body: PlexTokenRedeemSerializer) -> Response: 97 """Redeem a plex token, check it's access to resources against what's allowed 98 for the source, and redirect to an authentication/enrollment flow.""" 99 source: PlexSource = get_object_or_404( 100 PlexSource, slug=request.query_params.get("slug", "") 101 ) 102 plex_token = body.validated_data.get("plex_token", None) 103 if not plex_token: 104 raise ValidationError("No plex token given") 105 auth_api = PlexAuth(source, plex_token) 106 user_info, identifier = auth_api.get_user_info() 107 # Check friendship first, then check server overlay 108 friends_allowed = False 109 if source.allow_friends: 110 owner_api = PlexAuth(source, source.plex_token) 111 friends_allowed = owner_api.check_friends_overlap(identifier) 112 servers_allowed = auth_api.check_server_overlap() 113 if any([friends_allowed, servers_allowed]): 114 sfm = PlexSourceFlowManager( 115 source=source, 116 request=request, 117 identifier=str(identifier), 118 user_info={ 119 "info": user_info, 120 "auth_api": auth_api, 121 }, 122 policy_context={}, 123 ) 124 return to_stage_response(request, sfm.get_flow(plex_token=plex_token)) 125 LOGGER.warning( 126 "Denying plex auth because no server overlay and no friends and not owner", 127 user=user_info["username"], 128 ) 129 raise PermissionDenied("Access denied.") 130 131 @extend_schema( 132 request=PlexTokenRedeemSerializer(), 133 responses={ 134 204: OpenApiResponse(), 135 400: OpenApiResponse(description="Token not found"), 136 403: OpenApiResponse(description="Access denied"), 137 }, 138 parameters=[ 139 OpenApiParameter( 140 name="slug", 141 location=OpenApiParameter.QUERY, 142 type=OpenApiTypes.STR, 143 ) 144 ], 145 ) 146 @action( 147 methods=["POST"], 148 detail=False, 149 pagination_class=None, 150 filter_backends=[], 151 permission_classes=[IsAuthenticated], 152 ) 153 @validate(PlexTokenRedeemSerializer) 154 def redeem_token_authenticated( 155 self, request: Request, body: PlexTokenRedeemSerializer 156 ) -> Response: 157 """Redeem a plex token for an authenticated user, creating a connection""" 158 source: PlexSource = get_object_or_404( 159 PlexSource, slug=request.query_params.get("slug", "") 160 ) 161 plex_token = body.validated_data.get("plex_token", None) 162 if not plex_token: 163 raise ValidationError("No plex token given") 164 auth_api = PlexAuth(source, plex_token) 165 user_info, identifier = auth_api.get_user_info() 166 # Check friendship first, then check server overlay 167 friends_allowed = False 168 if source.allow_friends: 169 owner_api = PlexAuth(source, source.plex_token) 170 friends_allowed = owner_api.check_friends_overlap(identifier) 171 servers_allowed = auth_api.check_server_overlap() 172 if any([friends_allowed, servers_allowed]): 173 UserPlexSourceConnection.objects.create( 174 plex_token=plex_token, 175 user=request.user, 176 identifier=identifier, 177 source=source, 178 ) 179 return Response(status=204) 180 LOGGER.warning( 181 "Denying plex connection because no server overlay and no friends and not owner", 182 user=user_info["username"], 183 friends_allowed=friends_allowed, 184 servers_allowed=servers_allowed, 185 ) 186 raise PermissionDenied("Access denied.")
Plex source Viewset
serializer_class =
<class 'PlexSourceSerializer'>
filterset_fields =
['pbm_uuid', 'name', 'slug', 'enabled', 'authentication_flow', 'enrollment_flow', 'policy_engine_mode', 'user_matching_mode', 'group_matching_mode', 'client_id', 'allow_friends']
@extend_schema(request=PlexTokenRedeemSerializer(), responses={200: RedirectChallenge(), 400: OpenApiResponse(description='Token not found'), 403: OpenApiResponse(description='Access denied')}, parameters=[OpenApiParameter(name='slug', location=OpenApiParameter.QUERY, type=OpenApiTypes.STR)])
@action(methods=['POST'], detail=False, pagination_class=None, filter_backends=[], permission_classes=[AllowAny])
@validate(PlexTokenRedeemSerializer)
def
redeem_token( self, request: rest_framework.request.Request, body: PlexTokenRedeemSerializer) -> rest_framework.response.Response:
72 @permission_required(None) 73 @extend_schema( 74 request=PlexTokenRedeemSerializer(), 75 responses={ 76 200: RedirectChallenge(), 77 400: OpenApiResponse(description="Token not found"), 78 403: OpenApiResponse(description="Access denied"), 79 }, 80 parameters=[ 81 OpenApiParameter( 82 name="slug", 83 location=OpenApiParameter.QUERY, 84 type=OpenApiTypes.STR, 85 ) 86 ], 87 ) 88 @action( 89 methods=["POST"], 90 detail=False, 91 pagination_class=None, 92 filter_backends=[], 93 permission_classes=[AllowAny], 94 ) 95 @validate(PlexTokenRedeemSerializer) 96 def redeem_token(self, request: Request, body: PlexTokenRedeemSerializer) -> Response: 97 """Redeem a plex token, check it's access to resources against what's allowed 98 for the source, and redirect to an authentication/enrollment flow.""" 99 source: PlexSource = get_object_or_404( 100 PlexSource, slug=request.query_params.get("slug", "") 101 ) 102 plex_token = body.validated_data.get("plex_token", None) 103 if not plex_token: 104 raise ValidationError("No plex token given") 105 auth_api = PlexAuth(source, plex_token) 106 user_info, identifier = auth_api.get_user_info() 107 # Check friendship first, then check server overlay 108 friends_allowed = False 109 if source.allow_friends: 110 owner_api = PlexAuth(source, source.plex_token) 111 friends_allowed = owner_api.check_friends_overlap(identifier) 112 servers_allowed = auth_api.check_server_overlap() 113 if any([friends_allowed, servers_allowed]): 114 sfm = PlexSourceFlowManager( 115 source=source, 116 request=request, 117 identifier=str(identifier), 118 user_info={ 119 "info": user_info, 120 "auth_api": auth_api, 121 }, 122 policy_context={}, 123 ) 124 return to_stage_response(request, sfm.get_flow(plex_token=plex_token)) 125 LOGGER.warning( 126 "Denying plex auth because no server overlay and no friends and not owner", 127 user=user_info["username"], 128 ) 129 raise PermissionDenied("Access denied.")
Redeem a plex token, check it's access to resources against what's allowed for the source, and redirect to an authentication/enrollment flow.
@extend_schema(request=PlexTokenRedeemSerializer(), responses={204: OpenApiResponse(), 400: OpenApiResponse(description='Token not found'), 403: OpenApiResponse(description='Access denied')}, parameters=[OpenApiParameter(name='slug', location=OpenApiParameter.QUERY, type=OpenApiTypes.STR)])
@action(methods=['POST'], detail=False, pagination_class=None, filter_backends=[], permission_classes=[IsAuthenticated])
@validate(PlexTokenRedeemSerializer)
def
redeem_token_authenticated( self, request: rest_framework.request.Request, body: PlexTokenRedeemSerializer) -> rest_framework.response.Response:
131 @extend_schema( 132 request=PlexTokenRedeemSerializer(), 133 responses={ 134 204: OpenApiResponse(), 135 400: OpenApiResponse(description="Token not found"), 136 403: OpenApiResponse(description="Access denied"), 137 }, 138 parameters=[ 139 OpenApiParameter( 140 name="slug", 141 location=OpenApiParameter.QUERY, 142 type=OpenApiTypes.STR, 143 ) 144 ], 145 ) 146 @action( 147 methods=["POST"], 148 detail=False, 149 pagination_class=None, 150 filter_backends=[], 151 permission_classes=[IsAuthenticated], 152 ) 153 @validate(PlexTokenRedeemSerializer) 154 def redeem_token_authenticated( 155 self, request: Request, body: PlexTokenRedeemSerializer 156 ) -> Response: 157 """Redeem a plex token for an authenticated user, creating a connection""" 158 source: PlexSource = get_object_or_404( 159 PlexSource, slug=request.query_params.get("slug", "") 160 ) 161 plex_token = body.validated_data.get("plex_token", None) 162 if not plex_token: 163 raise ValidationError("No plex token given") 164 auth_api = PlexAuth(source, plex_token) 165 user_info, identifier = auth_api.get_user_info() 166 # Check friendship first, then check server overlay 167 friends_allowed = False 168 if source.allow_friends: 169 owner_api = PlexAuth(source, source.plex_token) 170 friends_allowed = owner_api.check_friends_overlap(identifier) 171 servers_allowed = auth_api.check_server_overlap() 172 if any([friends_allowed, servers_allowed]): 173 UserPlexSourceConnection.objects.create( 174 plex_token=plex_token, 175 user=request.user, 176 identifier=identifier, 177 source=source, 178 ) 179 return Response(status=204) 180 LOGGER.warning( 181 "Denying plex connection because no server overlay and no friends and not owner", 182 user=user_info["username"], 183 friends_allowed=friends_allowed, 184 servers_allowed=servers_allowed, 185 ) 186 raise PermissionDenied("Access denied.")
Redeem a plex token for an authenticated user, creating a connection