authentik.sources.scim.views.v2.base
SCIM Utils
1"""SCIM Utils""" 2 3from typing import Any 4from uuid import UUID 5 6from django.core.paginator import Page, Paginator 7from django.db.models import Q, QuerySet 8from django.http import HttpRequest 9from rest_framework.parsers import JSONParser 10from rest_framework.permissions import IsAuthenticated 11from rest_framework.renderers import JSONRenderer 12from rest_framework.request import Request 13from rest_framework.response import Response 14from rest_framework.views import APIView 15from scim2_filter_parser.transpilers.django_q_object import get_query 16from structlog import BoundLogger 17from structlog.stdlib import get_logger 18 19from authentik.core.models import Group, User 20from authentik.core.sources.mapper import SourceMapper 21from authentik.lib.sync.mapper import PropertyMappingManager 22from authentik.sources.scim.models import SCIMSource 23from authentik.sources.scim.views.v2.auth import SCIMTokenAuth 24from authentik.sources.scim.views.v2.exceptions import SCIMNotFoundError 25 26SCIM_CONTENT_TYPE = "application/scim+json" 27 28 29class SCIMParser(JSONParser): 30 """SCIM clients use a custom content type""" 31 32 media_type = SCIM_CONTENT_TYPE 33 34 35class SCIMRenderer(JSONRenderer): 36 """SCIM clients also expect a custom content type""" 37 38 media_type = SCIM_CONTENT_TYPE 39 40 41class SCIMView(APIView): 42 """Base class for SCIM Views""" 43 44 source: SCIMSource 45 logger: BoundLogger 46 47 permission_classes = [IsAuthenticated] 48 parser_classes = [SCIMParser, JSONParser] 49 renderer_classes = [SCIMRenderer] 50 51 def setup(self, request: HttpRequest, *args: Any, **kwargs: Any) -> None: 52 self.logger = get_logger().bind() 53 super().setup(request, *args, **kwargs) 54 55 def get_authenticators(self): 56 return [SCIMTokenAuth(self)] 57 58 def remove_excluded_attributes(self, data: dict): 59 """Remove attributes specified in excludedAttributes""" 60 excluded: str = self.request.query_params.get("excludedAttributes", "") 61 for key in excluded.split(","): 62 data.pop(key.strip(), None) 63 return data 64 65 def filter_parse(self, request: Request): 66 """Parse the path of a Patch Operation""" 67 path = request.query_params.get("filter") 68 if not path: 69 return Q() 70 attr_map = {} 71 if self.model == User: 72 attr_map = { 73 ("userName", None, None): "user__username", 74 ("active", None, None): "user__is_active", 75 ("name", "familyName", None): "attributes__familyName", 76 } 77 elif self.model == Group: 78 attr_map = { 79 ("displayName", None, None): "group__name", 80 ("members", None, None): "group__users", 81 } 82 return get_query( 83 path, 84 attr_map, 85 ) 86 87 def paginate_query(self, query: QuerySet) -> Page: 88 per_page = int(self.request.tenant.pagination_default_page_size) 89 start_index = 1 90 try: 91 start_index = int(self.request.query_params.get("startIndex", 1)) 92 except ValueError: 93 pass 94 paginator = Paginator(query, per_page=per_page) 95 page = paginator.page(int(max(start_index / per_page, 1))) 96 return page 97 98 99class SCIMObjectView(SCIMView): 100 """Base SCIM View for object management""" 101 102 mapper: SourceMapper 103 manager: PropertyMappingManager 104 105 model: type[User | Group] 106 107 def initial(self, request: Request, *args, **kwargs) -> None: 108 super().initial(request, *args, **kwargs) 109 # This needs to happen after authentication has happened, because we don't have 110 # a source attribute before 111 self.mapper = SourceMapper(self.source) 112 self.manager = self.mapper.get_manager(self.model, ["data"]) 113 for key, value in kwargs.items(): 114 if key.endswith("_id"): 115 try: 116 UUID(value) 117 except ValueError: 118 raise SCIMNotFoundError("Invalid ID") from None 119 120 def build_object_properties(self, data: dict[str, Any]) -> dict[str, Any | dict[str, Any]]: 121 return self.mapper.build_object_properties( 122 object_type=self.model, 123 manager=self.manager, 124 user=None, 125 request=self.request, 126 data=data, 127 ) 128 129 130class SCIMRootView(SCIMView): 131 """Root SCIM View""" 132 133 def dispatch(self, request: Request, *args, **kwargs) -> Response: 134 return Response({"message": "Use this base-URL with a SCIM-compatible system."})
SCIM_CONTENT_TYPE =
'application/scim+json'
class
SCIMParser(rest_framework.parsers.JSONParser):
30class SCIMParser(JSONParser): 31 """SCIM clients use a custom content type""" 32 33 media_type = SCIM_CONTENT_TYPE
SCIM clients use a custom content type
class
SCIMRenderer(rest_framework.renderers.JSONRenderer):
36class SCIMRenderer(JSONRenderer): 37 """SCIM clients also expect a custom content type""" 38 39 media_type = SCIM_CONTENT_TYPE
SCIM clients also expect a custom content type
class
SCIMView(rest_framework.views.APIView):
42class SCIMView(APIView): 43 """Base class for SCIM Views""" 44 45 source: SCIMSource 46 logger: BoundLogger 47 48 permission_classes = [IsAuthenticated] 49 parser_classes = [SCIMParser, JSONParser] 50 renderer_classes = [SCIMRenderer] 51 52 def setup(self, request: HttpRequest, *args: Any, **kwargs: Any) -> None: 53 self.logger = get_logger().bind() 54 super().setup(request, *args, **kwargs) 55 56 def get_authenticators(self): 57 return [SCIMTokenAuth(self)] 58 59 def remove_excluded_attributes(self, data: dict): 60 """Remove attributes specified in excludedAttributes""" 61 excluded: str = self.request.query_params.get("excludedAttributes", "") 62 for key in excluded.split(","): 63 data.pop(key.strip(), None) 64 return data 65 66 def filter_parse(self, request: Request): 67 """Parse the path of a Patch Operation""" 68 path = request.query_params.get("filter") 69 if not path: 70 return Q() 71 attr_map = {} 72 if self.model == User: 73 attr_map = { 74 ("userName", None, None): "user__username", 75 ("active", None, None): "user__is_active", 76 ("name", "familyName", None): "attributes__familyName", 77 } 78 elif self.model == Group: 79 attr_map = { 80 ("displayName", None, None): "group__name", 81 ("members", None, None): "group__users", 82 } 83 return get_query( 84 path, 85 attr_map, 86 ) 87 88 def paginate_query(self, query: QuerySet) -> Page: 89 per_page = int(self.request.tenant.pagination_default_page_size) 90 start_index = 1 91 try: 92 start_index = int(self.request.query_params.get("startIndex", 1)) 93 except ValueError: 94 pass 95 paginator = Paginator(query, per_page=per_page) 96 page = paginator.page(int(max(start_index / per_page, 1))) 97 return page
Base class for SCIM Views
parser_classes =
[<class 'SCIMParser'>, <class 'rest_framework.parsers.JSONParser'>]
renderer_classes =
[<class 'SCIMRenderer'>]
def
setup( self, request: django.http.request.HttpRequest, *args: Any, **kwargs: Any) -> None:
52 def setup(self, request: HttpRequest, *args: Any, **kwargs: Any) -> None: 53 self.logger = get_logger().bind() 54 super().setup(request, *args, **kwargs)
Initialize attributes shared by all view methods.
def
get_authenticators(self):
Instantiates and returns the list of authenticators that this view can use.
def
remove_excluded_attributes(self, data: dict):
59 def remove_excluded_attributes(self, data: dict): 60 """Remove attributes specified in excludedAttributes""" 61 excluded: str = self.request.query_params.get("excludedAttributes", "") 62 for key in excluded.split(","): 63 data.pop(key.strip(), None) 64 return data
Remove attributes specified in excludedAttributes
def
filter_parse(self, request: rest_framework.request.Request):
66 def filter_parse(self, request: Request): 67 """Parse the path of a Patch Operation""" 68 path = request.query_params.get("filter") 69 if not path: 70 return Q() 71 attr_map = {} 72 if self.model == User: 73 attr_map = { 74 ("userName", None, None): "user__username", 75 ("active", None, None): "user__is_active", 76 ("name", "familyName", None): "attributes__familyName", 77 } 78 elif self.model == Group: 79 attr_map = { 80 ("displayName", None, None): "group__name", 81 ("members", None, None): "group__users", 82 } 83 return get_query( 84 path, 85 attr_map, 86 )
Parse the path of a Patch Operation
def
paginate_query( self, query: django.db.models.query.QuerySet) -> django.core.paginator.Page:
88 def paginate_query(self, query: QuerySet) -> Page: 89 per_page = int(self.request.tenant.pagination_default_page_size) 90 start_index = 1 91 try: 92 start_index = int(self.request.query_params.get("startIndex", 1)) 93 except ValueError: 94 pass 95 paginator = Paginator(query, per_page=per_page) 96 page = paginator.page(int(max(start_index / per_page, 1))) 97 return page
100class SCIMObjectView(SCIMView): 101 """Base SCIM View for object management""" 102 103 mapper: SourceMapper 104 manager: PropertyMappingManager 105 106 model: type[User | Group] 107 108 def initial(self, request: Request, *args, **kwargs) -> None: 109 super().initial(request, *args, **kwargs) 110 # This needs to happen after authentication has happened, because we don't have 111 # a source attribute before 112 self.mapper = SourceMapper(self.source) 113 self.manager = self.mapper.get_manager(self.model, ["data"]) 114 for key, value in kwargs.items(): 115 if key.endswith("_id"): 116 try: 117 UUID(value) 118 except ValueError: 119 raise SCIMNotFoundError("Invalid ID") from None 120 121 def build_object_properties(self, data: dict[str, Any]) -> dict[str, Any | dict[str, Any]]: 122 return self.mapper.build_object_properties( 123 object_type=self.model, 124 manager=self.manager, 125 user=None, 126 request=self.request, 127 data=data, 128 )
Base SCIM View for object management
model: type[authentik.core.models.User | authentik.core.models.Group]
def
initial(self, request: rest_framework.request.Request, *args, **kwargs) -> None:
108 def initial(self, request: Request, *args, **kwargs) -> None: 109 super().initial(request, *args, **kwargs) 110 # This needs to happen after authentication has happened, because we don't have 111 # a source attribute before 112 self.mapper = SourceMapper(self.source) 113 self.manager = self.mapper.get_manager(self.model, ["data"]) 114 for key, value in kwargs.items(): 115 if key.endswith("_id"): 116 try: 117 UUID(value) 118 except ValueError: 119 raise SCIMNotFoundError("Invalid ID") from None
Runs anything that needs to occur prior to calling the method handler.
131class SCIMRootView(SCIMView): 132 """Root SCIM View""" 133 134 def dispatch(self, request: Request, *args, **kwargs) -> Response: 135 return Response({"message": "Use this base-URL with a SCIM-compatible system."})
Root SCIM View
def
dispatch( self, request: rest_framework.request.Request, *args, **kwargs) -> rest_framework.response.Response:
134 def dispatch(self, request: Request, *args, **kwargs) -> Response: 135 return Response({"message": "Use this base-URL with a SCIM-compatible system."})
.dispatch() is pretty much the same as Django's regular dispatch,
but with extra hooks for startup, finalize, and exception handling.