authentik.sources.oauth.api.source

OAuth Source Serializer

  1"""OAuth Source Serializer"""
  2
  3from django.urls.base import reverse_lazy
  4from django_filters.filters import BooleanFilter
  5from django_filters.filterset import FilterSet
  6from drf_spectacular.types import OpenApiTypes
  7from drf_spectacular.utils import OpenApiParameter, extend_schema, extend_schema_field
  8from requests import RequestException
  9from rest_framework.decorators import action
 10from rest_framework.fields import BooleanField, CharField, ChoiceField, SerializerMethodField
 11from rest_framework.request import Request
 12from rest_framework.response import Response
 13from rest_framework.serializers import ValidationError
 14from rest_framework.viewsets import ModelViewSet
 15
 16from authentik.core.api.sources import SourceSerializer
 17from authentik.core.api.used_by import UsedByMixin
 18from authentik.core.api.utils import PassiveSerializer
 19from authentik.lib.utils.http import get_http_session
 20from authentik.sources.oauth.models import OAuthSource, PKCEMethod
 21from authentik.sources.oauth.types.registry import SourceType, registry
 22
 23
 24class SourceTypeSerializer(PassiveSerializer):
 25    """Serializer for SourceType"""
 26
 27    name = CharField(required=True)
 28    verbose_name = CharField(required=True)
 29    urls_customizable = BooleanField()
 30    request_token_url = CharField(read_only=True, allow_null=True)
 31    authorization_url = CharField(read_only=True, allow_null=True)
 32    access_token_url = CharField(read_only=True, allow_null=True)
 33    profile_url = CharField(read_only=True, allow_null=True)
 34    oidc_well_known_url = CharField(read_only=True, allow_null=True)
 35    oidc_jwks_url = CharField(read_only=True, allow_null=True)
 36
 37
 38class OAuthSourceSerializer(SourceSerializer):
 39    """OAuth Source Serializer"""
 40
 41    provider_type = ChoiceField(choices=registry.get_name_tuple())
 42    callback_url = SerializerMethodField()
 43    type = SerializerMethodField()
 44
 45    def get_callback_url(self, instance: OAuthSource) -> str:
 46        """Get OAuth Callback URL"""
 47        relative_url = reverse_lazy(
 48            "authentik_sources_oauth:oauth-client-callback",
 49            kwargs={"source_slug": instance.slug},
 50        )
 51        if "request" not in self.context:
 52            return relative_url
 53        return self.context["request"].build_absolute_uri(relative_url)
 54
 55    @extend_schema_field(SourceTypeSerializer)
 56    def get_type(self, instance: OAuthSource) -> SourceTypeSerializer:
 57        """Get source's type configuration"""
 58        return SourceTypeSerializer(instance.source_type).data
 59
 60    def validate(self, attrs: dict) -> dict:
 61        session = get_http_session()
 62        provider_type_name = attrs.get(
 63            "provider_type",
 64            self.instance.provider_type if self.instance else None,
 65        )
 66        source_type = registry.find_type(provider_type_name)
 67
 68        well_known = attrs.get("oidc_well_known_url") or source_type.oidc_well_known_url
 69        inferred_oidc_jwks_url = None
 70
 71        if well_known and well_known != "":
 72            try:
 73                well_known_config = session.get(well_known)
 74                well_known_config.raise_for_status()
 75            except RequestException as exc:
 76                text = exc.response.text if exc.response is not None else str(exc)
 77                raise ValidationError({"oidc_well_known_url": text}) from None
 78            config = well_known_config.json()
 79            if "issuer" not in config:
 80                raise ValidationError({"oidc_well_known_url": "Invalid well-known configuration"})
 81            field_map = {
 82                # authentik field to oidc field
 83                "authorization_url": "authorization_endpoint",
 84                "access_token_url": "token_endpoint",
 85                "profile_url": "userinfo_endpoint",
 86            }
 87            for ak_key, oidc_key in field_map.items():
 88                # Don't overwrite user-set values
 89                if ak_key in attrs and attrs[ak_key]:
 90                    continue
 91                attrs[ak_key] = config.get(oidc_key, "")
 92            # code_challenge_methods_supported is a list per RFC 8414, not a
 93            # single method. Pick one (prefer S256, the RFC-recommended method)
 94            # rather than letting the list round-trip into the pkce TextField
 95            # and later str() into the authorize URL as "['plain', 'S256']".
 96            if not attrs.get("pkce"):
 97                supported_methods = config.get("code_challenge_methods_supported") or []
 98                attrs["pkce"] = PKCEMethod.NONE
 99                if isinstance(supported_methods, list):
100                    if PKCEMethod.S256 in supported_methods:
101                        attrs["pkce"] = PKCEMethod.S256
102                    elif PKCEMethod.PLAIN in supported_methods:
103                        attrs["pkce"] = PKCEMethod.PLAIN
104            inferred_oidc_jwks_url = config.get("jwks_uri", "")
105
106        # Prefer user-entered URL to inferred URL to default URL
107        jwks_url = attrs.get("oidc_jwks_url") or inferred_oidc_jwks_url or source_type.oidc_jwks_url
108        if jwks_url and jwks_url != "":
109            attrs["oidc_jwks_url"] = jwks_url
110            try:
111                jwks_config = session.get(jwks_url)
112                jwks_config.raise_for_status()
113            except RequestException as exc:
114                text = exc.response.text if exc.response is not None else str(exc)
115                raise ValidationError({"oidc_jwks_url": text}) from None
116            config = jwks_config.json()
117            attrs["oidc_jwks"] = config
118
119        for url in [
120            "authorization_url",
121            "access_token_url",
122            "profile_url",
123        ]:
124            if getattr(source_type, url, None) is None:
125                if url not in attrs:
126                    raise ValidationError(
127                        f"{url} is required for provider {source_type.verbose_name}"
128                    )
129        return attrs
130
131    class Meta:
132        model = OAuthSource
133        fields = SourceSerializer.Meta.fields + [
134            "group_matching_mode",
135            "provider_type",
136            "request_token_url",
137            "authorization_url",
138            "access_token_url",
139            "profile_url",
140            "pkce",
141            "consumer_key",
142            "consumer_secret",
143            "callback_url",
144            "additional_scopes",
145            "type",
146            "oidc_well_known_url",
147            "oidc_jwks_url",
148            "oidc_jwks",
149            "authorization_code_auth_method",
150        ]
151        extra_kwargs = {
152            "consumer_secret": {"write_only": True},
153            "request_token_url": {"allow_blank": True},
154            "authorization_url": {"allow_blank": True},
155            "access_token_url": {"allow_blank": True},
156            "profile_url": {"allow_blank": True},
157        }
158
159
160class OAuthSourceFilter(FilterSet):
161    """OAuth Source filter set"""
162
163    has_jwks = BooleanFilter(label="Only return sources with JWKS data", method="filter_has_jwks")
164
165    def filter_has_jwks(self, queryset, name, value):  # pragma: no cover
166        """Only return sources with JWKS data"""
167        return queryset.exclude(oidc_jwks__iexact="{}")
168
169    class Meta:
170        model = OAuthSource
171        fields = [
172            "pbm_uuid",
173            "name",
174            "slug",
175            "enabled",
176            "authentication_flow",
177            "enrollment_flow",
178            "policy_engine_mode",
179            "user_matching_mode",
180            "group_matching_mode",
181            "provider_type",
182            "request_token_url",
183            "authorization_url",
184            "access_token_url",
185            "profile_url",
186            "consumer_key",
187            "additional_scopes",
188        ]
189
190
191class OAuthSourceViewSet(UsedByMixin, ModelViewSet):
192    """Source Viewset"""
193
194    queryset = OAuthSource.objects.all()
195    serializer_class = OAuthSourceSerializer
196    lookup_field = "slug"
197    filterset_class = OAuthSourceFilter
198    search_fields = ["name", "slug"]
199    ordering = ["name"]
200
201    @extend_schema(
202        responses={200: SourceTypeSerializer(many=True)},
203        parameters=[
204            OpenApiParameter(
205                name="name",
206                location=OpenApiParameter.QUERY,
207                type=OpenApiTypes.STR,
208            )
209        ],
210    )
211    @action(detail=False, pagination_class=None, filter_backends=[])
212    def source_types(self, request: Request) -> Response:
213        """Get all creatable source types. If ?name is set, only returns the type for <name>.
214        If <name> isn't found, returns the default type."""
215        data = []
216        if "name" in request.query_params:
217            source_type = registry.find_type(request.query_params.get("name"))
218            if source_type.__class__ != SourceType:
219                data.append(SourceTypeSerializer(source_type).data)
220        else:
221            for source_type in registry.get():
222                data.append(SourceTypeSerializer(source_type).data)
223        return Response(data)
class SourceTypeSerializer(authentik.core.api.utils.PassiveSerializer):
25class SourceTypeSerializer(PassiveSerializer):
26    """Serializer for SourceType"""
27
28    name = CharField(required=True)
29    verbose_name = CharField(required=True)
30    urls_customizable = BooleanField()
31    request_token_url = CharField(read_only=True, allow_null=True)
32    authorization_url = CharField(read_only=True, allow_null=True)
33    access_token_url = CharField(read_only=True, allow_null=True)
34    profile_url = CharField(read_only=True, allow_null=True)
35    oidc_well_known_url = CharField(read_only=True, allow_null=True)
36    oidc_jwks_url = CharField(read_only=True, allow_null=True)

Serializer for SourceType

name
verbose_name
urls_customizable
request_token_url
authorization_url
access_token_url
profile_url
oidc_well_known_url
oidc_jwks_url
class OAuthSourceSerializer(authentik.core.api.sources.SourceSerializer):
 39class OAuthSourceSerializer(SourceSerializer):
 40    """OAuth Source Serializer"""
 41
 42    provider_type = ChoiceField(choices=registry.get_name_tuple())
 43    callback_url = SerializerMethodField()
 44    type = SerializerMethodField()
 45
 46    def get_callback_url(self, instance: OAuthSource) -> str:
 47        """Get OAuth Callback URL"""
 48        relative_url = reverse_lazy(
 49            "authentik_sources_oauth:oauth-client-callback",
 50            kwargs={"source_slug": instance.slug},
 51        )
 52        if "request" not in self.context:
 53            return relative_url
 54        return self.context["request"].build_absolute_uri(relative_url)
 55
 56    @extend_schema_field(SourceTypeSerializer)
 57    def get_type(self, instance: OAuthSource) -> SourceTypeSerializer:
 58        """Get source's type configuration"""
 59        return SourceTypeSerializer(instance.source_type).data
 60
 61    def validate(self, attrs: dict) -> dict:
 62        session = get_http_session()
 63        provider_type_name = attrs.get(
 64            "provider_type",
 65            self.instance.provider_type if self.instance else None,
 66        )
 67        source_type = registry.find_type(provider_type_name)
 68
 69        well_known = attrs.get("oidc_well_known_url") or source_type.oidc_well_known_url
 70        inferred_oidc_jwks_url = None
 71
 72        if well_known and well_known != "":
 73            try:
 74                well_known_config = session.get(well_known)
 75                well_known_config.raise_for_status()
 76            except RequestException as exc:
 77                text = exc.response.text if exc.response is not None else str(exc)
 78                raise ValidationError({"oidc_well_known_url": text}) from None
 79            config = well_known_config.json()
 80            if "issuer" not in config:
 81                raise ValidationError({"oidc_well_known_url": "Invalid well-known configuration"})
 82            field_map = {
 83                # authentik field to oidc field
 84                "authorization_url": "authorization_endpoint",
 85                "access_token_url": "token_endpoint",
 86                "profile_url": "userinfo_endpoint",
 87            }
 88            for ak_key, oidc_key in field_map.items():
 89                # Don't overwrite user-set values
 90                if ak_key in attrs and attrs[ak_key]:
 91                    continue
 92                attrs[ak_key] = config.get(oidc_key, "")
 93            # code_challenge_methods_supported is a list per RFC 8414, not a
 94            # single method. Pick one (prefer S256, the RFC-recommended method)
 95            # rather than letting the list round-trip into the pkce TextField
 96            # and later str() into the authorize URL as "['plain', 'S256']".
 97            if not attrs.get("pkce"):
 98                supported_methods = config.get("code_challenge_methods_supported") or []
 99                attrs["pkce"] = PKCEMethod.NONE
100                if isinstance(supported_methods, list):
101                    if PKCEMethod.S256 in supported_methods:
102                        attrs["pkce"] = PKCEMethod.S256
103                    elif PKCEMethod.PLAIN in supported_methods:
104                        attrs["pkce"] = PKCEMethod.PLAIN
105            inferred_oidc_jwks_url = config.get("jwks_uri", "")
106
107        # Prefer user-entered URL to inferred URL to default URL
108        jwks_url = attrs.get("oidc_jwks_url") or inferred_oidc_jwks_url or source_type.oidc_jwks_url
109        if jwks_url and jwks_url != "":
110            attrs["oidc_jwks_url"] = jwks_url
111            try:
112                jwks_config = session.get(jwks_url)
113                jwks_config.raise_for_status()
114            except RequestException as exc:
115                text = exc.response.text if exc.response is not None else str(exc)
116                raise ValidationError({"oidc_jwks_url": text}) from None
117            config = jwks_config.json()
118            attrs["oidc_jwks"] = config
119
120        for url in [
121            "authorization_url",
122            "access_token_url",
123            "profile_url",
124        ]:
125            if getattr(source_type, url, None) is None:
126                if url not in attrs:
127                    raise ValidationError(
128                        f"{url} is required for provider {source_type.verbose_name}"
129                    )
130        return attrs
131
132    class Meta:
133        model = OAuthSource
134        fields = SourceSerializer.Meta.fields + [
135            "group_matching_mode",
136            "provider_type",
137            "request_token_url",
138            "authorization_url",
139            "access_token_url",
140            "profile_url",
141            "pkce",
142            "consumer_key",
143            "consumer_secret",
144            "callback_url",
145            "additional_scopes",
146            "type",
147            "oidc_well_known_url",
148            "oidc_jwks_url",
149            "oidc_jwks",
150            "authorization_code_auth_method",
151        ]
152        extra_kwargs = {
153            "consumer_secret": {"write_only": True},
154            "request_token_url": {"allow_blank": True},
155            "authorization_url": {"allow_blank": True},
156            "access_token_url": {"allow_blank": True},
157            "profile_url": {"allow_blank": True},
158        }

OAuth Source Serializer

provider_type
callback_url
type
def get_callback_url(self, instance: authentik.sources.oauth.models.OAuthSource) -> str:
46    def get_callback_url(self, instance: OAuthSource) -> str:
47        """Get OAuth Callback URL"""
48        relative_url = reverse_lazy(
49            "authentik_sources_oauth:oauth-client-callback",
50            kwargs={"source_slug": instance.slug},
51        )
52        if "request" not in self.context:
53            return relative_url
54        return self.context["request"].build_absolute_uri(relative_url)

Get OAuth Callback URL

@extend_schema_field(SourceTypeSerializer)
def get_type( self, instance: authentik.sources.oauth.models.OAuthSource) -> SourceTypeSerializer:
56    @extend_schema_field(SourceTypeSerializer)
57    def get_type(self, instance: OAuthSource) -> SourceTypeSerializer:
58        """Get source's type configuration"""
59        return SourceTypeSerializer(instance.source_type).data

Get source's type configuration

def validate(self, attrs: dict) -> dict:
 61    def validate(self, attrs: dict) -> dict:
 62        session = get_http_session()
 63        provider_type_name = attrs.get(
 64            "provider_type",
 65            self.instance.provider_type if self.instance else None,
 66        )
 67        source_type = registry.find_type(provider_type_name)
 68
 69        well_known = attrs.get("oidc_well_known_url") or source_type.oidc_well_known_url
 70        inferred_oidc_jwks_url = None
 71
 72        if well_known and well_known != "":
 73            try:
 74                well_known_config = session.get(well_known)
 75                well_known_config.raise_for_status()
 76            except RequestException as exc:
 77                text = exc.response.text if exc.response is not None else str(exc)
 78                raise ValidationError({"oidc_well_known_url": text}) from None
 79            config = well_known_config.json()
 80            if "issuer" not in config:
 81                raise ValidationError({"oidc_well_known_url": "Invalid well-known configuration"})
 82            field_map = {
 83                # authentik field to oidc field
 84                "authorization_url": "authorization_endpoint",
 85                "access_token_url": "token_endpoint",
 86                "profile_url": "userinfo_endpoint",
 87            }
 88            for ak_key, oidc_key in field_map.items():
 89                # Don't overwrite user-set values
 90                if ak_key in attrs and attrs[ak_key]:
 91                    continue
 92                attrs[ak_key] = config.get(oidc_key, "")
 93            # code_challenge_methods_supported is a list per RFC 8414, not a
 94            # single method. Pick one (prefer S256, the RFC-recommended method)
 95            # rather than letting the list round-trip into the pkce TextField
 96            # and later str() into the authorize URL as "['plain', 'S256']".
 97            if not attrs.get("pkce"):
 98                supported_methods = config.get("code_challenge_methods_supported") or []
 99                attrs["pkce"] = PKCEMethod.NONE
100                if isinstance(supported_methods, list):
101                    if PKCEMethod.S256 in supported_methods:
102                        attrs["pkce"] = PKCEMethod.S256
103                    elif PKCEMethod.PLAIN in supported_methods:
104                        attrs["pkce"] = PKCEMethod.PLAIN
105            inferred_oidc_jwks_url = config.get("jwks_uri", "")
106
107        # Prefer user-entered URL to inferred URL to default URL
108        jwks_url = attrs.get("oidc_jwks_url") or inferred_oidc_jwks_url or source_type.oidc_jwks_url
109        if jwks_url and jwks_url != "":
110            attrs["oidc_jwks_url"] = jwks_url
111            try:
112                jwks_config = session.get(jwks_url)
113                jwks_config.raise_for_status()
114            except RequestException as exc:
115                text = exc.response.text if exc.response is not None else str(exc)
116                raise ValidationError({"oidc_jwks_url": text}) from None
117            config = jwks_config.json()
118            attrs["oidc_jwks"] = config
119
120        for url in [
121            "authorization_url",
122            "access_token_url",
123            "profile_url",
124        ]:
125            if getattr(source_type, url, None) is None:
126                if url not in attrs:
127                    raise ValidationError(
128                        f"{url} is required for provider {source_type.verbose_name}"
129                    )
130        return attrs
class OAuthSourceSerializer.Meta:
132    class Meta:
133        model = OAuthSource
134        fields = SourceSerializer.Meta.fields + [
135            "group_matching_mode",
136            "provider_type",
137            "request_token_url",
138            "authorization_url",
139            "access_token_url",
140            "profile_url",
141            "pkce",
142            "consumer_key",
143            "consumer_secret",
144            "callback_url",
145            "additional_scopes",
146            "type",
147            "oidc_well_known_url",
148            "oidc_jwks_url",
149            "oidc_jwks",
150            "authorization_code_auth_method",
151        ]
152        extra_kwargs = {
153            "consumer_secret": {"write_only": True},
154            "request_token_url": {"allow_blank": True},
155            "authorization_url": {"allow_blank": True},
156            "access_token_url": {"allow_blank": True},
157            "profile_url": {"allow_blank": True},
158        }
fields = ['pk', 'name', 'slug', 'enabled', 'promoted', 'authentication_flow', 'enrollment_flow', 'user_property_mappings', 'group_property_mappings', 'component', 'verbose_name', 'verbose_name_plural', 'meta_model_name', 'policy_engine_mode', 'user_matching_mode', 'managed', 'user_path_template', 'icon', 'icon_url', 'icon_themed_urls', 'group_matching_mode', 'provider_type', 'request_token_url', 'authorization_url', 'access_token_url', 'profile_url', 'pkce', 'consumer_key', 'consumer_secret', 'callback_url', 'additional_scopes', 'type', 'oidc_well_known_url', 'oidc_jwks_url', 'oidc_jwks', 'authorization_code_auth_method']
extra_kwargs = {'consumer_secret': {'write_only': True}, 'request_token_url': {'allow_blank': True}, 'authorization_url': {'allow_blank': True}, 'access_token_url': {'allow_blank': True}, 'profile_url': {'allow_blank': True}}
class OAuthSourceFilter(django_filters.filterset.FilterSet):
161class OAuthSourceFilter(FilterSet):
162    """OAuth Source filter set"""
163
164    has_jwks = BooleanFilter(label="Only return sources with JWKS data", method="filter_has_jwks")
165
166    def filter_has_jwks(self, queryset, name, value):  # pragma: no cover
167        """Only return sources with JWKS data"""
168        return queryset.exclude(oidc_jwks__iexact="{}")
169
170    class Meta:
171        model = OAuthSource
172        fields = [
173            "pbm_uuid",
174            "name",
175            "slug",
176            "enabled",
177            "authentication_flow",
178            "enrollment_flow",
179            "policy_engine_mode",
180            "user_matching_mode",
181            "group_matching_mode",
182            "provider_type",
183            "request_token_url",
184            "authorization_url",
185            "access_token_url",
186            "profile_url",
187            "consumer_key",
188            "additional_scopes",
189        ]

OAuth Source filter set

has_jwks
def filter_has_jwks(self, queryset, name, value):
166    def filter_has_jwks(self, queryset, name, value):  # pragma: no cover
167        """Only return sources with JWKS data"""
168        return queryset.exclude(oidc_jwks__iexact="{}")

Only return sources with JWKS data

declared_filters = OrderedDict({'has_jwks': <django_filters.filters.BooleanFilter object>})
base_filters = OrderedDict({'pbm_uuid': <django_filters.filters.UUIDFilter object>, 'name': <django_filters.filters.CharFilter object>, 'slug': <django_filters.filters.CharFilter object>, 'enabled': <django_filters.filters.BooleanFilter object>, 'authentication_flow': <django_filters.filters.ModelChoiceFilter object>, 'enrollment_flow': <django_filters.filters.ModelChoiceFilter object>, 'policy_engine_mode': <django_filters.filters.ChoiceFilter object>, 'user_matching_mode': <django_filters.filters.ChoiceFilter object>, 'group_matching_mode': <django_filters.filters.ChoiceFilter object>, 'provider_type': <django_filters.filters.CharFilter object>, 'request_token_url': <django_filters.filters.CharFilter object>, 'authorization_url': <django_filters.filters.CharFilter object>, 'access_token_url': <django_filters.filters.CharFilter object>, 'profile_url': <django_filters.filters.CharFilter object>, 'consumer_key': <django_filters.filters.CharFilter object>, 'additional_scopes': <django_filters.filters.CharFilter object>, 'has_jwks': <django_filters.filters.BooleanFilter object>})
class OAuthSourceFilter.Meta:
170    class Meta:
171        model = OAuthSource
172        fields = [
173            "pbm_uuid",
174            "name",
175            "slug",
176            "enabled",
177            "authentication_flow",
178            "enrollment_flow",
179            "policy_engine_mode",
180            "user_matching_mode",
181            "group_matching_mode",
182            "provider_type",
183            "request_token_url",
184            "authorization_url",
185            "access_token_url",
186            "profile_url",
187            "consumer_key",
188            "additional_scopes",
189        ]
fields = ['pbm_uuid', 'name', 'slug', 'enabled', 'authentication_flow', 'enrollment_flow', 'policy_engine_mode', 'user_matching_mode', 'group_matching_mode', 'provider_type', 'request_token_url', 'authorization_url', 'access_token_url', 'profile_url', 'consumer_key', 'additional_scopes']
class OAuthSourceViewSet(authentik.core.api.used_by.UsedByMixin, rest_framework.viewsets.ModelViewSet):
192class OAuthSourceViewSet(UsedByMixin, ModelViewSet):
193    """Source Viewset"""
194
195    queryset = OAuthSource.objects.all()
196    serializer_class = OAuthSourceSerializer
197    lookup_field = "slug"
198    filterset_class = OAuthSourceFilter
199    search_fields = ["name", "slug"]
200    ordering = ["name"]
201
202    @extend_schema(
203        responses={200: SourceTypeSerializer(many=True)},
204        parameters=[
205            OpenApiParameter(
206                name="name",
207                location=OpenApiParameter.QUERY,
208                type=OpenApiTypes.STR,
209            )
210        ],
211    )
212    @action(detail=False, pagination_class=None, filter_backends=[])
213    def source_types(self, request: Request) -> Response:
214        """Get all creatable source types. If ?name is set, only returns the type for <name>.
215        If <name> isn't found, returns the default type."""
216        data = []
217        if "name" in request.query_params:
218            source_type = registry.find_type(request.query_params.get("name"))
219            if source_type.__class__ != SourceType:
220                data.append(SourceTypeSerializer(source_type).data)
221        else:
222            for source_type in registry.get():
223                data.append(SourceTypeSerializer(source_type).data)
224        return Response(data)

Source Viewset

queryset = <InheritanceQuerySet []>
serializer_class = <class 'OAuthSourceSerializer'>
lookup_field = 'slug'
filterset_class = <class 'OAuthSourceFilter'>
search_fields = ['name', 'slug']
ordering = ['name']
@extend_schema(responses={200: SourceTypeSerializer(many=True)}, parameters=[OpenApiParameter(name='name', location=OpenApiParameter.QUERY, type=OpenApiTypes.STR)])
@action(detail=False, pagination_class=None, filter_backends=[])
def source_types( self, request: rest_framework.request.Request) -> rest_framework.response.Response:
202    @extend_schema(
203        responses={200: SourceTypeSerializer(many=True)},
204        parameters=[
205            OpenApiParameter(
206                name="name",
207                location=OpenApiParameter.QUERY,
208                type=OpenApiTypes.STR,
209            )
210        ],
211    )
212    @action(detail=False, pagination_class=None, filter_backends=[])
213    def source_types(self, request: Request) -> Response:
214        """Get all creatable source types. If ?name is set, only returns the type for <name>.
215        If <name> isn't found, returns the default type."""
216        data = []
217        if "name" in request.query_params:
218            source_type = registry.find_type(request.query_params.get("name"))
219            if source_type.__class__ != SourceType:
220                data.append(SourceTypeSerializer(source_type).data)
221        else:
222            for source_type in registry.get():
223                data.append(SourceTypeSerializer(source_type).data)
224        return Response(data)

Get all creatable source types. If ?name is set, only returns the type for . If isn't found, returns the default type.

name = None
description = None
suffix = None
detail = None
basename = None