authentik.sources.oauth.views.callback
OAuth Callback Views
1"""OAuth Callback Views""" 2 3from datetime import timedelta 4from json import JSONDecodeError 5from typing import Any 6 7from django.conf import settings 8from django.contrib import messages 9from django.http import Http404, HttpRequest, HttpResponse 10from django.shortcuts import redirect 11from django.utils.timezone import now 12from django.utils.translation import gettext as _ 13from django.views.generic import View 14from structlog.stdlib import get_logger 15 16from authentik.core.sources.flow_manager import SourceFlowManager 17from authentik.events.models import Event, EventAction 18from authentik.sources.oauth.clients.base import BaseOAuthClient 19from authentik.sources.oauth.models import ( 20 GroupOAuthSourceConnection, 21 OAuthSource, 22 UserOAuthSourceConnection, 23) 24from authentik.sources.oauth.views.base import OAuthClientMixin 25 26LOGGER = get_logger() 27 28 29class OAuthCallback(OAuthClientMixin, View): 30 "Base OAuth callback view." 31 32 source: OAuthSource 33 token: dict[str, Any] | None = None 34 35 def dispatch(self, request: HttpRequest, *_, **kwargs) -> HttpResponse: 36 """View Get handler""" 37 slug = kwargs.get("source_slug", "") 38 try: 39 self.source = OAuthSource.objects.get(slug=slug) 40 except OAuthSource.DoesNotExist: 41 raise Http404(f"Unknown OAuth source '{slug}'.") from None 42 43 if not self.source.enabled: 44 raise Http404(f"Source {slug} is not enabled.") 45 client = self.get_client(self.source, callback=self.get_callback_url(self.source)) 46 # Fetch access token 47 self.token = client.get_access_token() 48 if self.token is None: 49 return self.handle_login_failure("Could not retrieve token.") 50 if "error" in self.token: 51 return self.handle_login_failure(self.token["error"]) 52 # Fetch profile info 53 try: 54 res = self.redirect_flow_manager(client) 55 except ValueError as exc: 56 # if we're authenticated and not in a source stage and this new flag is enabled, 57 # just continue 58 if self.request.user.is_authenticated: 59 pass 60 return self.handle_login_failure(exc.args[0]) 61 return res 62 63 def redirect_flow_manager(self, client: BaseOAuthClient) -> HttpResponse: 64 try: 65 raw_info = client.get_profile_info(self.token) 66 if raw_info is None: 67 raise ValueError("Could not retrieve profile.") 68 except JSONDecodeError as exc: 69 Event.new( 70 EventAction.CONFIGURATION_ERROR, 71 message="Failed to JSON-decode profile.", 72 raw_profile=exc.doc, 73 ).from_http(self.request) 74 raise ValueError("Could not retrieve profile.") from None 75 identifier = self.get_user_id(info=raw_info) 76 if identifier is None: 77 raise ValueError("Could not determine id.") 78 sfm = OAuthSourceFlowManager( 79 source=self.source, 80 request=self.request, 81 identifier=identifier, 82 user_info={ 83 "info": raw_info, 84 "client": client, 85 "token": self.token, 86 }, 87 policy_context={ 88 "oauth_userinfo": raw_info, 89 }, 90 ) 91 return sfm.get_flow( 92 raw_info=raw_info, 93 access_token=self.token.get("access_token"), 94 refresh_token=self.token.get("refresh_token"), 95 expires=self.token.get("expires_in"), 96 ) 97 98 def get_callback_url(self, source: OAuthSource) -> str: 99 "Return callback url if different than the current url." 100 return "" 101 102 def get_error_redirect(self, source: OAuthSource, reason: str) -> str: 103 "Return url to redirect on login failure." 104 return settings.LOGIN_URL 105 106 def get_user_id(self, info: dict[str, Any]) -> str | None: 107 """Return unique identifier from the profile info.""" 108 if "id" in info: 109 return str(info["id"]) 110 return None 111 112 def handle_login_failure(self, reason: str) -> HttpResponse: 113 "Message user and redirect on error." 114 LOGGER.warning("Authentication Failure", reason=reason) 115 messages.error( 116 self.request, 117 _( 118 "Authentication failed: {reason}".format_map( 119 { 120 "reason": reason, 121 } 122 ) 123 ), 124 ) 125 return redirect(self.get_error_redirect(self.source, reason)) 126 127 128class OAuthSourceFlowManager(SourceFlowManager): 129 """Flow manager for oauth sources""" 130 131 user_connection_type = UserOAuthSourceConnection 132 group_connection_type = GroupOAuthSourceConnection 133 134 def update_user_connection( 135 self, 136 connection: UserOAuthSourceConnection, 137 access_token: str | None = None, 138 refresh_token: str | None = None, 139 expires_in: int | None = None, 140 **_, 141 ) -> UserOAuthSourceConnection: 142 """Set the access_token and refresh_token on the connection""" 143 connection.access_token = access_token 144 connection.refresh_token = refresh_token 145 connection.expires = now() + timedelta(seconds=expires_in) if expires_in else now() 146 return connection
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
class
OAuthCallback(authentik.sources.oauth.views.base.OAuthClientMixin, django.views.generic.base.View):
30class OAuthCallback(OAuthClientMixin, View): 31 "Base OAuth callback view." 32 33 source: OAuthSource 34 token: dict[str, Any] | None = None 35 36 def dispatch(self, request: HttpRequest, *_, **kwargs) -> HttpResponse: 37 """View Get handler""" 38 slug = kwargs.get("source_slug", "") 39 try: 40 self.source = OAuthSource.objects.get(slug=slug) 41 except OAuthSource.DoesNotExist: 42 raise Http404(f"Unknown OAuth source '{slug}'.") from None 43 44 if not self.source.enabled: 45 raise Http404(f"Source {slug} is not enabled.") 46 client = self.get_client(self.source, callback=self.get_callback_url(self.source)) 47 # Fetch access token 48 self.token = client.get_access_token() 49 if self.token is None: 50 return self.handle_login_failure("Could not retrieve token.") 51 if "error" in self.token: 52 return self.handle_login_failure(self.token["error"]) 53 # Fetch profile info 54 try: 55 res = self.redirect_flow_manager(client) 56 except ValueError as exc: 57 # if we're authenticated and not in a source stage and this new flag is enabled, 58 # just continue 59 if self.request.user.is_authenticated: 60 pass 61 return self.handle_login_failure(exc.args[0]) 62 return res 63 64 def redirect_flow_manager(self, client: BaseOAuthClient) -> HttpResponse: 65 try: 66 raw_info = client.get_profile_info(self.token) 67 if raw_info is None: 68 raise ValueError("Could not retrieve profile.") 69 except JSONDecodeError as exc: 70 Event.new( 71 EventAction.CONFIGURATION_ERROR, 72 message="Failed to JSON-decode profile.", 73 raw_profile=exc.doc, 74 ).from_http(self.request) 75 raise ValueError("Could not retrieve profile.") from None 76 identifier = self.get_user_id(info=raw_info) 77 if identifier is None: 78 raise ValueError("Could not determine id.") 79 sfm = OAuthSourceFlowManager( 80 source=self.source, 81 request=self.request, 82 identifier=identifier, 83 user_info={ 84 "info": raw_info, 85 "client": client, 86 "token": self.token, 87 }, 88 policy_context={ 89 "oauth_userinfo": raw_info, 90 }, 91 ) 92 return sfm.get_flow( 93 raw_info=raw_info, 94 access_token=self.token.get("access_token"), 95 refresh_token=self.token.get("refresh_token"), 96 expires=self.token.get("expires_in"), 97 ) 98 99 def get_callback_url(self, source: OAuthSource) -> str: 100 "Return callback url if different than the current url." 101 return "" 102 103 def get_error_redirect(self, source: OAuthSource, reason: str) -> str: 104 "Return url to redirect on login failure." 105 return settings.LOGIN_URL 106 107 def get_user_id(self, info: dict[str, Any]) -> str | None: 108 """Return unique identifier from the profile info.""" 109 if "id" in info: 110 return str(info["id"]) 111 return None 112 113 def handle_login_failure(self, reason: str) -> HttpResponse: 114 "Message user and redirect on error." 115 LOGGER.warning("Authentication Failure", reason=reason) 116 messages.error( 117 self.request, 118 _( 119 "Authentication failed: {reason}".format_map( 120 { 121 "reason": reason, 122 } 123 ) 124 ), 125 ) 126 return redirect(self.get_error_redirect(self.source, reason))
Base OAuth callback view.
def
dispatch( self, request: django.http.request.HttpRequest, *_, **kwargs) -> django.http.response.HttpResponse:
36 def dispatch(self, request: HttpRequest, *_, **kwargs) -> HttpResponse: 37 """View Get handler""" 38 slug = kwargs.get("source_slug", "") 39 try: 40 self.source = OAuthSource.objects.get(slug=slug) 41 except OAuthSource.DoesNotExist: 42 raise Http404(f"Unknown OAuth source '{slug}'.") from None 43 44 if not self.source.enabled: 45 raise Http404(f"Source {slug} is not enabled.") 46 client = self.get_client(self.source, callback=self.get_callback_url(self.source)) 47 # Fetch access token 48 self.token = client.get_access_token() 49 if self.token is None: 50 return self.handle_login_failure("Could not retrieve token.") 51 if "error" in self.token: 52 return self.handle_login_failure(self.token["error"]) 53 # Fetch profile info 54 try: 55 res = self.redirect_flow_manager(client) 56 except ValueError as exc: 57 # if we're authenticated and not in a source stage and this new flag is enabled, 58 # just continue 59 if self.request.user.is_authenticated: 60 pass 61 return self.handle_login_failure(exc.args[0]) 62 return res
View Get handler
def
redirect_flow_manager( self, client: authentik.sources.oauth.clients.base.BaseOAuthClient) -> django.http.response.HttpResponse:
64 def redirect_flow_manager(self, client: BaseOAuthClient) -> HttpResponse: 65 try: 66 raw_info = client.get_profile_info(self.token) 67 if raw_info is None: 68 raise ValueError("Could not retrieve profile.") 69 except JSONDecodeError as exc: 70 Event.new( 71 EventAction.CONFIGURATION_ERROR, 72 message="Failed to JSON-decode profile.", 73 raw_profile=exc.doc, 74 ).from_http(self.request) 75 raise ValueError("Could not retrieve profile.") from None 76 identifier = self.get_user_id(info=raw_info) 77 if identifier is None: 78 raise ValueError("Could not determine id.") 79 sfm = OAuthSourceFlowManager( 80 source=self.source, 81 request=self.request, 82 identifier=identifier, 83 user_info={ 84 "info": raw_info, 85 "client": client, 86 "token": self.token, 87 }, 88 policy_context={ 89 "oauth_userinfo": raw_info, 90 }, 91 ) 92 return sfm.get_flow( 93 raw_info=raw_info, 94 access_token=self.token.get("access_token"), 95 refresh_token=self.token.get("refresh_token"), 96 expires=self.token.get("expires_in"), 97 )
99 def get_callback_url(self, source: OAuthSource) -> str: 100 "Return callback url if different than the current url." 101 return ""
Return callback url if different than the current url.
def
get_error_redirect( self, source: authentik.sources.oauth.models.OAuthSource, reason: str) -> str:
103 def get_error_redirect(self, source: OAuthSource, reason: str) -> str: 104 "Return url to redirect on login failure." 105 return settings.LOGIN_URL
Return url to redirect on login failure.
def
get_user_id(self, info: dict[str, typing.Any]) -> str | None:
107 def get_user_id(self, info: dict[str, Any]) -> str | None: 108 """Return unique identifier from the profile info.""" 109 if "id" in info: 110 return str(info["id"]) 111 return None
Return unique identifier from the profile info.
def
handle_login_failure(self, reason: str) -> django.http.response.HttpResponse:
113 def handle_login_failure(self, reason: str) -> HttpResponse: 114 "Message user and redirect on error." 115 LOGGER.warning("Authentication Failure", reason=reason) 116 messages.error( 117 self.request, 118 _( 119 "Authentication failed: {reason}".format_map( 120 { 121 "reason": reason, 122 } 123 ) 124 ), 125 ) 126 return redirect(self.get_error_redirect(self.source, reason))
Message user and redirect on error.
Inherited Members
129class OAuthSourceFlowManager(SourceFlowManager): 130 """Flow manager for oauth sources""" 131 132 user_connection_type = UserOAuthSourceConnection 133 group_connection_type = GroupOAuthSourceConnection 134 135 def update_user_connection( 136 self, 137 connection: UserOAuthSourceConnection, 138 access_token: str | None = None, 139 refresh_token: str | None = None, 140 expires_in: int | None = None, 141 **_, 142 ) -> UserOAuthSourceConnection: 143 """Set the access_token and refresh_token on the connection""" 144 connection.access_token = access_token 145 connection.refresh_token = refresh_token 146 connection.expires = now() + timedelta(seconds=expires_in) if expires_in else now() 147 return connection
Flow manager for oauth sources
user_connection_type =
<class 'authentik.sources.oauth.models.UserOAuthSourceConnection'>
group_connection_type =
<class 'authentik.sources.oauth.models.GroupOAuthSourceConnection'>
def
update_user_connection( self, connection: authentik.sources.oauth.models.UserOAuthSourceConnection, access_token: str | None = None, refresh_token: str | None = None, expires_in: int | None = None, **_) -> authentik.sources.oauth.models.UserOAuthSourceConnection:
135 def update_user_connection( 136 self, 137 connection: UserOAuthSourceConnection, 138 access_token: str | None = None, 139 refresh_token: str | None = None, 140 expires_in: int | None = None, 141 **_, 142 ) -> UserOAuthSourceConnection: 143 """Set the access_token and refresh_token on the connection""" 144 connection.access_token = access_token 145 connection.refresh_token = refresh_token 146 connection.expires = now() + timedelta(seconds=expires_in) if expires_in else now() 147 return connection
Set the access_token and refresh_token on the connection