authentik.sources.scim.views.v2.schemas
Schema Views
1"""Schema Views""" 2 3from json import loads 4 5from django.conf import settings 6from django.urls import reverse 7from rest_framework.request import Request 8from rest_framework.response import Response 9 10from authentik.sources.scim.views.v2.base import SCIMView 11from authentik.sources.scim.views.v2.exceptions import SCIMNotFoundError 12 13with open( 14 settings.BASE_DIR / "authentik" / "sources" / "scim" / "schemas" / "schema.json", 15 encoding="utf-8", 16) as SCHEMA_FILE: 17 _raw_schemas = loads(SCHEMA_FILE.read()) 18 19 20class SchemaView(SCIMView): 21 """https://ldapwiki.com/wiki/SCIM%20Schemas%20Attribute""" 22 23 def get_schemas(self): 24 """List of all schemas""" 25 schemas = [] 26 for raw_schema in _raw_schemas: 27 raw_schema["meta"]["location"] = self.request.build_absolute_uri( 28 reverse( 29 "authentik_sources_scim:v2-schema", 30 kwargs={ 31 "source_slug": self.kwargs["source_slug"], 32 "schema_uri": raw_schema["id"], 33 }, 34 ) 35 ) 36 schemas.append(raw_schema) 37 return schemas 38 39 # pylint: disable=unused-argument 40 def get(self, request: Request, source_slug: str, schema_uri: str | None = None) -> Response: 41 """Get schemas as SCIM response""" 42 schemas = self.get_schemas() 43 if schema_uri: 44 schema = [x for x in schemas if x.get("id") == schema_uri] 45 if schema: 46 return Response(schema[0]) 47 raise SCIMNotFoundError("Schema not found.") 48 return Response( 49 { 50 "schemas": ["urn:ietf:params:scim:api:messages:2.0:ListResponse"], 51 "totalResults": len(schemas), 52 "itemsPerPage": len(schemas), 53 "startIndex": 1, 54 "Resources": schemas, 55 } 56 )
21class SchemaView(SCIMView): 22 """https://ldapwiki.com/wiki/SCIM%20Schemas%20Attribute""" 23 24 def get_schemas(self): 25 """List of all schemas""" 26 schemas = [] 27 for raw_schema in _raw_schemas: 28 raw_schema["meta"]["location"] = self.request.build_absolute_uri( 29 reverse( 30 "authentik_sources_scim:v2-schema", 31 kwargs={ 32 "source_slug": self.kwargs["source_slug"], 33 "schema_uri": raw_schema["id"], 34 }, 35 ) 36 ) 37 schemas.append(raw_schema) 38 return schemas 39 40 # pylint: disable=unused-argument 41 def get(self, request: Request, source_slug: str, schema_uri: str | None = None) -> Response: 42 """Get schemas as SCIM response""" 43 schemas = self.get_schemas() 44 if schema_uri: 45 schema = [x for x in schemas if x.get("id") == schema_uri] 46 if schema: 47 return Response(schema[0]) 48 raise SCIMNotFoundError("Schema not found.") 49 return Response( 50 { 51 "schemas": ["urn:ietf:params:scim:api:messages:2.0:ListResponse"], 52 "totalResults": len(schemas), 53 "itemsPerPage": len(schemas), 54 "startIndex": 1, 55 "Resources": schemas, 56 } 57 )
def
get_schemas(self):
24 def get_schemas(self): 25 """List of all schemas""" 26 schemas = [] 27 for raw_schema in _raw_schemas: 28 raw_schema["meta"]["location"] = self.request.build_absolute_uri( 29 reverse( 30 "authentik_sources_scim:v2-schema", 31 kwargs={ 32 "source_slug": self.kwargs["source_slug"], 33 "schema_uri": raw_schema["id"], 34 }, 35 ) 36 ) 37 schemas.append(raw_schema) 38 return schemas
List of all schemas
def
get( self, request: rest_framework.request.Request, source_slug: str, schema_uri: str | None = None) -> rest_framework.response.Response:
41 def get(self, request: Request, source_slug: str, schema_uri: str | None = None) -> Response: 42 """Get schemas as SCIM response""" 43 schemas = self.get_schemas() 44 if schema_uri: 45 schema = [x for x in schemas if x.get("id") == schema_uri] 46 if schema: 47 return Response(schema[0]) 48 raise SCIMNotFoundError("Schema not found.") 49 return Response( 50 { 51 "schemas": ["urn:ietf:params:scim:api:messages:2.0:ListResponse"], 52 "totalResults": len(schemas), 53 "itemsPerPage": len(schemas), 54 "startIndex": 1, 55 "Resources": schemas, 56 } 57 )
Get schemas as SCIM response