authentik.providers.oauth2.views.userinfo

authentik OAuth2 OpenID Userinfo views

  1"""authentik OAuth2 OpenID Userinfo views"""
  2
  3from typing import Any
  4
  5from deepmerge import always_merger
  6from django.http import HttpRequest, HttpResponse
  7from django.http.response import HttpResponseBadRequest
  8from django.utils.decorators import method_decorator
  9from django.utils.translation import gettext_lazy as _
 10from django.views import View
 11from django.views.decorators.csrf import csrf_exempt
 12from structlog.stdlib import get_logger
 13
 14from authentik.common.oauth.constants import (
 15    SCOPE_GITHUB_ORG_READ,
 16    SCOPE_GITHUB_USER,
 17    SCOPE_GITHUB_USER_EMAIL,
 18    SCOPE_GITHUB_USER_READ,
 19    SCOPE_OPENID,
 20)
 21from authentik.core.expression.exceptions import PropertyMappingExpressionException
 22from authentik.events.models import Event, EventAction
 23from authentik.flows.challenge import PermissionDict
 24from authentik.providers.oauth2.models import (
 25    BaseGrantModel,
 26    OAuth2Provider,
 27    RefreshToken,
 28    ScopeMapping,
 29)
 30from authentik.providers.oauth2.utils import TokenResponse, cors_allow, protected_resource_view
 31
 32LOGGER = get_logger()
 33
 34
 35@method_decorator(csrf_exempt, name="dispatch")
 36@method_decorator(protected_resource_view([SCOPE_OPENID]), name="dispatch")
 37class UserInfoView(View):
 38    """Create a dictionary with all the requested claims about the End-User.
 39    See: http://openid.net/specs/openid-connect-core-1_0.html#UserInfoResponse"""
 40
 41    token: RefreshToken | None
 42
 43    def get_scope_descriptions(
 44        self, scopes: list[str], provider: OAuth2Provider
 45    ) -> list[PermissionDict]:
 46        """Get a list of all Scopes's descriptions"""
 47        scope_descriptions = []
 48        for scope in ScopeMapping.objects.filter(scope_name__in=scopes, provider=provider).order_by(
 49            "scope_name"
 50        ):
 51            scope_descriptions.append(PermissionDict(id=scope.scope_name, name=scope.description))
 52        # GitHub Compatibility Scopes are handled differently, since they required custom paths
 53        # Hence they don't exist as Scope objects
 54        special_scope_map = {
 55            SCOPE_GITHUB_USER: _("GitHub Compatibility: Access your User Information"),
 56            SCOPE_GITHUB_USER_READ: _("GitHub Compatibility: Access your User Information"),
 57            SCOPE_GITHUB_USER_EMAIL: _("GitHub Compatibility: Access you Email addresses"),
 58            SCOPE_GITHUB_ORG_READ: _("GitHub Compatibility: Access your Groups"),
 59        }
 60        for scope in scopes:
 61            if scope in special_scope_map:
 62                scope_descriptions.append(
 63                    PermissionDict(id=str(scope), name=str(special_scope_map[scope]))
 64                )
 65        return scope_descriptions
 66
 67    def get_claims(self, provider: OAuth2Provider, token: BaseGrantModel) -> dict[str, Any]:
 68        """Get a dictionary of claims from scopes that the token
 69        requires and are assigned to the provider."""
 70
 71        scopes_from_client = token.scope
 72        final_claims = {}
 73        for scope in ScopeMapping.objects.filter(
 74            provider=provider, scope_name__in=scopes_from_client
 75        ).order_by("scope_name"):
 76            scope: ScopeMapping
 77            value = None
 78            try:
 79                value = scope.evaluate(
 80                    user=token.user,
 81                    request=self.request,
 82                    provider=provider,
 83                    token=token,
 84                )
 85            except PropertyMappingExpressionException as exc:
 86                Event.new(
 87                    EventAction.CONFIGURATION_ERROR,
 88                    message=f"Failed to evaluate property-mapping: '{scope.name}'",
 89                    provider=provider,
 90                    mapping=scope,
 91                ).from_http(self.request)
 92                LOGGER.warning("Failed to evaluate property mapping", exc=exc)
 93            if value is None:
 94                continue
 95            if not isinstance(value, dict):
 96                LOGGER.warning(
 97                    "Scope returned a non-dict value, ignoring",
 98                    scope=scope,
 99                    value=value,
100                )
101                continue
102            always_merger.merge(final_claims, value)
103            LOGGER.debug("updated scope", scope=scope)
104        return final_claims
105
106    def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
107        self.token = kwargs.get("token", None)
108        response = super().dispatch(request, *args, **kwargs)
109        allowed_origins = []
110        if self.token:
111            allowed_origins = [x.url for x in self.token.provider.redirect_uris]
112        cors_allow(self.request, response, *allowed_origins)
113        return response
114
115    def options(self, request: HttpRequest) -> HttpResponse:
116        return TokenResponse({})
117
118    def get(self, request: HttpRequest, **kwargs) -> HttpResponse:
119        """Handle GET Requests for UserInfo"""
120        if not self.token:
121            return HttpResponseBadRequest()
122        claims = {}
123        claims.setdefault("sub", self.token.id_token.sub)
124        claims.update(self.get_claims(self.token.provider, self.token))
125        if self.token.id_token.nonce:
126            claims["nonce"] = self.token.id_token.nonce
127        response = TokenResponse(claims)
128        return response
129
130    def post(self, request: HttpRequest, **kwargs) -> HttpResponse:
131        """POST Requests behave the same as GET Requests, so the get handler is called here"""
132        return self.get(request, **kwargs)
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
@method_decorator(csrf_exempt, name='dispatch')
@method_decorator(protected_resource_view([SCOPE_OPENID]), name='dispatch')
class UserInfoView(django.views.generic.base.View):
 36@method_decorator(csrf_exempt, name="dispatch")
 37@method_decorator(protected_resource_view([SCOPE_OPENID]), name="dispatch")
 38class UserInfoView(View):
 39    """Create a dictionary with all the requested claims about the End-User.
 40    See: http://openid.net/specs/openid-connect-core-1_0.html#UserInfoResponse"""
 41
 42    token: RefreshToken | None
 43
 44    def get_scope_descriptions(
 45        self, scopes: list[str], provider: OAuth2Provider
 46    ) -> list[PermissionDict]:
 47        """Get a list of all Scopes's descriptions"""
 48        scope_descriptions = []
 49        for scope in ScopeMapping.objects.filter(scope_name__in=scopes, provider=provider).order_by(
 50            "scope_name"
 51        ):
 52            scope_descriptions.append(PermissionDict(id=scope.scope_name, name=scope.description))
 53        # GitHub Compatibility Scopes are handled differently, since they required custom paths
 54        # Hence they don't exist as Scope objects
 55        special_scope_map = {
 56            SCOPE_GITHUB_USER: _("GitHub Compatibility: Access your User Information"),
 57            SCOPE_GITHUB_USER_READ: _("GitHub Compatibility: Access your User Information"),
 58            SCOPE_GITHUB_USER_EMAIL: _("GitHub Compatibility: Access you Email addresses"),
 59            SCOPE_GITHUB_ORG_READ: _("GitHub Compatibility: Access your Groups"),
 60        }
 61        for scope in scopes:
 62            if scope in special_scope_map:
 63                scope_descriptions.append(
 64                    PermissionDict(id=str(scope), name=str(special_scope_map[scope]))
 65                )
 66        return scope_descriptions
 67
 68    def get_claims(self, provider: OAuth2Provider, token: BaseGrantModel) -> dict[str, Any]:
 69        """Get a dictionary of claims from scopes that the token
 70        requires and are assigned to the provider."""
 71
 72        scopes_from_client = token.scope
 73        final_claims = {}
 74        for scope in ScopeMapping.objects.filter(
 75            provider=provider, scope_name__in=scopes_from_client
 76        ).order_by("scope_name"):
 77            scope: ScopeMapping
 78            value = None
 79            try:
 80                value = scope.evaluate(
 81                    user=token.user,
 82                    request=self.request,
 83                    provider=provider,
 84                    token=token,
 85                )
 86            except PropertyMappingExpressionException as exc:
 87                Event.new(
 88                    EventAction.CONFIGURATION_ERROR,
 89                    message=f"Failed to evaluate property-mapping: '{scope.name}'",
 90                    provider=provider,
 91                    mapping=scope,
 92                ).from_http(self.request)
 93                LOGGER.warning("Failed to evaluate property mapping", exc=exc)
 94            if value is None:
 95                continue
 96            if not isinstance(value, dict):
 97                LOGGER.warning(
 98                    "Scope returned a non-dict value, ignoring",
 99                    scope=scope,
100                    value=value,
101                )
102                continue
103            always_merger.merge(final_claims, value)
104            LOGGER.debug("updated scope", scope=scope)
105        return final_claims
106
107    def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
108        self.token = kwargs.get("token", None)
109        response = super().dispatch(request, *args, **kwargs)
110        allowed_origins = []
111        if self.token:
112            allowed_origins = [x.url for x in self.token.provider.redirect_uris]
113        cors_allow(self.request, response, *allowed_origins)
114        return response
115
116    def options(self, request: HttpRequest) -> HttpResponse:
117        return TokenResponse({})
118
119    def get(self, request: HttpRequest, **kwargs) -> HttpResponse:
120        """Handle GET Requests for UserInfo"""
121        if not self.token:
122            return HttpResponseBadRequest()
123        claims = {}
124        claims.setdefault("sub", self.token.id_token.sub)
125        claims.update(self.get_claims(self.token.provider, self.token))
126        if self.token.id_token.nonce:
127            claims["nonce"] = self.token.id_token.nonce
128        response = TokenResponse(claims)
129        return response
130
131    def post(self, request: HttpRequest, **kwargs) -> HttpResponse:
132        """POST Requests behave the same as GET Requests, so the get handler is called here"""
133        return self.get(request, **kwargs)

Create a dictionary with all the requested claims about the End-User. See: http://openid.net/specs/openid-connect-core-1_0.html#UserInfoResponse

def get_scope_descriptions( self, scopes: list[str], provider: authentik.providers.oauth2.models.OAuth2Provider) -> list[authentik.flows.challenge.PermissionDict]:
44    def get_scope_descriptions(
45        self, scopes: list[str], provider: OAuth2Provider
46    ) -> list[PermissionDict]:
47        """Get a list of all Scopes's descriptions"""
48        scope_descriptions = []
49        for scope in ScopeMapping.objects.filter(scope_name__in=scopes, provider=provider).order_by(
50            "scope_name"
51        ):
52            scope_descriptions.append(PermissionDict(id=scope.scope_name, name=scope.description))
53        # GitHub Compatibility Scopes are handled differently, since they required custom paths
54        # Hence they don't exist as Scope objects
55        special_scope_map = {
56            SCOPE_GITHUB_USER: _("GitHub Compatibility: Access your User Information"),
57            SCOPE_GITHUB_USER_READ: _("GitHub Compatibility: Access your User Information"),
58            SCOPE_GITHUB_USER_EMAIL: _("GitHub Compatibility: Access you Email addresses"),
59            SCOPE_GITHUB_ORG_READ: _("GitHub Compatibility: Access your Groups"),
60        }
61        for scope in scopes:
62            if scope in special_scope_map:
63                scope_descriptions.append(
64                    PermissionDict(id=str(scope), name=str(special_scope_map[scope]))
65                )
66        return scope_descriptions

Get a list of all Scopes's descriptions

def get_claims( self, provider: authentik.providers.oauth2.models.OAuth2Provider, token: authentik.providers.oauth2.models.BaseGrantModel) -> dict[str, typing.Any]:
 68    def get_claims(self, provider: OAuth2Provider, token: BaseGrantModel) -> dict[str, Any]:
 69        """Get a dictionary of claims from scopes that the token
 70        requires and are assigned to the provider."""
 71
 72        scopes_from_client = token.scope
 73        final_claims = {}
 74        for scope in ScopeMapping.objects.filter(
 75            provider=provider, scope_name__in=scopes_from_client
 76        ).order_by("scope_name"):
 77            scope: ScopeMapping
 78            value = None
 79            try:
 80                value = scope.evaluate(
 81                    user=token.user,
 82                    request=self.request,
 83                    provider=provider,
 84                    token=token,
 85                )
 86            except PropertyMappingExpressionException as exc:
 87                Event.new(
 88                    EventAction.CONFIGURATION_ERROR,
 89                    message=f"Failed to evaluate property-mapping: '{scope.name}'",
 90                    provider=provider,
 91                    mapping=scope,
 92                ).from_http(self.request)
 93                LOGGER.warning("Failed to evaluate property mapping", exc=exc)
 94            if value is None:
 95                continue
 96            if not isinstance(value, dict):
 97                LOGGER.warning(
 98                    "Scope returned a non-dict value, ignoring",
 99                    scope=scope,
100                    value=value,
101                )
102                continue
103            always_merger.merge(final_claims, value)
104            LOGGER.debug("updated scope", scope=scope)
105        return final_claims

Get a dictionary of claims from scopes that the token requires and are assigned to the provider.

def dispatch( self, request: django.http.request.HttpRequest, *args: Any, **kwargs: Any) -> django.http.response.HttpResponse:
107    def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
108        self.token = kwargs.get("token", None)
109        response = super().dispatch(request, *args, **kwargs)
110        allowed_origins = []
111        if self.token:
112            allowed_origins = [x.url for x in self.token.provider.redirect_uris]
113        cors_allow(self.request, response, *allowed_origins)
114        return response
def options( self, request: django.http.request.HttpRequest) -> django.http.response.HttpResponse:
116    def options(self, request: HttpRequest) -> HttpResponse:
117        return TokenResponse({})

Handle responding to requests for the OPTIONS HTTP verb.

def get( self, request: django.http.request.HttpRequest, **kwargs) -> django.http.response.HttpResponse:
119    def get(self, request: HttpRequest, **kwargs) -> HttpResponse:
120        """Handle GET Requests for UserInfo"""
121        if not self.token:
122            return HttpResponseBadRequest()
123        claims = {}
124        claims.setdefault("sub", self.token.id_token.sub)
125        claims.update(self.get_claims(self.token.provider, self.token))
126        if self.token.id_token.nonce:
127            claims["nonce"] = self.token.id_token.nonce
128        response = TokenResponse(claims)
129        return response

Handle GET Requests for UserInfo

def post( self, request: django.http.request.HttpRequest, **kwargs) -> django.http.response.HttpResponse:
131    def post(self, request: HttpRequest, **kwargs) -> HttpResponse:
132        """POST Requests behave the same as GET Requests, so the get handler is called here"""
133        return self.get(request, **kwargs)

POST Requests behave the same as GET Requests, so the get handler is called here