authentik.sources.oauth.clients.oauth1

OAuth 1 Clients

  1"""OAuth 1 Clients"""
  2
  3from typing import Any
  4from urllib.parse import parse_qsl
  5
  6from requests.exceptions import RequestException
  7from requests.models import Response
  8from requests_oauthlib import OAuth1
  9from structlog.stdlib import get_logger
 10
 11from authentik.sources.oauth.clients.base import BaseOAuthClient
 12from authentik.sources.oauth.exceptions import OAuthSourceException
 13
 14LOGGER = get_logger()
 15
 16
 17class OAuthClient(BaseOAuthClient):
 18    """OAuth1 Client"""
 19
 20    _default_headers = {
 21        "Accept": "application/json",
 22    }
 23
 24    def get_access_token(self, **request_kwargs) -> dict[str, Any] | None:
 25        """Fetch access token from callback request."""
 26        raw_token = self.request.session.get(self.session_key, None)
 27        verifier = self.request.GET.get("oauth_verifier", None)
 28        callback = self.request.build_absolute_uri(self.callback)
 29        if raw_token is not None and verifier is not None:
 30            token = self.parse_raw_token(raw_token)
 31            try:
 32                access_token_url = self.source.source_type.access_token_url or ""
 33                if self.source.source_type.urls_customizable and self.source.access_token_url:
 34                    access_token_url = self.source.access_token_url
 35                response = self.do_request(
 36                    "post",
 37                    access_token_url,
 38                    token=token,
 39                    headers=self._default_headers,
 40                    oauth_verifier=verifier,
 41                    oauth_callback=callback,
 42                )
 43                response.raise_for_status()
 44            except RequestException as exc:
 45                LOGGER.warning(
 46                    "Unable to fetch access token",
 47                    exc=exc,
 48                    response=exc.response.text if exc.response is not None else str(exc),
 49                )
 50                return None
 51            return self.parse_raw_token(response.text)
 52        return None
 53
 54    def get_request_token(self) -> str:
 55        """Fetch the OAuth request token. Only required for OAuth 1.0."""
 56        callback = self.request.build_absolute_uri(self.callback)
 57        try:
 58            request_token_url = self.source.source_type.request_token_url or ""
 59            if self.source.source_type.urls_customizable and self.source.request_token_url:
 60                request_token_url = self.source.request_token_url
 61            response = self.do_request(
 62                "post",
 63                request_token_url,
 64                headers=self._default_headers,
 65                oauth_callback=callback,
 66            )
 67            response.raise_for_status()
 68        except RequestException as exc:
 69            raise OAuthSourceException(
 70                exc.response.text if exc.response is not None else str(exc),
 71            ) from exc
 72        return response.text
 73
 74    def get_redirect_args(self) -> dict[str, Any]:
 75        """Get request parameters for redirect url."""
 76        callback = self.request.build_absolute_uri(self.callback)
 77        raw_token = self.get_request_token()
 78        token = self.parse_raw_token(raw_token)
 79        self.request.session[self.session_key] = raw_token
 80        return {
 81            "oauth_token": token["oauth_token"],
 82            "oauth_callback": callback,
 83        }
 84
 85    def parse_raw_token(self, raw_token: str) -> dict[str, Any]:
 86        """Parse token and secret from raw token response."""
 87        return dict(parse_qsl(raw_token))
 88
 89    def do_request(self, method: str, url: str, **kwargs) -> Response:
 90        """Build remote url request. Constructs necessary auth."""
 91        resource_owner_key = None
 92        resource_owner_secret = None
 93        if "token" in kwargs:
 94            user_token: dict[str, Any] = kwargs.pop("token")
 95            resource_owner_key = user_token["oauth_token"]
 96            resource_owner_secret = user_token["oauth_token_secret"]
 97
 98        callback = kwargs.pop("oauth_callback", None)
 99        verifier = kwargs.pop("oauth_verifier", None)
100        oauth = OAuth1(
101            resource_owner_key=resource_owner_key,
102            resource_owner_secret=resource_owner_secret,
103            client_key=self.source.consumer_key,
104            client_secret=self.source.consumer_secret,
105            verifier=verifier,
106            callback_uri=callback,
107        )
108        kwargs["auth"] = oauth
109        return super().do_request(method, url, **kwargs)
110
111    @property
112    def session_key(self) -> str:
113        return f"oauth-client-{self.source.name}-request-token"
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
 18class OAuthClient(BaseOAuthClient):
 19    """OAuth1 Client"""
 20
 21    _default_headers = {
 22        "Accept": "application/json",
 23    }
 24
 25    def get_access_token(self, **request_kwargs) -> dict[str, Any] | None:
 26        """Fetch access token from callback request."""
 27        raw_token = self.request.session.get(self.session_key, None)
 28        verifier = self.request.GET.get("oauth_verifier", None)
 29        callback = self.request.build_absolute_uri(self.callback)
 30        if raw_token is not None and verifier is not None:
 31            token = self.parse_raw_token(raw_token)
 32            try:
 33                access_token_url = self.source.source_type.access_token_url or ""
 34                if self.source.source_type.urls_customizable and self.source.access_token_url:
 35                    access_token_url = self.source.access_token_url
 36                response = self.do_request(
 37                    "post",
 38                    access_token_url,
 39                    token=token,
 40                    headers=self._default_headers,
 41                    oauth_verifier=verifier,
 42                    oauth_callback=callback,
 43                )
 44                response.raise_for_status()
 45            except RequestException as exc:
 46                LOGGER.warning(
 47                    "Unable to fetch access token",
 48                    exc=exc,
 49                    response=exc.response.text if exc.response is not None else str(exc),
 50                )
 51                return None
 52            return self.parse_raw_token(response.text)
 53        return None
 54
 55    def get_request_token(self) -> str:
 56        """Fetch the OAuth request token. Only required for OAuth 1.0."""
 57        callback = self.request.build_absolute_uri(self.callback)
 58        try:
 59            request_token_url = self.source.source_type.request_token_url or ""
 60            if self.source.source_type.urls_customizable and self.source.request_token_url:
 61                request_token_url = self.source.request_token_url
 62            response = self.do_request(
 63                "post",
 64                request_token_url,
 65                headers=self._default_headers,
 66                oauth_callback=callback,
 67            )
 68            response.raise_for_status()
 69        except RequestException as exc:
 70            raise OAuthSourceException(
 71                exc.response.text if exc.response is not None else str(exc),
 72            ) from exc
 73        return response.text
 74
 75    def get_redirect_args(self) -> dict[str, Any]:
 76        """Get request parameters for redirect url."""
 77        callback = self.request.build_absolute_uri(self.callback)
 78        raw_token = self.get_request_token()
 79        token = self.parse_raw_token(raw_token)
 80        self.request.session[self.session_key] = raw_token
 81        return {
 82            "oauth_token": token["oauth_token"],
 83            "oauth_callback": callback,
 84        }
 85
 86    def parse_raw_token(self, raw_token: str) -> dict[str, Any]:
 87        """Parse token and secret from raw token response."""
 88        return dict(parse_qsl(raw_token))
 89
 90    def do_request(self, method: str, url: str, **kwargs) -> Response:
 91        """Build remote url request. Constructs necessary auth."""
 92        resource_owner_key = None
 93        resource_owner_secret = None
 94        if "token" in kwargs:
 95            user_token: dict[str, Any] = kwargs.pop("token")
 96            resource_owner_key = user_token["oauth_token"]
 97            resource_owner_secret = user_token["oauth_token_secret"]
 98
 99        callback = kwargs.pop("oauth_callback", None)
100        verifier = kwargs.pop("oauth_verifier", None)
101        oauth = OAuth1(
102            resource_owner_key=resource_owner_key,
103            resource_owner_secret=resource_owner_secret,
104            client_key=self.source.consumer_key,
105            client_secret=self.source.consumer_secret,
106            verifier=verifier,
107            callback_uri=callback,
108        )
109        kwargs["auth"] = oauth
110        return super().do_request(method, url, **kwargs)
111
112    @property
113    def session_key(self) -> str:
114        return f"oauth-client-{self.source.name}-request-token"

OAuth1 Client

def get_access_token(self, **request_kwargs) -> dict[str, Any] | None:
25    def get_access_token(self, **request_kwargs) -> dict[str, Any] | None:
26        """Fetch access token from callback request."""
27        raw_token = self.request.session.get(self.session_key, None)
28        verifier = self.request.GET.get("oauth_verifier", None)
29        callback = self.request.build_absolute_uri(self.callback)
30        if raw_token is not None and verifier is not None:
31            token = self.parse_raw_token(raw_token)
32            try:
33                access_token_url = self.source.source_type.access_token_url or ""
34                if self.source.source_type.urls_customizable and self.source.access_token_url:
35                    access_token_url = self.source.access_token_url
36                response = self.do_request(
37                    "post",
38                    access_token_url,
39                    token=token,
40                    headers=self._default_headers,
41                    oauth_verifier=verifier,
42                    oauth_callback=callback,
43                )
44                response.raise_for_status()
45            except RequestException as exc:
46                LOGGER.warning(
47                    "Unable to fetch access token",
48                    exc=exc,
49                    response=exc.response.text if exc.response is not None else str(exc),
50                )
51                return None
52            return self.parse_raw_token(response.text)
53        return None

Fetch access token from callback request.

def get_request_token(self) -> str:
55    def get_request_token(self) -> str:
56        """Fetch the OAuth request token. Only required for OAuth 1.0."""
57        callback = self.request.build_absolute_uri(self.callback)
58        try:
59            request_token_url = self.source.source_type.request_token_url or ""
60            if self.source.source_type.urls_customizable and self.source.request_token_url:
61                request_token_url = self.source.request_token_url
62            response = self.do_request(
63                "post",
64                request_token_url,
65                headers=self._default_headers,
66                oauth_callback=callback,
67            )
68            response.raise_for_status()
69        except RequestException as exc:
70            raise OAuthSourceException(
71                exc.response.text if exc.response is not None else str(exc),
72            ) from exc
73        return response.text

Fetch the OAuth request token. Only required for OAuth 1.0.

def get_redirect_args(self) -> dict[str, typing.Any]:
75    def get_redirect_args(self) -> dict[str, Any]:
76        """Get request parameters for redirect url."""
77        callback = self.request.build_absolute_uri(self.callback)
78        raw_token = self.get_request_token()
79        token = self.parse_raw_token(raw_token)
80        self.request.session[self.session_key] = raw_token
81        return {
82            "oauth_token": token["oauth_token"],
83            "oauth_callback": callback,
84        }

Get request parameters for redirect url.

def parse_raw_token(self, raw_token: str) -> dict[str, typing.Any]:
86    def parse_raw_token(self, raw_token: str) -> dict[str, Any]:
87        """Parse token and secret from raw token response."""
88        return dict(parse_qsl(raw_token))

Parse token and secret from raw token response.

def do_request(self, method: str, url: str, **kwargs) -> requests.models.Response:
 90    def do_request(self, method: str, url: str, **kwargs) -> Response:
 91        """Build remote url request. Constructs necessary auth."""
 92        resource_owner_key = None
 93        resource_owner_secret = None
 94        if "token" in kwargs:
 95            user_token: dict[str, Any] = kwargs.pop("token")
 96            resource_owner_key = user_token["oauth_token"]
 97            resource_owner_secret = user_token["oauth_token_secret"]
 98
 99        callback = kwargs.pop("oauth_callback", None)
100        verifier = kwargs.pop("oauth_verifier", None)
101        oauth = OAuth1(
102            resource_owner_key=resource_owner_key,
103            resource_owner_secret=resource_owner_secret,
104            client_key=self.source.consumer_key,
105            client_secret=self.source.consumer_secret,
106            verifier=verifier,
107            callback_uri=callback,
108        )
109        kwargs["auth"] = oauth
110        return super().do_request(method, url, **kwargs)

Build remote url request. Constructs necessary auth.

session_key: str
112    @property
113    def session_key(self) -> str:
114        return f"oauth-client-{self.source.name}-request-token"

Return Session Key