authentik.providers.oauth2.views.device_backchannel
Device flow views
1"""Device flow views""" 2 3from urllib.parse import urlencode 4 5from django.http import HttpRequest, HttpResponse 6from django.urls import reverse 7from django.utils.decorators import method_decorator 8from django.utils.timezone import now 9from django.views import View 10from django.views.decorators.csrf import csrf_exempt 11from rest_framework.throttling import AnonRateThrottle 12from structlog.stdlib import get_logger 13 14from authentik.core.models import Application 15from authentik.lib.config import CONFIG 16from authentik.lib.utils.time import timedelta_from_string 17from authentik.providers.oauth2.errors import DeviceCodeError 18from authentik.providers.oauth2.models import DeviceToken, GrantType, OAuth2Provider, ScopeMapping 19from authentik.providers.oauth2.utils import TokenResponse, extract_client_auth 20from authentik.providers.oauth2.views.device_init import QS_KEY_CODE 21 22LOGGER = get_logger() 23 24 25@method_decorator(csrf_exempt, name="dispatch") 26class DeviceView(View): 27 """Device flow, devices can request tokens which users can verify""" 28 29 client_id: str 30 provider: OAuth2Provider 31 scopes: set[str] = [] 32 33 def parse_request(self): 34 """Parse incoming request""" 35 client_id, _ = extract_client_auth(self.request) 36 if not client_id: 37 raise DeviceCodeError("invalid_client") 38 provider = OAuth2Provider.objects.filter(client_id=client_id).first() 39 if not provider: 40 raise DeviceCodeError("invalid_client") 41 try: 42 _ = provider.application 43 except Application.DoesNotExist: 44 raise DeviceCodeError("invalid_client") from None 45 if GrantType.DEVICE_CODE not in provider.grant_types: 46 raise DeviceCodeError("invalid_client") 47 self.provider = provider 48 self.client_id = client_id 49 50 scopes_to_check = set(self.request.POST.get("scope", "").split()) 51 default_scope_names = set( 52 ScopeMapping.objects.filter(provider__in=[self.provider]).values_list( 53 "scope_name", flat=True 54 ) 55 ) 56 self.scopes = scopes_to_check 57 if not scopes_to_check.issubset(default_scope_names): 58 LOGGER.info( 59 "Application requested scopes not configured, setting to overlap", 60 scope_allowed=default_scope_names, 61 scope_given=self.scopes, 62 ) 63 self.scopes = self.scopes.intersection(default_scope_names) 64 65 def dispatch(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 66 throttle = AnonRateThrottle() 67 throttle.rate = CONFIG.get("throttle.providers.oauth2.device", "20/hour") 68 throttle.num_requests, throttle.duration = throttle.parse_rate(throttle.rate) 69 if not throttle.allow_request(request, self): 70 return TokenResponse(DeviceCodeError("slow_down").create_dict(request), status=429) 71 return super().dispatch(request, *args, **kwargs) 72 73 def post(self, request: HttpRequest) -> HttpResponse: 74 """Generate device token""" 75 try: 76 self.parse_request() 77 except DeviceCodeError as exc: 78 return TokenResponse(exc.create_dict(request), status=400) 79 until = timedelta_from_string(self.provider.access_code_validity) 80 token: DeviceToken = DeviceToken.objects.create( 81 expires=now() + until, provider=self.provider, _scope=" ".join(self.scopes) 82 ) 83 device_url = self.request.build_absolute_uri( 84 reverse("authentik_providers_oauth2_root:device-login") 85 ) 86 return TokenResponse( 87 { 88 "device_code": token.device_code, 89 "verification_uri": device_url, 90 "verification_uri_complete": ( 91 device_url 92 + "?" 93 + urlencode( 94 { 95 QS_KEY_CODE: token.user_code, 96 } 97 ) 98 ), 99 "user_code": token.user_code, 100 "expires_in": int(until.total_seconds()), 101 "interval": 5, 102 } 103 )
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
@method_decorator(csrf_exempt, name='dispatch')
class
DeviceView26@method_decorator(csrf_exempt, name="dispatch") 27class DeviceView(View): 28 """Device flow, devices can request tokens which users can verify""" 29 30 client_id: str 31 provider: OAuth2Provider 32 scopes: set[str] = [] 33 34 def parse_request(self): 35 """Parse incoming request""" 36 client_id, _ = extract_client_auth(self.request) 37 if not client_id: 38 raise DeviceCodeError("invalid_client") 39 provider = OAuth2Provider.objects.filter(client_id=client_id).first() 40 if not provider: 41 raise DeviceCodeError("invalid_client") 42 try: 43 _ = provider.application 44 except Application.DoesNotExist: 45 raise DeviceCodeError("invalid_client") from None 46 if GrantType.DEVICE_CODE not in provider.grant_types: 47 raise DeviceCodeError("invalid_client") 48 self.provider = provider 49 self.client_id = client_id 50 51 scopes_to_check = set(self.request.POST.get("scope", "").split()) 52 default_scope_names = set( 53 ScopeMapping.objects.filter(provider__in=[self.provider]).values_list( 54 "scope_name", flat=True 55 ) 56 ) 57 self.scopes = scopes_to_check 58 if not scopes_to_check.issubset(default_scope_names): 59 LOGGER.info( 60 "Application requested scopes not configured, setting to overlap", 61 scope_allowed=default_scope_names, 62 scope_given=self.scopes, 63 ) 64 self.scopes = self.scopes.intersection(default_scope_names) 65 66 def dispatch(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 67 throttle = AnonRateThrottle() 68 throttle.rate = CONFIG.get("throttle.providers.oauth2.device", "20/hour") 69 throttle.num_requests, throttle.duration = throttle.parse_rate(throttle.rate) 70 if not throttle.allow_request(request, self): 71 return TokenResponse(DeviceCodeError("slow_down").create_dict(request), status=429) 72 return super().dispatch(request, *args, **kwargs) 73 74 def post(self, request: HttpRequest) -> HttpResponse: 75 """Generate device token""" 76 try: 77 self.parse_request() 78 except DeviceCodeError as exc: 79 return TokenResponse(exc.create_dict(request), status=400) 80 until = timedelta_from_string(self.provider.access_code_validity) 81 token: DeviceToken = DeviceToken.objects.create( 82 expires=now() + until, provider=self.provider, _scope=" ".join(self.scopes) 83 ) 84 device_url = self.request.build_absolute_uri( 85 reverse("authentik_providers_oauth2_root:device-login") 86 ) 87 return TokenResponse( 88 { 89 "device_code": token.device_code, 90 "verification_uri": device_url, 91 "verification_uri_complete": ( 92 device_url 93 + "?" 94 + urlencode( 95 { 96 QS_KEY_CODE: token.user_code, 97 } 98 ) 99 ), 100 "user_code": token.user_code, 101 "expires_in": int(until.total_seconds()), 102 "interval": 5, 103 } 104 )
Device flow, devices can request tokens which users can verify
def
parse_request(self):
34 def parse_request(self): 35 """Parse incoming request""" 36 client_id, _ = extract_client_auth(self.request) 37 if not client_id: 38 raise DeviceCodeError("invalid_client") 39 provider = OAuth2Provider.objects.filter(client_id=client_id).first() 40 if not provider: 41 raise DeviceCodeError("invalid_client") 42 try: 43 _ = provider.application 44 except Application.DoesNotExist: 45 raise DeviceCodeError("invalid_client") from None 46 if GrantType.DEVICE_CODE not in provider.grant_types: 47 raise DeviceCodeError("invalid_client") 48 self.provider = provider 49 self.client_id = client_id 50 51 scopes_to_check = set(self.request.POST.get("scope", "").split()) 52 default_scope_names = set( 53 ScopeMapping.objects.filter(provider__in=[self.provider]).values_list( 54 "scope_name", flat=True 55 ) 56 ) 57 self.scopes = scopes_to_check 58 if not scopes_to_check.issubset(default_scope_names): 59 LOGGER.info( 60 "Application requested scopes not configured, setting to overlap", 61 scope_allowed=default_scope_names, 62 scope_given=self.scopes, 63 ) 64 self.scopes = self.scopes.intersection(default_scope_names)
Parse incoming request
def
dispatch( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
66 def dispatch(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 67 throttle = AnonRateThrottle() 68 throttle.rate = CONFIG.get("throttle.providers.oauth2.device", "20/hour") 69 throttle.num_requests, throttle.duration = throttle.parse_rate(throttle.rate) 70 if not throttle.allow_request(request, self): 71 return TokenResponse(DeviceCodeError("slow_down").create_dict(request), status=429) 72 return super().dispatch(request, *args, **kwargs)
def
post( self, request: django.http.request.HttpRequest) -> django.http.response.HttpResponse:
74 def post(self, request: HttpRequest) -> HttpResponse: 75 """Generate device token""" 76 try: 77 self.parse_request() 78 except DeviceCodeError as exc: 79 return TokenResponse(exc.create_dict(request), status=400) 80 until = timedelta_from_string(self.provider.access_code_validity) 81 token: DeviceToken = DeviceToken.objects.create( 82 expires=now() + until, provider=self.provider, _scope=" ".join(self.scopes) 83 ) 84 device_url = self.request.build_absolute_uri( 85 reverse("authentik_providers_oauth2_root:device-login") 86 ) 87 return TokenResponse( 88 { 89 "device_code": token.device_code, 90 "verification_uri": device_url, 91 "verification_uri_complete": ( 92 device_url 93 + "?" 94 + urlencode( 95 { 96 QS_KEY_CODE: token.user_code, 97 } 98 ) 99 ), 100 "user_code": token.user_code, 101 "expires_in": int(until.total_seconds()), 102 "interval": 5, 103 } 104 )
Generate device token