authentik.providers.oauth2.views.userinfo
authentik OAuth2 OpenID Userinfo views
1"""authentik OAuth2 OpenID Userinfo views""" 2 3from typing import Any 4 5from deepmerge import always_merger 6from django.http import HttpRequest, HttpResponse 7from django.http.response import HttpResponseBadRequest 8from django.utils.decorators import method_decorator 9from django.utils.translation import gettext_lazy as _ 10from django.views import View 11from django.views.decorators.csrf import csrf_exempt 12from structlog.stdlib import get_logger 13 14from authentik.common.oauth.constants import ( 15 SCOPE_GITHUB_ORG_READ, 16 SCOPE_GITHUB_USER, 17 SCOPE_GITHUB_USER_EMAIL, 18 SCOPE_GITHUB_USER_READ, 19 SCOPE_OPENID, 20) 21from authentik.core.expression.exceptions import PropertyMappingExpressionException 22from authentik.events.models import Event, EventAction 23from authentik.flows.challenge import PermissionDict 24from authentik.providers.oauth2.models import ( 25 BaseGrantModel, 26 OAuth2Provider, 27 RefreshToken, 28 ScopeMapping, 29) 30from authentik.providers.oauth2.utils import TokenResponse, cors_allow, protected_resource_view 31 32LOGGER = get_logger() 33 34 35@method_decorator(csrf_exempt, name="dispatch") 36@method_decorator(protected_resource_view([SCOPE_OPENID]), name="dispatch") 37class UserInfoView(View): 38 """Create a dictionary with all the requested claims about the End-User. 39 See: http://openid.net/specs/openid-connect-core-1_0.html#UserInfoResponse""" 40 41 token: RefreshToken | None 42 43 def get_scope_descriptions( 44 self, scopes: list[str], provider: OAuth2Provider 45 ) -> list[PermissionDict]: 46 """Get a list of all Scopes's descriptions""" 47 scope_descriptions = [] 48 for scope in ScopeMapping.objects.filter(scope_name__in=scopes, provider=provider).order_by( 49 "scope_name" 50 ): 51 scope_descriptions.append(PermissionDict(id=scope.scope_name, name=scope.description)) 52 # GitHub Compatibility Scopes are handled differently, since they required custom paths 53 # Hence they don't exist as Scope objects 54 special_scope_map = { 55 SCOPE_GITHUB_USER: _("GitHub Compatibility: Access your User Information"), 56 SCOPE_GITHUB_USER_READ: _("GitHub Compatibility: Access your User Information"), 57 SCOPE_GITHUB_USER_EMAIL: _("GitHub Compatibility: Access you Email addresses"), 58 SCOPE_GITHUB_ORG_READ: _("GitHub Compatibility: Access your Groups"), 59 } 60 for scope in scopes: 61 if scope in special_scope_map: 62 scope_descriptions.append( 63 PermissionDict(id=str(scope), name=str(special_scope_map[scope])) 64 ) 65 return scope_descriptions 66 67 def get_claims(self, provider: OAuth2Provider, token: BaseGrantModel) -> dict[str, Any]: 68 """Get a dictionary of claims from scopes that the token 69 requires and are assigned to the provider.""" 70 71 scopes_from_client = token.scope 72 final_claims = {} 73 for scope in ScopeMapping.objects.filter( 74 provider=provider, scope_name__in=scopes_from_client 75 ).order_by("scope_name"): 76 scope: ScopeMapping 77 value = None 78 try: 79 value = scope.evaluate( 80 user=token.user, 81 request=self.request, 82 provider=provider, 83 token=token, 84 ) 85 except PropertyMappingExpressionException as exc: 86 Event.new( 87 EventAction.CONFIGURATION_ERROR, 88 message=f"Failed to evaluate property-mapping: '{scope.name}'", 89 provider=provider, 90 mapping=scope, 91 ).from_http(self.request) 92 LOGGER.warning("Failed to evaluate property mapping", exc=exc) 93 if value is None: 94 continue 95 if not isinstance(value, dict): 96 LOGGER.warning( 97 "Scope returned a non-dict value, ignoring", 98 scope=scope, 99 value=value, 100 ) 101 continue 102 always_merger.merge(final_claims, value) 103 LOGGER.debug("updated scope", scope=scope) 104 return final_claims 105 106 def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse: 107 self.token = kwargs.get("token", None) 108 response = super().dispatch(request, *args, **kwargs) 109 allowed_origins = [] 110 if self.token: 111 allowed_origins = [x.url for x in self.token.provider.redirect_uris] 112 cors_allow(self.request, response, *allowed_origins) 113 return response 114 115 def options(self, request: HttpRequest) -> HttpResponse: 116 return TokenResponse({}) 117 118 def get(self, request: HttpRequest, **kwargs) -> HttpResponse: 119 """Handle GET Requests for UserInfo""" 120 if not self.token: 121 return HttpResponseBadRequest() 122 claims = {} 123 claims.setdefault("sub", self.token.id_token.sub) 124 claims.update(self.get_claims(self.token.provider, self.token)) 125 if self.token.id_token.nonce: 126 claims["nonce"] = self.token.id_token.nonce 127 response = TokenResponse(claims) 128 return response 129 130 def post(self, request: HttpRequest, **kwargs) -> HttpResponse: 131 """POST Requests behave the same as GET Requests, so the get handler is called here""" 132 return self.get(request, **kwargs)
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
@method_decorator(csrf_exempt, name='dispatch')
@method_decorator(protected_resource_view([SCOPE_OPENID]), name='dispatch')
class
UserInfoView36@method_decorator(csrf_exempt, name="dispatch") 37@method_decorator(protected_resource_view([SCOPE_OPENID]), name="dispatch") 38class UserInfoView(View): 39 """Create a dictionary with all the requested claims about the End-User. 40 See: http://openid.net/specs/openid-connect-core-1_0.html#UserInfoResponse""" 41 42 token: RefreshToken | None 43 44 def get_scope_descriptions( 45 self, scopes: list[str], provider: OAuth2Provider 46 ) -> list[PermissionDict]: 47 """Get a list of all Scopes's descriptions""" 48 scope_descriptions = [] 49 for scope in ScopeMapping.objects.filter(scope_name__in=scopes, provider=provider).order_by( 50 "scope_name" 51 ): 52 scope_descriptions.append(PermissionDict(id=scope.scope_name, name=scope.description)) 53 # GitHub Compatibility Scopes are handled differently, since they required custom paths 54 # Hence they don't exist as Scope objects 55 special_scope_map = { 56 SCOPE_GITHUB_USER: _("GitHub Compatibility: Access your User Information"), 57 SCOPE_GITHUB_USER_READ: _("GitHub Compatibility: Access your User Information"), 58 SCOPE_GITHUB_USER_EMAIL: _("GitHub Compatibility: Access you Email addresses"), 59 SCOPE_GITHUB_ORG_READ: _("GitHub Compatibility: Access your Groups"), 60 } 61 for scope in scopes: 62 if scope in special_scope_map: 63 scope_descriptions.append( 64 PermissionDict(id=str(scope), name=str(special_scope_map[scope])) 65 ) 66 return scope_descriptions 67 68 def get_claims(self, provider: OAuth2Provider, token: BaseGrantModel) -> dict[str, Any]: 69 """Get a dictionary of claims from scopes that the token 70 requires and are assigned to the provider.""" 71 72 scopes_from_client = token.scope 73 final_claims = {} 74 for scope in ScopeMapping.objects.filter( 75 provider=provider, scope_name__in=scopes_from_client 76 ).order_by("scope_name"): 77 scope: ScopeMapping 78 value = None 79 try: 80 value = scope.evaluate( 81 user=token.user, 82 request=self.request, 83 provider=provider, 84 token=token, 85 ) 86 except PropertyMappingExpressionException as exc: 87 Event.new( 88 EventAction.CONFIGURATION_ERROR, 89 message=f"Failed to evaluate property-mapping: '{scope.name}'", 90 provider=provider, 91 mapping=scope, 92 ).from_http(self.request) 93 LOGGER.warning("Failed to evaluate property mapping", exc=exc) 94 if value is None: 95 continue 96 if not isinstance(value, dict): 97 LOGGER.warning( 98 "Scope returned a non-dict value, ignoring", 99 scope=scope, 100 value=value, 101 ) 102 continue 103 always_merger.merge(final_claims, value) 104 LOGGER.debug("updated scope", scope=scope) 105 return final_claims 106 107 def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse: 108 self.token = kwargs.get("token", None) 109 response = super().dispatch(request, *args, **kwargs) 110 allowed_origins = [] 111 if self.token: 112 allowed_origins = [x.url for x in self.token.provider.redirect_uris] 113 cors_allow(self.request, response, *allowed_origins) 114 return response 115 116 def options(self, request: HttpRequest) -> HttpResponse: 117 return TokenResponse({}) 118 119 def get(self, request: HttpRequest, **kwargs) -> HttpResponse: 120 """Handle GET Requests for UserInfo""" 121 if not self.token: 122 return HttpResponseBadRequest() 123 claims = {} 124 claims.setdefault("sub", self.token.id_token.sub) 125 claims.update(self.get_claims(self.token.provider, self.token)) 126 if self.token.id_token.nonce: 127 claims["nonce"] = self.token.id_token.nonce 128 response = TokenResponse(claims) 129 return response 130 131 def post(self, request: HttpRequest, **kwargs) -> HttpResponse: 132 """POST Requests behave the same as GET Requests, so the get handler is called here""" 133 return self.get(request, **kwargs)
Create a dictionary with all the requested claims about the End-User. See: http://openid.net/specs/openid-connect-core-1_0.html#UserInfoResponse
token: authentik.providers.oauth2.models.RefreshToken | None
def
get_scope_descriptions( self, scopes: list[str], provider: authentik.providers.oauth2.models.OAuth2Provider) -> list[authentik.flows.challenge.PermissionDict]:
44 def get_scope_descriptions( 45 self, scopes: list[str], provider: OAuth2Provider 46 ) -> list[PermissionDict]: 47 """Get a list of all Scopes's descriptions""" 48 scope_descriptions = [] 49 for scope in ScopeMapping.objects.filter(scope_name__in=scopes, provider=provider).order_by( 50 "scope_name" 51 ): 52 scope_descriptions.append(PermissionDict(id=scope.scope_name, name=scope.description)) 53 # GitHub Compatibility Scopes are handled differently, since they required custom paths 54 # Hence they don't exist as Scope objects 55 special_scope_map = { 56 SCOPE_GITHUB_USER: _("GitHub Compatibility: Access your User Information"), 57 SCOPE_GITHUB_USER_READ: _("GitHub Compatibility: Access your User Information"), 58 SCOPE_GITHUB_USER_EMAIL: _("GitHub Compatibility: Access you Email addresses"), 59 SCOPE_GITHUB_ORG_READ: _("GitHub Compatibility: Access your Groups"), 60 } 61 for scope in scopes: 62 if scope in special_scope_map: 63 scope_descriptions.append( 64 PermissionDict(id=str(scope), name=str(special_scope_map[scope])) 65 ) 66 return scope_descriptions
Get a list of all Scopes's descriptions
def
get_claims( self, provider: authentik.providers.oauth2.models.OAuth2Provider, token: authentik.providers.oauth2.models.BaseGrantModel) -> dict[str, typing.Any]:
68 def get_claims(self, provider: OAuth2Provider, token: BaseGrantModel) -> dict[str, Any]: 69 """Get a dictionary of claims from scopes that the token 70 requires and are assigned to the provider.""" 71 72 scopes_from_client = token.scope 73 final_claims = {} 74 for scope in ScopeMapping.objects.filter( 75 provider=provider, scope_name__in=scopes_from_client 76 ).order_by("scope_name"): 77 scope: ScopeMapping 78 value = None 79 try: 80 value = scope.evaluate( 81 user=token.user, 82 request=self.request, 83 provider=provider, 84 token=token, 85 ) 86 except PropertyMappingExpressionException as exc: 87 Event.new( 88 EventAction.CONFIGURATION_ERROR, 89 message=f"Failed to evaluate property-mapping: '{scope.name}'", 90 provider=provider, 91 mapping=scope, 92 ).from_http(self.request) 93 LOGGER.warning("Failed to evaluate property mapping", exc=exc) 94 if value is None: 95 continue 96 if not isinstance(value, dict): 97 LOGGER.warning( 98 "Scope returned a non-dict value, ignoring", 99 scope=scope, 100 value=value, 101 ) 102 continue 103 always_merger.merge(final_claims, value) 104 LOGGER.debug("updated scope", scope=scope) 105 return final_claims
Get a dictionary of claims from scopes that the token requires and are assigned to the provider.
def
dispatch( self, request: django.http.request.HttpRequest, *args: Any, **kwargs: Any) -> django.http.response.HttpResponse:
107 def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse: 108 self.token = kwargs.get("token", None) 109 response = super().dispatch(request, *args, **kwargs) 110 allowed_origins = [] 111 if self.token: 112 allowed_origins = [x.url for x in self.token.provider.redirect_uris] 113 cors_allow(self.request, response, *allowed_origins) 114 return response
def
options( self, request: django.http.request.HttpRequest) -> django.http.response.HttpResponse:
Handle responding to requests for the OPTIONS HTTP verb.
def
get( self, request: django.http.request.HttpRequest, **kwargs) -> django.http.response.HttpResponse:
119 def get(self, request: HttpRequest, **kwargs) -> HttpResponse: 120 """Handle GET Requests for UserInfo""" 121 if not self.token: 122 return HttpResponseBadRequest() 123 claims = {} 124 claims.setdefault("sub", self.token.id_token.sub) 125 claims.update(self.get_claims(self.token.provider, self.token)) 126 if self.token.id_token.nonce: 127 claims["nonce"] = self.token.id_token.nonce 128 response = TokenResponse(claims) 129 return response
Handle GET Requests for UserInfo
def
post( self, request: django.http.request.HttpRequest, **kwargs) -> django.http.response.HttpResponse:
131 def post(self, request: HttpRequest, **kwargs) -> HttpResponse: 132 """POST Requests behave the same as GET Requests, so the get handler is called here""" 133 return self.get(request, **kwargs)
POST Requests behave the same as GET Requests, so the get handler is called here