authentik.sources.oauth.types.registry

Source type manager

  1"""Source type manager"""
  2
  3from enum import Enum
  4from typing import Any
  5
  6from django.http.request import HttpRequest
  7from django.templatetags.static import static
  8from django.urls.base import reverse
  9from structlog.stdlib import get_logger
 10
 11from authentik.flows.challenge import Challenge, RedirectChallenge
 12from authentik.sources.oauth.models import AuthorizationCodeAuthMethod, OAuthSource, PKCEMethod
 13from authentik.sources.oauth.views.callback import OAuthCallback
 14from authentik.sources.oauth.views.redirect import OAuthRedirect
 15
 16LOGGER = get_logger()
 17
 18
 19class RequestKind(Enum):
 20    """Enum of OAuth Request types"""
 21
 22    CALLBACK = "callback"
 23    REDIRECT = "redirect"
 24
 25
 26class SourceType:
 27    """Source type, allows overriding of urls and views per type"""
 28
 29    callback_view = OAuthCallback
 30    redirect_view = OAuthRedirect
 31    name: str = "default"
 32    verbose_name: str = "Default source type"
 33
 34    urls_customizable = False
 35
 36    request_token_url: str | None = None
 37    authorization_url: str | None = None
 38    access_token_url: str | None = None
 39    profile_url: str | None = None
 40    oidc_well_known_url: str | None = None
 41    oidc_jwks_url: str | None = None
 42    pkce: PKCEMethod = PKCEMethod.NONE
 43
 44    authorization_code_auth_method: AuthorizationCodeAuthMethod = (
 45        AuthorizationCodeAuthMethod.BASIC_AUTH
 46    )
 47
 48    def icon_url(self) -> str:
 49        """Get Icon URL for login"""
 50        return static(f"authentik/sources/{self.name}.svg")
 51
 52    def login_challenge(self, source: OAuthSource, request: HttpRequest) -> Challenge:
 53        """Allow types to return custom challenges"""
 54        return RedirectChallenge(
 55            data={
 56                "to": reverse(
 57                    "authentik_sources_oauth:oauth-client-login",
 58                    kwargs={"source_slug": source.slug},
 59                ),
 60            }
 61        )
 62
 63    def get_base_user_properties(
 64        self, source: OAuthSource, info: dict[str, Any], **kwargs
 65    ) -> dict[str, Any | dict[str, Any]]:
 66        """Get base user properties for enrollment/update"""
 67        return info
 68
 69    def get_base_group_properties(
 70        self, source: OAuthSource, group_id: str, **kwargs
 71    ) -> dict[str, Any | dict[str, Any]]:
 72        """Get base group properties for creation/update"""
 73        return {
 74            "name": group_id,
 75        }
 76
 77
 78class SourceTypeRegistry:
 79    """Registry to hold all Source types."""
 80
 81    def __init__(self) -> None:
 82        self.__sources: list[type[SourceType]] = []
 83
 84    def register(self):
 85        """Class decorator to register classes inline."""
 86
 87        def inner_wrapper(cls):
 88            self.__sources.append(cls)
 89            return cls
 90
 91        return inner_wrapper
 92
 93    def get(self):
 94        """Get a list of all source types"""
 95        return self.__sources
 96
 97    def get_name_tuple(self):
 98        """Get list of tuples of all registered names"""
 99        return [(x.name, x.verbose_name) for x in self.__sources]
100
101    def find_type(self, type_name: str) -> type[SourceType]:
102        """Find type based on source"""
103        found_type = None
104        for src_type in self.__sources:
105            if src_type.name == type_name:
106                return src_type
107        if not found_type:
108            found_type = SourceType
109            LOGGER.warning(
110                "no matching type found, using default",
111                wanted=type_name,
112                have=[x.name for x in self.__sources],
113            )
114        return found_type
115
116    def find(self, type_name: str, kind: RequestKind) -> type[OAuthCallback | OAuthRedirect]:
117        """Find fitting Source Type"""
118        found_type = self.find_type(type_name)
119        if kind == RequestKind.CALLBACK:
120            return found_type.callback_view
121        if kind == RequestKind.REDIRECT:
122            return found_type.redirect_view
123        raise ValueError
124
125
126registry = SourceTypeRegistry()
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
class RequestKind(enum.Enum):
20class RequestKind(Enum):
21    """Enum of OAuth Request types"""
22
23    CALLBACK = "callback"
24    REDIRECT = "redirect"

Enum of OAuth Request types

CALLBACK = <RequestKind.CALLBACK: 'callback'>
REDIRECT = <RequestKind.REDIRECT: 'redirect'>
class SourceType:
27class SourceType:
28    """Source type, allows overriding of urls and views per type"""
29
30    callback_view = OAuthCallback
31    redirect_view = OAuthRedirect
32    name: str = "default"
33    verbose_name: str = "Default source type"
34
35    urls_customizable = False
36
37    request_token_url: str | None = None
38    authorization_url: str | None = None
39    access_token_url: str | None = None
40    profile_url: str | None = None
41    oidc_well_known_url: str | None = None
42    oidc_jwks_url: str | None = None
43    pkce: PKCEMethod = PKCEMethod.NONE
44
45    authorization_code_auth_method: AuthorizationCodeAuthMethod = (
46        AuthorizationCodeAuthMethod.BASIC_AUTH
47    )
48
49    def icon_url(self) -> str:
50        """Get Icon URL for login"""
51        return static(f"authentik/sources/{self.name}.svg")
52
53    def login_challenge(self, source: OAuthSource, request: HttpRequest) -> Challenge:
54        """Allow types to return custom challenges"""
55        return RedirectChallenge(
56            data={
57                "to": reverse(
58                    "authentik_sources_oauth:oauth-client-login",
59                    kwargs={"source_slug": source.slug},
60                ),
61            }
62        )
63
64    def get_base_user_properties(
65        self, source: OAuthSource, info: dict[str, Any], **kwargs
66    ) -> dict[str, Any | dict[str, Any]]:
67        """Get base user properties for enrollment/update"""
68        return info
69
70    def get_base_group_properties(
71        self, source: OAuthSource, group_id: str, **kwargs
72    ) -> dict[str, Any | dict[str, Any]]:
73        """Get base group properties for creation/update"""
74        return {
75            "name": group_id,
76        }

Source type, allows overriding of urls and views per type

name: str = 'default'
verbose_name: str = 'Default source type'
urls_customizable = False
request_token_url: str | None = None
authorization_url: str | None = None
access_token_url: str | None = None
profile_url: str | None = None
oidc_well_known_url: str | None = None
oidc_jwks_url: str | None = None
authorization_code_auth_method: authentik.sources.oauth.models.AuthorizationCodeAuthMethod = AuthorizationCodeAuthMethod.BASIC_AUTH
def icon_url(self) -> str:
49    def icon_url(self) -> str:
50        """Get Icon URL for login"""
51        return static(f"authentik/sources/{self.name}.svg")

Get Icon URL for login

def login_challenge( self, source: authentik.sources.oauth.models.OAuthSource, request: django.http.request.HttpRequest) -> authentik.flows.challenge.Challenge:
53    def login_challenge(self, source: OAuthSource, request: HttpRequest) -> Challenge:
54        """Allow types to return custom challenges"""
55        return RedirectChallenge(
56            data={
57                "to": reverse(
58                    "authentik_sources_oauth:oauth-client-login",
59                    kwargs={"source_slug": source.slug},
60                ),
61            }
62        )

Allow types to return custom challenges

def get_base_user_properties( self, source: authentik.sources.oauth.models.OAuthSource, info: dict[str, typing.Any], **kwargs) -> dict[str, typing.Any | dict[str, typing.Any]]:
64    def get_base_user_properties(
65        self, source: OAuthSource, info: dict[str, Any], **kwargs
66    ) -> dict[str, Any | dict[str, Any]]:
67        """Get base user properties for enrollment/update"""
68        return info

Get base user properties for enrollment/update

def get_base_group_properties( self, source: authentik.sources.oauth.models.OAuthSource, group_id: str, **kwargs) -> dict[str, typing.Any | dict[str, typing.Any]]:
70    def get_base_group_properties(
71        self, source: OAuthSource, group_id: str, **kwargs
72    ) -> dict[str, Any | dict[str, Any]]:
73        """Get base group properties for creation/update"""
74        return {
75            "name": group_id,
76        }

Get base group properties for creation/update

class SourceTypeRegistry:
 79class SourceTypeRegistry:
 80    """Registry to hold all Source types."""
 81
 82    def __init__(self) -> None:
 83        self.__sources: list[type[SourceType]] = []
 84
 85    def register(self):
 86        """Class decorator to register classes inline."""
 87
 88        def inner_wrapper(cls):
 89            self.__sources.append(cls)
 90            return cls
 91
 92        return inner_wrapper
 93
 94    def get(self):
 95        """Get a list of all source types"""
 96        return self.__sources
 97
 98    def get_name_tuple(self):
 99        """Get list of tuples of all registered names"""
100        return [(x.name, x.verbose_name) for x in self.__sources]
101
102    def find_type(self, type_name: str) -> type[SourceType]:
103        """Find type based on source"""
104        found_type = None
105        for src_type in self.__sources:
106            if src_type.name == type_name:
107                return src_type
108        if not found_type:
109            found_type = SourceType
110            LOGGER.warning(
111                "no matching type found, using default",
112                wanted=type_name,
113                have=[x.name for x in self.__sources],
114            )
115        return found_type
116
117    def find(self, type_name: str, kind: RequestKind) -> type[OAuthCallback | OAuthRedirect]:
118        """Find fitting Source Type"""
119        found_type = self.find_type(type_name)
120        if kind == RequestKind.CALLBACK:
121            return found_type.callback_view
122        if kind == RequestKind.REDIRECT:
123            return found_type.redirect_view
124        raise ValueError

Registry to hold all Source types.

def register(self):
85    def register(self):
86        """Class decorator to register classes inline."""
87
88        def inner_wrapper(cls):
89            self.__sources.append(cls)
90            return cls
91
92        return inner_wrapper

Class decorator to register classes inline.

def get(self):
94    def get(self):
95        """Get a list of all source types"""
96        return self.__sources

Get a list of all source types

def get_name_tuple(self):
 98    def get_name_tuple(self):
 99        """Get list of tuples of all registered names"""
100        return [(x.name, x.verbose_name) for x in self.__sources]

Get list of tuples of all registered names

def find_type( self, type_name: str) -> type[SourceType]:
102    def find_type(self, type_name: str) -> type[SourceType]:
103        """Find type based on source"""
104        found_type = None
105        for src_type in self.__sources:
106            if src_type.name == type_name:
107                return src_type
108        if not found_type:
109            found_type = SourceType
110            LOGGER.warning(
111                "no matching type found, using default",
112                wanted=type_name,
113                have=[x.name for x in self.__sources],
114            )
115        return found_type

Find type based on source

117    def find(self, type_name: str, kind: RequestKind) -> type[OAuthCallback | OAuthRedirect]:
118        """Find fitting Source Type"""
119        found_type = self.find_type(type_name)
120        if kind == RequestKind.CALLBACK:
121            return found_type.callback_view
122        if kind == RequestKind.REDIRECT:
123            return found_type.redirect_view
124        raise ValueError

Find fitting Source Type

registry = <SourceTypeRegistry object>