authentik.providers.oauth2.utils
OAuth2/OpenID Utils
1"""OAuth2/OpenID Utils""" 2 3import re 4import uuid 5from base64 import b64decode, urlsafe_b64encode 6from binascii import Error 7from hashlib import sha256 8from hmac import compare_digest 9from typing import Any 10from urllib.parse import unquote, urlparse 11 12from django.http import HttpRequest, HttpResponse, JsonResponse 13from django.http.response import HttpResponseRedirect 14from django.utils.cache import patch_vary_headers 15from django.utils.timezone import now 16from structlog.stdlib import get_logger 17 18from authentik.core.middleware import CTX_AUTH_VIA, KEY_USER 19from authentik.events.models import Event, EventAction 20from authentik.lib.utils.time import timedelta_from_string 21from authentik.providers.oauth2.errors import BearerTokenError 22from authentik.providers.oauth2.id_token import hash_session_key 23from authentik.providers.oauth2.models import AccessToken, OAuth2Provider 24 25LOGGER = get_logger() 26 27 28class TokenResponse(JsonResponse): 29 """JSON Response with headers that it should never be cached 30 31 https://openid.net/specs/openid-connect-core-1_0.html#TokenResponse""" 32 33 def __init__(self, *args, **kwargs): 34 super().__init__(*args, **kwargs) 35 self["Cache-Control"] = "no-store" 36 self["Pragma"] = "no-cache" 37 38 39def cors_allow(request: HttpRequest, response: HttpResponse, *allowed_origins: str): 40 """Add headers to permit CORS requests from allowed_origins, with or without credentials, 41 with any headers.""" 42 origin = request.META.get("HTTP_ORIGIN") 43 if not origin: 44 return response 45 46 # OPTIONS requests don't have an authorization header -> hence 47 # we can't extract the provider this request is for 48 # so for options requests we allow the calling origin without checking 49 allowed = request.method == "OPTIONS" 50 received_origin = urlparse(origin) 51 for allowed_origin in allowed_origins: 52 url = urlparse(allowed_origin) 53 if ( 54 received_origin.scheme == url.scheme 55 and received_origin.hostname == url.hostname 56 and received_origin.port == url.port 57 ): 58 allowed = True 59 if not allowed: 60 LOGGER.warning( 61 "CORS: Origin is not an allowed origin", 62 requested=received_origin, 63 allowed=allowed_origins, 64 ) 65 return response 66 67 # From the CORS spec: The string "*" cannot be used for a resource that supports credentials. 68 response["Access-Control-Allow-Origin"] = origin 69 patch_vary_headers(response, ["Origin"]) 70 response["Access-Control-Allow-Credentials"] = "true" 71 72 if request.method == "OPTIONS": 73 if "HTTP_ACCESS_CONTROL_REQUEST_HEADERS" in request.META: 74 response["Access-Control-Allow-Headers"] = request.META[ 75 "HTTP_ACCESS_CONTROL_REQUEST_HEADERS" 76 ] 77 response["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS" 78 79 return response 80 81 82def pkce_s256_challenge(verifier: str) -> str: 83 """Convert PKCE verifier to S256 challenge""" 84 return ( 85 urlsafe_b64encode(sha256(verifier.encode("ascii")).digest()) 86 .decode("utf-8") 87 .replace("=", "") 88 ) 89 90 91def extract_access_token(request: HttpRequest) -> str | None: 92 """ 93 Get the access token using Authorization Request Header Field method. 94 Or try getting via GET. 95 See: http://tools.ietf.org/html/rfc6750#section-2.1 96 97 Return a string. 98 """ 99 auth_header = request.META.get("HTTP_AUTHORIZATION", "") 100 101 if re.compile(r"^[Bb]earer\s{1}.+$").match(auth_header): 102 return auth_header.split()[1] 103 if "access_token" in request.POST: 104 return request.POST.get("access_token") 105 if "access_token" in request.GET: 106 return request.GET.get("access_token") 107 return None 108 109 110def extract_client_auth(request: HttpRequest) -> tuple[str, str]: 111 """ 112 Get client credentials using HTTP Basic Authentication method. 113 Or try getting parameters via POST. 114 See: http://tools.ietf.org/html/rfc6750#section-2.1 115 116 Return a tuple `(client_id, client_secret)`. 117 """ 118 auth_header = request.META.get("HTTP_AUTHORIZATION", "") 119 120 if re.compile(r"^Basic\s{1}.+$").match(auth_header): 121 b64_user_pass = auth_header.split()[1] 122 try: 123 user_pass = b64decode(b64_user_pass).decode("utf-8").partition(":") 124 client_id, _, client_secret = user_pass 125 # RFC 6749 requires client credentials in Basic auth to be form-encoded first. 126 # We only percent-decode here so raw `+` characters keep their previous meaning. 127 client_id = unquote(client_id) 128 client_secret = unquote(client_secret) 129 except ValueError, Error: 130 client_id = client_secret = "" # nosec 131 else: 132 client_id = request.POST.get("client_id", "") 133 client_secret = request.POST.get("client_secret", "") 134 135 return (client_id, client_secret) 136 137 138def protected_resource_view(scopes: list[str]): 139 """View decorator. The client accesses protected resources by presenting the 140 access token to the resource server. 141 142 https://datatracker.ietf.org/doc/html/rfc6749#section-7 143 144 This decorator also injects the token into `kwargs`""" 145 146 def wrapper(view): 147 def view_wrapper(request: HttpRequest, *args, **kwargs): 148 if request.method == "OPTIONS": 149 return view(request, *args, **kwargs) 150 try: 151 access_token = extract_access_token(request) 152 if not access_token: 153 LOGGER.debug("No token passed") 154 raise BearerTokenError("invalid_token") 155 156 token = AccessToken.objects.filter(token=access_token).first() 157 if not token: 158 LOGGER.debug("Token does not exist", access_token=access_token) 159 raise BearerTokenError("invalid_token") 160 161 if token.is_expired: 162 LOGGER.debug("Token has expired", access_token=access_token) 163 raise BearerTokenError("invalid_token") 164 165 if token.revoked: 166 LOGGER.warning("Revoked token was used", access_token=access_token) 167 Event.new( 168 action=EventAction.SUSPICIOUS_REQUEST, 169 message="Revoked access token was used", 170 token=token, 171 provider=token.provider, 172 ).from_http(request, user=token.user) 173 raise BearerTokenError("invalid_token") 174 175 if not set(scopes).issubset(set(token.scope)): 176 LOGGER.warning( 177 "Scope mismatch.", 178 required=set(scopes), 179 token_has=set(token.scope), 180 ) 181 raise BearerTokenError("insufficient_scope") 182 except BearerTokenError as error: 183 response = HttpResponse(status=error.status) 184 response["WWW-Authenticate"] = ( 185 f'error="{error.code}", error_description="{error.description}"' 186 ) 187 return response 188 kwargs["token"] = token 189 CTX_AUTH_VIA.set("oauth_token") 190 response = view(request, *args, **kwargs) 191 response.ak_context = {} 192 response.ak_context[KEY_USER] = token.user.username 193 return response 194 195 return view_wrapper 196 197 return wrapper 198 199 200def provider_from_request(request: HttpRequest) -> tuple[OAuth2Provider | None, str, str]: 201 """Get provider from Basic auth of client_id:client_secret. Does not perform authentication""" 202 client_id, client_secret = extract_client_auth(request) 203 if client_id == client_secret == "": 204 return None, "", "" 205 provider: OAuth2Provider | None = OAuth2Provider.objects.filter(client_id=client_id).first() 206 return provider, client_id, client_secret 207 208 209def authenticate_provider(request: HttpRequest) -> OAuth2Provider | None: 210 """Attempt to authenticate via Basic auth of client_id:client_secret""" 211 provider, client_id, client_secret = provider_from_request(request) 212 if not provider: 213 return None 214 if not compare_digest(client_id, provider.client_id) or not compare_digest( 215 client_secret, provider.client_secret 216 ): 217 LOGGER.debug("(basic) Provider for basic auth does not exist") 218 return None 219 CTX_AUTH_VIA.set("oauth_client_secret") 220 return provider 221 222 223class HttpResponseRedirectScheme(HttpResponseRedirect): 224 """HTTP Response to redirect, can be to a non-http scheme""" 225 226 def __init__( 227 self, 228 redirect_to: str, 229 *args: Any, 230 allowed_schemes: list[str] | None = None, 231 **kwargs: Any, 232 ) -> None: 233 self.allowed_schemes = allowed_schemes or ["http", "https", "ftp"] 234 super().__init__(redirect_to, *args, **kwargs) 235 236 237def create_logout_token( 238 provider: OAuth2Provider, 239 iss: str, 240 sub: str | None = None, 241 session_key: str | None = None, 242) -> str: 243 """Create a logout token for Back-Channel Logout 244 245 As per https://openid.net/specs/openid-connect-backchannel-1_0.html 246 """ 247 248 LOGGER.debug("Creating logout token", provider=provider, sub=sub) 249 250 _now = now() 251 # Create the logout token payload 252 payload = { 253 "iss": str(iss), 254 "aud": provider.client_id, 255 "iat": int(_now.timestamp()), 256 "exp": int((_now + timedelta_from_string(provider.access_token_validity)).timestamp()), 257 "jti": str(uuid.uuid4()), 258 "events": { 259 "http://schemas.openid.net/event/backchannel-logout": {}, 260 }, 261 } 262 263 # Add either sub or sid (or both) 264 if sub: 265 payload["sub"] = sub 266 if session_key: 267 payload["sid"] = hash_session_key(session_key) 268 # Encode the token 269 return provider.encode(payload, jwt_type="logout+jwt")
29class TokenResponse(JsonResponse): 30 """JSON Response with headers that it should never be cached 31 32 https://openid.net/specs/openid-connect-core-1_0.html#TokenResponse""" 33 34 def __init__(self, *args, **kwargs): 35 super().__init__(*args, **kwargs) 36 self["Cache-Control"] = "no-store" 37 self["Pragma"] = "no-cache"
JSON Response with headers that it should never be cached
https://openid.net/specs/openid-connect-core-1_0.html#TokenResponse
40def cors_allow(request: HttpRequest, response: HttpResponse, *allowed_origins: str): 41 """Add headers to permit CORS requests from allowed_origins, with or without credentials, 42 with any headers.""" 43 origin = request.META.get("HTTP_ORIGIN") 44 if not origin: 45 return response 46 47 # OPTIONS requests don't have an authorization header -> hence 48 # we can't extract the provider this request is for 49 # so for options requests we allow the calling origin without checking 50 allowed = request.method == "OPTIONS" 51 received_origin = urlparse(origin) 52 for allowed_origin in allowed_origins: 53 url = urlparse(allowed_origin) 54 if ( 55 received_origin.scheme == url.scheme 56 and received_origin.hostname == url.hostname 57 and received_origin.port == url.port 58 ): 59 allowed = True 60 if not allowed: 61 LOGGER.warning( 62 "CORS: Origin is not an allowed origin", 63 requested=received_origin, 64 allowed=allowed_origins, 65 ) 66 return response 67 68 # From the CORS spec: The string "*" cannot be used for a resource that supports credentials. 69 response["Access-Control-Allow-Origin"] = origin 70 patch_vary_headers(response, ["Origin"]) 71 response["Access-Control-Allow-Credentials"] = "true" 72 73 if request.method == "OPTIONS": 74 if "HTTP_ACCESS_CONTROL_REQUEST_HEADERS" in request.META: 75 response["Access-Control-Allow-Headers"] = request.META[ 76 "HTTP_ACCESS_CONTROL_REQUEST_HEADERS" 77 ] 78 response["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS" 79 80 return response
Add headers to permit CORS requests from allowed_origins, with or without credentials, with any headers.
83def pkce_s256_challenge(verifier: str) -> str: 84 """Convert PKCE verifier to S256 challenge""" 85 return ( 86 urlsafe_b64encode(sha256(verifier.encode("ascii")).digest()) 87 .decode("utf-8") 88 .replace("=", "") 89 )
Convert PKCE verifier to S256 challenge
92def extract_access_token(request: HttpRequest) -> str | None: 93 """ 94 Get the access token using Authorization Request Header Field method. 95 Or try getting via GET. 96 See: http://tools.ietf.org/html/rfc6750#section-2.1 97 98 Return a string. 99 """ 100 auth_header = request.META.get("HTTP_AUTHORIZATION", "") 101 102 if re.compile(r"^[Bb]earer\s{1}.+$").match(auth_header): 103 return auth_header.split()[1] 104 if "access_token" in request.POST: 105 return request.POST.get("access_token") 106 if "access_token" in request.GET: 107 return request.GET.get("access_token") 108 return None
Get the access token using Authorization Request Header Field method. Or try getting via GET. See: http://tools.ietf.org/html/rfc6750#section-2.1
Return a string.
111def extract_client_auth(request: HttpRequest) -> tuple[str, str]: 112 """ 113 Get client credentials using HTTP Basic Authentication method. 114 Or try getting parameters via POST. 115 See: http://tools.ietf.org/html/rfc6750#section-2.1 116 117 Return a tuple `(client_id, client_secret)`. 118 """ 119 auth_header = request.META.get("HTTP_AUTHORIZATION", "") 120 121 if re.compile(r"^Basic\s{1}.+$").match(auth_header): 122 b64_user_pass = auth_header.split()[1] 123 try: 124 user_pass = b64decode(b64_user_pass).decode("utf-8").partition(":") 125 client_id, _, client_secret = user_pass 126 # RFC 6749 requires client credentials in Basic auth to be form-encoded first. 127 # We only percent-decode here so raw `+` characters keep their previous meaning. 128 client_id = unquote(client_id) 129 client_secret = unquote(client_secret) 130 except ValueError, Error: 131 client_id = client_secret = "" # nosec 132 else: 133 client_id = request.POST.get("client_id", "") 134 client_secret = request.POST.get("client_secret", "") 135 136 return (client_id, client_secret)
Get client credentials using HTTP Basic Authentication method. Or try getting parameters via POST. See: http://tools.ietf.org/html/rfc6750#section-2.1
Return a tuple (client_id, client_secret).
139def protected_resource_view(scopes: list[str]): 140 """View decorator. The client accesses protected resources by presenting the 141 access token to the resource server. 142 143 https://datatracker.ietf.org/doc/html/rfc6749#section-7 144 145 This decorator also injects the token into `kwargs`""" 146 147 def wrapper(view): 148 def view_wrapper(request: HttpRequest, *args, **kwargs): 149 if request.method == "OPTIONS": 150 return view(request, *args, **kwargs) 151 try: 152 access_token = extract_access_token(request) 153 if not access_token: 154 LOGGER.debug("No token passed") 155 raise BearerTokenError("invalid_token") 156 157 token = AccessToken.objects.filter(token=access_token).first() 158 if not token: 159 LOGGER.debug("Token does not exist", access_token=access_token) 160 raise BearerTokenError("invalid_token") 161 162 if token.is_expired: 163 LOGGER.debug("Token has expired", access_token=access_token) 164 raise BearerTokenError("invalid_token") 165 166 if token.revoked: 167 LOGGER.warning("Revoked token was used", access_token=access_token) 168 Event.new( 169 action=EventAction.SUSPICIOUS_REQUEST, 170 message="Revoked access token was used", 171 token=token, 172 provider=token.provider, 173 ).from_http(request, user=token.user) 174 raise BearerTokenError("invalid_token") 175 176 if not set(scopes).issubset(set(token.scope)): 177 LOGGER.warning( 178 "Scope mismatch.", 179 required=set(scopes), 180 token_has=set(token.scope), 181 ) 182 raise BearerTokenError("insufficient_scope") 183 except BearerTokenError as error: 184 response = HttpResponse(status=error.status) 185 response["WWW-Authenticate"] = ( 186 f'error="{error.code}", error_description="{error.description}"' 187 ) 188 return response 189 kwargs["token"] = token 190 CTX_AUTH_VIA.set("oauth_token") 191 response = view(request, *args, **kwargs) 192 response.ak_context = {} 193 response.ak_context[KEY_USER] = token.user.username 194 return response 195 196 return view_wrapper 197 198 return wrapper
View decorator. The client accesses protected resources by presenting the access token to the resource server.
https://datatracker.ietf.org/doc/html/rfc6749#section-7
This decorator also injects the token into kwargs
201def provider_from_request(request: HttpRequest) -> tuple[OAuth2Provider | None, str, str]: 202 """Get provider from Basic auth of client_id:client_secret. Does not perform authentication""" 203 client_id, client_secret = extract_client_auth(request) 204 if client_id == client_secret == "": 205 return None, "", "" 206 provider: OAuth2Provider | None = OAuth2Provider.objects.filter(client_id=client_id).first() 207 return provider, client_id, client_secret
Get provider from Basic auth of client_id:client_secret. Does not perform authentication
210def authenticate_provider(request: HttpRequest) -> OAuth2Provider | None: 211 """Attempt to authenticate via Basic auth of client_id:client_secret""" 212 provider, client_id, client_secret = provider_from_request(request) 213 if not provider: 214 return None 215 if not compare_digest(client_id, provider.client_id) or not compare_digest( 216 client_secret, provider.client_secret 217 ): 218 LOGGER.debug("(basic) Provider for basic auth does not exist") 219 return None 220 CTX_AUTH_VIA.set("oauth_client_secret") 221 return provider
Attempt to authenticate via Basic auth of client_id:client_secret
224class HttpResponseRedirectScheme(HttpResponseRedirect): 225 """HTTP Response to redirect, can be to a non-http scheme""" 226 227 def __init__( 228 self, 229 redirect_to: str, 230 *args: Any, 231 allowed_schemes: list[str] | None = None, 232 **kwargs: Any, 233 ) -> None: 234 self.allowed_schemes = allowed_schemes or ["http", "https", "ftp"] 235 super().__init__(redirect_to, *args, **kwargs)
HTTP Response to redirect, can be to a non-http scheme
238def create_logout_token( 239 provider: OAuth2Provider, 240 iss: str, 241 sub: str | None = None, 242 session_key: str | None = None, 243) -> str: 244 """Create a logout token for Back-Channel Logout 245 246 As per https://openid.net/specs/openid-connect-backchannel-1_0.html 247 """ 248 249 LOGGER.debug("Creating logout token", provider=provider, sub=sub) 250 251 _now = now() 252 # Create the logout token payload 253 payload = { 254 "iss": str(iss), 255 "aud": provider.client_id, 256 "iat": int(_now.timestamp()), 257 "exp": int((_now + timedelta_from_string(provider.access_token_validity)).timestamp()), 258 "jti": str(uuid.uuid4()), 259 "events": { 260 "http://schemas.openid.net/event/backchannel-logout": {}, 261 }, 262 } 263 264 # Add either sub or sid (or both) 265 if sub: 266 payload["sub"] = sub 267 if session_key: 268 payload["sid"] = hash_session_key(session_key) 269 # Encode the token 270 return provider.encode(payload, jwt_type="logout+jwt")
Create a logout token for Back-Channel Logout
As per https://openid.net/specs/openid-connect-backchannel-1_0.html