authentik.sources.oauth.views.callback
OAuth Callback Views
1"""OAuth Callback Views""" 2 3from datetime import timedelta 4from json import JSONDecodeError 5from typing import Any 6 7from django.conf import settings 8from django.contrib import messages 9from django.http import Http404, HttpRequest, HttpResponse 10from django.shortcuts import redirect 11from django.utils.timezone import now 12from django.utils.translation import gettext as _ 13from django.views.generic import View 14from structlog.stdlib import get_logger 15 16from authentik.core.sources.flow_manager import SourceFlowManager 17from authentik.events.models import Event, EventAction 18from authentik.sources.oauth.models import ( 19 GroupOAuthSourceConnection, 20 OAuthSource, 21 UserOAuthSourceConnection, 22) 23from authentik.sources.oauth.views.base import OAuthClientMixin 24 25LOGGER = get_logger() 26 27 28class OAuthCallback(OAuthClientMixin, View): 29 "Base OAuth callback view." 30 31 source: OAuthSource 32 token: dict | None = None 33 34 def dispatch(self, request: HttpRequest, *_, **kwargs) -> HttpResponse: 35 """View Get handler""" 36 slug = kwargs.get("source_slug", "") 37 try: 38 self.source = OAuthSource.objects.get(slug=slug) 39 except OAuthSource.DoesNotExist: 40 raise Http404(f"Unknown OAuth source '{slug}'.") from None 41 42 if not self.source.enabled: 43 raise Http404(f"Source {slug} is not enabled.") 44 client = self.get_client(self.source, callback=self.get_callback_url(self.source)) 45 # Fetch access token 46 self.token = client.get_access_token() 47 if self.token is None: 48 return self.handle_login_failure("Could not retrieve token.") 49 if "error" in self.token: 50 return self.handle_login_failure(self.token["error"]) 51 # Fetch profile info 52 try: 53 raw_info = client.get_profile_info(self.token) 54 if raw_info is None: 55 return self.handle_login_failure("Could not retrieve profile.") 56 except JSONDecodeError as exc: 57 Event.new( 58 EventAction.CONFIGURATION_ERROR, 59 message="Failed to JSON-decode profile.", 60 raw_profile=exc.doc, 61 ).from_http(self.request) 62 return self.handle_login_failure("Could not retrieve profile.") 63 identifier = self.get_user_id(info=raw_info) 64 if identifier is None: 65 return self.handle_login_failure("Could not determine id.") 66 sfm = OAuthSourceFlowManager( 67 source=self.source, 68 request=self.request, 69 identifier=identifier, 70 user_info={ 71 "info": raw_info, 72 "client": client, 73 "token": self.token, 74 }, 75 policy_context={ 76 "oauth_userinfo": raw_info, 77 }, 78 ) 79 return sfm.get_flow( 80 raw_info=raw_info, 81 access_token=self.token.get("access_token"), 82 refresh_token=self.token.get("refresh_token"), 83 expires=self.token.get("expires_in"), 84 ) 85 86 def get_callback_url(self, source: OAuthSource) -> str: 87 "Return callback url if different than the current url." 88 return "" 89 90 def get_error_redirect(self, source: OAuthSource, reason: str) -> str: 91 "Return url to redirect on login failure." 92 return settings.LOGIN_URL 93 94 def get_user_id(self, info: dict[str, Any]) -> str | None: 95 """Return unique identifier from the profile info.""" 96 if "id" in info: 97 return info["id"] 98 return None 99 100 def handle_login_failure(self, reason: str) -> HttpResponse: 101 "Message user and redirect on error." 102 LOGGER.warning("Authentication Failure", reason=reason) 103 messages.error( 104 self.request, 105 _( 106 "Authentication failed: {reason}".format_map( 107 { 108 "reason": reason, 109 } 110 ) 111 ), 112 ) 113 return redirect(self.get_error_redirect(self.source, reason)) 114 115 116class OAuthSourceFlowManager(SourceFlowManager): 117 """Flow manager for oauth sources""" 118 119 user_connection_type = UserOAuthSourceConnection 120 group_connection_type = GroupOAuthSourceConnection 121 122 def update_user_connection( 123 self, 124 connection: UserOAuthSourceConnection, 125 access_token: str | None = None, 126 refresh_token: str | None = None, 127 expires_in: int | None = None, 128 **_, 129 ) -> UserOAuthSourceConnection: 130 """Set the access_token and refresh_token on the connection""" 131 connection.access_token = access_token 132 connection.refresh_token = refresh_token 133 connection.expires = now() + timedelta(seconds=expires_in) if expires_in else now() 134 return connection
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
class
OAuthCallback(authentik.sources.oauth.views.base.OAuthClientMixin, django.views.generic.base.View):
29class OAuthCallback(OAuthClientMixin, View): 30 "Base OAuth callback view." 31 32 source: OAuthSource 33 token: dict | None = None 34 35 def dispatch(self, request: HttpRequest, *_, **kwargs) -> HttpResponse: 36 """View Get handler""" 37 slug = kwargs.get("source_slug", "") 38 try: 39 self.source = OAuthSource.objects.get(slug=slug) 40 except OAuthSource.DoesNotExist: 41 raise Http404(f"Unknown OAuth source '{slug}'.") from None 42 43 if not self.source.enabled: 44 raise Http404(f"Source {slug} is not enabled.") 45 client = self.get_client(self.source, callback=self.get_callback_url(self.source)) 46 # Fetch access token 47 self.token = client.get_access_token() 48 if self.token is None: 49 return self.handle_login_failure("Could not retrieve token.") 50 if "error" in self.token: 51 return self.handle_login_failure(self.token["error"]) 52 # Fetch profile info 53 try: 54 raw_info = client.get_profile_info(self.token) 55 if raw_info is None: 56 return self.handle_login_failure("Could not retrieve profile.") 57 except JSONDecodeError as exc: 58 Event.new( 59 EventAction.CONFIGURATION_ERROR, 60 message="Failed to JSON-decode profile.", 61 raw_profile=exc.doc, 62 ).from_http(self.request) 63 return self.handle_login_failure("Could not retrieve profile.") 64 identifier = self.get_user_id(info=raw_info) 65 if identifier is None: 66 return self.handle_login_failure("Could not determine id.") 67 sfm = OAuthSourceFlowManager( 68 source=self.source, 69 request=self.request, 70 identifier=identifier, 71 user_info={ 72 "info": raw_info, 73 "client": client, 74 "token": self.token, 75 }, 76 policy_context={ 77 "oauth_userinfo": raw_info, 78 }, 79 ) 80 return sfm.get_flow( 81 raw_info=raw_info, 82 access_token=self.token.get("access_token"), 83 refresh_token=self.token.get("refresh_token"), 84 expires=self.token.get("expires_in"), 85 ) 86 87 def get_callback_url(self, source: OAuthSource) -> str: 88 "Return callback url if different than the current url." 89 return "" 90 91 def get_error_redirect(self, source: OAuthSource, reason: str) -> str: 92 "Return url to redirect on login failure." 93 return settings.LOGIN_URL 94 95 def get_user_id(self, info: dict[str, Any]) -> str | None: 96 """Return unique identifier from the profile info.""" 97 if "id" in info: 98 return info["id"] 99 return None 100 101 def handle_login_failure(self, reason: str) -> HttpResponse: 102 "Message user and redirect on error." 103 LOGGER.warning("Authentication Failure", reason=reason) 104 messages.error( 105 self.request, 106 _( 107 "Authentication failed: {reason}".format_map( 108 { 109 "reason": reason, 110 } 111 ) 112 ), 113 ) 114 return redirect(self.get_error_redirect(self.source, reason))
Base OAuth callback view.
def
dispatch( self, request: django.http.request.HttpRequest, *_, **kwargs) -> django.http.response.HttpResponse:
35 def dispatch(self, request: HttpRequest, *_, **kwargs) -> HttpResponse: 36 """View Get handler""" 37 slug = kwargs.get("source_slug", "") 38 try: 39 self.source = OAuthSource.objects.get(slug=slug) 40 except OAuthSource.DoesNotExist: 41 raise Http404(f"Unknown OAuth source '{slug}'.") from None 42 43 if not self.source.enabled: 44 raise Http404(f"Source {slug} is not enabled.") 45 client = self.get_client(self.source, callback=self.get_callback_url(self.source)) 46 # Fetch access token 47 self.token = client.get_access_token() 48 if self.token is None: 49 return self.handle_login_failure("Could not retrieve token.") 50 if "error" in self.token: 51 return self.handle_login_failure(self.token["error"]) 52 # Fetch profile info 53 try: 54 raw_info = client.get_profile_info(self.token) 55 if raw_info is None: 56 return self.handle_login_failure("Could not retrieve profile.") 57 except JSONDecodeError as exc: 58 Event.new( 59 EventAction.CONFIGURATION_ERROR, 60 message="Failed to JSON-decode profile.", 61 raw_profile=exc.doc, 62 ).from_http(self.request) 63 return self.handle_login_failure("Could not retrieve profile.") 64 identifier = self.get_user_id(info=raw_info) 65 if identifier is None: 66 return self.handle_login_failure("Could not determine id.") 67 sfm = OAuthSourceFlowManager( 68 source=self.source, 69 request=self.request, 70 identifier=identifier, 71 user_info={ 72 "info": raw_info, 73 "client": client, 74 "token": self.token, 75 }, 76 policy_context={ 77 "oauth_userinfo": raw_info, 78 }, 79 ) 80 return sfm.get_flow( 81 raw_info=raw_info, 82 access_token=self.token.get("access_token"), 83 refresh_token=self.token.get("refresh_token"), 84 expires=self.token.get("expires_in"), 85 )
View Get handler
87 def get_callback_url(self, source: OAuthSource) -> str: 88 "Return callback url if different than the current url." 89 return ""
Return callback url if different than the current url.
def
get_error_redirect( self, source: authentik.sources.oauth.models.OAuthSource, reason: str) -> str:
91 def get_error_redirect(self, source: OAuthSource, reason: str) -> str: 92 "Return url to redirect on login failure." 93 return settings.LOGIN_URL
Return url to redirect on login failure.
def
get_user_id(self, info: dict[str, typing.Any]) -> str | None:
95 def get_user_id(self, info: dict[str, Any]) -> str | None: 96 """Return unique identifier from the profile info.""" 97 if "id" in info: 98 return info["id"] 99 return None
Return unique identifier from the profile info.
def
handle_login_failure(self, reason: str) -> django.http.response.HttpResponse:
101 def handle_login_failure(self, reason: str) -> HttpResponse: 102 "Message user and redirect on error." 103 LOGGER.warning("Authentication Failure", reason=reason) 104 messages.error( 105 self.request, 106 _( 107 "Authentication failed: {reason}".format_map( 108 { 109 "reason": reason, 110 } 111 ) 112 ), 113 ) 114 return redirect(self.get_error_redirect(self.source, reason))
Message user and redirect on error.
Inherited Members
117class OAuthSourceFlowManager(SourceFlowManager): 118 """Flow manager for oauth sources""" 119 120 user_connection_type = UserOAuthSourceConnection 121 group_connection_type = GroupOAuthSourceConnection 122 123 def update_user_connection( 124 self, 125 connection: UserOAuthSourceConnection, 126 access_token: str | None = None, 127 refresh_token: str | None = None, 128 expires_in: int | None = None, 129 **_, 130 ) -> UserOAuthSourceConnection: 131 """Set the access_token and refresh_token on the connection""" 132 connection.access_token = access_token 133 connection.refresh_token = refresh_token 134 connection.expires = now() + timedelta(seconds=expires_in) if expires_in else now() 135 return connection
Flow manager for oauth sources
user_connection_type =
<class 'authentik.sources.oauth.models.UserOAuthSourceConnection'>
group_connection_type =
<class 'authentik.sources.oauth.models.GroupOAuthSourceConnection'>
def
update_user_connection( self, connection: authentik.sources.oauth.models.UserOAuthSourceConnection, access_token: str | None = None, refresh_token: str | None = None, expires_in: int | None = None, **_) -> authentik.sources.oauth.models.UserOAuthSourceConnection:
123 def update_user_connection( 124 self, 125 connection: UserOAuthSourceConnection, 126 access_token: str | None = None, 127 refresh_token: str | None = None, 128 expires_in: int | None = None, 129 **_, 130 ) -> UserOAuthSourceConnection: 131 """Set the access_token and refresh_token on the connection""" 132 connection.access_token = access_token 133 connection.refresh_token = refresh_token 134 connection.expires = now() + timedelta(seconds=expires_in) if expires_in else now() 135 return connection
Set the access_token and refresh_token on the connection