authentik.sources.oauth.clients.oauth1
OAuth 1 Clients
1"""OAuth 1 Clients""" 2 3from typing import Any 4from urllib.parse import parse_qsl 5 6from requests.exceptions import RequestException 7from requests.models import Response 8from requests_oauthlib import OAuth1 9from structlog.stdlib import get_logger 10 11from authentik.sources.oauth.clients.base import BaseOAuthClient 12from authentik.sources.oauth.exceptions import OAuthSourceException 13 14LOGGER = get_logger() 15 16 17class OAuthClient(BaseOAuthClient): 18 """OAuth1 Client""" 19 20 _default_headers = { 21 "Accept": "application/json", 22 } 23 24 def get_access_token(self, **request_kwargs) -> dict[str, Any] | None: 25 """Fetch access token from callback request.""" 26 raw_token = self.request.session.get(self.session_key, None) 27 verifier = self.request.GET.get("oauth_verifier", None) 28 callback = self.request.build_absolute_uri(self.callback) 29 if raw_token is not None and verifier is not None: 30 token = self.parse_raw_token(raw_token) 31 try: 32 access_token_url = self.source.source_type.access_token_url or "" 33 if self.source.source_type.urls_customizable and self.source.access_token_url: 34 access_token_url = self.source.access_token_url 35 response = self.do_request( 36 "post", 37 access_token_url, 38 token=token, 39 headers=self._default_headers, 40 oauth_verifier=verifier, 41 oauth_callback=callback, 42 ) 43 response.raise_for_status() 44 except RequestException as exc: 45 LOGGER.warning( 46 "Unable to fetch access token", 47 exc=exc, 48 response=exc.response.text if exc.response is not None else str(exc), 49 ) 50 return None 51 return self.parse_raw_token(response.text) 52 return None 53 54 def get_request_token(self) -> str: 55 """Fetch the OAuth request token. Only required for OAuth 1.0.""" 56 callback = self.request.build_absolute_uri(self.callback) 57 try: 58 request_token_url = self.source.source_type.request_token_url or "" 59 if self.source.source_type.urls_customizable and self.source.request_token_url: 60 request_token_url = self.source.request_token_url 61 response = self.do_request( 62 "post", 63 request_token_url, 64 headers=self._default_headers, 65 oauth_callback=callback, 66 ) 67 response.raise_for_status() 68 except RequestException as exc: 69 raise OAuthSourceException( 70 exc.response.text if exc.response is not None else str(exc), 71 ) from exc 72 return response.text 73 74 def get_redirect_args(self) -> dict[str, Any]: 75 """Get request parameters for redirect url.""" 76 callback = self.request.build_absolute_uri(self.callback) 77 raw_token = self.get_request_token() 78 token = self.parse_raw_token(raw_token) 79 self.request.session[self.session_key] = raw_token 80 return { 81 "oauth_token": token["oauth_token"], 82 "oauth_callback": callback, 83 } 84 85 def parse_raw_token(self, raw_token: str) -> dict[str, Any]: 86 """Parse token and secret from raw token response.""" 87 return dict(parse_qsl(raw_token)) 88 89 def do_request(self, method: str, url: str, **kwargs) -> Response: 90 """Build remote url request. Constructs necessary auth.""" 91 resource_owner_key = None 92 resource_owner_secret = None 93 if "token" in kwargs: 94 user_token: dict[str, Any] = kwargs.pop("token") 95 resource_owner_key = user_token["oauth_token"] 96 resource_owner_secret = user_token["oauth_token_secret"] 97 98 callback = kwargs.pop("oauth_callback", None) 99 verifier = kwargs.pop("oauth_verifier", None) 100 oauth = OAuth1( 101 resource_owner_key=resource_owner_key, 102 resource_owner_secret=resource_owner_secret, 103 client_key=self.source.consumer_key, 104 client_secret=self.source.consumer_secret, 105 verifier=verifier, 106 callback_uri=callback, 107 ) 108 kwargs["auth"] = oauth 109 return super().do_request(method, url, **kwargs) 110 111 @property 112 def session_key(self) -> str: 113 return f"oauth-client-{self.source.name}-request-token"
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
18class OAuthClient(BaseOAuthClient): 19 """OAuth1 Client""" 20 21 _default_headers = { 22 "Accept": "application/json", 23 } 24 25 def get_access_token(self, **request_kwargs) -> dict[str, Any] | None: 26 """Fetch access token from callback request.""" 27 raw_token = self.request.session.get(self.session_key, None) 28 verifier = self.request.GET.get("oauth_verifier", None) 29 callback = self.request.build_absolute_uri(self.callback) 30 if raw_token is not None and verifier is not None: 31 token = self.parse_raw_token(raw_token) 32 try: 33 access_token_url = self.source.source_type.access_token_url or "" 34 if self.source.source_type.urls_customizable and self.source.access_token_url: 35 access_token_url = self.source.access_token_url 36 response = self.do_request( 37 "post", 38 access_token_url, 39 token=token, 40 headers=self._default_headers, 41 oauth_verifier=verifier, 42 oauth_callback=callback, 43 ) 44 response.raise_for_status() 45 except RequestException as exc: 46 LOGGER.warning( 47 "Unable to fetch access token", 48 exc=exc, 49 response=exc.response.text if exc.response is not None else str(exc), 50 ) 51 return None 52 return self.parse_raw_token(response.text) 53 return None 54 55 def get_request_token(self) -> str: 56 """Fetch the OAuth request token. Only required for OAuth 1.0.""" 57 callback = self.request.build_absolute_uri(self.callback) 58 try: 59 request_token_url = self.source.source_type.request_token_url or "" 60 if self.source.source_type.urls_customizable and self.source.request_token_url: 61 request_token_url = self.source.request_token_url 62 response = self.do_request( 63 "post", 64 request_token_url, 65 headers=self._default_headers, 66 oauth_callback=callback, 67 ) 68 response.raise_for_status() 69 except RequestException as exc: 70 raise OAuthSourceException( 71 exc.response.text if exc.response is not None else str(exc), 72 ) from exc 73 return response.text 74 75 def get_redirect_args(self) -> dict[str, Any]: 76 """Get request parameters for redirect url.""" 77 callback = self.request.build_absolute_uri(self.callback) 78 raw_token = self.get_request_token() 79 token = self.parse_raw_token(raw_token) 80 self.request.session[self.session_key] = raw_token 81 return { 82 "oauth_token": token["oauth_token"], 83 "oauth_callback": callback, 84 } 85 86 def parse_raw_token(self, raw_token: str) -> dict[str, Any]: 87 """Parse token and secret from raw token response.""" 88 return dict(parse_qsl(raw_token)) 89 90 def do_request(self, method: str, url: str, **kwargs) -> Response: 91 """Build remote url request. Constructs necessary auth.""" 92 resource_owner_key = None 93 resource_owner_secret = None 94 if "token" in kwargs: 95 user_token: dict[str, Any] = kwargs.pop("token") 96 resource_owner_key = user_token["oauth_token"] 97 resource_owner_secret = user_token["oauth_token_secret"] 98 99 callback = kwargs.pop("oauth_callback", None) 100 verifier = kwargs.pop("oauth_verifier", None) 101 oauth = OAuth1( 102 resource_owner_key=resource_owner_key, 103 resource_owner_secret=resource_owner_secret, 104 client_key=self.source.consumer_key, 105 client_secret=self.source.consumer_secret, 106 verifier=verifier, 107 callback_uri=callback, 108 ) 109 kwargs["auth"] = oauth 110 return super().do_request(method, url, **kwargs) 111 112 @property 113 def session_key(self) -> str: 114 return f"oauth-client-{self.source.name}-request-token"
OAuth1 Client
def
get_access_token(self, **request_kwargs) -> dict[str, Any] | None:
25 def get_access_token(self, **request_kwargs) -> dict[str, Any] | None: 26 """Fetch access token from callback request.""" 27 raw_token = self.request.session.get(self.session_key, None) 28 verifier = self.request.GET.get("oauth_verifier", None) 29 callback = self.request.build_absolute_uri(self.callback) 30 if raw_token is not None and verifier is not None: 31 token = self.parse_raw_token(raw_token) 32 try: 33 access_token_url = self.source.source_type.access_token_url or "" 34 if self.source.source_type.urls_customizable and self.source.access_token_url: 35 access_token_url = self.source.access_token_url 36 response = self.do_request( 37 "post", 38 access_token_url, 39 token=token, 40 headers=self._default_headers, 41 oauth_verifier=verifier, 42 oauth_callback=callback, 43 ) 44 response.raise_for_status() 45 except RequestException as exc: 46 LOGGER.warning( 47 "Unable to fetch access token", 48 exc=exc, 49 response=exc.response.text if exc.response is not None else str(exc), 50 ) 51 return None 52 return self.parse_raw_token(response.text) 53 return None
Fetch access token from callback request.
def
get_request_token(self) -> str:
55 def get_request_token(self) -> str: 56 """Fetch the OAuth request token. Only required for OAuth 1.0.""" 57 callback = self.request.build_absolute_uri(self.callback) 58 try: 59 request_token_url = self.source.source_type.request_token_url or "" 60 if self.source.source_type.urls_customizable and self.source.request_token_url: 61 request_token_url = self.source.request_token_url 62 response = self.do_request( 63 "post", 64 request_token_url, 65 headers=self._default_headers, 66 oauth_callback=callback, 67 ) 68 response.raise_for_status() 69 except RequestException as exc: 70 raise OAuthSourceException( 71 exc.response.text if exc.response is not None else str(exc), 72 ) from exc 73 return response.text
Fetch the OAuth request token. Only required for OAuth 1.0.
def
get_redirect_args(self) -> dict[str, typing.Any]:
75 def get_redirect_args(self) -> dict[str, Any]: 76 """Get request parameters for redirect url.""" 77 callback = self.request.build_absolute_uri(self.callback) 78 raw_token = self.get_request_token() 79 token = self.parse_raw_token(raw_token) 80 self.request.session[self.session_key] = raw_token 81 return { 82 "oauth_token": token["oauth_token"], 83 "oauth_callback": callback, 84 }
Get request parameters for redirect url.
def
parse_raw_token(self, raw_token: str) -> dict[str, typing.Any]:
86 def parse_raw_token(self, raw_token: str) -> dict[str, Any]: 87 """Parse token and secret from raw token response.""" 88 return dict(parse_qsl(raw_token))
Parse token and secret from raw token response.
def
do_request(self, method: str, url: str, **kwargs) -> requests.models.Response:
90 def do_request(self, method: str, url: str, **kwargs) -> Response: 91 """Build remote url request. Constructs necessary auth.""" 92 resource_owner_key = None 93 resource_owner_secret = None 94 if "token" in kwargs: 95 user_token: dict[str, Any] = kwargs.pop("token") 96 resource_owner_key = user_token["oauth_token"] 97 resource_owner_secret = user_token["oauth_token_secret"] 98 99 callback = kwargs.pop("oauth_callback", None) 100 verifier = kwargs.pop("oauth_verifier", None) 101 oauth = OAuth1( 102 resource_owner_key=resource_owner_key, 103 resource_owner_secret=resource_owner_secret, 104 client_key=self.source.consumer_key, 105 client_secret=self.source.consumer_secret, 106 verifier=verifier, 107 callback_uri=callback, 108 ) 109 kwargs["auth"] = oauth 110 return super().do_request(method, url, **kwargs)
Build remote url request. Constructs necessary auth.