authentik.providers.oauth2.utils

OAuth2/OpenID Utils

  1"""OAuth2/OpenID Utils"""
  2
  3import re
  4import uuid
  5from base64 import b64decode, urlsafe_b64encode
  6from binascii import Error
  7from hashlib import sha256
  8from hmac import compare_digest
  9from typing import Any
 10from urllib.parse import unquote, urlparse
 11
 12from django.http import HttpRequest, HttpResponse, JsonResponse
 13from django.http.response import HttpResponseRedirect
 14from django.utils.cache import patch_vary_headers
 15from django.utils.timezone import now
 16from structlog.stdlib import get_logger
 17
 18from authentik.core.middleware import CTX_AUTH_VIA, KEY_USER
 19from authentik.events.models import Event, EventAction
 20from authentik.lib.utils.time import timedelta_from_string
 21from authentik.providers.oauth2.errors import BearerTokenError
 22from authentik.providers.oauth2.id_token import hash_session_key
 23from authentik.providers.oauth2.models import AccessToken, OAuth2Provider
 24
 25LOGGER = get_logger()
 26
 27
 28class TokenResponse(JsonResponse):
 29    """JSON Response with headers that it should never be cached
 30
 31    https://openid.net/specs/openid-connect-core-1_0.html#TokenResponse"""
 32
 33    def __init__(self, *args, **kwargs):
 34        super().__init__(*args, **kwargs)
 35        self["Cache-Control"] = "no-store"
 36        self["Pragma"] = "no-cache"
 37
 38
 39def cors_allow(request: HttpRequest, response: HttpResponse, *allowed_origins: str):
 40    """Add headers to permit CORS requests from allowed_origins, with or without credentials,
 41    with any headers."""
 42    origin = request.META.get("HTTP_ORIGIN")
 43    if not origin:
 44        return response
 45
 46    # OPTIONS requests don't have an authorization header -> hence
 47    # we can't extract the provider this request is for
 48    # so for options requests we allow the calling origin without checking
 49    allowed = request.method == "OPTIONS"
 50    received_origin = urlparse(origin)
 51    for allowed_origin in allowed_origins:
 52        url = urlparse(allowed_origin)
 53        if (
 54            received_origin.scheme == url.scheme
 55            and received_origin.hostname == url.hostname
 56            and received_origin.port == url.port
 57        ):
 58            allowed = True
 59    if not allowed:
 60        LOGGER.warning(
 61            "CORS: Origin is not an allowed origin",
 62            requested=received_origin,
 63            allowed=allowed_origins,
 64        )
 65        return response
 66
 67    # From the CORS spec: The string "*" cannot be used for a resource that supports credentials.
 68    response["Access-Control-Allow-Origin"] = origin
 69    patch_vary_headers(response, ["Origin"])
 70    response["Access-Control-Allow-Credentials"] = "true"
 71
 72    if request.method == "OPTIONS":
 73        if "HTTP_ACCESS_CONTROL_REQUEST_HEADERS" in request.META:
 74            response["Access-Control-Allow-Headers"] = request.META[
 75                "HTTP_ACCESS_CONTROL_REQUEST_HEADERS"
 76            ]
 77        response["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS"
 78
 79    return response
 80
 81
 82def pkce_s256_challenge(verifier: str) -> str:
 83    """Convert PKCE verifier to S256 challenge"""
 84    return (
 85        urlsafe_b64encode(sha256(verifier.encode("ascii")).digest())
 86        .decode("utf-8")
 87        .replace("=", "")
 88    )
 89
 90
 91def extract_access_token(request: HttpRequest) -> str | None:
 92    """
 93    Get the access token using Authorization Request Header Field method.
 94    Or try getting via GET.
 95    See: http://tools.ietf.org/html/rfc6750#section-2.1
 96
 97    Return a string.
 98    """
 99    auth_header = request.META.get("HTTP_AUTHORIZATION", "")
100
101    if re.compile(r"^[Bb]earer\s{1}.+$").match(auth_header):
102        return auth_header.split()[1]
103    if "access_token" in request.POST:
104        return request.POST.get("access_token")
105    if "access_token" in request.GET:
106        return request.GET.get("access_token")
107    return None
108
109
110def extract_client_auth(request: HttpRequest) -> tuple[str, str]:
111    """
112    Get client credentials using HTTP Basic Authentication method.
113    Or try getting parameters via POST.
114    See: http://tools.ietf.org/html/rfc6750#section-2.1
115
116    Return a tuple `(client_id, client_secret)`.
117    """
118    auth_header = request.META.get("HTTP_AUTHORIZATION", "")
119
120    if re.compile(r"^Basic\s{1}.+$").match(auth_header):
121        b64_user_pass = auth_header.split()[1]
122        try:
123            user_pass = b64decode(b64_user_pass).decode("utf-8").partition(":")
124            client_id, _, client_secret = user_pass
125            # RFC 6749 requires client credentials in Basic auth to be form-encoded first.
126            # We only percent-decode here so raw `+` characters keep their previous meaning.
127            client_id = unquote(client_id)
128            client_secret = unquote(client_secret)
129        except ValueError, Error:
130            client_id = client_secret = ""  # nosec
131    else:
132        client_id = request.POST.get("client_id", "")
133        client_secret = request.POST.get("client_secret", "")
134
135    return (client_id, client_secret)
136
137
138def protected_resource_view(scopes: list[str]):
139    """View decorator. The client accesses protected resources by presenting the
140    access token to the resource server.
141
142    https://datatracker.ietf.org/doc/html/rfc6749#section-7
143
144    This decorator also injects the token into `kwargs`"""
145
146    def wrapper(view):
147        def view_wrapper(request: HttpRequest, *args, **kwargs):
148            if request.method == "OPTIONS":
149                return view(request, *args, **kwargs)
150            try:
151                access_token = extract_access_token(request)
152                if not access_token:
153                    LOGGER.debug("No token passed")
154                    raise BearerTokenError("invalid_token")
155
156                token = AccessToken.objects.filter(token=access_token).first()
157                if not token:
158                    LOGGER.debug("Token does not exist", access_token=access_token)
159                    raise BearerTokenError("invalid_token")
160
161                if token.is_expired:
162                    LOGGER.debug("Token has expired", access_token=access_token)
163                    raise BearerTokenError("invalid_token")
164
165                if token.revoked:
166                    LOGGER.warning("Revoked token was used", access_token=access_token)
167                    Event.new(
168                        action=EventAction.SUSPICIOUS_REQUEST,
169                        message="Revoked access token was used",
170                        token=token,
171                        provider=token.provider,
172                    ).from_http(request, user=token.user)
173                    raise BearerTokenError("invalid_token")
174
175                if not set(scopes).issubset(set(token.scope)):
176                    LOGGER.warning(
177                        "Scope mismatch.",
178                        required=set(scopes),
179                        token_has=set(token.scope),
180                    )
181                    raise BearerTokenError("insufficient_scope")
182            except BearerTokenError as error:
183                response = HttpResponse(status=error.status)
184                response["WWW-Authenticate"] = (
185                    f'error="{error.code}", error_description="{error.description}"'
186                )
187                return response
188            kwargs["token"] = token
189            CTX_AUTH_VIA.set("oauth_token")
190            response = view(request, *args, **kwargs)
191            response.ak_context = {}
192            response.ak_context[KEY_USER] = token.user.username
193            return response
194
195        return view_wrapper
196
197    return wrapper
198
199
200def provider_from_request(request: HttpRequest) -> tuple[OAuth2Provider | None, str, str]:
201    """Get provider from Basic auth of client_id:client_secret. Does not perform authentication"""
202    client_id, client_secret = extract_client_auth(request)
203    if client_id == client_secret == "":
204        return None, "", ""
205    provider: OAuth2Provider | None = OAuth2Provider.objects.filter(client_id=client_id).first()
206    return provider, client_id, client_secret
207
208
209def authenticate_provider(request: HttpRequest) -> OAuth2Provider | None:
210    """Attempt to authenticate via Basic auth of client_id:client_secret"""
211    provider, client_id, client_secret = provider_from_request(request)
212    if not provider:
213        return None
214    if not compare_digest(client_id, provider.client_id) or not compare_digest(
215        client_secret, provider.client_secret
216    ):
217        LOGGER.debug("(basic) Provider for basic auth does not exist")
218        return None
219    CTX_AUTH_VIA.set("oauth_client_secret")
220    return provider
221
222
223class HttpResponseRedirectScheme(HttpResponseRedirect):
224    """HTTP Response to redirect, can be to a non-http scheme"""
225
226    def __init__(
227        self,
228        redirect_to: str,
229        *args: Any,
230        allowed_schemes: list[str] | None = None,
231        **kwargs: Any,
232    ) -> None:
233        self.allowed_schemes = allowed_schemes or ["http", "https", "ftp"]
234        super().__init__(redirect_to, *args, **kwargs)
235
236
237def create_logout_token(
238    provider: OAuth2Provider,
239    iss: str,
240    sub: str | None = None,
241    session_key: str | None = None,
242) -> str:
243    """Create a logout token for Back-Channel Logout
244
245    As per https://openid.net/specs/openid-connect-backchannel-1_0.html
246    """
247
248    LOGGER.debug("Creating logout token", provider=provider, sub=sub)
249
250    _now = now()
251    # Create the logout token payload
252    payload = {
253        "iss": str(iss),
254        "aud": provider.client_id,
255        "iat": int(_now.timestamp()),
256        "exp": int((_now + timedelta_from_string(provider.access_token_validity)).timestamp()),
257        "jti": str(uuid.uuid4()),
258        "events": {
259            "http://schemas.openid.net/event/backchannel-logout": {},
260        },
261    }
262
263    # Add either sub or sid (or both)
264    if sub:
265        payload["sub"] = sub
266    if session_key:
267        payload["sid"] = hash_session_key(session_key)
268    # Encode the token
269    return provider.encode(payload, jwt_type="logout+jwt")
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
class TokenResponse(django.http.response.JsonResponse):
29class TokenResponse(JsonResponse):
30    """JSON Response with headers that it should never be cached
31
32    https://openid.net/specs/openid-connect-core-1_0.html#TokenResponse"""
33
34    def __init__(self, *args, **kwargs):
35        super().__init__(*args, **kwargs)
36        self["Cache-Control"] = "no-store"
37        self["Pragma"] = "no-cache"

JSON Response with headers that it should never be cached

https://openid.net/specs/openid-connect-core-1_0.html#TokenResponse

TokenResponse(*args, **kwargs)
34    def __init__(self, *args, **kwargs):
35        super().__init__(*args, **kwargs)
36        self["Cache-Control"] = "no-store"
37        self["Pragma"] = "no-cache"
def cors_allow( request: django.http.request.HttpRequest, response: django.http.response.HttpResponse, *allowed_origins: str):
40def cors_allow(request: HttpRequest, response: HttpResponse, *allowed_origins: str):
41    """Add headers to permit CORS requests from allowed_origins, with or without credentials,
42    with any headers."""
43    origin = request.META.get("HTTP_ORIGIN")
44    if not origin:
45        return response
46
47    # OPTIONS requests don't have an authorization header -> hence
48    # we can't extract the provider this request is for
49    # so for options requests we allow the calling origin without checking
50    allowed = request.method == "OPTIONS"
51    received_origin = urlparse(origin)
52    for allowed_origin in allowed_origins:
53        url = urlparse(allowed_origin)
54        if (
55            received_origin.scheme == url.scheme
56            and received_origin.hostname == url.hostname
57            and received_origin.port == url.port
58        ):
59            allowed = True
60    if not allowed:
61        LOGGER.warning(
62            "CORS: Origin is not an allowed origin",
63            requested=received_origin,
64            allowed=allowed_origins,
65        )
66        return response
67
68    # From the CORS spec: The string "*" cannot be used for a resource that supports credentials.
69    response["Access-Control-Allow-Origin"] = origin
70    patch_vary_headers(response, ["Origin"])
71    response["Access-Control-Allow-Credentials"] = "true"
72
73    if request.method == "OPTIONS":
74        if "HTTP_ACCESS_CONTROL_REQUEST_HEADERS" in request.META:
75            response["Access-Control-Allow-Headers"] = request.META[
76                "HTTP_ACCESS_CONTROL_REQUEST_HEADERS"
77            ]
78        response["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS"
79
80    return response

Add headers to permit CORS requests from allowed_origins, with or without credentials, with any headers.

def pkce_s256_challenge(verifier: str) -> str:
83def pkce_s256_challenge(verifier: str) -> str:
84    """Convert PKCE verifier to S256 challenge"""
85    return (
86        urlsafe_b64encode(sha256(verifier.encode("ascii")).digest())
87        .decode("utf-8")
88        .replace("=", "")
89    )

Convert PKCE verifier to S256 challenge

def extract_access_token(request: django.http.request.HttpRequest) -> str | None:
 92def extract_access_token(request: HttpRequest) -> str | None:
 93    """
 94    Get the access token using Authorization Request Header Field method.
 95    Or try getting via GET.
 96    See: http://tools.ietf.org/html/rfc6750#section-2.1
 97
 98    Return a string.
 99    """
100    auth_header = request.META.get("HTTP_AUTHORIZATION", "")
101
102    if re.compile(r"^[Bb]earer\s{1}.+$").match(auth_header):
103        return auth_header.split()[1]
104    if "access_token" in request.POST:
105        return request.POST.get("access_token")
106    if "access_token" in request.GET:
107        return request.GET.get("access_token")
108    return None

Get the access token using Authorization Request Header Field method. Or try getting via GET. See: http://tools.ietf.org/html/rfc6750#section-2.1

Return a string.

def extract_client_auth(request: django.http.request.HttpRequest) -> tuple[str, str]:
111def extract_client_auth(request: HttpRequest) -> tuple[str, str]:
112    """
113    Get client credentials using HTTP Basic Authentication method.
114    Or try getting parameters via POST.
115    See: http://tools.ietf.org/html/rfc6750#section-2.1
116
117    Return a tuple `(client_id, client_secret)`.
118    """
119    auth_header = request.META.get("HTTP_AUTHORIZATION", "")
120
121    if re.compile(r"^Basic\s{1}.+$").match(auth_header):
122        b64_user_pass = auth_header.split()[1]
123        try:
124            user_pass = b64decode(b64_user_pass).decode("utf-8").partition(":")
125            client_id, _, client_secret = user_pass
126            # RFC 6749 requires client credentials in Basic auth to be form-encoded first.
127            # We only percent-decode here so raw `+` characters keep their previous meaning.
128            client_id = unquote(client_id)
129            client_secret = unquote(client_secret)
130        except ValueError, Error:
131            client_id = client_secret = ""  # nosec
132    else:
133        client_id = request.POST.get("client_id", "")
134        client_secret = request.POST.get("client_secret", "")
135
136    return (client_id, client_secret)

Get client credentials using HTTP Basic Authentication method. Or try getting parameters via POST. See: http://tools.ietf.org/html/rfc6750#section-2.1

Return a tuple (client_id, client_secret).

def protected_resource_view(scopes: list[str]):
139def protected_resource_view(scopes: list[str]):
140    """View decorator. The client accesses protected resources by presenting the
141    access token to the resource server.
142
143    https://datatracker.ietf.org/doc/html/rfc6749#section-7
144
145    This decorator also injects the token into `kwargs`"""
146
147    def wrapper(view):
148        def view_wrapper(request: HttpRequest, *args, **kwargs):
149            if request.method == "OPTIONS":
150                return view(request, *args, **kwargs)
151            try:
152                access_token = extract_access_token(request)
153                if not access_token:
154                    LOGGER.debug("No token passed")
155                    raise BearerTokenError("invalid_token")
156
157                token = AccessToken.objects.filter(token=access_token).first()
158                if not token:
159                    LOGGER.debug("Token does not exist", access_token=access_token)
160                    raise BearerTokenError("invalid_token")
161
162                if token.is_expired:
163                    LOGGER.debug("Token has expired", access_token=access_token)
164                    raise BearerTokenError("invalid_token")
165
166                if token.revoked:
167                    LOGGER.warning("Revoked token was used", access_token=access_token)
168                    Event.new(
169                        action=EventAction.SUSPICIOUS_REQUEST,
170                        message="Revoked access token was used",
171                        token=token,
172                        provider=token.provider,
173                    ).from_http(request, user=token.user)
174                    raise BearerTokenError("invalid_token")
175
176                if not set(scopes).issubset(set(token.scope)):
177                    LOGGER.warning(
178                        "Scope mismatch.",
179                        required=set(scopes),
180                        token_has=set(token.scope),
181                    )
182                    raise BearerTokenError("insufficient_scope")
183            except BearerTokenError as error:
184                response = HttpResponse(status=error.status)
185                response["WWW-Authenticate"] = (
186                    f'error="{error.code}", error_description="{error.description}"'
187                )
188                return response
189            kwargs["token"] = token
190            CTX_AUTH_VIA.set("oauth_token")
191            response = view(request, *args, **kwargs)
192            response.ak_context = {}
193            response.ak_context[KEY_USER] = token.user.username
194            return response
195
196        return view_wrapper
197
198    return wrapper

View decorator. The client accesses protected resources by presenting the access token to the resource server.

https://datatracker.ietf.org/doc/html/rfc6749#section-7

This decorator also injects the token into kwargs

def provider_from_request( request: django.http.request.HttpRequest) -> tuple[authentik.providers.oauth2.models.OAuth2Provider | None, str, str]:
201def provider_from_request(request: HttpRequest) -> tuple[OAuth2Provider | None, str, str]:
202    """Get provider from Basic auth of client_id:client_secret. Does not perform authentication"""
203    client_id, client_secret = extract_client_auth(request)
204    if client_id == client_secret == "":
205        return None, "", ""
206    provider: OAuth2Provider | None = OAuth2Provider.objects.filter(client_id=client_id).first()
207    return provider, client_id, client_secret

Get provider from Basic auth of client_id:client_secret. Does not perform authentication

def authenticate_provider( request: django.http.request.HttpRequest) -> authentik.providers.oauth2.models.OAuth2Provider | None:
210def authenticate_provider(request: HttpRequest) -> OAuth2Provider | None:
211    """Attempt to authenticate via Basic auth of client_id:client_secret"""
212    provider, client_id, client_secret = provider_from_request(request)
213    if not provider:
214        return None
215    if not compare_digest(client_id, provider.client_id) or not compare_digest(
216        client_secret, provider.client_secret
217    ):
218        LOGGER.debug("(basic) Provider for basic auth does not exist")
219        return None
220    CTX_AUTH_VIA.set("oauth_client_secret")
221    return provider

Attempt to authenticate via Basic auth of client_id:client_secret

class HttpResponseRedirectScheme(django.http.response.HttpResponseRedirect):
224class HttpResponseRedirectScheme(HttpResponseRedirect):
225    """HTTP Response to redirect, can be to a non-http scheme"""
226
227    def __init__(
228        self,
229        redirect_to: str,
230        *args: Any,
231        allowed_schemes: list[str] | None = None,
232        **kwargs: Any,
233    ) -> None:
234        self.allowed_schemes = allowed_schemes or ["http", "https", "ftp"]
235        super().__init__(redirect_to, *args, **kwargs)

HTTP Response to redirect, can be to a non-http scheme

HttpResponseRedirectScheme( redirect_to: str, *args: Any, allowed_schemes: list[str] | None = None, **kwargs: Any)
227    def __init__(
228        self,
229        redirect_to: str,
230        *args: Any,
231        allowed_schemes: list[str] | None = None,
232        **kwargs: Any,
233    ) -> None:
234        self.allowed_schemes = allowed_schemes or ["http", "https", "ftp"]
235        super().__init__(redirect_to, *args, **kwargs)
allowed_schemes = ['http', 'https', 'ftp']
def create_logout_token( provider: authentik.providers.oauth2.models.OAuth2Provider, iss: str, sub: str | None = None, session_key: str | None = None) -> str:
238def create_logout_token(
239    provider: OAuth2Provider,
240    iss: str,
241    sub: str | None = None,
242    session_key: str | None = None,
243) -> str:
244    """Create a logout token for Back-Channel Logout
245
246    As per https://openid.net/specs/openid-connect-backchannel-1_0.html
247    """
248
249    LOGGER.debug("Creating logout token", provider=provider, sub=sub)
250
251    _now = now()
252    # Create the logout token payload
253    payload = {
254        "iss": str(iss),
255        "aud": provider.client_id,
256        "iat": int(_now.timestamp()),
257        "exp": int((_now + timedelta_from_string(provider.access_token_validity)).timestamp()),
258        "jti": str(uuid.uuid4()),
259        "events": {
260            "http://schemas.openid.net/event/backchannel-logout": {},
261        },
262    }
263
264    # Add either sub or sid (or both)
265    if sub:
266        payload["sub"] = sub
267    if session_key:
268        payload["sid"] = hash_session_key(session_key)
269    # Encode the token
270    return provider.encode(payload, jwt_type="logout+jwt")

Create a logout token for Back-Channel Logout

As per https://openid.net/specs/openid-connect-backchannel-1_0.html