authentik.sources.oauth.api.source
OAuth Source Serializer
1"""OAuth Source Serializer""" 2 3from django.urls.base import reverse_lazy 4from django_filters.filters import BooleanFilter 5from django_filters.filterset import FilterSet 6from drf_spectacular.types import OpenApiTypes 7from drf_spectacular.utils import OpenApiParameter, extend_schema, extend_schema_field 8from requests import RequestException 9from rest_framework.decorators import action 10from rest_framework.fields import BooleanField, CharField, ChoiceField, SerializerMethodField 11from rest_framework.request import Request 12from rest_framework.response import Response 13from rest_framework.serializers import ValidationError 14from rest_framework.viewsets import ModelViewSet 15 16from authentik.core.api.sources import SourceSerializer 17from authentik.core.api.used_by import UsedByMixin 18from authentik.core.api.utils import PassiveSerializer 19from authentik.lib.utils.http import get_http_session 20from authentik.sources.oauth.models import OAuthSource, PKCEMethod 21from authentik.sources.oauth.types.registry import SourceType, registry 22 23 24class SourceTypeSerializer(PassiveSerializer): 25 """Serializer for SourceType""" 26 27 name = CharField(required=True) 28 verbose_name = CharField(required=True) 29 urls_customizable = BooleanField() 30 request_token_url = CharField(read_only=True, allow_null=True) 31 authorization_url = CharField(read_only=True, allow_null=True) 32 access_token_url = CharField(read_only=True, allow_null=True) 33 profile_url = CharField(read_only=True, allow_null=True) 34 oidc_well_known_url = CharField(read_only=True, allow_null=True) 35 oidc_jwks_url = CharField(read_only=True, allow_null=True) 36 37 38class OAuthSourceSerializer(SourceSerializer): 39 """OAuth Source Serializer""" 40 41 provider_type = ChoiceField(choices=registry.get_name_tuple()) 42 callback_url = SerializerMethodField() 43 type = SerializerMethodField() 44 45 def get_callback_url(self, instance: OAuthSource) -> str: 46 """Get OAuth Callback URL""" 47 relative_url = reverse_lazy( 48 "authentik_sources_oauth:oauth-client-callback", 49 kwargs={"source_slug": instance.slug}, 50 ) 51 if "request" not in self.context: 52 return relative_url 53 return self.context["request"].build_absolute_uri(relative_url) 54 55 @extend_schema_field(SourceTypeSerializer) 56 def get_type(self, instance: OAuthSource) -> SourceTypeSerializer: 57 """Get source's type configuration""" 58 return SourceTypeSerializer(instance.source_type).data 59 60 def validate(self, attrs: dict) -> dict: 61 session = get_http_session() 62 provider_type_name = attrs.get( 63 "provider_type", 64 self.instance.provider_type if self.instance else None, 65 ) 66 source_type = registry.find_type(provider_type_name) 67 68 well_known = attrs.get("oidc_well_known_url") or source_type.oidc_well_known_url 69 inferred_oidc_jwks_url = None 70 71 if well_known and well_known != "": 72 try: 73 well_known_config = session.get(well_known) 74 well_known_config.raise_for_status() 75 except RequestException as exc: 76 text = exc.response.text if exc.response is not None else str(exc) 77 raise ValidationError({"oidc_well_known_url": text}) from None 78 config = well_known_config.json() 79 if "issuer" not in config: 80 raise ValidationError({"oidc_well_known_url": "Invalid well-known configuration"}) 81 field_map = { 82 # authentik field to oidc field 83 "authorization_url": "authorization_endpoint", 84 "access_token_url": "token_endpoint", 85 "profile_url": "userinfo_endpoint", 86 } 87 for ak_key, oidc_key in field_map.items(): 88 # Don't overwrite user-set values 89 if ak_key in attrs and attrs[ak_key]: 90 continue 91 attrs[ak_key] = config.get(oidc_key, "") 92 # code_challenge_methods_supported is a list per RFC 8414, not a 93 # single method. Pick one (prefer S256, the RFC-recommended method) 94 # rather than letting the list round-trip into the pkce TextField 95 # and later str() into the authorize URL as "['plain', 'S256']". 96 if not attrs.get("pkce"): 97 supported_methods = config.get("code_challenge_methods_supported") or [] 98 attrs["pkce"] = PKCEMethod.NONE 99 if isinstance(supported_methods, list): 100 if PKCEMethod.S256 in supported_methods: 101 attrs["pkce"] = PKCEMethod.S256 102 elif PKCEMethod.PLAIN in supported_methods: 103 attrs["pkce"] = PKCEMethod.PLAIN 104 inferred_oidc_jwks_url = config.get("jwks_uri", "") 105 106 # Prefer user-entered URL to inferred URL to default URL 107 jwks_url = attrs.get("oidc_jwks_url") or inferred_oidc_jwks_url or source_type.oidc_jwks_url 108 if jwks_url and jwks_url != "": 109 attrs["oidc_jwks_url"] = jwks_url 110 try: 111 jwks_config = session.get(jwks_url) 112 jwks_config.raise_for_status() 113 except RequestException as exc: 114 text = exc.response.text if exc.response is not None else str(exc) 115 raise ValidationError({"oidc_jwks_url": text}) from None 116 config = jwks_config.json() 117 attrs["oidc_jwks"] = config 118 119 for url in [ 120 "authorization_url", 121 "access_token_url", 122 "profile_url", 123 ]: 124 if getattr(source_type, url, None) is None: 125 if url not in attrs: 126 raise ValidationError( 127 f"{url} is required for provider {source_type.verbose_name}" 128 ) 129 return attrs 130 131 class Meta: 132 model = OAuthSource 133 fields = SourceSerializer.Meta.fields + [ 134 "group_matching_mode", 135 "provider_type", 136 "request_token_url", 137 "authorization_url", 138 "access_token_url", 139 "profile_url", 140 "pkce", 141 "consumer_key", 142 "consumer_secret", 143 "callback_url", 144 "additional_scopes", 145 "type", 146 "oidc_well_known_url", 147 "oidc_jwks_url", 148 "oidc_jwks", 149 "authorization_code_auth_method", 150 ] 151 extra_kwargs = { 152 "consumer_secret": {"write_only": True}, 153 "request_token_url": {"allow_blank": True}, 154 "authorization_url": {"allow_blank": True}, 155 "access_token_url": {"allow_blank": True}, 156 "profile_url": {"allow_blank": True}, 157 } 158 159 160class OAuthSourceFilter(FilterSet): 161 """OAuth Source filter set""" 162 163 has_jwks = BooleanFilter(label="Only return sources with JWKS data", method="filter_has_jwks") 164 165 def filter_has_jwks(self, queryset, name, value): # pragma: no cover 166 """Only return sources with JWKS data""" 167 return queryset.exclude(oidc_jwks__iexact="{}") 168 169 class Meta: 170 model = OAuthSource 171 fields = [ 172 "pbm_uuid", 173 "name", 174 "slug", 175 "enabled", 176 "authentication_flow", 177 "enrollment_flow", 178 "policy_engine_mode", 179 "user_matching_mode", 180 "group_matching_mode", 181 "provider_type", 182 "request_token_url", 183 "authorization_url", 184 "access_token_url", 185 "profile_url", 186 "consumer_key", 187 "additional_scopes", 188 ] 189 190 191class OAuthSourceViewSet(UsedByMixin, ModelViewSet): 192 """Source Viewset""" 193 194 queryset = OAuthSource.objects.all() 195 serializer_class = OAuthSourceSerializer 196 lookup_field = "slug" 197 filterset_class = OAuthSourceFilter 198 search_fields = ["name", "slug"] 199 ordering = ["name"] 200 201 @extend_schema( 202 responses={200: SourceTypeSerializer(many=True)}, 203 parameters=[ 204 OpenApiParameter( 205 name="name", 206 location=OpenApiParameter.QUERY, 207 type=OpenApiTypes.STR, 208 ) 209 ], 210 ) 211 @action(detail=False, pagination_class=None, filter_backends=[]) 212 def source_types(self, request: Request) -> Response: 213 """Get all creatable source types. If ?name is set, only returns the type for <name>. 214 If <name> isn't found, returns the default type.""" 215 data = [] 216 if "name" in request.query_params: 217 source_type = registry.find_type(request.query_params.get("name")) 218 if source_type.__class__ != SourceType: 219 data.append(SourceTypeSerializer(source_type).data) 220 else: 221 for source_type in registry.get(): 222 data.append(SourceTypeSerializer(source_type).data) 223 return Response(data)
25class SourceTypeSerializer(PassiveSerializer): 26 """Serializer for SourceType""" 27 28 name = CharField(required=True) 29 verbose_name = CharField(required=True) 30 urls_customizable = BooleanField() 31 request_token_url = CharField(read_only=True, allow_null=True) 32 authorization_url = CharField(read_only=True, allow_null=True) 33 access_token_url = CharField(read_only=True, allow_null=True) 34 profile_url = CharField(read_only=True, allow_null=True) 35 oidc_well_known_url = CharField(read_only=True, allow_null=True) 36 oidc_jwks_url = CharField(read_only=True, allow_null=True)
Serializer for SourceType
Inherited Members
39class OAuthSourceSerializer(SourceSerializer): 40 """OAuth Source Serializer""" 41 42 provider_type = ChoiceField(choices=registry.get_name_tuple()) 43 callback_url = SerializerMethodField() 44 type = SerializerMethodField() 45 46 def get_callback_url(self, instance: OAuthSource) -> str: 47 """Get OAuth Callback URL""" 48 relative_url = reverse_lazy( 49 "authentik_sources_oauth:oauth-client-callback", 50 kwargs={"source_slug": instance.slug}, 51 ) 52 if "request" not in self.context: 53 return relative_url 54 return self.context["request"].build_absolute_uri(relative_url) 55 56 @extend_schema_field(SourceTypeSerializer) 57 def get_type(self, instance: OAuthSource) -> SourceTypeSerializer: 58 """Get source's type configuration""" 59 return SourceTypeSerializer(instance.source_type).data 60 61 def validate(self, attrs: dict) -> dict: 62 session = get_http_session() 63 provider_type_name = attrs.get( 64 "provider_type", 65 self.instance.provider_type if self.instance else None, 66 ) 67 source_type = registry.find_type(provider_type_name) 68 69 well_known = attrs.get("oidc_well_known_url") or source_type.oidc_well_known_url 70 inferred_oidc_jwks_url = None 71 72 if well_known and well_known != "": 73 try: 74 well_known_config = session.get(well_known) 75 well_known_config.raise_for_status() 76 except RequestException as exc: 77 text = exc.response.text if exc.response is not None else str(exc) 78 raise ValidationError({"oidc_well_known_url": text}) from None 79 config = well_known_config.json() 80 if "issuer" not in config: 81 raise ValidationError({"oidc_well_known_url": "Invalid well-known configuration"}) 82 field_map = { 83 # authentik field to oidc field 84 "authorization_url": "authorization_endpoint", 85 "access_token_url": "token_endpoint", 86 "profile_url": "userinfo_endpoint", 87 } 88 for ak_key, oidc_key in field_map.items(): 89 # Don't overwrite user-set values 90 if ak_key in attrs and attrs[ak_key]: 91 continue 92 attrs[ak_key] = config.get(oidc_key, "") 93 # code_challenge_methods_supported is a list per RFC 8414, not a 94 # single method. Pick one (prefer S256, the RFC-recommended method) 95 # rather than letting the list round-trip into the pkce TextField 96 # and later str() into the authorize URL as "['plain', 'S256']". 97 if not attrs.get("pkce"): 98 supported_methods = config.get("code_challenge_methods_supported") or [] 99 attrs["pkce"] = PKCEMethod.NONE 100 if isinstance(supported_methods, list): 101 if PKCEMethod.S256 in supported_methods: 102 attrs["pkce"] = PKCEMethod.S256 103 elif PKCEMethod.PLAIN in supported_methods: 104 attrs["pkce"] = PKCEMethod.PLAIN 105 inferred_oidc_jwks_url = config.get("jwks_uri", "") 106 107 # Prefer user-entered URL to inferred URL to default URL 108 jwks_url = attrs.get("oidc_jwks_url") or inferred_oidc_jwks_url or source_type.oidc_jwks_url 109 if jwks_url and jwks_url != "": 110 attrs["oidc_jwks_url"] = jwks_url 111 try: 112 jwks_config = session.get(jwks_url) 113 jwks_config.raise_for_status() 114 except RequestException as exc: 115 text = exc.response.text if exc.response is not None else str(exc) 116 raise ValidationError({"oidc_jwks_url": text}) from None 117 config = jwks_config.json() 118 attrs["oidc_jwks"] = config 119 120 for url in [ 121 "authorization_url", 122 "access_token_url", 123 "profile_url", 124 ]: 125 if getattr(source_type, url, None) is None: 126 if url not in attrs: 127 raise ValidationError( 128 f"{url} is required for provider {source_type.verbose_name}" 129 ) 130 return attrs 131 132 class Meta: 133 model = OAuthSource 134 fields = SourceSerializer.Meta.fields + [ 135 "group_matching_mode", 136 "provider_type", 137 "request_token_url", 138 "authorization_url", 139 "access_token_url", 140 "profile_url", 141 "pkce", 142 "consumer_key", 143 "consumer_secret", 144 "callback_url", 145 "additional_scopes", 146 "type", 147 "oidc_well_known_url", 148 "oidc_jwks_url", 149 "oidc_jwks", 150 "authorization_code_auth_method", 151 ] 152 extra_kwargs = { 153 "consumer_secret": {"write_only": True}, 154 "request_token_url": {"allow_blank": True}, 155 "authorization_url": {"allow_blank": True}, 156 "access_token_url": {"allow_blank": True}, 157 "profile_url": {"allow_blank": True}, 158 }
OAuth Source Serializer
46 def get_callback_url(self, instance: OAuthSource) -> str: 47 """Get OAuth Callback URL""" 48 relative_url = reverse_lazy( 49 "authentik_sources_oauth:oauth-client-callback", 50 kwargs={"source_slug": instance.slug}, 51 ) 52 if "request" not in self.context: 53 return relative_url 54 return self.context["request"].build_absolute_uri(relative_url)
Get OAuth Callback URL
@extend_schema_field(SourceTypeSerializer)
def
get_type( self, instance: authentik.sources.oauth.models.OAuthSource) -> SourceTypeSerializer:
56 @extend_schema_field(SourceTypeSerializer) 57 def get_type(self, instance: OAuthSource) -> SourceTypeSerializer: 58 """Get source's type configuration""" 59 return SourceTypeSerializer(instance.source_type).data
Get source's type configuration
def
validate(self, attrs: dict) -> dict:
61 def validate(self, attrs: dict) -> dict: 62 session = get_http_session() 63 provider_type_name = attrs.get( 64 "provider_type", 65 self.instance.provider_type if self.instance else None, 66 ) 67 source_type = registry.find_type(provider_type_name) 68 69 well_known = attrs.get("oidc_well_known_url") or source_type.oidc_well_known_url 70 inferred_oidc_jwks_url = None 71 72 if well_known and well_known != "": 73 try: 74 well_known_config = session.get(well_known) 75 well_known_config.raise_for_status() 76 except RequestException as exc: 77 text = exc.response.text if exc.response is not None else str(exc) 78 raise ValidationError({"oidc_well_known_url": text}) from None 79 config = well_known_config.json() 80 if "issuer" not in config: 81 raise ValidationError({"oidc_well_known_url": "Invalid well-known configuration"}) 82 field_map = { 83 # authentik field to oidc field 84 "authorization_url": "authorization_endpoint", 85 "access_token_url": "token_endpoint", 86 "profile_url": "userinfo_endpoint", 87 } 88 for ak_key, oidc_key in field_map.items(): 89 # Don't overwrite user-set values 90 if ak_key in attrs and attrs[ak_key]: 91 continue 92 attrs[ak_key] = config.get(oidc_key, "") 93 # code_challenge_methods_supported is a list per RFC 8414, not a 94 # single method. Pick one (prefer S256, the RFC-recommended method) 95 # rather than letting the list round-trip into the pkce TextField 96 # and later str() into the authorize URL as "['plain', 'S256']". 97 if not attrs.get("pkce"): 98 supported_methods = config.get("code_challenge_methods_supported") or [] 99 attrs["pkce"] = PKCEMethod.NONE 100 if isinstance(supported_methods, list): 101 if PKCEMethod.S256 in supported_methods: 102 attrs["pkce"] = PKCEMethod.S256 103 elif PKCEMethod.PLAIN in supported_methods: 104 attrs["pkce"] = PKCEMethod.PLAIN 105 inferred_oidc_jwks_url = config.get("jwks_uri", "") 106 107 # Prefer user-entered URL to inferred URL to default URL 108 jwks_url = attrs.get("oidc_jwks_url") or inferred_oidc_jwks_url or source_type.oidc_jwks_url 109 if jwks_url and jwks_url != "": 110 attrs["oidc_jwks_url"] = jwks_url 111 try: 112 jwks_config = session.get(jwks_url) 113 jwks_config.raise_for_status() 114 except RequestException as exc: 115 text = exc.response.text if exc.response is not None else str(exc) 116 raise ValidationError({"oidc_jwks_url": text}) from None 117 config = jwks_config.json() 118 attrs["oidc_jwks"] = config 119 120 for url in [ 121 "authorization_url", 122 "access_token_url", 123 "profile_url", 124 ]: 125 if getattr(source_type, url, None) is None: 126 if url not in attrs: 127 raise ValidationError( 128 f"{url} is required for provider {source_type.verbose_name}" 129 ) 130 return attrs
Inherited Members
class
OAuthSourceSerializer.Meta:
132 class Meta: 133 model = OAuthSource 134 fields = SourceSerializer.Meta.fields + [ 135 "group_matching_mode", 136 "provider_type", 137 "request_token_url", 138 "authorization_url", 139 "access_token_url", 140 "profile_url", 141 "pkce", 142 "consumer_key", 143 "consumer_secret", 144 "callback_url", 145 "additional_scopes", 146 "type", 147 "oidc_well_known_url", 148 "oidc_jwks_url", 149 "oidc_jwks", 150 "authorization_code_auth_method", 151 ] 152 extra_kwargs = { 153 "consumer_secret": {"write_only": True}, 154 "request_token_url": {"allow_blank": True}, 155 "authorization_url": {"allow_blank": True}, 156 "access_token_url": {"allow_blank": True}, 157 "profile_url": {"allow_blank": True}, 158 }
model =
<class 'authentik.sources.oauth.models.OAuthSource'>
fields =
['pk', 'name', 'slug', 'enabled', 'promoted', 'authentication_flow', 'enrollment_flow', 'user_property_mappings', 'group_property_mappings', 'component', 'verbose_name', 'verbose_name_plural', 'meta_model_name', 'policy_engine_mode', 'user_matching_mode', 'managed', 'user_path_template', 'icon', 'icon_url', 'icon_themed_urls', 'group_matching_mode', 'provider_type', 'request_token_url', 'authorization_url', 'access_token_url', 'profile_url', 'pkce', 'consumer_key', 'consumer_secret', 'callback_url', 'additional_scopes', 'type', 'oidc_well_known_url', 'oidc_jwks_url', 'oidc_jwks', 'authorization_code_auth_method']
class
OAuthSourceFilter(django_filters.filterset.FilterSet):
161class OAuthSourceFilter(FilterSet): 162 """OAuth Source filter set""" 163 164 has_jwks = BooleanFilter(label="Only return sources with JWKS data", method="filter_has_jwks") 165 166 def filter_has_jwks(self, queryset, name, value): # pragma: no cover 167 """Only return sources with JWKS data""" 168 return queryset.exclude(oidc_jwks__iexact="{}") 169 170 class Meta: 171 model = OAuthSource 172 fields = [ 173 "pbm_uuid", 174 "name", 175 "slug", 176 "enabled", 177 "authentication_flow", 178 "enrollment_flow", 179 "policy_engine_mode", 180 "user_matching_mode", 181 "group_matching_mode", 182 "provider_type", 183 "request_token_url", 184 "authorization_url", 185 "access_token_url", 186 "profile_url", 187 "consumer_key", 188 "additional_scopes", 189 ]
OAuth Source filter set
def
filter_has_jwks(self, queryset, name, value):
166 def filter_has_jwks(self, queryset, name, value): # pragma: no cover 167 """Only return sources with JWKS data""" 168 return queryset.exclude(oidc_jwks__iexact="{}")
Only return sources with JWKS data
base_filters =
OrderedDict({'pbm_uuid': <django_filters.filters.UUIDFilter object>, 'name': <django_filters.filters.CharFilter object>, 'slug': <django_filters.filters.CharFilter object>, 'enabled': <django_filters.filters.BooleanFilter object>, 'authentication_flow': <django_filters.filters.ModelChoiceFilter object>, 'enrollment_flow': <django_filters.filters.ModelChoiceFilter object>, 'policy_engine_mode': <django_filters.filters.ChoiceFilter object>, 'user_matching_mode': <django_filters.filters.ChoiceFilter object>, 'group_matching_mode': <django_filters.filters.ChoiceFilter object>, 'provider_type': <django_filters.filters.CharFilter object>, 'request_token_url': <django_filters.filters.CharFilter object>, 'authorization_url': <django_filters.filters.CharFilter object>, 'access_token_url': <django_filters.filters.CharFilter object>, 'profile_url': <django_filters.filters.CharFilter object>, 'consumer_key': <django_filters.filters.CharFilter object>, 'additional_scopes': <django_filters.filters.CharFilter object>, 'has_jwks': <django_filters.filters.BooleanFilter object>})
class
OAuthSourceFilter.Meta:
170 class Meta: 171 model = OAuthSource 172 fields = [ 173 "pbm_uuid", 174 "name", 175 "slug", 176 "enabled", 177 "authentication_flow", 178 "enrollment_flow", 179 "policy_engine_mode", 180 "user_matching_mode", 181 "group_matching_mode", 182 "provider_type", 183 "request_token_url", 184 "authorization_url", 185 "access_token_url", 186 "profile_url", 187 "consumer_key", 188 "additional_scopes", 189 ]
model =
<class 'authentik.sources.oauth.models.OAuthSource'>
class
OAuthSourceViewSet(authentik.core.api.used_by.UsedByMixin, rest_framework.viewsets.ModelViewSet):
192class OAuthSourceViewSet(UsedByMixin, ModelViewSet): 193 """Source Viewset""" 194 195 queryset = OAuthSource.objects.all() 196 serializer_class = OAuthSourceSerializer 197 lookup_field = "slug" 198 filterset_class = OAuthSourceFilter 199 search_fields = ["name", "slug"] 200 ordering = ["name"] 201 202 @extend_schema( 203 responses={200: SourceTypeSerializer(many=True)}, 204 parameters=[ 205 OpenApiParameter( 206 name="name", 207 location=OpenApiParameter.QUERY, 208 type=OpenApiTypes.STR, 209 ) 210 ], 211 ) 212 @action(detail=False, pagination_class=None, filter_backends=[]) 213 def source_types(self, request: Request) -> Response: 214 """Get all creatable source types. If ?name is set, only returns the type for <name>. 215 If <name> isn't found, returns the default type.""" 216 data = [] 217 if "name" in request.query_params: 218 source_type = registry.find_type(request.query_params.get("name")) 219 if source_type.__class__ != SourceType: 220 data.append(SourceTypeSerializer(source_type).data) 221 else: 222 for source_type in registry.get(): 223 data.append(SourceTypeSerializer(source_type).data) 224 return Response(data)
Source Viewset
serializer_class =
<class 'OAuthSourceSerializer'>
filterset_class =
<class 'OAuthSourceFilter'>
@extend_schema(responses={200: SourceTypeSerializer(many=True)}, parameters=[OpenApiParameter(name='name', location=OpenApiParameter.QUERY, type=OpenApiTypes.STR)])
@action(detail=False, pagination_class=None, filter_backends=[])
def
source_types( self, request: rest_framework.request.Request) -> rest_framework.response.Response:
202 @extend_schema( 203 responses={200: SourceTypeSerializer(many=True)}, 204 parameters=[ 205 OpenApiParameter( 206 name="name", 207 location=OpenApiParameter.QUERY, 208 type=OpenApiTypes.STR, 209 ) 210 ], 211 ) 212 @action(detail=False, pagination_class=None, filter_backends=[]) 213 def source_types(self, request: Request) -> Response: 214 """Get all creatable source types. If ?name is set, only returns the type for <name>. 215 If <name> isn't found, returns the default type.""" 216 data = [] 217 if "name" in request.query_params: 218 source_type = registry.find_type(request.query_params.get("name")) 219 if source_type.__class__ != SourceType: 220 data.append(SourceTypeSerializer(source_type).data) 221 else: 222 for source_type in registry.get(): 223 data.append(SourceTypeSerializer(source_type).data) 224 return Response(data)
Get all creatable source types. If ?name is set, only returns the type for