authentik.sources.oauth.types.oidc
OpenID Connect OAuth Views
1"""OpenID Connect OAuth Views""" 2 3from typing import Any 4 5from jwt import PyJWKSet, PyJWTError, decode, get_unverified_header 6from requests.auth import AuthBase, HTTPBasicAuth 7 8from authentik.sources.oauth.clients.oauth2 import UserprofileHeaderAuthClient 9from authentik.sources.oauth.models import AuthorizationCodeAuthMethod, OAuthSource 10from authentik.sources.oauth.types.registry import SourceType, registry 11from authentik.sources.oauth.views.callback import OAuthCallback 12from authentik.sources.oauth.views.redirect import OAuthRedirect 13 14 15class OpenIDConnectOAuthRedirect(OAuthRedirect): 16 """OpenIDConnect OAuth2 Redirect""" 17 18 def get_additional_parameters(self, source: OAuthSource): # pragma: no cover 19 return { 20 "scope": ["openid", "email", "profile"], 21 } 22 23 24class OpenIDConnectClient(UserprofileHeaderAuthClient): 25 def get_access_token_args(self, callback: str, code: str) -> dict[str, Any]: 26 args = super().get_access_token_args(callback, code) 27 if self.source.authorization_code_auth_method == AuthorizationCodeAuthMethod.POST_BODY: 28 args["client_id"] = self.get_client_id() 29 args["client_secret"] = self.get_client_secret() 30 else: 31 args.pop("client_id", None) 32 args.pop("client_secret", None) 33 return args 34 35 def get_access_token_auth(self) -> AuthBase | None: 36 if self.source.authorization_code_auth_method == AuthorizationCodeAuthMethod.BASIC_AUTH: 37 return HTTPBasicAuth(self.get_client_id(), self.get_client_secret()) 38 return None 39 40 def get_profile_info(self, token: dict[str, str]) -> dict[str, Any] | None: 41 profile = super().get_profile_info(token) 42 if profile: 43 return profile 44 if "id_token" not in token: 45 self.logger.warning("no id_token given") 46 return None 47 id_token = token["id_token"] 48 try: 49 raw = get_unverified_header(id_token) 50 jwk = PyJWKSet.from_dict(self.source.oidc_jwks) 51 key = [key for key in jwk.keys if key.key_id == raw["kid"]][0] 52 return decode( 53 id_token, 54 key=key, 55 algorithms=[raw["alg"]], 56 audience=self.get_client_id(), 57 options={"verify_iss": False}, 58 ) 59 except (PyJWTError, IndexError, ValueError) as exc: 60 self.logger.warning("Failed to decode id_token", exc=exc) 61 return None 62 63 64class OpenIDConnectOAuth2Callback(OAuthCallback): 65 """OpenIDConnect OAuth2 Callback""" 66 67 client_class = OpenIDConnectClient 68 69 def get_user_id(self, info: dict[str, str]) -> str: 70 return info.get("sub", None) 71 72 73@registry.register() 74class OpenIDConnectType(SourceType): 75 """OpenIDConnect Type definition""" 76 77 callback_view = OpenIDConnectOAuth2Callback 78 redirect_view = OpenIDConnectOAuthRedirect 79 verbose_name = "OpenID Connect" 80 name = "openidconnect" 81 82 urls_customizable = True 83 84 def get_base_user_properties(self, info: dict[str, Any], **kwargs) -> dict[str, Any]: 85 return { 86 "username": info.get("nickname", info.get("preferred_username")), 87 "email": info.get("email"), 88 "name": info.get("name"), 89 "groups": info.get("groups", []), 90 }
16class OpenIDConnectOAuthRedirect(OAuthRedirect): 17 """OpenIDConnect OAuth2 Redirect""" 18 19 def get_additional_parameters(self, source: OAuthSource): # pragma: no cover 20 return { 21 "scope": ["openid", "email", "profile"], 22 }
OpenIDConnect OAuth2 Redirect
25class OpenIDConnectClient(UserprofileHeaderAuthClient): 26 def get_access_token_args(self, callback: str, code: str) -> dict[str, Any]: 27 args = super().get_access_token_args(callback, code) 28 if self.source.authorization_code_auth_method == AuthorizationCodeAuthMethod.POST_BODY: 29 args["client_id"] = self.get_client_id() 30 args["client_secret"] = self.get_client_secret() 31 else: 32 args.pop("client_id", None) 33 args.pop("client_secret", None) 34 return args 35 36 def get_access_token_auth(self) -> AuthBase | None: 37 if self.source.authorization_code_auth_method == AuthorizationCodeAuthMethod.BASIC_AUTH: 38 return HTTPBasicAuth(self.get_client_id(), self.get_client_secret()) 39 return None 40 41 def get_profile_info(self, token: dict[str, str]) -> dict[str, Any] | None: 42 profile = super().get_profile_info(token) 43 if profile: 44 return profile 45 if "id_token" not in token: 46 self.logger.warning("no id_token given") 47 return None 48 id_token = token["id_token"] 49 try: 50 raw = get_unverified_header(id_token) 51 jwk = PyJWKSet.from_dict(self.source.oidc_jwks) 52 key = [key for key in jwk.keys if key.key_id == raw["kid"]][0] 53 return decode( 54 id_token, 55 key=key, 56 algorithms=[raw["alg"]], 57 audience=self.get_client_id(), 58 options={"verify_iss": False}, 59 ) 60 except (PyJWTError, IndexError, ValueError) as exc: 61 self.logger.warning("Failed to decode id_token", exc=exc) 62 return None
OAuth client which only sends authentication via header, not querystring
def
get_access_token_args(self, callback: str, code: str) -> dict[str, typing.Any]:
26 def get_access_token_args(self, callback: str, code: str) -> dict[str, Any]: 27 args = super().get_access_token_args(callback, code) 28 if self.source.authorization_code_auth_method == AuthorizationCodeAuthMethod.POST_BODY: 29 args["client_id"] = self.get_client_id() 30 args["client_secret"] = self.get_client_secret() 31 else: 32 args.pop("client_id", None) 33 args.pop("client_secret", None) 34 return args
def
get_profile_info(self, token: dict[str, str]) -> dict[str, Any] | None:
41 def get_profile_info(self, token: dict[str, str]) -> dict[str, Any] | None: 42 profile = super().get_profile_info(token) 43 if profile: 44 return profile 45 if "id_token" not in token: 46 self.logger.warning("no id_token given") 47 return None 48 id_token = token["id_token"] 49 try: 50 raw = get_unverified_header(id_token) 51 jwk = PyJWKSet.from_dict(self.source.oidc_jwks) 52 key = [key for key in jwk.keys if key.key_id == raw["kid"]][0] 53 return decode( 54 id_token, 55 key=key, 56 algorithms=[raw["alg"]], 57 audience=self.get_client_id(), 58 options={"verify_iss": False}, 59 ) 60 except (PyJWTError, IndexError, ValueError) as exc: 61 self.logger.warning("Failed to decode id_token", exc=exc) 62 return None
Fetch user profile information.
Inherited Members
65class OpenIDConnectOAuth2Callback(OAuthCallback): 66 """OpenIDConnect OAuth2 Callback""" 67 68 client_class = OpenIDConnectClient 69 70 def get_user_id(self, info: dict[str, str]) -> str: 71 return info.get("sub", None)
OpenIDConnect OAuth2 Callback
client_class =
<class 'OpenIDConnectClient'>
74@registry.register() 75class OpenIDConnectType(SourceType): 76 """OpenIDConnect Type definition""" 77 78 callback_view = OpenIDConnectOAuth2Callback 79 redirect_view = OpenIDConnectOAuthRedirect 80 verbose_name = "OpenID Connect" 81 name = "openidconnect" 82 83 urls_customizable = True 84 85 def get_base_user_properties(self, info: dict[str, Any], **kwargs) -> dict[str, Any]: 86 return { 87 "username": info.get("nickname", info.get("preferred_username")), 88 "email": info.get("email"), 89 "name": info.get("name"), 90 "groups": info.get("groups", []), 91 }
OpenIDConnect Type definition
callback_view =
<class 'OpenIDConnectOAuth2Callback'>
redirect_view =
<class 'OpenIDConnectOAuthRedirect'>
def
get_base_user_properties(self, info: dict[str, typing.Any], **kwargs) -> dict[str, typing.Any]:
85 def get_base_user_properties(self, info: dict[str, Any], **kwargs) -> dict[str, Any]: 86 return { 87 "username": info.get("nickname", info.get("preferred_username")), 88 "email": info.get("email"), 89 "name": info.get("name"), 90 "groups": info.get("groups", []), 91 }
Get base user properties for enrollment/update