authentik.providers.oauth2.views.device_backchannel
Device flow views
1"""Device flow views""" 2 3from urllib.parse import urlencode 4 5from django.http import HttpRequest, HttpResponse 6from django.urls import reverse 7from django.utils.decorators import method_decorator 8from django.utils.timezone import now 9from django.views import View 10from django.views.decorators.csrf import csrf_exempt 11from rest_framework.throttling import AnonRateThrottle 12from structlog.stdlib import get_logger 13 14from authentik.core.models import Application 15from authentik.lib.config import CONFIG 16from authentik.lib.utils.time import timedelta_from_string 17from authentik.providers.oauth2.errors import DeviceCodeError 18from authentik.providers.oauth2.models import DeviceToken, OAuth2Provider 19from authentik.providers.oauth2.utils import TokenResponse, extract_client_auth 20from authentik.providers.oauth2.views.device_init import QS_KEY_CODE 21 22LOGGER = get_logger() 23 24 25@method_decorator(csrf_exempt, name="dispatch") 26class DeviceView(View): 27 """Device flow, devices can request tokens which users can verify""" 28 29 client_id: str 30 provider: OAuth2Provider 31 scopes: list[str] = [] 32 33 def parse_request(self): 34 """Parse incoming request""" 35 client_id, _ = extract_client_auth(self.request) 36 if not client_id: 37 raise DeviceCodeError("invalid_client") 38 provider = OAuth2Provider.objects.filter(client_id=client_id).first() 39 if not provider: 40 raise DeviceCodeError("invalid_client") 41 try: 42 _ = provider.application 43 except Application.DoesNotExist: 44 raise DeviceCodeError("invalid_client") from None 45 self.provider = provider 46 self.client_id = client_id 47 self.scopes = self.request.POST.get("scope", "").split(" ") 48 49 def dispatch(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 50 throttle = AnonRateThrottle() 51 throttle.rate = CONFIG.get("throttle.providers.oauth2.device", "20/hour") 52 throttle.num_requests, throttle.duration = throttle.parse_rate(throttle.rate) 53 if not throttle.allow_request(request, self): 54 return TokenResponse(DeviceCodeError("slow_down").create_dict(request), status=429) 55 return super().dispatch(request, *args, **kwargs) 56 57 def post(self, request: HttpRequest) -> HttpResponse: 58 """Generate device token""" 59 try: 60 self.parse_request() 61 except DeviceCodeError as exc: 62 return TokenResponse(exc.create_dict(request), status=400) 63 until = timedelta_from_string(self.provider.access_code_validity) 64 token: DeviceToken = DeviceToken.objects.create( 65 expires=now() + until, provider=self.provider, _scope=" ".join(self.scopes) 66 ) 67 device_url = self.request.build_absolute_uri( 68 reverse("authentik_providers_oauth2_root:device-login") 69 ) 70 return TokenResponse( 71 { 72 "device_code": token.device_code, 73 "verification_uri": device_url, 74 "verification_uri_complete": ( 75 device_url 76 + "?" 77 + urlencode( 78 { 79 QS_KEY_CODE: token.user_code, 80 } 81 ) 82 ), 83 "user_code": token.user_code, 84 "expires_in": int(until.total_seconds()), 85 "interval": 5, 86 } 87 )
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
@method_decorator(csrf_exempt, name='dispatch')
class
DeviceView26@method_decorator(csrf_exempt, name="dispatch") 27class DeviceView(View): 28 """Device flow, devices can request tokens which users can verify""" 29 30 client_id: str 31 provider: OAuth2Provider 32 scopes: list[str] = [] 33 34 def parse_request(self): 35 """Parse incoming request""" 36 client_id, _ = extract_client_auth(self.request) 37 if not client_id: 38 raise DeviceCodeError("invalid_client") 39 provider = OAuth2Provider.objects.filter(client_id=client_id).first() 40 if not provider: 41 raise DeviceCodeError("invalid_client") 42 try: 43 _ = provider.application 44 except Application.DoesNotExist: 45 raise DeviceCodeError("invalid_client") from None 46 self.provider = provider 47 self.client_id = client_id 48 self.scopes = self.request.POST.get("scope", "").split(" ") 49 50 def dispatch(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 51 throttle = AnonRateThrottle() 52 throttle.rate = CONFIG.get("throttle.providers.oauth2.device", "20/hour") 53 throttle.num_requests, throttle.duration = throttle.parse_rate(throttle.rate) 54 if not throttle.allow_request(request, self): 55 return TokenResponse(DeviceCodeError("slow_down").create_dict(request), status=429) 56 return super().dispatch(request, *args, **kwargs) 57 58 def post(self, request: HttpRequest) -> HttpResponse: 59 """Generate device token""" 60 try: 61 self.parse_request() 62 except DeviceCodeError as exc: 63 return TokenResponse(exc.create_dict(request), status=400) 64 until = timedelta_from_string(self.provider.access_code_validity) 65 token: DeviceToken = DeviceToken.objects.create( 66 expires=now() + until, provider=self.provider, _scope=" ".join(self.scopes) 67 ) 68 device_url = self.request.build_absolute_uri( 69 reverse("authentik_providers_oauth2_root:device-login") 70 ) 71 return TokenResponse( 72 { 73 "device_code": token.device_code, 74 "verification_uri": device_url, 75 "verification_uri_complete": ( 76 device_url 77 + "?" 78 + urlencode( 79 { 80 QS_KEY_CODE: token.user_code, 81 } 82 ) 83 ), 84 "user_code": token.user_code, 85 "expires_in": int(until.total_seconds()), 86 "interval": 5, 87 } 88 )
Device flow, devices can request tokens which users can verify
def
parse_request(self):
34 def parse_request(self): 35 """Parse incoming request""" 36 client_id, _ = extract_client_auth(self.request) 37 if not client_id: 38 raise DeviceCodeError("invalid_client") 39 provider = OAuth2Provider.objects.filter(client_id=client_id).first() 40 if not provider: 41 raise DeviceCodeError("invalid_client") 42 try: 43 _ = provider.application 44 except Application.DoesNotExist: 45 raise DeviceCodeError("invalid_client") from None 46 self.provider = provider 47 self.client_id = client_id 48 self.scopes = self.request.POST.get("scope", "").split(" ")
Parse incoming request
def
dispatch( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
50 def dispatch(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 51 throttle = AnonRateThrottle() 52 throttle.rate = CONFIG.get("throttle.providers.oauth2.device", "20/hour") 53 throttle.num_requests, throttle.duration = throttle.parse_rate(throttle.rate) 54 if not throttle.allow_request(request, self): 55 return TokenResponse(DeviceCodeError("slow_down").create_dict(request), status=429) 56 return super().dispatch(request, *args, **kwargs)
def
post( self, request: django.http.request.HttpRequest) -> django.http.response.HttpResponse:
58 def post(self, request: HttpRequest) -> HttpResponse: 59 """Generate device token""" 60 try: 61 self.parse_request() 62 except DeviceCodeError as exc: 63 return TokenResponse(exc.create_dict(request), status=400) 64 until = timedelta_from_string(self.provider.access_code_validity) 65 token: DeviceToken = DeviceToken.objects.create( 66 expires=now() + until, provider=self.provider, _scope=" ".join(self.scopes) 67 ) 68 device_url = self.request.build_absolute_uri( 69 reverse("authentik_providers_oauth2_root:device-login") 70 ) 71 return TokenResponse( 72 { 73 "device_code": token.device_code, 74 "verification_uri": device_url, 75 "verification_uri_complete": ( 76 device_url 77 + "?" 78 + urlencode( 79 { 80 QS_KEY_CODE: token.user_code, 81 } 82 ) 83 ), 84 "user_code": token.user_code, 85 "expires_in": int(until.total_seconds()), 86 "interval": 5, 87 } 88 )
Generate device token