authentik.sources.oauth.types.entra_id
EntraID OAuth2 Views
1"""EntraID OAuth2 Views""" 2 3from typing import Any 4 5from requests import RequestException 6from structlog.stdlib import get_logger 7 8from authentik.sources.oauth.clients.oauth2 import UserprofileHeaderAuthClient 9from authentik.sources.oauth.models import AuthorizationCodeAuthMethod 10from authentik.sources.oauth.types.oidc import OpenIDConnectOAuth2Callback 11from authentik.sources.oauth.types.registry import SourceType, registry 12from authentik.sources.oauth.views.redirect import OAuthRedirect 13 14LOGGER = get_logger() 15 16 17class EntraIDOAuthRedirect(OAuthRedirect): 18 """Entra ID OAuth2 Redirect""" 19 20 def get_additional_parameters(self, source): # pragma: no cover 21 return { 22 "scope": ["openid", "https://graph.microsoft.com/User.Read"], 23 } 24 25 26class EntraIDClient(UserprofileHeaderAuthClient): 27 """Fetch EntraID group information""" 28 29 def get_profile_info(self, token): 30 profile_data = super().get_profile_info(token) 31 if "https://graph.microsoft.com/GroupMember.Read.All" not in self.source.additional_scopes: 32 return profile_data 33 group_response = self.session.request( 34 "get", 35 "https://graph.microsoft.com/v1.0/me/memberOf", 36 headers={"Authorization": f"{token['token_type']} {token['access_token']}"}, 37 ) 38 try: 39 group_response.raise_for_status() 40 except RequestException as exc: 41 LOGGER.warning( 42 "Unable to fetch user profile", 43 exc=exc, 44 response=exc.response.text if exc.response else str(exc), 45 ) 46 return None 47 profile_data["raw_groups"] = group_response.json() 48 return profile_data 49 50 51class EntraIDOAuthCallback(OpenIDConnectOAuth2Callback): 52 """EntraID OAuth2 Callback""" 53 54 client_class = EntraIDClient 55 56 def get_user_id(self, info: dict[str, str]) -> str: 57 # Default try to get `id` for the Graph API endpoint 58 # fallback to OpenID logic in case the profile URL was changed 59 return info.get("id", super().get_user_id(info)) 60 61 62@registry.register() 63class EntraIDType(SourceType): 64 """Entra ID Type definition""" 65 66 callback_view = EntraIDOAuthCallback 67 redirect_view = EntraIDOAuthRedirect 68 verbose_name = "Entra ID" 69 name = "entraid" 70 71 urls_customizable = True 72 73 authorization_url = "https://login.microsoftonline.com/common/oauth2/v2.0/authorize" 74 access_token_url = "https://login.microsoftonline.com/common/oauth2/v2.0/token" # nosec 75 profile_url = "https://graph.microsoft.com/v1.0/me" 76 oidc_jwks_url = "https://login.microsoftonline.com/common/discovery/keys" 77 78 authorization_code_auth_method = AuthorizationCodeAuthMethod.POST_BODY 79 80 def get_base_user_properties(self, info: dict[str, Any], **kwargs) -> dict[str, Any]: 81 mail = info.get("mail", None) or info.get("otherMails", [None])[0] 82 # Format group info 83 groups = [] 84 group_id_dict = {} 85 for group in info.get("raw_groups", {}).get("value", []): 86 if group["@odata.type"] != "#microsoft.graph.group": 87 continue 88 groups.append(group["id"]) 89 group_id_dict[group["id"]] = group 90 info["raw_groups"] = group_id_dict 91 return { 92 "username": info.get("userPrincipalName"), 93 "email": mail, 94 "name": info.get("displayName"), 95 "groups": groups, 96 } 97 98 def get_base_group_properties(self, source, group_id, **kwargs): 99 raw_groups = kwargs["info"]["raw_groups"] 100 if group_id in raw_groups: 101 name = raw_groups[group_id]["displayName"] 102 else: 103 name = group_id 104 return { 105 "name": name, 106 }
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
18class EntraIDOAuthRedirect(OAuthRedirect): 19 """Entra ID OAuth2 Redirect""" 20 21 def get_additional_parameters(self, source): # pragma: no cover 22 return { 23 "scope": ["openid", "https://graph.microsoft.com/User.Read"], 24 }
Entra ID OAuth2 Redirect
27class EntraIDClient(UserprofileHeaderAuthClient): 28 """Fetch EntraID group information""" 29 30 def get_profile_info(self, token): 31 profile_data = super().get_profile_info(token) 32 if "https://graph.microsoft.com/GroupMember.Read.All" not in self.source.additional_scopes: 33 return profile_data 34 group_response = self.session.request( 35 "get", 36 "https://graph.microsoft.com/v1.0/me/memberOf", 37 headers={"Authorization": f"{token['token_type']} {token['access_token']}"}, 38 ) 39 try: 40 group_response.raise_for_status() 41 except RequestException as exc: 42 LOGGER.warning( 43 "Unable to fetch user profile", 44 exc=exc, 45 response=exc.response.text if exc.response else str(exc), 46 ) 47 return None 48 profile_data["raw_groups"] = group_response.json() 49 return profile_data
Fetch EntraID group information
def
get_profile_info(self, token):
30 def get_profile_info(self, token): 31 profile_data = super().get_profile_info(token) 32 if "https://graph.microsoft.com/GroupMember.Read.All" not in self.source.additional_scopes: 33 return profile_data 34 group_response = self.session.request( 35 "get", 36 "https://graph.microsoft.com/v1.0/me/memberOf", 37 headers={"Authorization": f"{token['token_type']} {token['access_token']}"}, 38 ) 39 try: 40 group_response.raise_for_status() 41 except RequestException as exc: 42 LOGGER.warning( 43 "Unable to fetch user profile", 44 exc=exc, 45 response=exc.response.text if exc.response else str(exc), 46 ) 47 return None 48 profile_data["raw_groups"] = group_response.json() 49 return profile_data
Fetch user profile information.
Inherited Members
52class EntraIDOAuthCallback(OpenIDConnectOAuth2Callback): 53 """EntraID OAuth2 Callback""" 54 55 client_class = EntraIDClient 56 57 def get_user_id(self, info: dict[str, str]) -> str: 58 # Default try to get `id` for the Graph API endpoint 59 # fallback to OpenID logic in case the profile URL was changed 60 return info.get("id", super().get_user_id(info))
EntraID OAuth2 Callback
client_class =
<class 'EntraIDClient'>
def
get_user_id(self, info: dict[str, str]) -> str:
57 def get_user_id(self, info: dict[str, str]) -> str: 58 # Default try to get `id` for the Graph API endpoint 59 # fallback to OpenID logic in case the profile URL was changed 60 return info.get("id", super().get_user_id(info))
Return unique identifier from the profile info.
63@registry.register() 64class EntraIDType(SourceType): 65 """Entra ID Type definition""" 66 67 callback_view = EntraIDOAuthCallback 68 redirect_view = EntraIDOAuthRedirect 69 verbose_name = "Entra ID" 70 name = "entraid" 71 72 urls_customizable = True 73 74 authorization_url = "https://login.microsoftonline.com/common/oauth2/v2.0/authorize" 75 access_token_url = "https://login.microsoftonline.com/common/oauth2/v2.0/token" # nosec 76 profile_url = "https://graph.microsoft.com/v1.0/me" 77 oidc_jwks_url = "https://login.microsoftonline.com/common/discovery/keys" 78 79 authorization_code_auth_method = AuthorizationCodeAuthMethod.POST_BODY 80 81 def get_base_user_properties(self, info: dict[str, Any], **kwargs) -> dict[str, Any]: 82 mail = info.get("mail", None) or info.get("otherMails", [None])[0] 83 # Format group info 84 groups = [] 85 group_id_dict = {} 86 for group in info.get("raw_groups", {}).get("value", []): 87 if group["@odata.type"] != "#microsoft.graph.group": 88 continue 89 groups.append(group["id"]) 90 group_id_dict[group["id"]] = group 91 info["raw_groups"] = group_id_dict 92 return { 93 "username": info.get("userPrincipalName"), 94 "email": mail, 95 "name": info.get("displayName"), 96 "groups": groups, 97 } 98 99 def get_base_group_properties(self, source, group_id, **kwargs): 100 raw_groups = kwargs["info"]["raw_groups"] 101 if group_id in raw_groups: 102 name = raw_groups[group_id]["displayName"] 103 else: 104 name = group_id 105 return { 106 "name": name, 107 }
Entra ID Type definition
callback_view =
<class 'EntraIDOAuthCallback'>
redirect_view =
<class 'EntraIDOAuthRedirect'>
def
get_base_user_properties(self, info: dict[str, typing.Any], **kwargs) -> dict[str, typing.Any]:
81 def get_base_user_properties(self, info: dict[str, Any], **kwargs) -> dict[str, Any]: 82 mail = info.get("mail", None) or info.get("otherMails", [None])[0] 83 # Format group info 84 groups = [] 85 group_id_dict = {} 86 for group in info.get("raw_groups", {}).get("value", []): 87 if group["@odata.type"] != "#microsoft.graph.group": 88 continue 89 groups.append(group["id"]) 90 group_id_dict[group["id"]] = group 91 info["raw_groups"] = group_id_dict 92 return { 93 "username": info.get("userPrincipalName"), 94 "email": mail, 95 "name": info.get("displayName"), 96 "groups": groups, 97 }
Get base user properties for enrollment/update
def
get_base_group_properties(self, source, group_id, **kwargs):
99 def get_base_group_properties(self, source, group_id, **kwargs): 100 raw_groups = kwargs["info"]["raw_groups"] 101 if group_id in raw_groups: 102 name = raw_groups[group_id]["displayName"] 103 else: 104 name = group_id 105 return { 106 "name": name, 107 }
Get base group properties for creation/update