authentik.sources.oauth.views.callback

OAuth Callback Views

  1"""OAuth Callback Views"""
  2
  3from datetime import timedelta
  4from json import JSONDecodeError
  5from typing import Any
  6
  7from django.conf import settings
  8from django.contrib import messages
  9from django.http import Http404, HttpRequest, HttpResponse
 10from django.shortcuts import redirect
 11from django.utils.timezone import now
 12from django.utils.translation import gettext as _
 13from django.views.generic import View
 14from structlog.stdlib import get_logger
 15
 16from authentik.core.sources.flow_manager import SourceFlowManager
 17from authentik.events.models import Event, EventAction
 18from authentik.sources.oauth.clients.base import BaseOAuthClient
 19from authentik.sources.oauth.models import (
 20    GroupOAuthSourceConnection,
 21    OAuthSource,
 22    UserOAuthSourceConnection,
 23)
 24from authentik.sources.oauth.views.base import OAuthClientMixin
 25
 26LOGGER = get_logger()
 27
 28
 29class OAuthCallback(OAuthClientMixin, View):
 30    "Base OAuth callback view."
 31
 32    source: OAuthSource
 33    token: dict[str, Any] | None = None
 34
 35    def dispatch(self, request: HttpRequest, *_, **kwargs) -> HttpResponse:
 36        """View Get handler"""
 37        slug = kwargs.get("source_slug", "")
 38        try:
 39            self.source = OAuthSource.objects.get(slug=slug)
 40        except OAuthSource.DoesNotExist:
 41            raise Http404(f"Unknown OAuth source '{slug}'.") from None
 42
 43        if not self.source.enabled:
 44            raise Http404(f"Source {slug} is not enabled.")
 45        client = self.get_client(self.source, callback=self.get_callback_url(self.source))
 46        # Fetch access token
 47        self.token = client.get_access_token()
 48        if self.token is None:
 49            return self.handle_login_failure("Could not retrieve token.")
 50        if "error" in self.token:
 51            return self.handle_login_failure(self.token["error"])
 52        # Fetch profile info
 53        try:
 54            res = self.redirect_flow_manager(client)
 55        except ValueError as exc:
 56            # if we're authenticated and not in a source stage and this new flag is enabled,
 57            # just continue
 58            if self.request.user.is_authenticated:
 59                pass
 60            return self.handle_login_failure(exc.args[0])
 61        return res
 62
 63    def redirect_flow_manager(self, client: BaseOAuthClient) -> HttpResponse:
 64        try:
 65            raw_info = client.get_profile_info(self.token)
 66            if raw_info is None:
 67                raise ValueError("Could not retrieve profile.")
 68        except JSONDecodeError as exc:
 69            Event.new(
 70                EventAction.CONFIGURATION_ERROR,
 71                message="Failed to JSON-decode profile.",
 72                raw_profile=exc.doc,
 73            ).from_http(self.request)
 74            raise ValueError("Could not retrieve profile.") from None
 75        identifier = self.get_user_id(info=raw_info)
 76        if identifier is None:
 77            raise ValueError("Could not determine id.")
 78        sfm = OAuthSourceFlowManager(
 79            source=self.source,
 80            request=self.request,
 81            identifier=identifier,
 82            user_info={
 83                "info": raw_info,
 84                "client": client,
 85                "token": self.token,
 86            },
 87            policy_context={
 88                "oauth_userinfo": raw_info,
 89            },
 90        )
 91        return sfm.get_flow(
 92            raw_info=raw_info,
 93            access_token=self.token.get("access_token"),
 94            refresh_token=self.token.get("refresh_token"),
 95            expires=self.token.get("expires_in"),
 96        )
 97
 98    def get_callback_url(self, source: OAuthSource) -> str:
 99        "Return callback url if different than the current url."
100        return ""
101
102    def get_error_redirect(self, source: OAuthSource, reason: str) -> str:
103        "Return url to redirect on login failure."
104        return settings.LOGIN_URL
105
106    def get_user_id(self, info: dict[str, Any]) -> str | None:
107        """Return unique identifier from the profile info."""
108        if "id" in info:
109            return str(info["id"])
110        return None
111
112    def handle_login_failure(self, reason: str) -> HttpResponse:
113        "Message user and redirect on error."
114        LOGGER.warning("Authentication Failure", reason=reason)
115        messages.error(
116            self.request,
117            _(
118                "Authentication failed: {reason}".format_map(
119                    {
120                        "reason": reason,
121                    }
122                )
123            ),
124        )
125        return redirect(self.get_error_redirect(self.source, reason))
126
127
128class OAuthSourceFlowManager(SourceFlowManager):
129    """Flow manager for oauth sources"""
130
131    user_connection_type = UserOAuthSourceConnection
132    group_connection_type = GroupOAuthSourceConnection
133
134    def update_user_connection(
135        self,
136        connection: UserOAuthSourceConnection,
137        access_token: str | None = None,
138        refresh_token: str | None = None,
139        expires_in: int | None = None,
140        **_,
141    ) -> UserOAuthSourceConnection:
142        """Set the access_token and refresh_token on the connection"""
143        connection.access_token = access_token
144        connection.refresh_token = refresh_token
145        connection.expires = now() + timedelta(seconds=expires_in) if expires_in else now()
146        return connection
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
class OAuthCallback(authentik.sources.oauth.views.base.OAuthClientMixin, django.views.generic.base.View):
 30class OAuthCallback(OAuthClientMixin, View):
 31    "Base OAuth callback view."
 32
 33    source: OAuthSource
 34    token: dict[str, Any] | None = None
 35
 36    def dispatch(self, request: HttpRequest, *_, **kwargs) -> HttpResponse:
 37        """View Get handler"""
 38        slug = kwargs.get("source_slug", "")
 39        try:
 40            self.source = OAuthSource.objects.get(slug=slug)
 41        except OAuthSource.DoesNotExist:
 42            raise Http404(f"Unknown OAuth source '{slug}'.") from None
 43
 44        if not self.source.enabled:
 45            raise Http404(f"Source {slug} is not enabled.")
 46        client = self.get_client(self.source, callback=self.get_callback_url(self.source))
 47        # Fetch access token
 48        self.token = client.get_access_token()
 49        if self.token is None:
 50            return self.handle_login_failure("Could not retrieve token.")
 51        if "error" in self.token:
 52            return self.handle_login_failure(self.token["error"])
 53        # Fetch profile info
 54        try:
 55            res = self.redirect_flow_manager(client)
 56        except ValueError as exc:
 57            # if we're authenticated and not in a source stage and this new flag is enabled,
 58            # just continue
 59            if self.request.user.is_authenticated:
 60                pass
 61            return self.handle_login_failure(exc.args[0])
 62        return res
 63
 64    def redirect_flow_manager(self, client: BaseOAuthClient) -> HttpResponse:
 65        try:
 66            raw_info = client.get_profile_info(self.token)
 67            if raw_info is None:
 68                raise ValueError("Could not retrieve profile.")
 69        except JSONDecodeError as exc:
 70            Event.new(
 71                EventAction.CONFIGURATION_ERROR,
 72                message="Failed to JSON-decode profile.",
 73                raw_profile=exc.doc,
 74            ).from_http(self.request)
 75            raise ValueError("Could not retrieve profile.") from None
 76        identifier = self.get_user_id(info=raw_info)
 77        if identifier is None:
 78            raise ValueError("Could not determine id.")
 79        sfm = OAuthSourceFlowManager(
 80            source=self.source,
 81            request=self.request,
 82            identifier=identifier,
 83            user_info={
 84                "info": raw_info,
 85                "client": client,
 86                "token": self.token,
 87            },
 88            policy_context={
 89                "oauth_userinfo": raw_info,
 90            },
 91        )
 92        return sfm.get_flow(
 93            raw_info=raw_info,
 94            access_token=self.token.get("access_token"),
 95            refresh_token=self.token.get("refresh_token"),
 96            expires=self.token.get("expires_in"),
 97        )
 98
 99    def get_callback_url(self, source: OAuthSource) -> str:
100        "Return callback url if different than the current url."
101        return ""
102
103    def get_error_redirect(self, source: OAuthSource, reason: str) -> str:
104        "Return url to redirect on login failure."
105        return settings.LOGIN_URL
106
107    def get_user_id(self, info: dict[str, Any]) -> str | None:
108        """Return unique identifier from the profile info."""
109        if "id" in info:
110            return str(info["id"])
111        return None
112
113    def handle_login_failure(self, reason: str) -> HttpResponse:
114        "Message user and redirect on error."
115        LOGGER.warning("Authentication Failure", reason=reason)
116        messages.error(
117            self.request,
118            _(
119                "Authentication failed: {reason}".format_map(
120                    {
121                        "reason": reason,
122                    }
123                )
124            ),
125        )
126        return redirect(self.get_error_redirect(self.source, reason))

Base OAuth callback view.

token: dict[str, Any] | None = None
def dispatch( self, request: django.http.request.HttpRequest, *_, **kwargs) -> django.http.response.HttpResponse:
36    def dispatch(self, request: HttpRequest, *_, **kwargs) -> HttpResponse:
37        """View Get handler"""
38        slug = kwargs.get("source_slug", "")
39        try:
40            self.source = OAuthSource.objects.get(slug=slug)
41        except OAuthSource.DoesNotExist:
42            raise Http404(f"Unknown OAuth source '{slug}'.") from None
43
44        if not self.source.enabled:
45            raise Http404(f"Source {slug} is not enabled.")
46        client = self.get_client(self.source, callback=self.get_callback_url(self.source))
47        # Fetch access token
48        self.token = client.get_access_token()
49        if self.token is None:
50            return self.handle_login_failure("Could not retrieve token.")
51        if "error" in self.token:
52            return self.handle_login_failure(self.token["error"])
53        # Fetch profile info
54        try:
55            res = self.redirect_flow_manager(client)
56        except ValueError as exc:
57            # if we're authenticated and not in a source stage and this new flag is enabled,
58            # just continue
59            if self.request.user.is_authenticated:
60                pass
61            return self.handle_login_failure(exc.args[0])
62        return res

View Get handler

def redirect_flow_manager( self, client: authentik.sources.oauth.clients.base.BaseOAuthClient) -> django.http.response.HttpResponse:
64    def redirect_flow_manager(self, client: BaseOAuthClient) -> HttpResponse:
65        try:
66            raw_info = client.get_profile_info(self.token)
67            if raw_info is None:
68                raise ValueError("Could not retrieve profile.")
69        except JSONDecodeError as exc:
70            Event.new(
71                EventAction.CONFIGURATION_ERROR,
72                message="Failed to JSON-decode profile.",
73                raw_profile=exc.doc,
74            ).from_http(self.request)
75            raise ValueError("Could not retrieve profile.") from None
76        identifier = self.get_user_id(info=raw_info)
77        if identifier is None:
78            raise ValueError("Could not determine id.")
79        sfm = OAuthSourceFlowManager(
80            source=self.source,
81            request=self.request,
82            identifier=identifier,
83            user_info={
84                "info": raw_info,
85                "client": client,
86                "token": self.token,
87            },
88            policy_context={
89                "oauth_userinfo": raw_info,
90            },
91        )
92        return sfm.get_flow(
93            raw_info=raw_info,
94            access_token=self.token.get("access_token"),
95            refresh_token=self.token.get("refresh_token"),
96            expires=self.token.get("expires_in"),
97        )
def get_callback_url(self, source: authentik.sources.oauth.models.OAuthSource) -> str:
 99    def get_callback_url(self, source: OAuthSource) -> str:
100        "Return callback url if different than the current url."
101        return ""

Return callback url if different than the current url.

def get_error_redirect( self, source: authentik.sources.oauth.models.OAuthSource, reason: str) -> str:
103    def get_error_redirect(self, source: OAuthSource, reason: str) -> str:
104        "Return url to redirect on login failure."
105        return settings.LOGIN_URL

Return url to redirect on login failure.

def get_user_id(self, info: dict[str, typing.Any]) -> str | None:
107    def get_user_id(self, info: dict[str, Any]) -> str | None:
108        """Return unique identifier from the profile info."""
109        if "id" in info:
110            return str(info["id"])
111        return None

Return unique identifier from the profile info.

def handle_login_failure(self, reason: str) -> django.http.response.HttpResponse:
113    def handle_login_failure(self, reason: str) -> HttpResponse:
114        "Message user and redirect on error."
115        LOGGER.warning("Authentication Failure", reason=reason)
116        messages.error(
117            self.request,
118            _(
119                "Authentication failed: {reason}".format_map(
120                    {
121                        "reason": reason,
122                    }
123                )
124            ),
125        )
126        return redirect(self.get_error_redirect(self.source, reason))

Message user and redirect on error.

class OAuthSourceFlowManager(authentik.core.sources.flow_manager.SourceFlowManager):
129class OAuthSourceFlowManager(SourceFlowManager):
130    """Flow manager for oauth sources"""
131
132    user_connection_type = UserOAuthSourceConnection
133    group_connection_type = GroupOAuthSourceConnection
134
135    def update_user_connection(
136        self,
137        connection: UserOAuthSourceConnection,
138        access_token: str | None = None,
139        refresh_token: str | None = None,
140        expires_in: int | None = None,
141        **_,
142    ) -> UserOAuthSourceConnection:
143        """Set the access_token and refresh_token on the connection"""
144        connection.access_token = access_token
145        connection.refresh_token = refresh_token
146        connection.expires = now() + timedelta(seconds=expires_in) if expires_in else now()
147        return connection

Flow manager for oauth sources

def update_user_connection( self, connection: authentik.sources.oauth.models.UserOAuthSourceConnection, access_token: str | None = None, refresh_token: str | None = None, expires_in: int | None = None, **_) -> authentik.sources.oauth.models.UserOAuthSourceConnection:
135    def update_user_connection(
136        self,
137        connection: UserOAuthSourceConnection,
138        access_token: str | None = None,
139        refresh_token: str | None = None,
140        expires_in: int | None = None,
141        **_,
142    ) -> UserOAuthSourceConnection:
143        """Set the access_token and refresh_token on the connection"""
144        connection.access_token = access_token
145        connection.refresh_token = refresh_token
146        connection.expires = now() + timedelta(seconds=expires_in) if expires_in else now()
147        return connection

Set the access_token and refresh_token on the connection