authentik.sources.oauth.types.registry
Source type manager
1"""Source type manager""" 2 3from enum import Enum 4from typing import Any 5 6from django.http.request import HttpRequest 7from django.templatetags.static import static 8from django.urls.base import reverse 9from structlog.stdlib import get_logger 10 11from authentik.flows.challenge import Challenge, RedirectChallenge 12from authentik.sources.oauth.models import AuthorizationCodeAuthMethod, OAuthSource, PKCEMethod 13from authentik.sources.oauth.views.callback import OAuthCallback 14from authentik.sources.oauth.views.redirect import OAuthRedirect 15 16LOGGER = get_logger() 17 18 19class RequestKind(Enum): 20 """Enum of OAuth Request types""" 21 22 CALLBACK = "callback" 23 REDIRECT = "redirect" 24 25 26class SourceType: 27 """Source type, allows overriding of urls and views per type""" 28 29 callback_view = OAuthCallback 30 redirect_view = OAuthRedirect 31 name: str = "default" 32 verbose_name: str = "Default source type" 33 34 urls_customizable = False 35 36 request_token_url: str | None = None 37 authorization_url: str | None = None 38 access_token_url: str | None = None 39 profile_url: str | None = None 40 oidc_well_known_url: str | None = None 41 oidc_jwks_url: str | None = None 42 pkce: PKCEMethod = PKCEMethod.NONE 43 44 authorization_code_auth_method: AuthorizationCodeAuthMethod = ( 45 AuthorizationCodeAuthMethod.BASIC_AUTH 46 ) 47 48 def icon_url(self) -> str: 49 """Get Icon URL for login""" 50 return static(f"authentik/sources/{self.name}.svg") 51 52 def login_challenge(self, source: OAuthSource, request: HttpRequest) -> Challenge: 53 """Allow types to return custom challenges""" 54 return RedirectChallenge( 55 data={ 56 "to": reverse( 57 "authentik_sources_oauth:oauth-client-login", 58 kwargs={"source_slug": source.slug}, 59 ), 60 } 61 ) 62 63 def get_base_user_properties( 64 self, source: OAuthSource, info: dict[str, Any], **kwargs 65 ) -> dict[str, Any | dict[str, Any]]: 66 """Get base user properties for enrollment/update""" 67 return info 68 69 def get_base_group_properties( 70 self, source: OAuthSource, group_id: str, **kwargs 71 ) -> dict[str, Any | dict[str, Any]]: 72 """Get base group properties for creation/update""" 73 return { 74 "name": group_id, 75 } 76 77 78class SourceTypeRegistry: 79 """Registry to hold all Source types.""" 80 81 def __init__(self) -> None: 82 self.__sources: list[type[SourceType]] = [] 83 84 def register(self): 85 """Class decorator to register classes inline.""" 86 87 def inner_wrapper(cls): 88 self.__sources.append(cls) 89 return cls 90 91 return inner_wrapper 92 93 def get(self): 94 """Get a list of all source types""" 95 return self.__sources 96 97 def get_name_tuple(self): 98 """Get list of tuples of all registered names""" 99 return [(x.name, x.verbose_name) for x in self.__sources] 100 101 def find_type(self, type_name: str) -> type[SourceType]: 102 """Find type based on source""" 103 found_type = None 104 for src_type in self.__sources: 105 if src_type.name == type_name: 106 return src_type 107 if not found_type: 108 found_type = SourceType 109 LOGGER.warning( 110 "no matching type found, using default", 111 wanted=type_name, 112 have=[x.name for x in self.__sources], 113 ) 114 return found_type 115 116 def find(self, type_name: str, kind: RequestKind) -> type[OAuthCallback | OAuthRedirect]: 117 """Find fitting Source Type""" 118 found_type = self.find_type(type_name) 119 if kind == RequestKind.CALLBACK: 120 return found_type.callback_view 121 if kind == RequestKind.REDIRECT: 122 return found_type.redirect_view 123 raise ValueError 124 125 126registry = SourceTypeRegistry()
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
class
RequestKind(enum.Enum):
20class RequestKind(Enum): 21 """Enum of OAuth Request types""" 22 23 CALLBACK = "callback" 24 REDIRECT = "redirect"
Enum of OAuth Request types
CALLBACK =
<RequestKind.CALLBACK: 'callback'>
REDIRECT =
<RequestKind.REDIRECT: 'redirect'>
class
SourceType:
27class SourceType: 28 """Source type, allows overriding of urls and views per type""" 29 30 callback_view = OAuthCallback 31 redirect_view = OAuthRedirect 32 name: str = "default" 33 verbose_name: str = "Default source type" 34 35 urls_customizable = False 36 37 request_token_url: str | None = None 38 authorization_url: str | None = None 39 access_token_url: str | None = None 40 profile_url: str | None = None 41 oidc_well_known_url: str | None = None 42 oidc_jwks_url: str | None = None 43 pkce: PKCEMethod = PKCEMethod.NONE 44 45 authorization_code_auth_method: AuthorizationCodeAuthMethod = ( 46 AuthorizationCodeAuthMethod.BASIC_AUTH 47 ) 48 49 def icon_url(self) -> str: 50 """Get Icon URL for login""" 51 return static(f"authentik/sources/{self.name}.svg") 52 53 def login_challenge(self, source: OAuthSource, request: HttpRequest) -> Challenge: 54 """Allow types to return custom challenges""" 55 return RedirectChallenge( 56 data={ 57 "to": reverse( 58 "authentik_sources_oauth:oauth-client-login", 59 kwargs={"source_slug": source.slug}, 60 ), 61 } 62 ) 63 64 def get_base_user_properties( 65 self, source: OAuthSource, info: dict[str, Any], **kwargs 66 ) -> dict[str, Any | dict[str, Any]]: 67 """Get base user properties for enrollment/update""" 68 return info 69 70 def get_base_group_properties( 71 self, source: OAuthSource, group_id: str, **kwargs 72 ) -> dict[str, Any | dict[str, Any]]: 73 """Get base group properties for creation/update""" 74 return { 75 "name": group_id, 76 }
Source type, allows overriding of urls and views per type
callback_view =
<class 'authentik.sources.oauth.views.callback.OAuthCallback'>
redirect_view =
<class 'authentik.sources.oauth.views.redirect.OAuthRedirect'>
def
icon_url(self) -> str:
49 def icon_url(self) -> str: 50 """Get Icon URL for login""" 51 return static(f"authentik/sources/{self.name}.svg")
Get Icon URL for login
def
login_challenge( self, source: authentik.sources.oauth.models.OAuthSource, request: django.http.request.HttpRequest) -> authentik.flows.challenge.Challenge:
53 def login_challenge(self, source: OAuthSource, request: HttpRequest) -> Challenge: 54 """Allow types to return custom challenges""" 55 return RedirectChallenge( 56 data={ 57 "to": reverse( 58 "authentik_sources_oauth:oauth-client-login", 59 kwargs={"source_slug": source.slug}, 60 ), 61 } 62 )
Allow types to return custom challenges
def
get_base_user_properties( self, source: authentik.sources.oauth.models.OAuthSource, info: dict[str, typing.Any], **kwargs) -> dict[str, typing.Any | dict[str, typing.Any]]:
64 def get_base_user_properties( 65 self, source: OAuthSource, info: dict[str, Any], **kwargs 66 ) -> dict[str, Any | dict[str, Any]]: 67 """Get base user properties for enrollment/update""" 68 return info
Get base user properties for enrollment/update
def
get_base_group_properties( self, source: authentik.sources.oauth.models.OAuthSource, group_id: str, **kwargs) -> dict[str, typing.Any | dict[str, typing.Any]]:
70 def get_base_group_properties( 71 self, source: OAuthSource, group_id: str, **kwargs 72 ) -> dict[str, Any | dict[str, Any]]: 73 """Get base group properties for creation/update""" 74 return { 75 "name": group_id, 76 }
Get base group properties for creation/update
class
SourceTypeRegistry:
79class SourceTypeRegistry: 80 """Registry to hold all Source types.""" 81 82 def __init__(self) -> None: 83 self.__sources: list[type[SourceType]] = [] 84 85 def register(self): 86 """Class decorator to register classes inline.""" 87 88 def inner_wrapper(cls): 89 self.__sources.append(cls) 90 return cls 91 92 return inner_wrapper 93 94 def get(self): 95 """Get a list of all source types""" 96 return self.__sources 97 98 def get_name_tuple(self): 99 """Get list of tuples of all registered names""" 100 return [(x.name, x.verbose_name) for x in self.__sources] 101 102 def find_type(self, type_name: str) -> type[SourceType]: 103 """Find type based on source""" 104 found_type = None 105 for src_type in self.__sources: 106 if src_type.name == type_name: 107 return src_type 108 if not found_type: 109 found_type = SourceType 110 LOGGER.warning( 111 "no matching type found, using default", 112 wanted=type_name, 113 have=[x.name for x in self.__sources], 114 ) 115 return found_type 116 117 def find(self, type_name: str, kind: RequestKind) -> type[OAuthCallback | OAuthRedirect]: 118 """Find fitting Source Type""" 119 found_type = self.find_type(type_name) 120 if kind == RequestKind.CALLBACK: 121 return found_type.callback_view 122 if kind == RequestKind.REDIRECT: 123 return found_type.redirect_view 124 raise ValueError
Registry to hold all Source types.
def
register(self):
85 def register(self): 86 """Class decorator to register classes inline.""" 87 88 def inner_wrapper(cls): 89 self.__sources.append(cls) 90 return cls 91 92 return inner_wrapper
Class decorator to register classes inline.
def
get_name_tuple(self):
98 def get_name_tuple(self): 99 """Get list of tuples of all registered names""" 100 return [(x.name, x.verbose_name) for x in self.__sources]
Get list of tuples of all registered names
102 def find_type(self, type_name: str) -> type[SourceType]: 103 """Find type based on source""" 104 found_type = None 105 for src_type in self.__sources: 106 if src_type.name == type_name: 107 return src_type 108 if not found_type: 109 found_type = SourceType 110 LOGGER.warning( 111 "no matching type found, using default", 112 wanted=type_name, 113 have=[x.name for x in self.__sources], 114 ) 115 return found_type
Find type based on source
def
find( self, type_name: str, kind: RequestKind) -> type[authentik.sources.oauth.views.callback.OAuthCallback | authentik.sources.oauth.views.redirect.OAuthRedirect]:
117 def find(self, type_name: str, kind: RequestKind) -> type[OAuthCallback | OAuthRedirect]: 118 """Find fitting Source Type""" 119 found_type = self.find_type(type_name) 120 if kind == RequestKind.CALLBACK: 121 return found_type.callback_view 122 if kind == RequestKind.REDIRECT: 123 return found_type.redirect_view 124 raise ValueError
Find fitting Source Type
registry =
<SourceTypeRegistry object>