authentik.sources.oauth.api.source
OAuth Source Serializer
1"""OAuth Source Serializer""" 2 3from django.urls.base import reverse_lazy 4from django_filters.filters import BooleanFilter 5from django_filters.filterset import FilterSet 6from drf_spectacular.types import OpenApiTypes 7from drf_spectacular.utils import OpenApiParameter, extend_schema, extend_schema_field 8from requests import RequestException 9from rest_framework.decorators import action 10from rest_framework.fields import BooleanField, CharField, ChoiceField, SerializerMethodField 11from rest_framework.request import Request 12from rest_framework.response import Response 13from rest_framework.serializers import ValidationError 14from rest_framework.viewsets import ModelViewSet 15 16from authentik.core.api.sources import SourceSerializer 17from authentik.core.api.used_by import UsedByMixin 18from authentik.core.api.utils import PassiveSerializer 19from authentik.lib.utils.http import get_http_session 20from authentik.sources.oauth.models import OAuthSource 21from authentik.sources.oauth.types.registry import SourceType, registry 22 23 24class SourceTypeSerializer(PassiveSerializer): 25 """Serializer for SourceType""" 26 27 name = CharField(required=True) 28 verbose_name = CharField(required=True) 29 urls_customizable = BooleanField() 30 request_token_url = CharField(read_only=True, allow_null=True) 31 authorization_url = CharField(read_only=True, allow_null=True) 32 access_token_url = CharField(read_only=True, allow_null=True) 33 profile_url = CharField(read_only=True, allow_null=True) 34 oidc_well_known_url = CharField(read_only=True, allow_null=True) 35 oidc_jwks_url = CharField(read_only=True, allow_null=True) 36 37 38class OAuthSourceSerializer(SourceSerializer): 39 """OAuth Source Serializer""" 40 41 provider_type = ChoiceField(choices=registry.get_name_tuple()) 42 callback_url = SerializerMethodField() 43 type = SerializerMethodField() 44 45 def get_callback_url(self, instance: OAuthSource) -> str: 46 """Get OAuth Callback URL""" 47 relative_url = reverse_lazy( 48 "authentik_sources_oauth:oauth-client-callback", 49 kwargs={"source_slug": instance.slug}, 50 ) 51 if "request" not in self.context: 52 return relative_url 53 return self.context["request"].build_absolute_uri(relative_url) 54 55 @extend_schema_field(SourceTypeSerializer) 56 def get_type(self, instance: OAuthSource) -> SourceTypeSerializer: 57 """Get source's type configuration""" 58 return SourceTypeSerializer(instance.source_type).data 59 60 def validate(self, attrs: dict) -> dict: 61 session = get_http_session() 62 provider_type_name = attrs.get( 63 "provider_type", 64 self.instance.provider_type if self.instance else None, 65 ) 66 source_type = registry.find_type(provider_type_name) 67 68 well_known = attrs.get("oidc_well_known_url") or source_type.oidc_well_known_url 69 inferred_oidc_jwks_url = None 70 71 if well_known and well_known != "": 72 try: 73 well_known_config = session.get(well_known) 74 well_known_config.raise_for_status() 75 except RequestException as exc: 76 text = exc.response.text if exc.response else str(exc) 77 raise ValidationError({"oidc_well_known_url": text}) from None 78 config = well_known_config.json() 79 if "issuer" not in config: 80 raise ValidationError({"oidc_well_known_url": "Invalid well-known configuration"}) 81 field_map = { 82 # authentik field to oidc field 83 "authorization_url": "authorization_endpoint", 84 "access_token_url": "token_endpoint", 85 "profile_url": "userinfo_endpoint", 86 "pkce": "code_challenge_methods_supported", 87 } 88 for ak_key, oidc_key in field_map.items(): 89 # Don't overwrite user-set values 90 if ak_key in attrs and attrs[ak_key]: 91 continue 92 attrs[ak_key] = config.get(oidc_key, "") 93 inferred_oidc_jwks_url = config.get("jwks_uri", "") 94 95 # Prefer user-entered URL to inferred URL to default URL 96 jwks_url = attrs.get("oidc_jwks_url") or inferred_oidc_jwks_url or source_type.oidc_jwks_url 97 if jwks_url and jwks_url != "": 98 attrs["oidc_jwks_url"] = jwks_url 99 try: 100 jwks_config = session.get(jwks_url) 101 jwks_config.raise_for_status() 102 except RequestException as exc: 103 text = exc.response.text if exc.response else str(exc) 104 raise ValidationError({"oidc_jwks_url": text}) from None 105 config = jwks_config.json() 106 attrs["oidc_jwks"] = config 107 108 for url in [ 109 "authorization_url", 110 "access_token_url", 111 "profile_url", 112 ]: 113 if getattr(source_type, url, None) is None: 114 if url not in attrs: 115 raise ValidationError( 116 f"{url} is required for provider {source_type.verbose_name}" 117 ) 118 return attrs 119 120 class Meta: 121 model = OAuthSource 122 fields = SourceSerializer.Meta.fields + [ 123 "group_matching_mode", 124 "provider_type", 125 "request_token_url", 126 "authorization_url", 127 "access_token_url", 128 "profile_url", 129 "pkce", 130 "consumer_key", 131 "consumer_secret", 132 "callback_url", 133 "additional_scopes", 134 "type", 135 "oidc_well_known_url", 136 "oidc_jwks_url", 137 "oidc_jwks", 138 "authorization_code_auth_method", 139 ] 140 extra_kwargs = { 141 "consumer_secret": {"write_only": True}, 142 "request_token_url": {"allow_blank": True}, 143 "authorization_url": {"allow_blank": True}, 144 "access_token_url": {"allow_blank": True}, 145 "profile_url": {"allow_blank": True}, 146 } 147 148 149class OAuthSourceFilter(FilterSet): 150 """OAuth Source filter set""" 151 152 has_jwks = BooleanFilter(label="Only return sources with JWKS data", method="filter_has_jwks") 153 154 def filter_has_jwks(self, queryset, name, value): # pragma: no cover 155 """Only return sources with JWKS data""" 156 return queryset.exclude(oidc_jwks__iexact="{}") 157 158 class Meta: 159 model = OAuthSource 160 fields = [ 161 "pbm_uuid", 162 "name", 163 "slug", 164 "enabled", 165 "authentication_flow", 166 "enrollment_flow", 167 "policy_engine_mode", 168 "user_matching_mode", 169 "group_matching_mode", 170 "provider_type", 171 "request_token_url", 172 "authorization_url", 173 "access_token_url", 174 "profile_url", 175 "consumer_key", 176 "additional_scopes", 177 ] 178 179 180class OAuthSourceViewSet(UsedByMixin, ModelViewSet): 181 """Source Viewset""" 182 183 queryset = OAuthSource.objects.all() 184 serializer_class = OAuthSourceSerializer 185 lookup_field = "slug" 186 filterset_class = OAuthSourceFilter 187 search_fields = ["name", "slug"] 188 ordering = ["name"] 189 190 @extend_schema( 191 responses={200: SourceTypeSerializer(many=True)}, 192 parameters=[ 193 OpenApiParameter( 194 name="name", 195 location=OpenApiParameter.QUERY, 196 type=OpenApiTypes.STR, 197 ) 198 ], 199 ) 200 @action(detail=False, pagination_class=None, filter_backends=[]) 201 def source_types(self, request: Request) -> Response: 202 """Get all creatable source types. If ?name is set, only returns the type for <name>. 203 If <name> isn't found, returns the default type.""" 204 data = [] 205 if "name" in request.query_params: 206 source_type = registry.find_type(request.query_params.get("name")) 207 if source_type.__class__ != SourceType: 208 data.append(SourceTypeSerializer(source_type).data) 209 else: 210 for source_type in registry.get(): 211 data.append(SourceTypeSerializer(source_type).data) 212 return Response(data)
25class SourceTypeSerializer(PassiveSerializer): 26 """Serializer for SourceType""" 27 28 name = CharField(required=True) 29 verbose_name = CharField(required=True) 30 urls_customizable = BooleanField() 31 request_token_url = CharField(read_only=True, allow_null=True) 32 authorization_url = CharField(read_only=True, allow_null=True) 33 access_token_url = CharField(read_only=True, allow_null=True) 34 profile_url = CharField(read_only=True, allow_null=True) 35 oidc_well_known_url = CharField(read_only=True, allow_null=True) 36 oidc_jwks_url = CharField(read_only=True, allow_null=True)
Serializer for SourceType
Inherited Members
39class OAuthSourceSerializer(SourceSerializer): 40 """OAuth Source Serializer""" 41 42 provider_type = ChoiceField(choices=registry.get_name_tuple()) 43 callback_url = SerializerMethodField() 44 type = SerializerMethodField() 45 46 def get_callback_url(self, instance: OAuthSource) -> str: 47 """Get OAuth Callback URL""" 48 relative_url = reverse_lazy( 49 "authentik_sources_oauth:oauth-client-callback", 50 kwargs={"source_slug": instance.slug}, 51 ) 52 if "request" not in self.context: 53 return relative_url 54 return self.context["request"].build_absolute_uri(relative_url) 55 56 @extend_schema_field(SourceTypeSerializer) 57 def get_type(self, instance: OAuthSource) -> SourceTypeSerializer: 58 """Get source's type configuration""" 59 return SourceTypeSerializer(instance.source_type).data 60 61 def validate(self, attrs: dict) -> dict: 62 session = get_http_session() 63 provider_type_name = attrs.get( 64 "provider_type", 65 self.instance.provider_type if self.instance else None, 66 ) 67 source_type = registry.find_type(provider_type_name) 68 69 well_known = attrs.get("oidc_well_known_url") or source_type.oidc_well_known_url 70 inferred_oidc_jwks_url = None 71 72 if well_known and well_known != "": 73 try: 74 well_known_config = session.get(well_known) 75 well_known_config.raise_for_status() 76 except RequestException as exc: 77 text = exc.response.text if exc.response else str(exc) 78 raise ValidationError({"oidc_well_known_url": text}) from None 79 config = well_known_config.json() 80 if "issuer" not in config: 81 raise ValidationError({"oidc_well_known_url": "Invalid well-known configuration"}) 82 field_map = { 83 # authentik field to oidc field 84 "authorization_url": "authorization_endpoint", 85 "access_token_url": "token_endpoint", 86 "profile_url": "userinfo_endpoint", 87 "pkce": "code_challenge_methods_supported", 88 } 89 for ak_key, oidc_key in field_map.items(): 90 # Don't overwrite user-set values 91 if ak_key in attrs and attrs[ak_key]: 92 continue 93 attrs[ak_key] = config.get(oidc_key, "") 94 inferred_oidc_jwks_url = config.get("jwks_uri", "") 95 96 # Prefer user-entered URL to inferred URL to default URL 97 jwks_url = attrs.get("oidc_jwks_url") or inferred_oidc_jwks_url or source_type.oidc_jwks_url 98 if jwks_url and jwks_url != "": 99 attrs["oidc_jwks_url"] = jwks_url 100 try: 101 jwks_config = session.get(jwks_url) 102 jwks_config.raise_for_status() 103 except RequestException as exc: 104 text = exc.response.text if exc.response else str(exc) 105 raise ValidationError({"oidc_jwks_url": text}) from None 106 config = jwks_config.json() 107 attrs["oidc_jwks"] = config 108 109 for url in [ 110 "authorization_url", 111 "access_token_url", 112 "profile_url", 113 ]: 114 if getattr(source_type, url, None) is None: 115 if url not in attrs: 116 raise ValidationError( 117 f"{url} is required for provider {source_type.verbose_name}" 118 ) 119 return attrs 120 121 class Meta: 122 model = OAuthSource 123 fields = SourceSerializer.Meta.fields + [ 124 "group_matching_mode", 125 "provider_type", 126 "request_token_url", 127 "authorization_url", 128 "access_token_url", 129 "profile_url", 130 "pkce", 131 "consumer_key", 132 "consumer_secret", 133 "callback_url", 134 "additional_scopes", 135 "type", 136 "oidc_well_known_url", 137 "oidc_jwks_url", 138 "oidc_jwks", 139 "authorization_code_auth_method", 140 ] 141 extra_kwargs = { 142 "consumer_secret": {"write_only": True}, 143 "request_token_url": {"allow_blank": True}, 144 "authorization_url": {"allow_blank": True}, 145 "access_token_url": {"allow_blank": True}, 146 "profile_url": {"allow_blank": True}, 147 }
OAuth Source Serializer
46 def get_callback_url(self, instance: OAuthSource) -> str: 47 """Get OAuth Callback URL""" 48 relative_url = reverse_lazy( 49 "authentik_sources_oauth:oauth-client-callback", 50 kwargs={"source_slug": instance.slug}, 51 ) 52 if "request" not in self.context: 53 return relative_url 54 return self.context["request"].build_absolute_uri(relative_url)
Get OAuth Callback URL
@extend_schema_field(SourceTypeSerializer)
def
get_type( self, instance: authentik.sources.oauth.models.OAuthSource) -> SourceTypeSerializer:
56 @extend_schema_field(SourceTypeSerializer) 57 def get_type(self, instance: OAuthSource) -> SourceTypeSerializer: 58 """Get source's type configuration""" 59 return SourceTypeSerializer(instance.source_type).data
Get source's type configuration
def
validate(self, attrs: dict) -> dict:
61 def validate(self, attrs: dict) -> dict: 62 session = get_http_session() 63 provider_type_name = attrs.get( 64 "provider_type", 65 self.instance.provider_type if self.instance else None, 66 ) 67 source_type = registry.find_type(provider_type_name) 68 69 well_known = attrs.get("oidc_well_known_url") or source_type.oidc_well_known_url 70 inferred_oidc_jwks_url = None 71 72 if well_known and well_known != "": 73 try: 74 well_known_config = session.get(well_known) 75 well_known_config.raise_for_status() 76 except RequestException as exc: 77 text = exc.response.text if exc.response else str(exc) 78 raise ValidationError({"oidc_well_known_url": text}) from None 79 config = well_known_config.json() 80 if "issuer" not in config: 81 raise ValidationError({"oidc_well_known_url": "Invalid well-known configuration"}) 82 field_map = { 83 # authentik field to oidc field 84 "authorization_url": "authorization_endpoint", 85 "access_token_url": "token_endpoint", 86 "profile_url": "userinfo_endpoint", 87 "pkce": "code_challenge_methods_supported", 88 } 89 for ak_key, oidc_key in field_map.items(): 90 # Don't overwrite user-set values 91 if ak_key in attrs and attrs[ak_key]: 92 continue 93 attrs[ak_key] = config.get(oidc_key, "") 94 inferred_oidc_jwks_url = config.get("jwks_uri", "") 95 96 # Prefer user-entered URL to inferred URL to default URL 97 jwks_url = attrs.get("oidc_jwks_url") or inferred_oidc_jwks_url or source_type.oidc_jwks_url 98 if jwks_url and jwks_url != "": 99 attrs["oidc_jwks_url"] = jwks_url 100 try: 101 jwks_config = session.get(jwks_url) 102 jwks_config.raise_for_status() 103 except RequestException as exc: 104 text = exc.response.text if exc.response else str(exc) 105 raise ValidationError({"oidc_jwks_url": text}) from None 106 config = jwks_config.json() 107 attrs["oidc_jwks"] = config 108 109 for url in [ 110 "authorization_url", 111 "access_token_url", 112 "profile_url", 113 ]: 114 if getattr(source_type, url, None) is None: 115 if url not in attrs: 116 raise ValidationError( 117 f"{url} is required for provider {source_type.verbose_name}" 118 ) 119 return attrs
Inherited Members
class
OAuthSourceSerializer.Meta:
121 class Meta: 122 model = OAuthSource 123 fields = SourceSerializer.Meta.fields + [ 124 "group_matching_mode", 125 "provider_type", 126 "request_token_url", 127 "authorization_url", 128 "access_token_url", 129 "profile_url", 130 "pkce", 131 "consumer_key", 132 "consumer_secret", 133 "callback_url", 134 "additional_scopes", 135 "type", 136 "oidc_well_known_url", 137 "oidc_jwks_url", 138 "oidc_jwks", 139 "authorization_code_auth_method", 140 ] 141 extra_kwargs = { 142 "consumer_secret": {"write_only": True}, 143 "request_token_url": {"allow_blank": True}, 144 "authorization_url": {"allow_blank": True}, 145 "access_token_url": {"allow_blank": True}, 146 "profile_url": {"allow_blank": True}, 147 }
model =
<class 'authentik.sources.oauth.models.OAuthSource'>
fields =
['pk', 'name', 'slug', 'enabled', 'promoted', 'authentication_flow', 'enrollment_flow', 'user_property_mappings', 'group_property_mappings', 'component', 'verbose_name', 'verbose_name_plural', 'meta_model_name', 'policy_engine_mode', 'user_matching_mode', 'managed', 'user_path_template', 'icon', 'icon_url', 'icon_themed_urls', 'group_matching_mode', 'provider_type', 'request_token_url', 'authorization_url', 'access_token_url', 'profile_url', 'pkce', 'consumer_key', 'consumer_secret', 'callback_url', 'additional_scopes', 'type', 'oidc_well_known_url', 'oidc_jwks_url', 'oidc_jwks', 'authorization_code_auth_method']
class
OAuthSourceFilter(django_filters.filterset.FilterSet):
150class OAuthSourceFilter(FilterSet): 151 """OAuth Source filter set""" 152 153 has_jwks = BooleanFilter(label="Only return sources with JWKS data", method="filter_has_jwks") 154 155 def filter_has_jwks(self, queryset, name, value): # pragma: no cover 156 """Only return sources with JWKS data""" 157 return queryset.exclude(oidc_jwks__iexact="{}") 158 159 class Meta: 160 model = OAuthSource 161 fields = [ 162 "pbm_uuid", 163 "name", 164 "slug", 165 "enabled", 166 "authentication_flow", 167 "enrollment_flow", 168 "policy_engine_mode", 169 "user_matching_mode", 170 "group_matching_mode", 171 "provider_type", 172 "request_token_url", 173 "authorization_url", 174 "access_token_url", 175 "profile_url", 176 "consumer_key", 177 "additional_scopes", 178 ]
OAuth Source filter set
def
filter_has_jwks(self, queryset, name, value):
155 def filter_has_jwks(self, queryset, name, value): # pragma: no cover 156 """Only return sources with JWKS data""" 157 return queryset.exclude(oidc_jwks__iexact="{}")
Only return sources with JWKS data
base_filters =
OrderedDict({'pbm_uuid': <django_filters.filters.UUIDFilter object>, 'name': <django_filters.filters.CharFilter object>, 'slug': <django_filters.filters.CharFilter object>, 'enabled': <django_filters.filters.BooleanFilter object>, 'authentication_flow': <django_filters.filters.ModelChoiceFilter object>, 'enrollment_flow': <django_filters.filters.ModelChoiceFilter object>, 'policy_engine_mode': <django_filters.filters.ChoiceFilter object>, 'user_matching_mode': <django_filters.filters.ChoiceFilter object>, 'group_matching_mode': <django_filters.filters.ChoiceFilter object>, 'provider_type': <django_filters.filters.CharFilter object>, 'request_token_url': <django_filters.filters.CharFilter object>, 'authorization_url': <django_filters.filters.CharFilter object>, 'access_token_url': <django_filters.filters.CharFilter object>, 'profile_url': <django_filters.filters.CharFilter object>, 'consumer_key': <django_filters.filters.CharFilter object>, 'additional_scopes': <django_filters.filters.CharFilter object>, 'has_jwks': <django_filters.filters.BooleanFilter object>})
class
OAuthSourceFilter.Meta:
159 class Meta: 160 model = OAuthSource 161 fields = [ 162 "pbm_uuid", 163 "name", 164 "slug", 165 "enabled", 166 "authentication_flow", 167 "enrollment_flow", 168 "policy_engine_mode", 169 "user_matching_mode", 170 "group_matching_mode", 171 "provider_type", 172 "request_token_url", 173 "authorization_url", 174 "access_token_url", 175 "profile_url", 176 "consumer_key", 177 "additional_scopes", 178 ]
model =
<class 'authentik.sources.oauth.models.OAuthSource'>
class
OAuthSourceViewSet(authentik.core.api.used_by.UsedByMixin, rest_framework.viewsets.ModelViewSet):
181class OAuthSourceViewSet(UsedByMixin, ModelViewSet): 182 """Source Viewset""" 183 184 queryset = OAuthSource.objects.all() 185 serializer_class = OAuthSourceSerializer 186 lookup_field = "slug" 187 filterset_class = OAuthSourceFilter 188 search_fields = ["name", "slug"] 189 ordering = ["name"] 190 191 @extend_schema( 192 responses={200: SourceTypeSerializer(many=True)}, 193 parameters=[ 194 OpenApiParameter( 195 name="name", 196 location=OpenApiParameter.QUERY, 197 type=OpenApiTypes.STR, 198 ) 199 ], 200 ) 201 @action(detail=False, pagination_class=None, filter_backends=[]) 202 def source_types(self, request: Request) -> Response: 203 """Get all creatable source types. If ?name is set, only returns the type for <name>. 204 If <name> isn't found, returns the default type.""" 205 data = [] 206 if "name" in request.query_params: 207 source_type = registry.find_type(request.query_params.get("name")) 208 if source_type.__class__ != SourceType: 209 data.append(SourceTypeSerializer(source_type).data) 210 else: 211 for source_type in registry.get(): 212 data.append(SourceTypeSerializer(source_type).data) 213 return Response(data)
Source Viewset
serializer_class =
<class 'OAuthSourceSerializer'>
filterset_class =
<class 'OAuthSourceFilter'>
@extend_schema(responses={200: SourceTypeSerializer(many=True)}, parameters=[OpenApiParameter(name='name', location=OpenApiParameter.QUERY, type=OpenApiTypes.STR)])
@action(detail=False, pagination_class=None, filter_backends=[])
def
source_types( self, request: rest_framework.request.Request) -> rest_framework.response.Response:
191 @extend_schema( 192 responses={200: SourceTypeSerializer(many=True)}, 193 parameters=[ 194 OpenApiParameter( 195 name="name", 196 location=OpenApiParameter.QUERY, 197 type=OpenApiTypes.STR, 198 ) 199 ], 200 ) 201 @action(detail=False, pagination_class=None, filter_backends=[]) 202 def source_types(self, request: Request) -> Response: 203 """Get all creatable source types. If ?name is set, only returns the type for <name>. 204 If <name> isn't found, returns the default type.""" 205 data = [] 206 if "name" in request.query_params: 207 source_type = registry.find_type(request.query_params.get("name")) 208 if source_type.__class__ != SourceType: 209 data.append(SourceTypeSerializer(source_type).data) 210 else: 211 for source_type in registry.get(): 212 data.append(SourceTypeSerializer(source_type).data) 213 return Response(data)
Get all creatable source types. If ?name is set, only returns the type for