authentik.sources.scim.views.v2.base

SCIM Utils

  1"""SCIM Utils"""
  2
  3from typing import Any
  4from uuid import UUID
  5
  6from django.core.paginator import Page, Paginator
  7from django.db.models import Q, QuerySet
  8from django.http import HttpRequest
  9from rest_framework.parsers import JSONParser
 10from rest_framework.permissions import IsAuthenticated
 11from rest_framework.renderers import JSONRenderer
 12from rest_framework.request import Request
 13from rest_framework.response import Response
 14from rest_framework.views import APIView
 15from scim2_filter_parser.transpilers.django_q_object import get_query
 16from structlog import BoundLogger
 17from structlog.stdlib import get_logger
 18
 19from authentik.core.models import Group, User
 20from authentik.core.sources.mapper import SourceMapper
 21from authentik.lib.sync.mapper import PropertyMappingManager
 22from authentik.sources.scim.models import SCIMSource
 23from authentik.sources.scim.views.v2.auth import SCIMTokenAuth
 24from authentik.sources.scim.views.v2.exceptions import SCIMNotFoundError
 25
 26SCIM_CONTENT_TYPE = "application/scim+json"
 27
 28
 29class SCIMParser(JSONParser):
 30    """SCIM clients use a custom content type"""
 31
 32    media_type = SCIM_CONTENT_TYPE
 33
 34
 35class SCIMRenderer(JSONRenderer):
 36    """SCIM clients also expect a custom content type"""
 37
 38    media_type = SCIM_CONTENT_TYPE
 39
 40
 41class SCIMView(APIView):
 42    """Base class for SCIM Views"""
 43
 44    source: SCIMSource
 45    logger: BoundLogger
 46
 47    permission_classes = [IsAuthenticated]
 48    parser_classes = [SCIMParser, JSONParser]
 49    renderer_classes = [SCIMRenderer]
 50
 51    def setup(self, request: HttpRequest, *args: Any, **kwargs: Any) -> None:
 52        self.logger = get_logger().bind()
 53        super().setup(request, *args, **kwargs)
 54
 55    def get_authenticators(self):
 56        return [SCIMTokenAuth(self)]
 57
 58    def remove_excluded_attributes(self, data: dict):
 59        """Remove attributes specified in excludedAttributes"""
 60        excluded: str = self.request.query_params.get("excludedAttributes", "")
 61        for key in excluded.split(","):
 62            data.pop(key.strip(), None)
 63        return data
 64
 65    def filter_parse(self, request: Request):
 66        """Parse the path of a Patch Operation"""
 67        path = request.query_params.get("filter")
 68        if not path:
 69            return Q()
 70        attr_map = {}
 71        if self.model == User:
 72            attr_map = {
 73                ("userName", None, None): "user__username",
 74                ("active", None, None): "user__is_active",
 75                ("name", "familyName", None): "attributes__familyName",
 76            }
 77        elif self.model == Group:
 78            attr_map = {
 79                ("displayName", None, None): "group__name",
 80                ("members", None, None): "group__users",
 81            }
 82        return get_query(
 83            path,
 84            attr_map,
 85        )
 86
 87    def paginate_query(self, query: QuerySet) -> Page:
 88        per_page = int(self.request.tenant.pagination_default_page_size)
 89        start_index = 1
 90        try:
 91            start_index = int(self.request.query_params.get("startIndex", 1))
 92        except ValueError:
 93            pass
 94        paginator = Paginator(query, per_page=per_page)
 95        page = paginator.page(int(max(start_index / per_page, 1)))
 96        return page
 97
 98
 99class SCIMObjectView(SCIMView):
100    """Base SCIM View for object management"""
101
102    mapper: SourceMapper
103    manager: PropertyMappingManager
104
105    model: type[User | Group]
106
107    def initial(self, request: Request, *args, **kwargs) -> None:
108        super().initial(request, *args, **kwargs)
109        # This needs to happen after authentication has happened, because we don't have
110        # a source attribute before
111        self.mapper = SourceMapper(self.source)
112        self.manager = self.mapper.get_manager(self.model, ["data"])
113        for key, value in kwargs.items():
114            if key.endswith("_id"):
115                try:
116                    UUID(value)
117                except ValueError:
118                    raise SCIMNotFoundError("Invalid ID") from None
119
120    def build_object_properties(self, data: dict[str, Any]) -> dict[str, Any | dict[str, Any]]:
121        return self.mapper.build_object_properties(
122            object_type=self.model,
123            manager=self.manager,
124            user=None,
125            request=self.request,
126            data=data,
127        )
128
129
130class SCIMRootView(SCIMView):
131    """Root SCIM View"""
132
133    def dispatch(self, request: Request, *args, **kwargs) -> Response:
134        return Response({"message": "Use this base-URL with a SCIM-compatible system."})
SCIM_CONTENT_TYPE = 'application/scim+json'
class SCIMParser(rest_framework.parsers.JSONParser):
30class SCIMParser(JSONParser):
31    """SCIM clients use a custom content type"""
32
33    media_type = SCIM_CONTENT_TYPE

SCIM clients use a custom content type

media_type = 'application/scim+json'
class SCIMRenderer(rest_framework.renderers.JSONRenderer):
36class SCIMRenderer(JSONRenderer):
37    """SCIM clients also expect a custom content type"""
38
39    media_type = SCIM_CONTENT_TYPE

SCIM clients also expect a custom content type

media_type = 'application/scim+json'
class SCIMView(rest_framework.views.APIView):
42class SCIMView(APIView):
43    """Base class for SCIM Views"""
44
45    source: SCIMSource
46    logger: BoundLogger
47
48    permission_classes = [IsAuthenticated]
49    parser_classes = [SCIMParser, JSONParser]
50    renderer_classes = [SCIMRenderer]
51
52    def setup(self, request: HttpRequest, *args: Any, **kwargs: Any) -> None:
53        self.logger = get_logger().bind()
54        super().setup(request, *args, **kwargs)
55
56    def get_authenticators(self):
57        return [SCIMTokenAuth(self)]
58
59    def remove_excluded_attributes(self, data: dict):
60        """Remove attributes specified in excludedAttributes"""
61        excluded: str = self.request.query_params.get("excludedAttributes", "")
62        for key in excluded.split(","):
63            data.pop(key.strip(), None)
64        return data
65
66    def filter_parse(self, request: Request):
67        """Parse the path of a Patch Operation"""
68        path = request.query_params.get("filter")
69        if not path:
70            return Q()
71        attr_map = {}
72        if self.model == User:
73            attr_map = {
74                ("userName", None, None): "user__username",
75                ("active", None, None): "user__is_active",
76                ("name", "familyName", None): "attributes__familyName",
77            }
78        elif self.model == Group:
79            attr_map = {
80                ("displayName", None, None): "group__name",
81                ("members", None, None): "group__users",
82            }
83        return get_query(
84            path,
85            attr_map,
86        )
87
88    def paginate_query(self, query: QuerySet) -> Page:
89        per_page = int(self.request.tenant.pagination_default_page_size)
90        start_index = 1
91        try:
92            start_index = int(self.request.query_params.get("startIndex", 1))
93        except ValueError:
94            pass
95        paginator = Paginator(query, per_page=per_page)
96        page = paginator.page(int(max(start_index / per_page, 1)))
97        return page

Base class for SCIM Views

logger: structlog._generic.BoundLogger
permission_classes = [<class 'rest_framework.permissions.IsAuthenticated'>]
parser_classes = [<class 'SCIMParser'>, <class 'rest_framework.parsers.JSONParser'>]
renderer_classes = [<class 'SCIMRenderer'>]
def setup( self, request: django.http.request.HttpRequest, *args: Any, **kwargs: Any) -> None:
52    def setup(self, request: HttpRequest, *args: Any, **kwargs: Any) -> None:
53        self.logger = get_logger().bind()
54        super().setup(request, *args, **kwargs)

Initialize attributes shared by all view methods.

def get_authenticators(self):
56    def get_authenticators(self):
57        return [SCIMTokenAuth(self)]

Instantiates and returns the list of authenticators that this view can use.

def remove_excluded_attributes(self, data: dict):
59    def remove_excluded_attributes(self, data: dict):
60        """Remove attributes specified in excludedAttributes"""
61        excluded: str = self.request.query_params.get("excludedAttributes", "")
62        for key in excluded.split(","):
63            data.pop(key.strip(), None)
64        return data

Remove attributes specified in excludedAttributes

def filter_parse(self, request: rest_framework.request.Request):
66    def filter_parse(self, request: Request):
67        """Parse the path of a Patch Operation"""
68        path = request.query_params.get("filter")
69        if not path:
70            return Q()
71        attr_map = {}
72        if self.model == User:
73            attr_map = {
74                ("userName", None, None): "user__username",
75                ("active", None, None): "user__is_active",
76                ("name", "familyName", None): "attributes__familyName",
77            }
78        elif self.model == Group:
79            attr_map = {
80                ("displayName", None, None): "group__name",
81                ("members", None, None): "group__users",
82            }
83        return get_query(
84            path,
85            attr_map,
86        )

Parse the path of a Patch Operation

def paginate_query( self, query: django.db.models.query.QuerySet) -> django.core.paginator.Page:
88    def paginate_query(self, query: QuerySet) -> Page:
89        per_page = int(self.request.tenant.pagination_default_page_size)
90        start_index = 1
91        try:
92            start_index = int(self.request.query_params.get("startIndex", 1))
93        except ValueError:
94            pass
95        paginator = Paginator(query, per_page=per_page)
96        page = paginator.page(int(max(start_index / per_page, 1)))
97        return page
class SCIMObjectView(SCIMView):
100class SCIMObjectView(SCIMView):
101    """Base SCIM View for object management"""
102
103    mapper: SourceMapper
104    manager: PropertyMappingManager
105
106    model: type[User | Group]
107
108    def initial(self, request: Request, *args, **kwargs) -> None:
109        super().initial(request, *args, **kwargs)
110        # This needs to happen after authentication has happened, because we don't have
111        # a source attribute before
112        self.mapper = SourceMapper(self.source)
113        self.manager = self.mapper.get_manager(self.model, ["data"])
114        for key, value in kwargs.items():
115            if key.endswith("_id"):
116                try:
117                    UUID(value)
118                except ValueError:
119                    raise SCIMNotFoundError("Invalid ID") from None
120
121    def build_object_properties(self, data: dict[str, Any]) -> dict[str, Any | dict[str, Any]]:
122        return self.mapper.build_object_properties(
123            object_type=self.model,
124            manager=self.manager,
125            user=None,
126            request=self.request,
127            data=data,
128        )

Base SCIM View for object management

def initial(self, request: rest_framework.request.Request, *args, **kwargs) -> None:
108    def initial(self, request: Request, *args, **kwargs) -> None:
109        super().initial(request, *args, **kwargs)
110        # This needs to happen after authentication has happened, because we don't have
111        # a source attribute before
112        self.mapper = SourceMapper(self.source)
113        self.manager = self.mapper.get_manager(self.model, ["data"])
114        for key, value in kwargs.items():
115            if key.endswith("_id"):
116                try:
117                    UUID(value)
118                except ValueError:
119                    raise SCIMNotFoundError("Invalid ID") from None

Runs anything that needs to occur prior to calling the method handler.

def build_object_properties( self, data: dict[str, typing.Any]) -> dict[str, typing.Any | dict[str, typing.Any]]:
121    def build_object_properties(self, data: dict[str, Any]) -> dict[str, Any | dict[str, Any]]:
122        return self.mapper.build_object_properties(
123            object_type=self.model,
124            manager=self.manager,
125            user=None,
126            request=self.request,
127            data=data,
128        )
class SCIMRootView(SCIMView):
131class SCIMRootView(SCIMView):
132    """Root SCIM View"""
133
134    def dispatch(self, request: Request, *args, **kwargs) -> Response:
135        return Response({"message": "Use this base-URL with a SCIM-compatible system."})

Root SCIM View

def dispatch( self, request: rest_framework.request.Request, *args, **kwargs) -> rest_framework.response.Response:
134    def dispatch(self, request: Request, *args, **kwargs) -> Response:
135        return Response({"message": "Use this base-URL with a SCIM-compatible system."})

.dispatch() is pretty much the same as Django's regular dispatch, but with extra hooks for startup, finalize, and exception handling.