authentik.sources.oauth.clients.base
OAuth Clients
1"""OAuth Clients""" 2 3from typing import Any 4from urllib.parse import parse_qs, quote, urlencode, urlparse, urlunparse 5 6from django.http import HttpRequest 7from requests import Session 8from requests.exceptions import RequestException 9from requests.models import Response 10from structlog.stdlib import get_logger 11 12from authentik.events.models import Event, EventAction 13from authentik.lib.utils.http import get_http_session 14from authentik.sources.oauth.models import OAuthSource 15 16 17class BaseOAuthClient: 18 """Base OAuth Client""" 19 20 session: Session 21 22 source: OAuthSource 23 request: HttpRequest 24 25 callback: str | None 26 27 def __init__(self, source: OAuthSource, request: HttpRequest, callback: str | None = None): 28 self.source = source 29 self.session = get_http_session() 30 self.request = request 31 self.callback = callback 32 self.logger = get_logger().bind(source=source.slug) 33 34 def get_access_token(self, **request_kwargs) -> dict[str, Any] | None: 35 """Fetch access token from callback request.""" 36 raise NotImplementedError("Defined in a sub-class") # pragma: no cover 37 38 def get_profile_info(self, token: dict[str, str]) -> dict[str, Any] | None: 39 """Fetch user profile information.""" 40 profile_url = self.source.source_type.profile_url or "" 41 if self.source.source_type.urls_customizable and self.source.profile_url: 42 profile_url = self.source.profile_url 43 response = self.do_request("get", profile_url, token=token) 44 try: 45 response.raise_for_status() 46 except RequestException as exc: 47 self.logger.warning( 48 "Unable to fetch user profile", 49 exc=exc, 50 response=exc.response.text if exc.response is not None else str(exc), 51 ) 52 return None 53 return response.json() 54 55 def get_redirect_args(self) -> dict[str, str]: 56 """Get request parameters for redirect url.""" 57 raise NotImplementedError("Defined in a sub-class") # pragma: no cover 58 59 def get_redirect_url(self, parameters=None): 60 """Build authentication redirect url.""" 61 authorization_url = self.source.source_type.authorization_url or "" 62 if self.source.source_type.urls_customizable and self.source.authorization_url: 63 authorization_url = self.source.authorization_url 64 if authorization_url == "": 65 Event.new( 66 EventAction.CONFIGURATION_ERROR, 67 source=self.source, 68 message="Source has an empty authorization URL.", 69 ).save() 70 parsed_url = urlparse(authorization_url) 71 parsed_args = parse_qs(parsed_url.query) 72 args = self.get_redirect_args() 73 args.update(parameters or {}) 74 args.update(parsed_args) 75 # Special handling for scope, since it's set as array 76 # to make additional scopes easier 77 args["scope"] = " ".join(sorted(set(args["scope"]))) 78 params = urlencode(args, quote_via=quote, doseq=True) 79 self.logger.info("redirect args", **args) 80 return urlunparse(parsed_url._replace(query=params)) 81 82 def parse_raw_token(self, raw_token: str) -> dict[str, Any]: 83 """Parse token and secret from raw token response.""" 84 raise NotImplementedError("Defined in a sub-class") # pragma: no cover 85 86 def do_request(self, method: str, url: str, **kwargs) -> Response: 87 """Wrapper around self.session.request, which can add special headers""" 88 return self.session.request(method, url, **kwargs) 89 90 @property 91 def session_key(self) -> str: 92 """Return Session Key""" 93 raise NotImplementedError("Defined in a sub-class") # pragma: no cover
class
BaseOAuthClient:
18class BaseOAuthClient: 19 """Base OAuth Client""" 20 21 session: Session 22 23 source: OAuthSource 24 request: HttpRequest 25 26 callback: str | None 27 28 def __init__(self, source: OAuthSource, request: HttpRequest, callback: str | None = None): 29 self.source = source 30 self.session = get_http_session() 31 self.request = request 32 self.callback = callback 33 self.logger = get_logger().bind(source=source.slug) 34 35 def get_access_token(self, **request_kwargs) -> dict[str, Any] | None: 36 """Fetch access token from callback request.""" 37 raise NotImplementedError("Defined in a sub-class") # pragma: no cover 38 39 def get_profile_info(self, token: dict[str, str]) -> dict[str, Any] | None: 40 """Fetch user profile information.""" 41 profile_url = self.source.source_type.profile_url or "" 42 if self.source.source_type.urls_customizable and self.source.profile_url: 43 profile_url = self.source.profile_url 44 response = self.do_request("get", profile_url, token=token) 45 try: 46 response.raise_for_status() 47 except RequestException as exc: 48 self.logger.warning( 49 "Unable to fetch user profile", 50 exc=exc, 51 response=exc.response.text if exc.response is not None else str(exc), 52 ) 53 return None 54 return response.json() 55 56 def get_redirect_args(self) -> dict[str, str]: 57 """Get request parameters for redirect url.""" 58 raise NotImplementedError("Defined in a sub-class") # pragma: no cover 59 60 def get_redirect_url(self, parameters=None): 61 """Build authentication redirect url.""" 62 authorization_url = self.source.source_type.authorization_url or "" 63 if self.source.source_type.urls_customizable and self.source.authorization_url: 64 authorization_url = self.source.authorization_url 65 if authorization_url == "": 66 Event.new( 67 EventAction.CONFIGURATION_ERROR, 68 source=self.source, 69 message="Source has an empty authorization URL.", 70 ).save() 71 parsed_url = urlparse(authorization_url) 72 parsed_args = parse_qs(parsed_url.query) 73 args = self.get_redirect_args() 74 args.update(parameters or {}) 75 args.update(parsed_args) 76 # Special handling for scope, since it's set as array 77 # to make additional scopes easier 78 args["scope"] = " ".join(sorted(set(args["scope"]))) 79 params = urlencode(args, quote_via=quote, doseq=True) 80 self.logger.info("redirect args", **args) 81 return urlunparse(parsed_url._replace(query=params)) 82 83 def parse_raw_token(self, raw_token: str) -> dict[str, Any]: 84 """Parse token and secret from raw token response.""" 85 raise NotImplementedError("Defined in a sub-class") # pragma: no cover 86 87 def do_request(self, method: str, url: str, **kwargs) -> Response: 88 """Wrapper around self.session.request, which can add special headers""" 89 return self.session.request(method, url, **kwargs) 90 91 @property 92 def session_key(self) -> str: 93 """Return Session Key""" 94 raise NotImplementedError("Defined in a sub-class") # pragma: no cover
Base OAuth Client
BaseOAuthClient( source: authentik.sources.oauth.models.OAuthSource, request: django.http.request.HttpRequest, callback: str | None = None)
def
get_access_token(self, **request_kwargs) -> dict[str, Any] | None:
35 def get_access_token(self, **request_kwargs) -> dict[str, Any] | None: 36 """Fetch access token from callback request.""" 37 raise NotImplementedError("Defined in a sub-class") # pragma: no cover
Fetch access token from callback request.
def
get_profile_info(self, token: dict[str, str]) -> dict[str, Any] | None:
39 def get_profile_info(self, token: dict[str, str]) -> dict[str, Any] | None: 40 """Fetch user profile information.""" 41 profile_url = self.source.source_type.profile_url or "" 42 if self.source.source_type.urls_customizable and self.source.profile_url: 43 profile_url = self.source.profile_url 44 response = self.do_request("get", profile_url, token=token) 45 try: 46 response.raise_for_status() 47 except RequestException as exc: 48 self.logger.warning( 49 "Unable to fetch user profile", 50 exc=exc, 51 response=exc.response.text if exc.response is not None else str(exc), 52 ) 53 return None 54 return response.json()
Fetch user profile information.
def
get_redirect_args(self) -> dict[str, str]:
56 def get_redirect_args(self) -> dict[str, str]: 57 """Get request parameters for redirect url.""" 58 raise NotImplementedError("Defined in a sub-class") # pragma: no cover
Get request parameters for redirect url.
def
get_redirect_url(self, parameters=None):
60 def get_redirect_url(self, parameters=None): 61 """Build authentication redirect url.""" 62 authorization_url = self.source.source_type.authorization_url or "" 63 if self.source.source_type.urls_customizable and self.source.authorization_url: 64 authorization_url = self.source.authorization_url 65 if authorization_url == "": 66 Event.new( 67 EventAction.CONFIGURATION_ERROR, 68 source=self.source, 69 message="Source has an empty authorization URL.", 70 ).save() 71 parsed_url = urlparse(authorization_url) 72 parsed_args = parse_qs(parsed_url.query) 73 args = self.get_redirect_args() 74 args.update(parameters or {}) 75 args.update(parsed_args) 76 # Special handling for scope, since it's set as array 77 # to make additional scopes easier 78 args["scope"] = " ".join(sorted(set(args["scope"]))) 79 params = urlencode(args, quote_via=quote, doseq=True) 80 self.logger.info("redirect args", **args) 81 return urlunparse(parsed_url._replace(query=params))
Build authentication redirect url.
def
parse_raw_token(self, raw_token: str) -> dict[str, typing.Any]:
83 def parse_raw_token(self, raw_token: str) -> dict[str, Any]: 84 """Parse token and secret from raw token response.""" 85 raise NotImplementedError("Defined in a sub-class") # pragma: no cover
Parse token and secret from raw token response.
def
do_request(self, method: str, url: str, **kwargs) -> requests.models.Response:
87 def do_request(self, method: str, url: str, **kwargs) -> Response: 88 """Wrapper around self.session.request, which can add special headers""" 89 return self.session.request(method, url, **kwargs)
Wrapper around self.session.request, which can add special headers