authentik.sources.oauth.types.oidc

OpenID Connect OAuth Views

 1"""OpenID Connect OAuth Views"""
 2
 3from typing import Any
 4
 5from jwt import PyJWKSet, PyJWTError, decode, get_unverified_header
 6from requests.auth import AuthBase, HTTPBasicAuth
 7
 8from authentik.sources.oauth.clients.oauth2 import UserprofileHeaderAuthClient
 9from authentik.sources.oauth.models import AuthorizationCodeAuthMethod, OAuthSource
10from authentik.sources.oauth.types.registry import SourceType, registry
11from authentik.sources.oauth.views.callback import OAuthCallback
12from authentik.sources.oauth.views.redirect import OAuthRedirect
13
14
15class OpenIDConnectOAuthRedirect(OAuthRedirect):
16    """OpenIDConnect OAuth2 Redirect"""
17
18    def get_additional_parameters(self, source: OAuthSource):  # pragma: no cover
19        return {
20            "scope": ["openid", "email", "profile"],
21        }
22
23
24class OpenIDConnectClient(UserprofileHeaderAuthClient):
25    def get_access_token_args(self, callback: str, code: str) -> dict[str, Any]:
26        args = super().get_access_token_args(callback, code)
27        if self.source.authorization_code_auth_method == AuthorizationCodeAuthMethod.POST_BODY:
28            args["client_id"] = self.get_client_id()
29            args["client_secret"] = self.get_client_secret()
30        else:
31            args.pop("client_id", None)
32            args.pop("client_secret", None)
33        return args
34
35    def get_access_token_auth(self) -> AuthBase | None:
36        if self.source.authorization_code_auth_method == AuthorizationCodeAuthMethod.BASIC_AUTH:
37            return HTTPBasicAuth(self.get_client_id(), self.get_client_secret())
38        return None
39
40    def get_profile_info(self, token: dict[str, str]) -> dict[str, Any] | None:
41        profile = super().get_profile_info(token)
42        if profile:
43            return profile
44        if "id_token" not in token:
45            self.logger.warning("no id_token given")
46            return None
47        id_token = token["id_token"]
48        try:
49            raw = get_unverified_header(id_token)
50            jwk = PyJWKSet.from_dict(self.source.oidc_jwks)
51            key = [key for key in jwk.keys if key.key_id == raw["kid"]][0]
52            return decode(
53                id_token,
54                key=key,
55                algorithms=[raw["alg"]],
56                audience=self.get_client_id(),
57                options={"verify_iss": False},
58            )
59        except (PyJWTError, IndexError, ValueError) as exc:
60            self.logger.warning("Failed to decode id_token", exc=exc)
61            return None
62
63
64class OpenIDConnectOAuth2Callback(OAuthCallback):
65    """OpenIDConnect OAuth2 Callback"""
66
67    client_class = OpenIDConnectClient
68
69    def get_user_id(self, info: dict[str, str]) -> str:
70        return info.get("sub", None)
71
72
73@registry.register()
74class OpenIDConnectType(SourceType):
75    """OpenIDConnect Type definition"""
76
77    callback_view = OpenIDConnectOAuth2Callback
78    redirect_view = OpenIDConnectOAuthRedirect
79    verbose_name = "OpenID Connect"
80    name = "openidconnect"
81
82    urls_customizable = True
83
84    def get_base_user_properties(self, info: dict[str, Any], **kwargs) -> dict[str, Any]:
85        return {
86            "username": info.get("nickname", info.get("preferred_username")),
87            "email": info.get("email"),
88            "name": info.get("name"),
89            "groups": info.get("groups", []),
90        }
class OpenIDConnectOAuthRedirect(authentik.sources.oauth.views.redirect.OAuthRedirect):
16class OpenIDConnectOAuthRedirect(OAuthRedirect):
17    """OpenIDConnect OAuth2 Redirect"""
18
19    def get_additional_parameters(self, source: OAuthSource):  # pragma: no cover
20        return {
21            "scope": ["openid", "email", "profile"],
22        }

OpenIDConnect OAuth2 Redirect

def get_additional_parameters(self, source: authentik.sources.oauth.models.OAuthSource):
19    def get_additional_parameters(self, source: OAuthSource):  # pragma: no cover
20        return {
21            "scope": ["openid", "email", "profile"],
22        }

Return additional redirect parameters for this source.

25class OpenIDConnectClient(UserprofileHeaderAuthClient):
26    def get_access_token_args(self, callback: str, code: str) -> dict[str, Any]:
27        args = super().get_access_token_args(callback, code)
28        if self.source.authorization_code_auth_method == AuthorizationCodeAuthMethod.POST_BODY:
29            args["client_id"] = self.get_client_id()
30            args["client_secret"] = self.get_client_secret()
31        else:
32            args.pop("client_id", None)
33            args.pop("client_secret", None)
34        return args
35
36    def get_access_token_auth(self) -> AuthBase | None:
37        if self.source.authorization_code_auth_method == AuthorizationCodeAuthMethod.BASIC_AUTH:
38            return HTTPBasicAuth(self.get_client_id(), self.get_client_secret())
39        return None
40
41    def get_profile_info(self, token: dict[str, str]) -> dict[str, Any] | None:
42        profile = super().get_profile_info(token)
43        if profile:
44            return profile
45        if "id_token" not in token:
46            self.logger.warning("no id_token given")
47            return None
48        id_token = token["id_token"]
49        try:
50            raw = get_unverified_header(id_token)
51            jwk = PyJWKSet.from_dict(self.source.oidc_jwks)
52            key = [key for key in jwk.keys if key.key_id == raw["kid"]][0]
53            return decode(
54                id_token,
55                key=key,
56                algorithms=[raw["alg"]],
57                audience=self.get_client_id(),
58                options={"verify_iss": False},
59            )
60        except (PyJWTError, IndexError, ValueError) as exc:
61            self.logger.warning("Failed to decode id_token", exc=exc)
62            return None

OAuth client which only sends authentication via header, not querystring

def get_access_token_args(self, callback: str, code: str) -> dict[str, typing.Any]:
26    def get_access_token_args(self, callback: str, code: str) -> dict[str, Any]:
27        args = super().get_access_token_args(callback, code)
28        if self.source.authorization_code_auth_method == AuthorizationCodeAuthMethod.POST_BODY:
29            args["client_id"] = self.get_client_id()
30            args["client_secret"] = self.get_client_secret()
31        else:
32            args.pop("client_id", None)
33            args.pop("client_secret", None)
34        return args
def get_access_token_auth(self) -> requests.auth.AuthBase | None:
36    def get_access_token_auth(self) -> AuthBase | None:
37        if self.source.authorization_code_auth_method == AuthorizationCodeAuthMethod.BASIC_AUTH:
38            return HTTPBasicAuth(self.get_client_id(), self.get_client_secret())
39        return None
def get_profile_info(self, token: dict[str, str]) -> dict[str, Any] | None:
41    def get_profile_info(self, token: dict[str, str]) -> dict[str, Any] | None:
42        profile = super().get_profile_info(token)
43        if profile:
44            return profile
45        if "id_token" not in token:
46            self.logger.warning("no id_token given")
47            return None
48        id_token = token["id_token"]
49        try:
50            raw = get_unverified_header(id_token)
51            jwk = PyJWKSet.from_dict(self.source.oidc_jwks)
52            key = [key for key in jwk.keys if key.key_id == raw["kid"]][0]
53            return decode(
54                id_token,
55                key=key,
56                algorithms=[raw["alg"]],
57                audience=self.get_client_id(),
58                options={"verify_iss": False},
59            )
60        except (PyJWTError, IndexError, ValueError) as exc:
61            self.logger.warning("Failed to decode id_token", exc=exc)
62            return None

Fetch user profile information.

class OpenIDConnectOAuth2Callback(authentik.sources.oauth.views.callback.OAuthCallback):
65class OpenIDConnectOAuth2Callback(OAuthCallback):
66    """OpenIDConnect OAuth2 Callback"""
67
68    client_class = OpenIDConnectClient
69
70    def get_user_id(self, info: dict[str, str]) -> str:
71        return info.get("sub", None)

OpenIDConnect OAuth2 Callback

client_class = <class 'OpenIDConnectClient'>
def get_user_id(self, info: dict[str, str]) -> str:
70    def get_user_id(self, info: dict[str, str]) -> str:
71        return info.get("sub", None)

Return unique identifier from the profile info.

@registry.register()
class OpenIDConnectType(authentik.sources.oauth.types.registry.SourceType):
74@registry.register()
75class OpenIDConnectType(SourceType):
76    """OpenIDConnect Type definition"""
77
78    callback_view = OpenIDConnectOAuth2Callback
79    redirect_view = OpenIDConnectOAuthRedirect
80    verbose_name = "OpenID Connect"
81    name = "openidconnect"
82
83    urls_customizable = True
84
85    def get_base_user_properties(self, info: dict[str, Any], **kwargs) -> dict[str, Any]:
86        return {
87            "username": info.get("nickname", info.get("preferred_username")),
88            "email": info.get("email"),
89            "name": info.get("name"),
90            "groups": info.get("groups", []),
91        }

OpenIDConnect Type definition

callback_view = <class 'OpenIDConnectOAuth2Callback'>
redirect_view = <class 'OpenIDConnectOAuthRedirect'>
verbose_name = 'OpenID Connect'
name = 'openidconnect'
urls_customizable = True
def get_base_user_properties(self, info: dict[str, typing.Any], **kwargs) -> dict[str, typing.Any]:
85    def get_base_user_properties(self, info: dict[str, Any], **kwargs) -> dict[str, Any]:
86        return {
87            "username": info.get("nickname", info.get("preferred_username")),
88            "email": info.get("email"),
89            "name": info.get("name"),
90            "groups": info.get("groups", []),
91        }

Get base user properties for enrollment/update