authentik.sources.oauth.clients.base

OAuth Clients

 1"""OAuth Clients"""
 2
 3from typing import Any
 4from urllib.parse import parse_qs, quote, urlencode, urlparse, urlunparse
 5
 6from django.http import HttpRequest
 7from requests import Session
 8from requests.exceptions import RequestException
 9from requests.models import Response
10from structlog.stdlib import get_logger
11
12from authentik.events.models import Event, EventAction
13from authentik.lib.utils.http import get_http_session
14from authentik.sources.oauth.models import OAuthSource
15
16
17class BaseOAuthClient:
18    """Base OAuth Client"""
19
20    session: Session
21
22    source: OAuthSource
23    request: HttpRequest
24
25    callback: str | None
26
27    def __init__(self, source: OAuthSource, request: HttpRequest, callback: str | None = None):
28        self.source = source
29        self.session = get_http_session()
30        self.request = request
31        self.callback = callback
32        self.logger = get_logger().bind(source=source.slug)
33
34    def get_access_token(self, **request_kwargs) -> dict[str, Any] | None:
35        """Fetch access token from callback request."""
36        raise NotImplementedError("Defined in a sub-class")  # pragma: no cover
37
38    def get_profile_info(self, token: dict[str, str]) -> dict[str, Any] | None:
39        """Fetch user profile information."""
40        profile_url = self.source.source_type.profile_url or ""
41        if self.source.source_type.urls_customizable and self.source.profile_url:
42            profile_url = self.source.profile_url
43        response = self.do_request("get", profile_url, token=token)
44        try:
45            response.raise_for_status()
46        except RequestException as exc:
47            self.logger.warning(
48                "Unable to fetch user profile",
49                exc=exc,
50                response=exc.response.text if exc.response is not None else str(exc),
51            )
52            return None
53        return response.json()
54
55    def get_redirect_args(self) -> dict[str, str]:
56        """Get request parameters for redirect url."""
57        raise NotImplementedError("Defined in a sub-class")  # pragma: no cover
58
59    def get_redirect_url(self, parameters=None):
60        """Build authentication redirect url."""
61        authorization_url = self.source.source_type.authorization_url or ""
62        if self.source.source_type.urls_customizable and self.source.authorization_url:
63            authorization_url = self.source.authorization_url
64        if authorization_url == "":
65            Event.new(
66                EventAction.CONFIGURATION_ERROR,
67                source=self.source,
68                message="Source has an empty authorization URL.",
69            ).save()
70        parsed_url = urlparse(authorization_url)
71        parsed_args = parse_qs(parsed_url.query)
72        args = self.get_redirect_args()
73        args.update(parameters or {})
74        args.update(parsed_args)
75        # Special handling for scope, since it's set as array
76        # to make additional scopes easier
77        args["scope"] = " ".join(sorted(set(args["scope"])))
78        params = urlencode(args, quote_via=quote, doseq=True)
79        self.logger.info("redirect args", **args)
80        return urlunparse(parsed_url._replace(query=params))
81
82    def parse_raw_token(self, raw_token: str) -> dict[str, Any]:
83        """Parse token and secret from raw token response."""
84        raise NotImplementedError("Defined in a sub-class")  # pragma: no cover
85
86    def do_request(self, method: str, url: str, **kwargs) -> Response:
87        """Wrapper around self.session.request, which can add special headers"""
88        return self.session.request(method, url, **kwargs)
89
90    @property
91    def session_key(self) -> str:
92        """Return Session Key"""
93        raise NotImplementedError("Defined in a sub-class")  # pragma: no cover
class BaseOAuthClient:
18class BaseOAuthClient:
19    """Base OAuth Client"""
20
21    session: Session
22
23    source: OAuthSource
24    request: HttpRequest
25
26    callback: str | None
27
28    def __init__(self, source: OAuthSource, request: HttpRequest, callback: str | None = None):
29        self.source = source
30        self.session = get_http_session()
31        self.request = request
32        self.callback = callback
33        self.logger = get_logger().bind(source=source.slug)
34
35    def get_access_token(self, **request_kwargs) -> dict[str, Any] | None:
36        """Fetch access token from callback request."""
37        raise NotImplementedError("Defined in a sub-class")  # pragma: no cover
38
39    def get_profile_info(self, token: dict[str, str]) -> dict[str, Any] | None:
40        """Fetch user profile information."""
41        profile_url = self.source.source_type.profile_url or ""
42        if self.source.source_type.urls_customizable and self.source.profile_url:
43            profile_url = self.source.profile_url
44        response = self.do_request("get", profile_url, token=token)
45        try:
46            response.raise_for_status()
47        except RequestException as exc:
48            self.logger.warning(
49                "Unable to fetch user profile",
50                exc=exc,
51                response=exc.response.text if exc.response is not None else str(exc),
52            )
53            return None
54        return response.json()
55
56    def get_redirect_args(self) -> dict[str, str]:
57        """Get request parameters for redirect url."""
58        raise NotImplementedError("Defined in a sub-class")  # pragma: no cover
59
60    def get_redirect_url(self, parameters=None):
61        """Build authentication redirect url."""
62        authorization_url = self.source.source_type.authorization_url or ""
63        if self.source.source_type.urls_customizable and self.source.authorization_url:
64            authorization_url = self.source.authorization_url
65        if authorization_url == "":
66            Event.new(
67                EventAction.CONFIGURATION_ERROR,
68                source=self.source,
69                message="Source has an empty authorization URL.",
70            ).save()
71        parsed_url = urlparse(authorization_url)
72        parsed_args = parse_qs(parsed_url.query)
73        args = self.get_redirect_args()
74        args.update(parameters or {})
75        args.update(parsed_args)
76        # Special handling for scope, since it's set as array
77        # to make additional scopes easier
78        args["scope"] = " ".join(sorted(set(args["scope"])))
79        params = urlencode(args, quote_via=quote, doseq=True)
80        self.logger.info("redirect args", **args)
81        return urlunparse(parsed_url._replace(query=params))
82
83    def parse_raw_token(self, raw_token: str) -> dict[str, Any]:
84        """Parse token and secret from raw token response."""
85        raise NotImplementedError("Defined in a sub-class")  # pragma: no cover
86
87    def do_request(self, method: str, url: str, **kwargs) -> Response:
88        """Wrapper around self.session.request, which can add special headers"""
89        return self.session.request(method, url, **kwargs)
90
91    @property
92    def session_key(self) -> str:
93        """Return Session Key"""
94        raise NotImplementedError("Defined in a sub-class")  # pragma: no cover

Base OAuth Client

BaseOAuthClient( source: authentik.sources.oauth.models.OAuthSource, request: django.http.request.HttpRequest, callback: str | None = None)
28    def __init__(self, source: OAuthSource, request: HttpRequest, callback: str | None = None):
29        self.source = source
30        self.session = get_http_session()
31        self.request = request
32        self.callback = callback
33        self.logger = get_logger().bind(source=source.slug)
session: requests.sessions.Session
request: django.http.request.HttpRequest
callback: str | None
logger
def get_access_token(self, **request_kwargs) -> dict[str, Any] | None:
35    def get_access_token(self, **request_kwargs) -> dict[str, Any] | None:
36        """Fetch access token from callback request."""
37        raise NotImplementedError("Defined in a sub-class")  # pragma: no cover

Fetch access token from callback request.

def get_profile_info(self, token: dict[str, str]) -> dict[str, Any] | None:
39    def get_profile_info(self, token: dict[str, str]) -> dict[str, Any] | None:
40        """Fetch user profile information."""
41        profile_url = self.source.source_type.profile_url or ""
42        if self.source.source_type.urls_customizable and self.source.profile_url:
43            profile_url = self.source.profile_url
44        response = self.do_request("get", profile_url, token=token)
45        try:
46            response.raise_for_status()
47        except RequestException as exc:
48            self.logger.warning(
49                "Unable to fetch user profile",
50                exc=exc,
51                response=exc.response.text if exc.response is not None else str(exc),
52            )
53            return None
54        return response.json()

Fetch user profile information.

def get_redirect_args(self) -> dict[str, str]:
56    def get_redirect_args(self) -> dict[str, str]:
57        """Get request parameters for redirect url."""
58        raise NotImplementedError("Defined in a sub-class")  # pragma: no cover

Get request parameters for redirect url.

def get_redirect_url(self, parameters=None):
60    def get_redirect_url(self, parameters=None):
61        """Build authentication redirect url."""
62        authorization_url = self.source.source_type.authorization_url or ""
63        if self.source.source_type.urls_customizable and self.source.authorization_url:
64            authorization_url = self.source.authorization_url
65        if authorization_url == "":
66            Event.new(
67                EventAction.CONFIGURATION_ERROR,
68                source=self.source,
69                message="Source has an empty authorization URL.",
70            ).save()
71        parsed_url = urlparse(authorization_url)
72        parsed_args = parse_qs(parsed_url.query)
73        args = self.get_redirect_args()
74        args.update(parameters or {})
75        args.update(parsed_args)
76        # Special handling for scope, since it's set as array
77        # to make additional scopes easier
78        args["scope"] = " ".join(sorted(set(args["scope"])))
79        params = urlencode(args, quote_via=quote, doseq=True)
80        self.logger.info("redirect args", **args)
81        return urlunparse(parsed_url._replace(query=params))

Build authentication redirect url.

def parse_raw_token(self, raw_token: str) -> dict[str, typing.Any]:
83    def parse_raw_token(self, raw_token: str) -> dict[str, Any]:
84        """Parse token and secret from raw token response."""
85        raise NotImplementedError("Defined in a sub-class")  # pragma: no cover

Parse token and secret from raw token response.

def do_request(self, method: str, url: str, **kwargs) -> requests.models.Response:
87    def do_request(self, method: str, url: str, **kwargs) -> Response:
88        """Wrapper around self.session.request, which can add special headers"""
89        return self.session.request(method, url, **kwargs)

Wrapper around self.session.request, which can add special headers

session_key: str
91    @property
92    def session_key(self) -> str:
93        """Return Session Key"""
94        raise NotImplementedError("Defined in a sub-class")  # pragma: no cover

Return Session Key