authentik.sources.oauth.types.entra_id

EntraID OAuth2 Views

  1"""EntraID OAuth2 Views"""
  2
  3from typing import Any
  4
  5from requests import RequestException
  6from structlog.stdlib import get_logger
  7
  8from authentik.sources.oauth.clients.oauth2 import UserprofileHeaderAuthClient
  9from authentik.sources.oauth.models import AuthorizationCodeAuthMethod
 10from authentik.sources.oauth.types.oidc import OpenIDConnectOAuth2Callback
 11from authentik.sources.oauth.types.registry import SourceType, registry
 12from authentik.sources.oauth.views.redirect import OAuthRedirect
 13
 14LOGGER = get_logger()
 15
 16
 17class EntraIDOAuthRedirect(OAuthRedirect):
 18    """Entra ID OAuth2 Redirect"""
 19
 20    def get_additional_parameters(self, source):  # pragma: no cover
 21        return {
 22            "scope": ["openid", "https://graph.microsoft.com/User.Read"],
 23        }
 24
 25
 26class EntraIDClient(UserprofileHeaderAuthClient):
 27    """Fetch EntraID group information"""
 28
 29    def get_profile_info(self, token):
 30        profile_data = super().get_profile_info(token)
 31        if "https://graph.microsoft.com/GroupMember.Read.All" not in self.source.additional_scopes:
 32            return profile_data
 33        group_response = self.session.request(
 34            "get",
 35            "https://graph.microsoft.com/v1.0/me/memberOf",
 36            headers={"Authorization": f"{token['token_type']} {token['access_token']}"},
 37        )
 38        try:
 39            group_response.raise_for_status()
 40        except RequestException as exc:
 41            LOGGER.warning(
 42                "Unable to fetch user profile",
 43                exc=exc,
 44                response=exc.response.text if exc.response else str(exc),
 45            )
 46            return None
 47        profile_data["raw_groups"] = group_response.json()
 48        return profile_data
 49
 50
 51class EntraIDOAuthCallback(OpenIDConnectOAuth2Callback):
 52    """EntraID OAuth2 Callback"""
 53
 54    client_class = EntraIDClient
 55
 56    def get_user_id(self, info: dict[str, str]) -> str:
 57        # Default try to get `id` for the Graph API endpoint
 58        # fallback to OpenID logic in case the profile URL was changed
 59        return info.get("id", super().get_user_id(info))
 60
 61
 62@registry.register()
 63class EntraIDType(SourceType):
 64    """Entra ID Type definition"""
 65
 66    callback_view = EntraIDOAuthCallback
 67    redirect_view = EntraIDOAuthRedirect
 68    verbose_name = "Entra ID"
 69    name = "entraid"
 70
 71    urls_customizable = True
 72
 73    authorization_url = "https://login.microsoftonline.com/common/oauth2/v2.0/authorize"
 74    access_token_url = "https://login.microsoftonline.com/common/oauth2/v2.0/token"  # nosec
 75    profile_url = "https://graph.microsoft.com/v1.0/me"
 76    oidc_jwks_url = "https://login.microsoftonline.com/common/discovery/keys"
 77
 78    authorization_code_auth_method = AuthorizationCodeAuthMethod.POST_BODY
 79
 80    def get_base_user_properties(self, info: dict[str, Any], **kwargs) -> dict[str, Any]:
 81        mail = info.get("mail", None) or info.get("otherMails", [None])[0]
 82        # Format group info
 83        groups = []
 84        group_id_dict = {}
 85        for group in info.get("raw_groups", {}).get("value", []):
 86            if group["@odata.type"] != "#microsoft.graph.group":
 87                continue
 88            groups.append(group["id"])
 89            group_id_dict[group["id"]] = group
 90        info["raw_groups"] = group_id_dict
 91        return {
 92            "username": info.get("userPrincipalName"),
 93            "email": mail,
 94            "name": info.get("displayName"),
 95            "groups": groups,
 96        }
 97
 98    def get_base_group_properties(self, source, group_id, **kwargs):
 99        raw_groups = kwargs["info"]["raw_groups"]
100        if group_id in raw_groups:
101            name = raw_groups[group_id]["displayName"]
102        else:
103            name = group_id
104        return {
105            "name": name,
106        }
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
class EntraIDOAuthRedirect(authentik.sources.oauth.views.redirect.OAuthRedirect):
18class EntraIDOAuthRedirect(OAuthRedirect):
19    """Entra ID OAuth2 Redirect"""
20
21    def get_additional_parameters(self, source):  # pragma: no cover
22        return {
23            "scope": ["openid", "https://graph.microsoft.com/User.Read"],
24        }

Entra ID OAuth2 Redirect

def get_additional_parameters(self, source):
21    def get_additional_parameters(self, source):  # pragma: no cover
22        return {
23            "scope": ["openid", "https://graph.microsoft.com/User.Read"],
24        }

Return additional redirect parameters for this source.

27class EntraIDClient(UserprofileHeaderAuthClient):
28    """Fetch EntraID group information"""
29
30    def get_profile_info(self, token):
31        profile_data = super().get_profile_info(token)
32        if "https://graph.microsoft.com/GroupMember.Read.All" not in self.source.additional_scopes:
33            return profile_data
34        group_response = self.session.request(
35            "get",
36            "https://graph.microsoft.com/v1.0/me/memberOf",
37            headers={"Authorization": f"{token['token_type']} {token['access_token']}"},
38        )
39        try:
40            group_response.raise_for_status()
41        except RequestException as exc:
42            LOGGER.warning(
43                "Unable to fetch user profile",
44                exc=exc,
45                response=exc.response.text if exc.response else str(exc),
46            )
47            return None
48        profile_data["raw_groups"] = group_response.json()
49        return profile_data

Fetch EntraID group information

def get_profile_info(self, token):
30    def get_profile_info(self, token):
31        profile_data = super().get_profile_info(token)
32        if "https://graph.microsoft.com/GroupMember.Read.All" not in self.source.additional_scopes:
33            return profile_data
34        group_response = self.session.request(
35            "get",
36            "https://graph.microsoft.com/v1.0/me/memberOf",
37            headers={"Authorization": f"{token['token_type']} {token['access_token']}"},
38        )
39        try:
40            group_response.raise_for_status()
41        except RequestException as exc:
42            LOGGER.warning(
43                "Unable to fetch user profile",
44                exc=exc,
45                response=exc.response.text if exc.response else str(exc),
46            )
47            return None
48        profile_data["raw_groups"] = group_response.json()
49        return profile_data

Fetch user profile information.

class EntraIDOAuthCallback(authentik.sources.oauth.types.oidc.OpenIDConnectOAuth2Callback):
52class EntraIDOAuthCallback(OpenIDConnectOAuth2Callback):
53    """EntraID OAuth2 Callback"""
54
55    client_class = EntraIDClient
56
57    def get_user_id(self, info: dict[str, str]) -> str:
58        # Default try to get `id` for the Graph API endpoint
59        # fallback to OpenID logic in case the profile URL was changed
60        return info.get("id", super().get_user_id(info))

EntraID OAuth2 Callback

client_class = <class 'EntraIDClient'>
def get_user_id(self, info: dict[str, str]) -> str:
57    def get_user_id(self, info: dict[str, str]) -> str:
58        # Default try to get `id` for the Graph API endpoint
59        # fallback to OpenID logic in case the profile URL was changed
60        return info.get("id", super().get_user_id(info))

Return unique identifier from the profile info.

@registry.register()
class EntraIDType(authentik.sources.oauth.types.registry.SourceType):
 63@registry.register()
 64class EntraIDType(SourceType):
 65    """Entra ID Type definition"""
 66
 67    callback_view = EntraIDOAuthCallback
 68    redirect_view = EntraIDOAuthRedirect
 69    verbose_name = "Entra ID"
 70    name = "entraid"
 71
 72    urls_customizable = True
 73
 74    authorization_url = "https://login.microsoftonline.com/common/oauth2/v2.0/authorize"
 75    access_token_url = "https://login.microsoftonline.com/common/oauth2/v2.0/token"  # nosec
 76    profile_url = "https://graph.microsoft.com/v1.0/me"
 77    oidc_jwks_url = "https://login.microsoftonline.com/common/discovery/keys"
 78
 79    authorization_code_auth_method = AuthorizationCodeAuthMethod.POST_BODY
 80
 81    def get_base_user_properties(self, info: dict[str, Any], **kwargs) -> dict[str, Any]:
 82        mail = info.get("mail", None) or info.get("otherMails", [None])[0]
 83        # Format group info
 84        groups = []
 85        group_id_dict = {}
 86        for group in info.get("raw_groups", {}).get("value", []):
 87            if group["@odata.type"] != "#microsoft.graph.group":
 88                continue
 89            groups.append(group["id"])
 90            group_id_dict[group["id"]] = group
 91        info["raw_groups"] = group_id_dict
 92        return {
 93            "username": info.get("userPrincipalName"),
 94            "email": mail,
 95            "name": info.get("displayName"),
 96            "groups": groups,
 97        }
 98
 99    def get_base_group_properties(self, source, group_id, **kwargs):
100        raw_groups = kwargs["info"]["raw_groups"]
101        if group_id in raw_groups:
102            name = raw_groups[group_id]["displayName"]
103        else:
104            name = group_id
105        return {
106            "name": name,
107        }

Entra ID Type definition

callback_view = <class 'EntraIDOAuthCallback'>
redirect_view = <class 'EntraIDOAuthRedirect'>
verbose_name = 'Entra ID'
name = 'entraid'
urls_customizable = True
authorization_url = 'https://login.microsoftonline.com/common/oauth2/v2.0/authorize'
access_token_url = 'https://login.microsoftonline.com/common/oauth2/v2.0/token'
profile_url = 'https://graph.microsoft.com/v1.0/me'
oidc_jwks_url = 'https://login.microsoftonline.com/common/discovery/keys'
authorization_code_auth_method = AuthorizationCodeAuthMethod.POST_BODY
def get_base_user_properties(self, info: dict[str, typing.Any], **kwargs) -> dict[str, typing.Any]:
81    def get_base_user_properties(self, info: dict[str, Any], **kwargs) -> dict[str, Any]:
82        mail = info.get("mail", None) or info.get("otherMails", [None])[0]
83        # Format group info
84        groups = []
85        group_id_dict = {}
86        for group in info.get("raw_groups", {}).get("value", []):
87            if group["@odata.type"] != "#microsoft.graph.group":
88                continue
89            groups.append(group["id"])
90            group_id_dict[group["id"]] = group
91        info["raw_groups"] = group_id_dict
92        return {
93            "username": info.get("userPrincipalName"),
94            "email": mail,
95            "name": info.get("displayName"),
96            "groups": groups,
97        }

Get base user properties for enrollment/update

def get_base_group_properties(self, source, group_id, **kwargs):
 99    def get_base_group_properties(self, source, group_id, **kwargs):
100        raw_groups = kwargs["info"]["raw_groups"]
101        if group_id in raw_groups:
102            name = raw_groups[group_id]["displayName"]
103        else:
104            name = group_id
105        return {
106            "name": name,
107        }

Get base group properties for creation/update