authentik.sources.oauth.views.callback

OAuth Callback Views

  1"""OAuth Callback Views"""
  2
  3from datetime import timedelta
  4from json import JSONDecodeError
  5from typing import Any
  6
  7from django.conf import settings
  8from django.contrib import messages
  9from django.http import Http404, HttpRequest, HttpResponse
 10from django.shortcuts import redirect
 11from django.utils.timezone import now
 12from django.utils.translation import gettext as _
 13from django.views.generic import View
 14from structlog.stdlib import get_logger
 15
 16from authentik.core.sources.flow_manager import SourceFlowManager
 17from authentik.events.models import Event, EventAction
 18from authentik.sources.oauth.models import (
 19    GroupOAuthSourceConnection,
 20    OAuthSource,
 21    UserOAuthSourceConnection,
 22)
 23from authentik.sources.oauth.views.base import OAuthClientMixin
 24
 25LOGGER = get_logger()
 26
 27
 28class OAuthCallback(OAuthClientMixin, View):
 29    "Base OAuth callback view."
 30
 31    source: OAuthSource
 32    token: dict | None = None
 33
 34    def dispatch(self, request: HttpRequest, *_, **kwargs) -> HttpResponse:
 35        """View Get handler"""
 36        slug = kwargs.get("source_slug", "")
 37        try:
 38            self.source = OAuthSource.objects.get(slug=slug)
 39        except OAuthSource.DoesNotExist:
 40            raise Http404(f"Unknown OAuth source '{slug}'.") from None
 41
 42        if not self.source.enabled:
 43            raise Http404(f"Source {slug} is not enabled.")
 44        client = self.get_client(self.source, callback=self.get_callback_url(self.source))
 45        # Fetch access token
 46        self.token = client.get_access_token()
 47        if self.token is None:
 48            return self.handle_login_failure("Could not retrieve token.")
 49        if "error" in self.token:
 50            return self.handle_login_failure(self.token["error"])
 51        # Fetch profile info
 52        try:
 53            raw_info = client.get_profile_info(self.token)
 54            if raw_info is None:
 55                return self.handle_login_failure("Could not retrieve profile.")
 56        except JSONDecodeError as exc:
 57            Event.new(
 58                EventAction.CONFIGURATION_ERROR,
 59                message="Failed to JSON-decode profile.",
 60                raw_profile=exc.doc,
 61            ).from_http(self.request)
 62            return self.handle_login_failure("Could not retrieve profile.")
 63        identifier = self.get_user_id(info=raw_info)
 64        if identifier is None:
 65            return self.handle_login_failure("Could not determine id.")
 66        sfm = OAuthSourceFlowManager(
 67            source=self.source,
 68            request=self.request,
 69            identifier=identifier,
 70            user_info={
 71                "info": raw_info,
 72                "client": client,
 73                "token": self.token,
 74            },
 75            policy_context={
 76                "oauth_userinfo": raw_info,
 77            },
 78        )
 79        return sfm.get_flow(
 80            raw_info=raw_info,
 81            access_token=self.token.get("access_token"),
 82            refresh_token=self.token.get("refresh_token"),
 83            expires=self.token.get("expires_in"),
 84        )
 85
 86    def get_callback_url(self, source: OAuthSource) -> str:
 87        "Return callback url if different than the current url."
 88        return ""
 89
 90    def get_error_redirect(self, source: OAuthSource, reason: str) -> str:
 91        "Return url to redirect on login failure."
 92        return settings.LOGIN_URL
 93
 94    def get_user_id(self, info: dict[str, Any]) -> str | None:
 95        """Return unique identifier from the profile info."""
 96        if "id" in info:
 97            return info["id"]
 98        return None
 99
100    def handle_login_failure(self, reason: str) -> HttpResponse:
101        "Message user and redirect on error."
102        LOGGER.warning("Authentication Failure", reason=reason)
103        messages.error(
104            self.request,
105            _(
106                "Authentication failed: {reason}".format_map(
107                    {
108                        "reason": reason,
109                    }
110                )
111            ),
112        )
113        return redirect(self.get_error_redirect(self.source, reason))
114
115
116class OAuthSourceFlowManager(SourceFlowManager):
117    """Flow manager for oauth sources"""
118
119    user_connection_type = UserOAuthSourceConnection
120    group_connection_type = GroupOAuthSourceConnection
121
122    def update_user_connection(
123        self,
124        connection: UserOAuthSourceConnection,
125        access_token: str | None = None,
126        refresh_token: str | None = None,
127        expires_in: int | None = None,
128        **_,
129    ) -> UserOAuthSourceConnection:
130        """Set the access_token and refresh_token on the connection"""
131        connection.access_token = access_token
132        connection.refresh_token = refresh_token
133        connection.expires = now() + timedelta(seconds=expires_in) if expires_in else now()
134        return connection
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
class OAuthCallback(authentik.sources.oauth.views.base.OAuthClientMixin, django.views.generic.base.View):
 29class OAuthCallback(OAuthClientMixin, View):
 30    "Base OAuth callback view."
 31
 32    source: OAuthSource
 33    token: dict | None = None
 34
 35    def dispatch(self, request: HttpRequest, *_, **kwargs) -> HttpResponse:
 36        """View Get handler"""
 37        slug = kwargs.get("source_slug", "")
 38        try:
 39            self.source = OAuthSource.objects.get(slug=slug)
 40        except OAuthSource.DoesNotExist:
 41            raise Http404(f"Unknown OAuth source '{slug}'.") from None
 42
 43        if not self.source.enabled:
 44            raise Http404(f"Source {slug} is not enabled.")
 45        client = self.get_client(self.source, callback=self.get_callback_url(self.source))
 46        # Fetch access token
 47        self.token = client.get_access_token()
 48        if self.token is None:
 49            return self.handle_login_failure("Could not retrieve token.")
 50        if "error" in self.token:
 51            return self.handle_login_failure(self.token["error"])
 52        # Fetch profile info
 53        try:
 54            raw_info = client.get_profile_info(self.token)
 55            if raw_info is None:
 56                return self.handle_login_failure("Could not retrieve profile.")
 57        except JSONDecodeError as exc:
 58            Event.new(
 59                EventAction.CONFIGURATION_ERROR,
 60                message="Failed to JSON-decode profile.",
 61                raw_profile=exc.doc,
 62            ).from_http(self.request)
 63            return self.handle_login_failure("Could not retrieve profile.")
 64        identifier = self.get_user_id(info=raw_info)
 65        if identifier is None:
 66            return self.handle_login_failure("Could not determine id.")
 67        sfm = OAuthSourceFlowManager(
 68            source=self.source,
 69            request=self.request,
 70            identifier=identifier,
 71            user_info={
 72                "info": raw_info,
 73                "client": client,
 74                "token": self.token,
 75            },
 76            policy_context={
 77                "oauth_userinfo": raw_info,
 78            },
 79        )
 80        return sfm.get_flow(
 81            raw_info=raw_info,
 82            access_token=self.token.get("access_token"),
 83            refresh_token=self.token.get("refresh_token"),
 84            expires=self.token.get("expires_in"),
 85        )
 86
 87    def get_callback_url(self, source: OAuthSource) -> str:
 88        "Return callback url if different than the current url."
 89        return ""
 90
 91    def get_error_redirect(self, source: OAuthSource, reason: str) -> str:
 92        "Return url to redirect on login failure."
 93        return settings.LOGIN_URL
 94
 95    def get_user_id(self, info: dict[str, Any]) -> str | None:
 96        """Return unique identifier from the profile info."""
 97        if "id" in info:
 98            return info["id"]
 99        return None
100
101    def handle_login_failure(self, reason: str) -> HttpResponse:
102        "Message user and redirect on error."
103        LOGGER.warning("Authentication Failure", reason=reason)
104        messages.error(
105            self.request,
106            _(
107                "Authentication failed: {reason}".format_map(
108                    {
109                        "reason": reason,
110                    }
111                )
112            ),
113        )
114        return redirect(self.get_error_redirect(self.source, reason))

Base OAuth callback view.

token: dict | None = None
def dispatch( self, request: django.http.request.HttpRequest, *_, **kwargs) -> django.http.response.HttpResponse:
35    def dispatch(self, request: HttpRequest, *_, **kwargs) -> HttpResponse:
36        """View Get handler"""
37        slug = kwargs.get("source_slug", "")
38        try:
39            self.source = OAuthSource.objects.get(slug=slug)
40        except OAuthSource.DoesNotExist:
41            raise Http404(f"Unknown OAuth source '{slug}'.") from None
42
43        if not self.source.enabled:
44            raise Http404(f"Source {slug} is not enabled.")
45        client = self.get_client(self.source, callback=self.get_callback_url(self.source))
46        # Fetch access token
47        self.token = client.get_access_token()
48        if self.token is None:
49            return self.handle_login_failure("Could not retrieve token.")
50        if "error" in self.token:
51            return self.handle_login_failure(self.token["error"])
52        # Fetch profile info
53        try:
54            raw_info = client.get_profile_info(self.token)
55            if raw_info is None:
56                return self.handle_login_failure("Could not retrieve profile.")
57        except JSONDecodeError as exc:
58            Event.new(
59                EventAction.CONFIGURATION_ERROR,
60                message="Failed to JSON-decode profile.",
61                raw_profile=exc.doc,
62            ).from_http(self.request)
63            return self.handle_login_failure("Could not retrieve profile.")
64        identifier = self.get_user_id(info=raw_info)
65        if identifier is None:
66            return self.handle_login_failure("Could not determine id.")
67        sfm = OAuthSourceFlowManager(
68            source=self.source,
69            request=self.request,
70            identifier=identifier,
71            user_info={
72                "info": raw_info,
73                "client": client,
74                "token": self.token,
75            },
76            policy_context={
77                "oauth_userinfo": raw_info,
78            },
79        )
80        return sfm.get_flow(
81            raw_info=raw_info,
82            access_token=self.token.get("access_token"),
83            refresh_token=self.token.get("refresh_token"),
84            expires=self.token.get("expires_in"),
85        )

View Get handler

def get_callback_url(self, source: authentik.sources.oauth.models.OAuthSource) -> str:
87    def get_callback_url(self, source: OAuthSource) -> str:
88        "Return callback url if different than the current url."
89        return ""

Return callback url if different than the current url.

def get_error_redirect( self, source: authentik.sources.oauth.models.OAuthSource, reason: str) -> str:
91    def get_error_redirect(self, source: OAuthSource, reason: str) -> str:
92        "Return url to redirect on login failure."
93        return settings.LOGIN_URL

Return url to redirect on login failure.

def get_user_id(self, info: dict[str, typing.Any]) -> str | None:
95    def get_user_id(self, info: dict[str, Any]) -> str | None:
96        """Return unique identifier from the profile info."""
97        if "id" in info:
98            return info["id"]
99        return None

Return unique identifier from the profile info.

def handle_login_failure(self, reason: str) -> django.http.response.HttpResponse:
101    def handle_login_failure(self, reason: str) -> HttpResponse:
102        "Message user and redirect on error."
103        LOGGER.warning("Authentication Failure", reason=reason)
104        messages.error(
105            self.request,
106            _(
107                "Authentication failed: {reason}".format_map(
108                    {
109                        "reason": reason,
110                    }
111                )
112            ),
113        )
114        return redirect(self.get_error_redirect(self.source, reason))

Message user and redirect on error.

class OAuthSourceFlowManager(authentik.core.sources.flow_manager.SourceFlowManager):
117class OAuthSourceFlowManager(SourceFlowManager):
118    """Flow manager for oauth sources"""
119
120    user_connection_type = UserOAuthSourceConnection
121    group_connection_type = GroupOAuthSourceConnection
122
123    def update_user_connection(
124        self,
125        connection: UserOAuthSourceConnection,
126        access_token: str | None = None,
127        refresh_token: str | None = None,
128        expires_in: int | None = None,
129        **_,
130    ) -> UserOAuthSourceConnection:
131        """Set the access_token and refresh_token on the connection"""
132        connection.access_token = access_token
133        connection.refresh_token = refresh_token
134        connection.expires = now() + timedelta(seconds=expires_in) if expires_in else now()
135        return connection

Flow manager for oauth sources

def update_user_connection( self, connection: authentik.sources.oauth.models.UserOAuthSourceConnection, access_token: str | None = None, refresh_token: str | None = None, expires_in: int | None = None, **_) -> authentik.sources.oauth.models.UserOAuthSourceConnection:
123    def update_user_connection(
124        self,
125        connection: UserOAuthSourceConnection,
126        access_token: str | None = None,
127        refresh_token: str | None = None,
128        expires_in: int | None = None,
129        **_,
130    ) -> UserOAuthSourceConnection:
131        """Set the access_token and refresh_token on the connection"""
132        connection.access_token = access_token
133        connection.refresh_token = refresh_token
134        connection.expires = now() + timedelta(seconds=expires_in) if expires_in else now()
135        return connection

Set the access_token and refresh_token on the connection