authentik.sources.oauth.api.source

OAuth Source Serializer

  1"""OAuth Source Serializer"""
  2
  3from django.urls.base import reverse_lazy
  4from django_filters.filters import BooleanFilter
  5from django_filters.filterset import FilterSet
  6from drf_spectacular.types import OpenApiTypes
  7from drf_spectacular.utils import OpenApiParameter, extend_schema, extend_schema_field
  8from requests import RequestException
  9from rest_framework.decorators import action
 10from rest_framework.fields import BooleanField, CharField, ChoiceField, SerializerMethodField
 11from rest_framework.request import Request
 12from rest_framework.response import Response
 13from rest_framework.serializers import ValidationError
 14from rest_framework.viewsets import ModelViewSet
 15
 16from authentik.core.api.sources import SourceSerializer
 17from authentik.core.api.used_by import UsedByMixin
 18from authentik.core.api.utils import PassiveSerializer
 19from authentik.lib.utils.http import get_http_session
 20from authentik.sources.oauth.models import OAuthSource
 21from authentik.sources.oauth.types.registry import SourceType, registry
 22
 23
 24class SourceTypeSerializer(PassiveSerializer):
 25    """Serializer for SourceType"""
 26
 27    name = CharField(required=True)
 28    verbose_name = CharField(required=True)
 29    urls_customizable = BooleanField()
 30    request_token_url = CharField(read_only=True, allow_null=True)
 31    authorization_url = CharField(read_only=True, allow_null=True)
 32    access_token_url = CharField(read_only=True, allow_null=True)
 33    profile_url = CharField(read_only=True, allow_null=True)
 34    oidc_well_known_url = CharField(read_only=True, allow_null=True)
 35    oidc_jwks_url = CharField(read_only=True, allow_null=True)
 36
 37
 38class OAuthSourceSerializer(SourceSerializer):
 39    """OAuth Source Serializer"""
 40
 41    provider_type = ChoiceField(choices=registry.get_name_tuple())
 42    callback_url = SerializerMethodField()
 43    type = SerializerMethodField()
 44
 45    def get_callback_url(self, instance: OAuthSource) -> str:
 46        """Get OAuth Callback URL"""
 47        relative_url = reverse_lazy(
 48            "authentik_sources_oauth:oauth-client-callback",
 49            kwargs={"source_slug": instance.slug},
 50        )
 51        if "request" not in self.context:
 52            return relative_url
 53        return self.context["request"].build_absolute_uri(relative_url)
 54
 55    @extend_schema_field(SourceTypeSerializer)
 56    def get_type(self, instance: OAuthSource) -> SourceTypeSerializer:
 57        """Get source's type configuration"""
 58        return SourceTypeSerializer(instance.source_type).data
 59
 60    def validate(self, attrs: dict) -> dict:
 61        session = get_http_session()
 62        provider_type_name = attrs.get(
 63            "provider_type",
 64            self.instance.provider_type if self.instance else None,
 65        )
 66        source_type = registry.find_type(provider_type_name)
 67
 68        well_known = attrs.get("oidc_well_known_url") or source_type.oidc_well_known_url
 69        inferred_oidc_jwks_url = None
 70
 71        if well_known and well_known != "":
 72            try:
 73                well_known_config = session.get(well_known)
 74                well_known_config.raise_for_status()
 75            except RequestException as exc:
 76                text = exc.response.text if exc.response else str(exc)
 77                raise ValidationError({"oidc_well_known_url": text}) from None
 78            config = well_known_config.json()
 79            if "issuer" not in config:
 80                raise ValidationError({"oidc_well_known_url": "Invalid well-known configuration"})
 81            field_map = {
 82                # authentik field to oidc field
 83                "authorization_url": "authorization_endpoint",
 84                "access_token_url": "token_endpoint",
 85                "profile_url": "userinfo_endpoint",
 86                "pkce": "code_challenge_methods_supported",
 87            }
 88            for ak_key, oidc_key in field_map.items():
 89                # Don't overwrite user-set values
 90                if ak_key in attrs and attrs[ak_key]:
 91                    continue
 92                attrs[ak_key] = config.get(oidc_key, "")
 93            inferred_oidc_jwks_url = config.get("jwks_uri", "")
 94
 95        # Prefer user-entered URL to inferred URL to default URL
 96        jwks_url = attrs.get("oidc_jwks_url") or inferred_oidc_jwks_url or source_type.oidc_jwks_url
 97        if jwks_url and jwks_url != "":
 98            attrs["oidc_jwks_url"] = jwks_url
 99            try:
100                jwks_config = session.get(jwks_url)
101                jwks_config.raise_for_status()
102            except RequestException as exc:
103                text = exc.response.text if exc.response else str(exc)
104                raise ValidationError({"oidc_jwks_url": text}) from None
105            config = jwks_config.json()
106            attrs["oidc_jwks"] = config
107
108        for url in [
109            "authorization_url",
110            "access_token_url",
111            "profile_url",
112        ]:
113            if getattr(source_type, url, None) is None:
114                if url not in attrs:
115                    raise ValidationError(
116                        f"{url} is required for provider {source_type.verbose_name}"
117                    )
118        return attrs
119
120    class Meta:
121        model = OAuthSource
122        fields = SourceSerializer.Meta.fields + [
123            "group_matching_mode",
124            "provider_type",
125            "request_token_url",
126            "authorization_url",
127            "access_token_url",
128            "profile_url",
129            "pkce",
130            "consumer_key",
131            "consumer_secret",
132            "callback_url",
133            "additional_scopes",
134            "type",
135            "oidc_well_known_url",
136            "oidc_jwks_url",
137            "oidc_jwks",
138            "authorization_code_auth_method",
139        ]
140        extra_kwargs = {
141            "consumer_secret": {"write_only": True},
142            "request_token_url": {"allow_blank": True},
143            "authorization_url": {"allow_blank": True},
144            "access_token_url": {"allow_blank": True},
145            "profile_url": {"allow_blank": True},
146        }
147
148
149class OAuthSourceFilter(FilterSet):
150    """OAuth Source filter set"""
151
152    has_jwks = BooleanFilter(label="Only return sources with JWKS data", method="filter_has_jwks")
153
154    def filter_has_jwks(self, queryset, name, value):  # pragma: no cover
155        """Only return sources with JWKS data"""
156        return queryset.exclude(oidc_jwks__iexact="{}")
157
158    class Meta:
159        model = OAuthSource
160        fields = [
161            "pbm_uuid",
162            "name",
163            "slug",
164            "enabled",
165            "authentication_flow",
166            "enrollment_flow",
167            "policy_engine_mode",
168            "user_matching_mode",
169            "group_matching_mode",
170            "provider_type",
171            "request_token_url",
172            "authorization_url",
173            "access_token_url",
174            "profile_url",
175            "consumer_key",
176            "additional_scopes",
177        ]
178
179
180class OAuthSourceViewSet(UsedByMixin, ModelViewSet):
181    """Source Viewset"""
182
183    queryset = OAuthSource.objects.all()
184    serializer_class = OAuthSourceSerializer
185    lookup_field = "slug"
186    filterset_class = OAuthSourceFilter
187    search_fields = ["name", "slug"]
188    ordering = ["name"]
189
190    @extend_schema(
191        responses={200: SourceTypeSerializer(many=True)},
192        parameters=[
193            OpenApiParameter(
194                name="name",
195                location=OpenApiParameter.QUERY,
196                type=OpenApiTypes.STR,
197            )
198        ],
199    )
200    @action(detail=False, pagination_class=None, filter_backends=[])
201    def source_types(self, request: Request) -> Response:
202        """Get all creatable source types. If ?name is set, only returns the type for <name>.
203        If <name> isn't found, returns the default type."""
204        data = []
205        if "name" in request.query_params:
206            source_type = registry.find_type(request.query_params.get("name"))
207            if source_type.__class__ != SourceType:
208                data.append(SourceTypeSerializer(source_type).data)
209        else:
210            for source_type in registry.get():
211                data.append(SourceTypeSerializer(source_type).data)
212        return Response(data)
class SourceTypeSerializer(authentik.core.api.utils.PassiveSerializer):
25class SourceTypeSerializer(PassiveSerializer):
26    """Serializer for SourceType"""
27
28    name = CharField(required=True)
29    verbose_name = CharField(required=True)
30    urls_customizable = BooleanField()
31    request_token_url = CharField(read_only=True, allow_null=True)
32    authorization_url = CharField(read_only=True, allow_null=True)
33    access_token_url = CharField(read_only=True, allow_null=True)
34    profile_url = CharField(read_only=True, allow_null=True)
35    oidc_well_known_url = CharField(read_only=True, allow_null=True)
36    oidc_jwks_url = CharField(read_only=True, allow_null=True)

Serializer for SourceType

name
verbose_name
urls_customizable
request_token_url
authorization_url
access_token_url
profile_url
oidc_well_known_url
oidc_jwks_url
class OAuthSourceSerializer(authentik.core.api.sources.SourceSerializer):
 39class OAuthSourceSerializer(SourceSerializer):
 40    """OAuth Source Serializer"""
 41
 42    provider_type = ChoiceField(choices=registry.get_name_tuple())
 43    callback_url = SerializerMethodField()
 44    type = SerializerMethodField()
 45
 46    def get_callback_url(self, instance: OAuthSource) -> str:
 47        """Get OAuth Callback URL"""
 48        relative_url = reverse_lazy(
 49            "authentik_sources_oauth:oauth-client-callback",
 50            kwargs={"source_slug": instance.slug},
 51        )
 52        if "request" not in self.context:
 53            return relative_url
 54        return self.context["request"].build_absolute_uri(relative_url)
 55
 56    @extend_schema_field(SourceTypeSerializer)
 57    def get_type(self, instance: OAuthSource) -> SourceTypeSerializer:
 58        """Get source's type configuration"""
 59        return SourceTypeSerializer(instance.source_type).data
 60
 61    def validate(self, attrs: dict) -> dict:
 62        session = get_http_session()
 63        provider_type_name = attrs.get(
 64            "provider_type",
 65            self.instance.provider_type if self.instance else None,
 66        )
 67        source_type = registry.find_type(provider_type_name)
 68
 69        well_known = attrs.get("oidc_well_known_url") or source_type.oidc_well_known_url
 70        inferred_oidc_jwks_url = None
 71
 72        if well_known and well_known != "":
 73            try:
 74                well_known_config = session.get(well_known)
 75                well_known_config.raise_for_status()
 76            except RequestException as exc:
 77                text = exc.response.text if exc.response else str(exc)
 78                raise ValidationError({"oidc_well_known_url": text}) from None
 79            config = well_known_config.json()
 80            if "issuer" not in config:
 81                raise ValidationError({"oidc_well_known_url": "Invalid well-known configuration"})
 82            field_map = {
 83                # authentik field to oidc field
 84                "authorization_url": "authorization_endpoint",
 85                "access_token_url": "token_endpoint",
 86                "profile_url": "userinfo_endpoint",
 87                "pkce": "code_challenge_methods_supported",
 88            }
 89            for ak_key, oidc_key in field_map.items():
 90                # Don't overwrite user-set values
 91                if ak_key in attrs and attrs[ak_key]:
 92                    continue
 93                attrs[ak_key] = config.get(oidc_key, "")
 94            inferred_oidc_jwks_url = config.get("jwks_uri", "")
 95
 96        # Prefer user-entered URL to inferred URL to default URL
 97        jwks_url = attrs.get("oidc_jwks_url") or inferred_oidc_jwks_url or source_type.oidc_jwks_url
 98        if jwks_url and jwks_url != "":
 99            attrs["oidc_jwks_url"] = jwks_url
100            try:
101                jwks_config = session.get(jwks_url)
102                jwks_config.raise_for_status()
103            except RequestException as exc:
104                text = exc.response.text if exc.response else str(exc)
105                raise ValidationError({"oidc_jwks_url": text}) from None
106            config = jwks_config.json()
107            attrs["oidc_jwks"] = config
108
109        for url in [
110            "authorization_url",
111            "access_token_url",
112            "profile_url",
113        ]:
114            if getattr(source_type, url, None) is None:
115                if url not in attrs:
116                    raise ValidationError(
117                        f"{url} is required for provider {source_type.verbose_name}"
118                    )
119        return attrs
120
121    class Meta:
122        model = OAuthSource
123        fields = SourceSerializer.Meta.fields + [
124            "group_matching_mode",
125            "provider_type",
126            "request_token_url",
127            "authorization_url",
128            "access_token_url",
129            "profile_url",
130            "pkce",
131            "consumer_key",
132            "consumer_secret",
133            "callback_url",
134            "additional_scopes",
135            "type",
136            "oidc_well_known_url",
137            "oidc_jwks_url",
138            "oidc_jwks",
139            "authorization_code_auth_method",
140        ]
141        extra_kwargs = {
142            "consumer_secret": {"write_only": True},
143            "request_token_url": {"allow_blank": True},
144            "authorization_url": {"allow_blank": True},
145            "access_token_url": {"allow_blank": True},
146            "profile_url": {"allow_blank": True},
147        }

OAuth Source Serializer

provider_type
callback_url
type
def get_callback_url(self, instance: authentik.sources.oauth.models.OAuthSource) -> str:
46    def get_callback_url(self, instance: OAuthSource) -> str:
47        """Get OAuth Callback URL"""
48        relative_url = reverse_lazy(
49            "authentik_sources_oauth:oauth-client-callback",
50            kwargs={"source_slug": instance.slug},
51        )
52        if "request" not in self.context:
53            return relative_url
54        return self.context["request"].build_absolute_uri(relative_url)

Get OAuth Callback URL

@extend_schema_field(SourceTypeSerializer)
def get_type( self, instance: authentik.sources.oauth.models.OAuthSource) -> SourceTypeSerializer:
56    @extend_schema_field(SourceTypeSerializer)
57    def get_type(self, instance: OAuthSource) -> SourceTypeSerializer:
58        """Get source's type configuration"""
59        return SourceTypeSerializer(instance.source_type).data

Get source's type configuration

def validate(self, attrs: dict) -> dict:
 61    def validate(self, attrs: dict) -> dict:
 62        session = get_http_session()
 63        provider_type_name = attrs.get(
 64            "provider_type",
 65            self.instance.provider_type if self.instance else None,
 66        )
 67        source_type = registry.find_type(provider_type_name)
 68
 69        well_known = attrs.get("oidc_well_known_url") or source_type.oidc_well_known_url
 70        inferred_oidc_jwks_url = None
 71
 72        if well_known and well_known != "":
 73            try:
 74                well_known_config = session.get(well_known)
 75                well_known_config.raise_for_status()
 76            except RequestException as exc:
 77                text = exc.response.text if exc.response else str(exc)
 78                raise ValidationError({"oidc_well_known_url": text}) from None
 79            config = well_known_config.json()
 80            if "issuer" not in config:
 81                raise ValidationError({"oidc_well_known_url": "Invalid well-known configuration"})
 82            field_map = {
 83                # authentik field to oidc field
 84                "authorization_url": "authorization_endpoint",
 85                "access_token_url": "token_endpoint",
 86                "profile_url": "userinfo_endpoint",
 87                "pkce": "code_challenge_methods_supported",
 88            }
 89            for ak_key, oidc_key in field_map.items():
 90                # Don't overwrite user-set values
 91                if ak_key in attrs and attrs[ak_key]:
 92                    continue
 93                attrs[ak_key] = config.get(oidc_key, "")
 94            inferred_oidc_jwks_url = config.get("jwks_uri", "")
 95
 96        # Prefer user-entered URL to inferred URL to default URL
 97        jwks_url = attrs.get("oidc_jwks_url") or inferred_oidc_jwks_url or source_type.oidc_jwks_url
 98        if jwks_url and jwks_url != "":
 99            attrs["oidc_jwks_url"] = jwks_url
100            try:
101                jwks_config = session.get(jwks_url)
102                jwks_config.raise_for_status()
103            except RequestException as exc:
104                text = exc.response.text if exc.response else str(exc)
105                raise ValidationError({"oidc_jwks_url": text}) from None
106            config = jwks_config.json()
107            attrs["oidc_jwks"] = config
108
109        for url in [
110            "authorization_url",
111            "access_token_url",
112            "profile_url",
113        ]:
114            if getattr(source_type, url, None) is None:
115                if url not in attrs:
116                    raise ValidationError(
117                        f"{url} is required for provider {source_type.verbose_name}"
118                    )
119        return attrs
class OAuthSourceSerializer.Meta:
121    class Meta:
122        model = OAuthSource
123        fields = SourceSerializer.Meta.fields + [
124            "group_matching_mode",
125            "provider_type",
126            "request_token_url",
127            "authorization_url",
128            "access_token_url",
129            "profile_url",
130            "pkce",
131            "consumer_key",
132            "consumer_secret",
133            "callback_url",
134            "additional_scopes",
135            "type",
136            "oidc_well_known_url",
137            "oidc_jwks_url",
138            "oidc_jwks",
139            "authorization_code_auth_method",
140        ]
141        extra_kwargs = {
142            "consumer_secret": {"write_only": True},
143            "request_token_url": {"allow_blank": True},
144            "authorization_url": {"allow_blank": True},
145            "access_token_url": {"allow_blank": True},
146            "profile_url": {"allow_blank": True},
147        }
fields = ['pk', 'name', 'slug', 'enabled', 'promoted', 'authentication_flow', 'enrollment_flow', 'user_property_mappings', 'group_property_mappings', 'component', 'verbose_name', 'verbose_name_plural', 'meta_model_name', 'policy_engine_mode', 'user_matching_mode', 'managed', 'user_path_template', 'icon', 'icon_url', 'icon_themed_urls', 'group_matching_mode', 'provider_type', 'request_token_url', 'authorization_url', 'access_token_url', 'profile_url', 'pkce', 'consumer_key', 'consumer_secret', 'callback_url', 'additional_scopes', 'type', 'oidc_well_known_url', 'oidc_jwks_url', 'oidc_jwks', 'authorization_code_auth_method']
extra_kwargs = {'consumer_secret': {'write_only': True}, 'request_token_url': {'allow_blank': True}, 'authorization_url': {'allow_blank': True}, 'access_token_url': {'allow_blank': True}, 'profile_url': {'allow_blank': True}}
class OAuthSourceFilter(django_filters.filterset.FilterSet):
150class OAuthSourceFilter(FilterSet):
151    """OAuth Source filter set"""
152
153    has_jwks = BooleanFilter(label="Only return sources with JWKS data", method="filter_has_jwks")
154
155    def filter_has_jwks(self, queryset, name, value):  # pragma: no cover
156        """Only return sources with JWKS data"""
157        return queryset.exclude(oidc_jwks__iexact="{}")
158
159    class Meta:
160        model = OAuthSource
161        fields = [
162            "pbm_uuid",
163            "name",
164            "slug",
165            "enabled",
166            "authentication_flow",
167            "enrollment_flow",
168            "policy_engine_mode",
169            "user_matching_mode",
170            "group_matching_mode",
171            "provider_type",
172            "request_token_url",
173            "authorization_url",
174            "access_token_url",
175            "profile_url",
176            "consumer_key",
177            "additional_scopes",
178        ]

OAuth Source filter set

has_jwks
def filter_has_jwks(self, queryset, name, value):
155    def filter_has_jwks(self, queryset, name, value):  # pragma: no cover
156        """Only return sources with JWKS data"""
157        return queryset.exclude(oidc_jwks__iexact="{}")

Only return sources with JWKS data

declared_filters = OrderedDict({'has_jwks': <django_filters.filters.BooleanFilter object>})
base_filters = OrderedDict({'pbm_uuid': <django_filters.filters.UUIDFilter object>, 'name': <django_filters.filters.CharFilter object>, 'slug': <django_filters.filters.CharFilter object>, 'enabled': <django_filters.filters.BooleanFilter object>, 'authentication_flow': <django_filters.filters.ModelChoiceFilter object>, 'enrollment_flow': <django_filters.filters.ModelChoiceFilter object>, 'policy_engine_mode': <django_filters.filters.ChoiceFilter object>, 'user_matching_mode': <django_filters.filters.ChoiceFilter object>, 'group_matching_mode': <django_filters.filters.ChoiceFilter object>, 'provider_type': <django_filters.filters.CharFilter object>, 'request_token_url': <django_filters.filters.CharFilter object>, 'authorization_url': <django_filters.filters.CharFilter object>, 'access_token_url': <django_filters.filters.CharFilter object>, 'profile_url': <django_filters.filters.CharFilter object>, 'consumer_key': <django_filters.filters.CharFilter object>, 'additional_scopes': <django_filters.filters.CharFilter object>, 'has_jwks': <django_filters.filters.BooleanFilter object>})
class OAuthSourceFilter.Meta:
159    class Meta:
160        model = OAuthSource
161        fields = [
162            "pbm_uuid",
163            "name",
164            "slug",
165            "enabled",
166            "authentication_flow",
167            "enrollment_flow",
168            "policy_engine_mode",
169            "user_matching_mode",
170            "group_matching_mode",
171            "provider_type",
172            "request_token_url",
173            "authorization_url",
174            "access_token_url",
175            "profile_url",
176            "consumer_key",
177            "additional_scopes",
178        ]
fields = ['pbm_uuid', 'name', 'slug', 'enabled', 'authentication_flow', 'enrollment_flow', 'policy_engine_mode', 'user_matching_mode', 'group_matching_mode', 'provider_type', 'request_token_url', 'authorization_url', 'access_token_url', 'profile_url', 'consumer_key', 'additional_scopes']
class OAuthSourceViewSet(authentik.core.api.used_by.UsedByMixin, rest_framework.viewsets.ModelViewSet):
181class OAuthSourceViewSet(UsedByMixin, ModelViewSet):
182    """Source Viewset"""
183
184    queryset = OAuthSource.objects.all()
185    serializer_class = OAuthSourceSerializer
186    lookup_field = "slug"
187    filterset_class = OAuthSourceFilter
188    search_fields = ["name", "slug"]
189    ordering = ["name"]
190
191    @extend_schema(
192        responses={200: SourceTypeSerializer(many=True)},
193        parameters=[
194            OpenApiParameter(
195                name="name",
196                location=OpenApiParameter.QUERY,
197                type=OpenApiTypes.STR,
198            )
199        ],
200    )
201    @action(detail=False, pagination_class=None, filter_backends=[])
202    def source_types(self, request: Request) -> Response:
203        """Get all creatable source types. If ?name is set, only returns the type for <name>.
204        If <name> isn't found, returns the default type."""
205        data = []
206        if "name" in request.query_params:
207            source_type = registry.find_type(request.query_params.get("name"))
208            if source_type.__class__ != SourceType:
209                data.append(SourceTypeSerializer(source_type).data)
210        else:
211            for source_type in registry.get():
212                data.append(SourceTypeSerializer(source_type).data)
213        return Response(data)

Source Viewset

queryset = <InheritanceQuerySet []>
serializer_class = <class 'OAuthSourceSerializer'>
lookup_field = 'slug'
filterset_class = <class 'OAuthSourceFilter'>
search_fields = ['name', 'slug']
ordering = ['name']
@extend_schema(responses={200: SourceTypeSerializer(many=True)}, parameters=[OpenApiParameter(name='name', location=OpenApiParameter.QUERY, type=OpenApiTypes.STR)])
@action(detail=False, pagination_class=None, filter_backends=[])
def source_types( self, request: rest_framework.request.Request) -> rest_framework.response.Response:
191    @extend_schema(
192        responses={200: SourceTypeSerializer(many=True)},
193        parameters=[
194            OpenApiParameter(
195                name="name",
196                location=OpenApiParameter.QUERY,
197                type=OpenApiTypes.STR,
198            )
199        ],
200    )
201    @action(detail=False, pagination_class=None, filter_backends=[])
202    def source_types(self, request: Request) -> Response:
203        """Get all creatable source types. If ?name is set, only returns the type for <name>.
204        If <name> isn't found, returns the default type."""
205        data = []
206        if "name" in request.query_params:
207            source_type = registry.find_type(request.query_params.get("name"))
208            if source_type.__class__ != SourceType:
209                data.append(SourceTypeSerializer(source_type).data)
210        else:
211            for source_type in registry.get():
212                data.append(SourceTypeSerializer(source_type).data)
213        return Response(data)

Get all creatable source types. If ?name is set, only returns the type for . If isn't found, returns the default type.

name = None
description = None
suffix = None
detail = None
basename = None