authentik.sources.oauth.types.apple
Apple OAuth Views
1"""Apple OAuth Views""" 2 3from time import time 4from typing import Any 5 6from django.http.request import HttpRequest 7from django.urls.base import reverse 8from jwt import decode, encode 9from rest_framework.fields import CharField 10from structlog.stdlib import get_logger 11 12from authentik.flows.challenge import Challenge, ChallengeResponse 13from authentik.sources.oauth.clients.oauth2 import OAuth2Client 14from authentik.sources.oauth.models import AuthorizationCodeAuthMethod, OAuthSource 15from authentik.sources.oauth.types.registry import SourceType, registry 16from authentik.sources.oauth.views.callback import OAuthCallback 17from authentik.sources.oauth.views.redirect import OAuthRedirect 18from authentik.stages.identification.stage import LoginChallengeMixin 19 20LOGGER = get_logger() 21APPLE_CLIENT_ID_PARTS = 3 22 23 24class AppleLoginChallenge(LoginChallengeMixin, Challenge): 25 """Special challenge for apple-native authentication flow, which happens on the client.""" 26 27 client_id = CharField() 28 component = CharField(default="ak-source-oauth-apple") 29 scope = CharField() 30 redirect_uri = CharField() 31 state = CharField() 32 33 34class AppleChallengeResponse(ChallengeResponse): 35 """Pseudo class for apple response""" 36 37 component = CharField(default="ak-source-oauth-apple") 38 39 40class AppleOAuthClient(OAuth2Client): 41 """Apple OAuth2 client""" 42 43 def get_client_id(self) -> str: 44 parts: list[str] = self.source.consumer_key.split(";") 45 if len(parts) < APPLE_CLIENT_ID_PARTS: 46 return self.source.consumer_key 47 return parts[0].strip() 48 49 def get_client_secret(self) -> str: 50 now = time() 51 parts: list[str] = self.source.consumer_key.split(";") 52 if len(parts) < APPLE_CLIENT_ID_PARTS: 53 raise ValueError( 54 "Apple Source client_id should be formatted like " 55 "services_id_identifier;apple_team_id;key_id" 56 ) 57 LOGGER.debug("got values from client_id", team=parts[1], kid=parts[2]) 58 payload = { 59 "iss": parts[1].strip(), 60 "iat": now, 61 "exp": now + 86400 * 180, 62 "aud": "https://appleid.apple.com", 63 "sub": parts[0].strip(), 64 } 65 jwt = encode(payload, self.source.consumer_secret, "ES256", {"kid": parts[2].strip()}) 66 LOGGER.debug("signing payload as secret key", payload=payload, jwt=jwt) 67 return jwt 68 69 def get_profile_info(self, token: dict[str, str]) -> dict[str, Any] | None: 70 id_token = token.get("id_token") 71 return decode(id_token, options={"verify_signature": False}) 72 73 74class AppleOAuthRedirect(OAuthRedirect): 75 """Apple OAuth2 Redirect""" 76 77 client_class = AppleOAuthClient 78 79 def get_additional_parameters(self, source: OAuthSource): # pragma: no cover 80 return { 81 "scope": ["name", "email"], 82 "response_mode": "form_post", 83 } 84 85 86class AppleOAuth2Callback(OAuthCallback): 87 """Apple OAuth2 Callback""" 88 89 client_class = AppleOAuthClient 90 91 def get_user_id(self, info: dict[str, Any]) -> str | None: 92 return info["sub"] 93 94 95@registry.register() 96class AppleType(SourceType): 97 """Apple Type definition""" 98 99 callback_view = AppleOAuth2Callback 100 redirect_view = AppleOAuthRedirect 101 verbose_name = "Apple" 102 name = "apple" 103 104 authorization_url = "https://appleid.apple.com/auth/authorize" 105 access_token_url = "https://appleid.apple.com/auth/token" # nosec 106 profile_url = "" 107 108 authorization_code_auth_method = AuthorizationCodeAuthMethod.POST_BODY 109 110 def login_challenge(self, source: OAuthSource, request: HttpRequest) -> Challenge: 111 """Pre-general all the things required for the JS SDK""" 112 apple_client = AppleOAuthClient( 113 source, 114 request, 115 callback=reverse( 116 "authentik_sources_oauth:oauth-client-callback", 117 kwargs={"source_slug": source.slug}, 118 ), 119 ) 120 args = apple_client.get_redirect_args() 121 return AppleLoginChallenge( 122 data={ 123 "client_id": apple_client.get_client_id(), 124 "scope": "name email", 125 "redirect_uri": args["redirect_uri"], 126 "state": args["state"], 127 } 128 ) 129 130 def get_base_user_properties(self, info: dict[str, Any], **kwargs) -> dict[str, Any]: 131 return { 132 "email": info.get("email"), 133 "name": info.get("name"), 134 }
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
APPLE_CLIENT_ID_PARTS =
3
class
AppleLoginChallenge(authentik.stages.identification.stage.LoginChallengeMixin, authentik.flows.challenge.Challenge):
25class AppleLoginChallenge(LoginChallengeMixin, Challenge): 26 """Special challenge for apple-native authentication flow, which happens on the client.""" 27 28 client_id = CharField() 29 component = CharField(default="ak-source-oauth-apple") 30 scope = CharField() 31 redirect_uri = CharField() 32 state = CharField()
Special challenge for apple-native authentication flow, which happens on the client.
35class AppleChallengeResponse(ChallengeResponse): 36 """Pseudo class for apple response""" 37 38 component = CharField(default="ak-source-oauth-apple")
Pseudo class for apple response
41class AppleOAuthClient(OAuth2Client): 42 """Apple OAuth2 client""" 43 44 def get_client_id(self) -> str: 45 parts: list[str] = self.source.consumer_key.split(";") 46 if len(parts) < APPLE_CLIENT_ID_PARTS: 47 return self.source.consumer_key 48 return parts[0].strip() 49 50 def get_client_secret(self) -> str: 51 now = time() 52 parts: list[str] = self.source.consumer_key.split(";") 53 if len(parts) < APPLE_CLIENT_ID_PARTS: 54 raise ValueError( 55 "Apple Source client_id should be formatted like " 56 "services_id_identifier;apple_team_id;key_id" 57 ) 58 LOGGER.debug("got values from client_id", team=parts[1], kid=parts[2]) 59 payload = { 60 "iss": parts[1].strip(), 61 "iat": now, 62 "exp": now + 86400 * 180, 63 "aud": "https://appleid.apple.com", 64 "sub": parts[0].strip(), 65 } 66 jwt = encode(payload, self.source.consumer_secret, "ES256", {"kid": parts[2].strip()}) 67 LOGGER.debug("signing payload as secret key", payload=payload, jwt=jwt) 68 return jwt 69 70 def get_profile_info(self, token: dict[str, str]) -> dict[str, Any] | None: 71 id_token = token.get("id_token") 72 return decode(id_token, options={"verify_signature": False})
Apple OAuth2 client
def
get_client_id(self) -> str:
44 def get_client_id(self) -> str: 45 parts: list[str] = self.source.consumer_key.split(";") 46 if len(parts) < APPLE_CLIENT_ID_PARTS: 47 return self.source.consumer_key 48 return parts[0].strip()
Get client id
def
get_client_secret(self) -> str:
50 def get_client_secret(self) -> str: 51 now = time() 52 parts: list[str] = self.source.consumer_key.split(";") 53 if len(parts) < APPLE_CLIENT_ID_PARTS: 54 raise ValueError( 55 "Apple Source client_id should be formatted like " 56 "services_id_identifier;apple_team_id;key_id" 57 ) 58 LOGGER.debug("got values from client_id", team=parts[1], kid=parts[2]) 59 payload = { 60 "iss": parts[1].strip(), 61 "iat": now, 62 "exp": now + 86400 * 180, 63 "aud": "https://appleid.apple.com", 64 "sub": parts[0].strip(), 65 } 66 jwt = encode(payload, self.source.consumer_secret, "ES256", {"kid": parts[2].strip()}) 67 LOGGER.debug("signing payload as secret key", payload=payload, jwt=jwt) 68 return jwt
Get client secret
def
get_profile_info(self, token: dict[str, str]) -> dict[str, Any] | None:
70 def get_profile_info(self, token: dict[str, str]) -> dict[str, Any] | None: 71 id_token = token.get("id_token") 72 return decode(id_token, options={"verify_signature": False})
Fetch user profile information.
Inherited Members
75class AppleOAuthRedirect(OAuthRedirect): 76 """Apple OAuth2 Redirect""" 77 78 client_class = AppleOAuthClient 79 80 def get_additional_parameters(self, source: OAuthSource): # pragma: no cover 81 return { 82 "scope": ["name", "email"], 83 "response_mode": "form_post", 84 }
Apple OAuth2 Redirect
client_class =
<class 'AppleOAuthClient'>
87class AppleOAuth2Callback(OAuthCallback): 88 """Apple OAuth2 Callback""" 89 90 client_class = AppleOAuthClient 91 92 def get_user_id(self, info: dict[str, Any]) -> str | None: 93 return info["sub"]
Apple OAuth2 Callback
client_class =
<class 'AppleOAuthClient'>
96@registry.register() 97class AppleType(SourceType): 98 """Apple Type definition""" 99 100 callback_view = AppleOAuth2Callback 101 redirect_view = AppleOAuthRedirect 102 verbose_name = "Apple" 103 name = "apple" 104 105 authorization_url = "https://appleid.apple.com/auth/authorize" 106 access_token_url = "https://appleid.apple.com/auth/token" # nosec 107 profile_url = "" 108 109 authorization_code_auth_method = AuthorizationCodeAuthMethod.POST_BODY 110 111 def login_challenge(self, source: OAuthSource, request: HttpRequest) -> Challenge: 112 """Pre-general all the things required for the JS SDK""" 113 apple_client = AppleOAuthClient( 114 source, 115 request, 116 callback=reverse( 117 "authentik_sources_oauth:oauth-client-callback", 118 kwargs={"source_slug": source.slug}, 119 ), 120 ) 121 args = apple_client.get_redirect_args() 122 return AppleLoginChallenge( 123 data={ 124 "client_id": apple_client.get_client_id(), 125 "scope": "name email", 126 "redirect_uri": args["redirect_uri"], 127 "state": args["state"], 128 } 129 ) 130 131 def get_base_user_properties(self, info: dict[str, Any], **kwargs) -> dict[str, Any]: 132 return { 133 "email": info.get("email"), 134 "name": info.get("name"), 135 }
Apple Type definition
callback_view =
<class 'AppleOAuth2Callback'>
redirect_view =
<class 'AppleOAuthRedirect'>
access_token_url =
'https://appleidauthentik.sources.oauth.types.apple.com/auth/token'
def
login_challenge( self, source: authentik.sources.oauth.models.OAuthSource, request: django.http.request.HttpRequest) -> authentik.flows.challenge.Challenge:
111 def login_challenge(self, source: OAuthSource, request: HttpRequest) -> Challenge: 112 """Pre-general all the things required for the JS SDK""" 113 apple_client = AppleOAuthClient( 114 source, 115 request, 116 callback=reverse( 117 "authentik_sources_oauth:oauth-client-callback", 118 kwargs={"source_slug": source.slug}, 119 ), 120 ) 121 args = apple_client.get_redirect_args() 122 return AppleLoginChallenge( 123 data={ 124 "client_id": apple_client.get_client_id(), 125 "scope": "name email", 126 "redirect_uri": args["redirect_uri"], 127 "state": args["state"], 128 } 129 )
Pre-general all the things required for the JS SDK
def
get_base_user_properties(self, info: dict[str, typing.Any], **kwargs) -> dict[str, typing.Any]:
131 def get_base_user_properties(self, info: dict[str, Any], **kwargs) -> dict[str, Any]: 132 return { 133 "email": info.get("email"), 134 "name": info.get("name"), 135 }
Get base user properties for enrollment/update