authentik.providers.oauth2.views.device_backchannel

Device flow views

 1"""Device flow views"""
 2
 3from urllib.parse import urlencode
 4
 5from django.http import HttpRequest, HttpResponse
 6from django.urls import reverse
 7from django.utils.decorators import method_decorator
 8from django.utils.timezone import now
 9from django.views import View
10from django.views.decorators.csrf import csrf_exempt
11from rest_framework.throttling import AnonRateThrottle
12from structlog.stdlib import get_logger
13
14from authentik.core.models import Application
15from authentik.lib.config import CONFIG
16from authentik.lib.utils.time import timedelta_from_string
17from authentik.providers.oauth2.errors import DeviceCodeError
18from authentik.providers.oauth2.models import DeviceToken, OAuth2Provider
19from authentik.providers.oauth2.utils import TokenResponse, extract_client_auth
20from authentik.providers.oauth2.views.device_init import QS_KEY_CODE
21
22LOGGER = get_logger()
23
24
25@method_decorator(csrf_exempt, name="dispatch")
26class DeviceView(View):
27    """Device flow, devices can request tokens which users can verify"""
28
29    client_id: str
30    provider: OAuth2Provider
31    scopes: list[str] = []
32
33    def parse_request(self):
34        """Parse incoming request"""
35        client_id, _ = extract_client_auth(self.request)
36        if not client_id:
37            raise DeviceCodeError("invalid_client")
38        provider = OAuth2Provider.objects.filter(client_id=client_id).first()
39        if not provider:
40            raise DeviceCodeError("invalid_client")
41        try:
42            _ = provider.application
43        except Application.DoesNotExist:
44            raise DeviceCodeError("invalid_client") from None
45        self.provider = provider
46        self.client_id = client_id
47        self.scopes = self.request.POST.get("scope", "").split(" ")
48
49    def dispatch(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
50        throttle = AnonRateThrottle()
51        throttle.rate = CONFIG.get("throttle.providers.oauth2.device", "20/hour")
52        throttle.num_requests, throttle.duration = throttle.parse_rate(throttle.rate)
53        if not throttle.allow_request(request, self):
54            return TokenResponse(DeviceCodeError("slow_down").create_dict(request), status=429)
55        return super().dispatch(request, *args, **kwargs)
56
57    def post(self, request: HttpRequest) -> HttpResponse:
58        """Generate device token"""
59        try:
60            self.parse_request()
61        except DeviceCodeError as exc:
62            return TokenResponse(exc.create_dict(request), status=400)
63        until = timedelta_from_string(self.provider.access_code_validity)
64        token: DeviceToken = DeviceToken.objects.create(
65            expires=now() + until, provider=self.provider, _scope=" ".join(self.scopes)
66        )
67        device_url = self.request.build_absolute_uri(
68            reverse("authentik_providers_oauth2_root:device-login")
69        )
70        return TokenResponse(
71            {
72                "device_code": token.device_code,
73                "verification_uri": device_url,
74                "verification_uri_complete": (
75                    device_url
76                    + "?"
77                    + urlencode(
78                        {
79                            QS_KEY_CODE: token.user_code,
80                        }
81                    )
82                ),
83                "user_code": token.user_code,
84                "expires_in": int(until.total_seconds()),
85                "interval": 5,
86            }
87        )
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
@method_decorator(csrf_exempt, name='dispatch')
class DeviceView(django.views.generic.base.View):
26@method_decorator(csrf_exempt, name="dispatch")
27class DeviceView(View):
28    """Device flow, devices can request tokens which users can verify"""
29
30    client_id: str
31    provider: OAuth2Provider
32    scopes: list[str] = []
33
34    def parse_request(self):
35        """Parse incoming request"""
36        client_id, _ = extract_client_auth(self.request)
37        if not client_id:
38            raise DeviceCodeError("invalid_client")
39        provider = OAuth2Provider.objects.filter(client_id=client_id).first()
40        if not provider:
41            raise DeviceCodeError("invalid_client")
42        try:
43            _ = provider.application
44        except Application.DoesNotExist:
45            raise DeviceCodeError("invalid_client") from None
46        self.provider = provider
47        self.client_id = client_id
48        self.scopes = self.request.POST.get("scope", "").split(" ")
49
50    def dispatch(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
51        throttle = AnonRateThrottle()
52        throttle.rate = CONFIG.get("throttle.providers.oauth2.device", "20/hour")
53        throttle.num_requests, throttle.duration = throttle.parse_rate(throttle.rate)
54        if not throttle.allow_request(request, self):
55            return TokenResponse(DeviceCodeError("slow_down").create_dict(request), status=429)
56        return super().dispatch(request, *args, **kwargs)
57
58    def post(self, request: HttpRequest) -> HttpResponse:
59        """Generate device token"""
60        try:
61            self.parse_request()
62        except DeviceCodeError as exc:
63            return TokenResponse(exc.create_dict(request), status=400)
64        until = timedelta_from_string(self.provider.access_code_validity)
65        token: DeviceToken = DeviceToken.objects.create(
66            expires=now() + until, provider=self.provider, _scope=" ".join(self.scopes)
67        )
68        device_url = self.request.build_absolute_uri(
69            reverse("authentik_providers_oauth2_root:device-login")
70        )
71        return TokenResponse(
72            {
73                "device_code": token.device_code,
74                "verification_uri": device_url,
75                "verification_uri_complete": (
76                    device_url
77                    + "?"
78                    + urlencode(
79                        {
80                            QS_KEY_CODE: token.user_code,
81                        }
82                    )
83                ),
84                "user_code": token.user_code,
85                "expires_in": int(until.total_seconds()),
86                "interval": 5,
87            }
88        )

Device flow, devices can request tokens which users can verify

client_id: str
scopes: list[str] = []
def parse_request(self):
34    def parse_request(self):
35        """Parse incoming request"""
36        client_id, _ = extract_client_auth(self.request)
37        if not client_id:
38            raise DeviceCodeError("invalid_client")
39        provider = OAuth2Provider.objects.filter(client_id=client_id).first()
40        if not provider:
41            raise DeviceCodeError("invalid_client")
42        try:
43            _ = provider.application
44        except Application.DoesNotExist:
45            raise DeviceCodeError("invalid_client") from None
46        self.provider = provider
47        self.client_id = client_id
48        self.scopes = self.request.POST.get("scope", "").split(" ")

Parse incoming request

def dispatch( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
50    def dispatch(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
51        throttle = AnonRateThrottle()
52        throttle.rate = CONFIG.get("throttle.providers.oauth2.device", "20/hour")
53        throttle.num_requests, throttle.duration = throttle.parse_rate(throttle.rate)
54        if not throttle.allow_request(request, self):
55            return TokenResponse(DeviceCodeError("slow_down").create_dict(request), status=429)
56        return super().dispatch(request, *args, **kwargs)
def post( self, request: django.http.request.HttpRequest) -> django.http.response.HttpResponse:
58    def post(self, request: HttpRequest) -> HttpResponse:
59        """Generate device token"""
60        try:
61            self.parse_request()
62        except DeviceCodeError as exc:
63            return TokenResponse(exc.create_dict(request), status=400)
64        until = timedelta_from_string(self.provider.access_code_validity)
65        token: DeviceToken = DeviceToken.objects.create(
66            expires=now() + until, provider=self.provider, _scope=" ".join(self.scopes)
67        )
68        device_url = self.request.build_absolute_uri(
69            reverse("authentik_providers_oauth2_root:device-login")
70        )
71        return TokenResponse(
72            {
73                "device_code": token.device_code,
74                "verification_uri": device_url,
75                "verification_uri_complete": (
76                    device_url
77                    + "?"
78                    + urlencode(
79                        {
80                            QS_KEY_CODE: token.user_code,
81                        }
82                    )
83                ),
84                "user_code": token.user_code,
85                "expires_in": int(until.total_seconds()),
86                "interval": 5,
87            }
88        )

Generate device token