authentik.providers.oauth2.views.device_backchannel

Device flow views

  1"""Device flow views"""
  2
  3from urllib.parse import urlencode
  4
  5from django.http import HttpRequest, HttpResponse
  6from django.urls import reverse
  7from django.utils.decorators import method_decorator
  8from django.utils.timezone import now
  9from django.views import View
 10from django.views.decorators.csrf import csrf_exempt
 11from rest_framework.throttling import AnonRateThrottle
 12from structlog.stdlib import get_logger
 13
 14from authentik.core.models import Application
 15from authentik.lib.config import CONFIG
 16from authentik.lib.utils.time import timedelta_from_string
 17from authentik.providers.oauth2.errors import DeviceCodeError
 18from authentik.providers.oauth2.models import DeviceToken, GrantType, OAuth2Provider, ScopeMapping
 19from authentik.providers.oauth2.utils import TokenResponse, extract_client_auth
 20from authentik.providers.oauth2.views.device_init import QS_KEY_CODE
 21
 22LOGGER = get_logger()
 23
 24
 25@method_decorator(csrf_exempt, name="dispatch")
 26class DeviceView(View):
 27    """Device flow, devices can request tokens which users can verify"""
 28
 29    client_id: str
 30    provider: OAuth2Provider
 31    scopes: set[str] = []
 32
 33    def parse_request(self):
 34        """Parse incoming request"""
 35        client_id, _ = extract_client_auth(self.request)
 36        if not client_id:
 37            raise DeviceCodeError("invalid_client")
 38        provider = OAuth2Provider.objects.filter(client_id=client_id).first()
 39        if not provider:
 40            raise DeviceCodeError("invalid_client")
 41        try:
 42            _ = provider.application
 43        except Application.DoesNotExist:
 44            raise DeviceCodeError("invalid_client") from None
 45        if GrantType.DEVICE_CODE not in provider.grant_types:
 46            raise DeviceCodeError("invalid_client")
 47        self.provider = provider
 48        self.client_id = client_id
 49
 50        scopes_to_check = set(self.request.POST.get("scope", "").split())
 51        default_scope_names = set(
 52            ScopeMapping.objects.filter(provider__in=[self.provider]).values_list(
 53                "scope_name", flat=True
 54            )
 55        )
 56        self.scopes = scopes_to_check
 57        if not scopes_to_check.issubset(default_scope_names):
 58            LOGGER.info(
 59                "Application requested scopes not configured, setting to overlap",
 60                scope_allowed=default_scope_names,
 61                scope_given=self.scopes,
 62            )
 63            self.scopes = self.scopes.intersection(default_scope_names)
 64
 65    def dispatch(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
 66        throttle = AnonRateThrottle()
 67        throttle.rate = CONFIG.get("throttle.providers.oauth2.device", "20/hour")
 68        throttle.num_requests, throttle.duration = throttle.parse_rate(throttle.rate)
 69        if not throttle.allow_request(request, self):
 70            return TokenResponse(DeviceCodeError("slow_down").create_dict(request), status=429)
 71        return super().dispatch(request, *args, **kwargs)
 72
 73    def post(self, request: HttpRequest) -> HttpResponse:
 74        """Generate device token"""
 75        try:
 76            self.parse_request()
 77        except DeviceCodeError as exc:
 78            return TokenResponse(exc.create_dict(request), status=400)
 79        until = timedelta_from_string(self.provider.access_code_validity)
 80        token: DeviceToken = DeviceToken.objects.create(
 81            expires=now() + until, provider=self.provider, _scope=" ".join(self.scopes)
 82        )
 83        device_url = self.request.build_absolute_uri(
 84            reverse("authentik_providers_oauth2_root:device-login")
 85        )
 86        return TokenResponse(
 87            {
 88                "device_code": token.device_code,
 89                "verification_uri": device_url,
 90                "verification_uri_complete": (
 91                    device_url
 92                    + "?"
 93                    + urlencode(
 94                        {
 95                            QS_KEY_CODE: token.user_code,
 96                        }
 97                    )
 98                ),
 99                "user_code": token.user_code,
100                "expires_in": int(until.total_seconds()),
101                "interval": 5,
102            }
103        )
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
@method_decorator(csrf_exempt, name='dispatch')
class DeviceView(django.views.generic.base.View):
 26@method_decorator(csrf_exempt, name="dispatch")
 27class DeviceView(View):
 28    """Device flow, devices can request tokens which users can verify"""
 29
 30    client_id: str
 31    provider: OAuth2Provider
 32    scopes: set[str] = []
 33
 34    def parse_request(self):
 35        """Parse incoming request"""
 36        client_id, _ = extract_client_auth(self.request)
 37        if not client_id:
 38            raise DeviceCodeError("invalid_client")
 39        provider = OAuth2Provider.objects.filter(client_id=client_id).first()
 40        if not provider:
 41            raise DeviceCodeError("invalid_client")
 42        try:
 43            _ = provider.application
 44        except Application.DoesNotExist:
 45            raise DeviceCodeError("invalid_client") from None
 46        if GrantType.DEVICE_CODE not in provider.grant_types:
 47            raise DeviceCodeError("invalid_client")
 48        self.provider = provider
 49        self.client_id = client_id
 50
 51        scopes_to_check = set(self.request.POST.get("scope", "").split())
 52        default_scope_names = set(
 53            ScopeMapping.objects.filter(provider__in=[self.provider]).values_list(
 54                "scope_name", flat=True
 55            )
 56        )
 57        self.scopes = scopes_to_check
 58        if not scopes_to_check.issubset(default_scope_names):
 59            LOGGER.info(
 60                "Application requested scopes not configured, setting to overlap",
 61                scope_allowed=default_scope_names,
 62                scope_given=self.scopes,
 63            )
 64            self.scopes = self.scopes.intersection(default_scope_names)
 65
 66    def dispatch(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
 67        throttle = AnonRateThrottle()
 68        throttle.rate = CONFIG.get("throttle.providers.oauth2.device", "20/hour")
 69        throttle.num_requests, throttle.duration = throttle.parse_rate(throttle.rate)
 70        if not throttle.allow_request(request, self):
 71            return TokenResponse(DeviceCodeError("slow_down").create_dict(request), status=429)
 72        return super().dispatch(request, *args, **kwargs)
 73
 74    def post(self, request: HttpRequest) -> HttpResponse:
 75        """Generate device token"""
 76        try:
 77            self.parse_request()
 78        except DeviceCodeError as exc:
 79            return TokenResponse(exc.create_dict(request), status=400)
 80        until = timedelta_from_string(self.provider.access_code_validity)
 81        token: DeviceToken = DeviceToken.objects.create(
 82            expires=now() + until, provider=self.provider, _scope=" ".join(self.scopes)
 83        )
 84        device_url = self.request.build_absolute_uri(
 85            reverse("authentik_providers_oauth2_root:device-login")
 86        )
 87        return TokenResponse(
 88            {
 89                "device_code": token.device_code,
 90                "verification_uri": device_url,
 91                "verification_uri_complete": (
 92                    device_url
 93                    + "?"
 94                    + urlencode(
 95                        {
 96                            QS_KEY_CODE: token.user_code,
 97                        }
 98                    )
 99                ),
100                "user_code": token.user_code,
101                "expires_in": int(until.total_seconds()),
102                "interval": 5,
103            }
104        )

Device flow, devices can request tokens which users can verify

client_id: str
scopes: set[str] = []
def parse_request(self):
34    def parse_request(self):
35        """Parse incoming request"""
36        client_id, _ = extract_client_auth(self.request)
37        if not client_id:
38            raise DeviceCodeError("invalid_client")
39        provider = OAuth2Provider.objects.filter(client_id=client_id).first()
40        if not provider:
41            raise DeviceCodeError("invalid_client")
42        try:
43            _ = provider.application
44        except Application.DoesNotExist:
45            raise DeviceCodeError("invalid_client") from None
46        if GrantType.DEVICE_CODE not in provider.grant_types:
47            raise DeviceCodeError("invalid_client")
48        self.provider = provider
49        self.client_id = client_id
50
51        scopes_to_check = set(self.request.POST.get("scope", "").split())
52        default_scope_names = set(
53            ScopeMapping.objects.filter(provider__in=[self.provider]).values_list(
54                "scope_name", flat=True
55            )
56        )
57        self.scopes = scopes_to_check
58        if not scopes_to_check.issubset(default_scope_names):
59            LOGGER.info(
60                "Application requested scopes not configured, setting to overlap",
61                scope_allowed=default_scope_names,
62                scope_given=self.scopes,
63            )
64            self.scopes = self.scopes.intersection(default_scope_names)

Parse incoming request

def dispatch( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
66    def dispatch(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
67        throttle = AnonRateThrottle()
68        throttle.rate = CONFIG.get("throttle.providers.oauth2.device", "20/hour")
69        throttle.num_requests, throttle.duration = throttle.parse_rate(throttle.rate)
70        if not throttle.allow_request(request, self):
71            return TokenResponse(DeviceCodeError("slow_down").create_dict(request), status=429)
72        return super().dispatch(request, *args, **kwargs)
def post( self, request: django.http.request.HttpRequest) -> django.http.response.HttpResponse:
 74    def post(self, request: HttpRequest) -> HttpResponse:
 75        """Generate device token"""
 76        try:
 77            self.parse_request()
 78        except DeviceCodeError as exc:
 79            return TokenResponse(exc.create_dict(request), status=400)
 80        until = timedelta_from_string(self.provider.access_code_validity)
 81        token: DeviceToken = DeviceToken.objects.create(
 82            expires=now() + until, provider=self.provider, _scope=" ".join(self.scopes)
 83        )
 84        device_url = self.request.build_absolute_uri(
 85            reverse("authentik_providers_oauth2_root:device-login")
 86        )
 87        return TokenResponse(
 88            {
 89                "device_code": token.device_code,
 90                "verification_uri": device_url,
 91                "verification_uri_complete": (
 92                    device_url
 93                    + "?"
 94                    + urlencode(
 95                        {
 96                            QS_KEY_CODE: token.user_code,
 97                        }
 98                    )
 99                ),
100                "user_code": token.user_code,
101                "expires_in": int(until.total_seconds()),
102                "interval": 5,
103            }
104        )

Generate device token