authentik.providers.oauth2.views.device_init
Device flow views
1"""Device flow views""" 2 3from typing import Any 4 5from django.http import HttpRequest, HttpResponse 6from django.utils.translation import gettext as _ 7from rest_framework.exceptions import ValidationError 8from rest_framework.fields import CharField 9from structlog.stdlib import get_logger 10 11from authentik.brands.models import Brand 12from authentik.core.models import Application 13from authentik.flows.challenge import Challenge, ChallengeResponse 14from authentik.flows.exceptions import FlowNonApplicableException 15from authentik.flows.models import in_memory_stage 16from authentik.flows.planner import PLAN_CONTEXT_APPLICATION, PLAN_CONTEXT_SSO, FlowPlanner 17from authentik.flows.stage import ChallengeStageView 18from authentik.flows.views.executor import SESSION_KEY_PLAN 19from authentik.policies.views import PolicyAccessView 20from authentik.providers.oauth2.models import DeviceToken 21from authentik.providers.oauth2.views.device_finish import ( 22 PLAN_CONTEXT_DEVICE, 23 OAuthDeviceCodeFinishStage, 24) 25from authentik.providers.oauth2.views.userinfo import UserInfoView 26from authentik.stages.consent.stage import ( 27 PLAN_CONTEXT_CONSENT_HEADER, 28 PLAN_CONTEXT_CONSENT_PERMISSIONS, 29) 30 31LOGGER = get_logger() 32QS_KEY_CODE = "code" # nosec 33 34 35class CodeValidatorView(PolicyAccessView): 36 """Helper to validate frontside token""" 37 38 def __init__(self, code: str, **kwargs: Any) -> None: 39 super().__init__(**kwargs) 40 self.code = code 41 42 def resolve_provider_application(self): 43 self.token = DeviceToken.objects.filter(user_code=self.code).first() 44 if not self.token: 45 raise Application.DoesNotExist 46 self.provider = self.token.provider 47 self.application = self.token.provider.application 48 49 def post(self, request: HttpRequest, *args, **kwargs): 50 return self.get(request, *args, **kwargs) 51 52 def get(self, request: HttpRequest, *args, **kwargs): 53 scope_descriptions = UserInfoView().get_scope_descriptions(self.token.scope, self.provider) 54 planner = FlowPlanner(self.provider.authorization_flow) 55 planner.allow_empty_flows = True 56 planner.use_cache = False 57 try: 58 plan = planner.plan( 59 request, 60 { 61 PLAN_CONTEXT_SSO: True, 62 PLAN_CONTEXT_APPLICATION: self.application, 63 # OAuth2 related params 64 PLAN_CONTEXT_DEVICE: self.token, 65 # Consent related params 66 PLAN_CONTEXT_CONSENT_HEADER: _("You're about to sign into %(application)s.") 67 % {"application": self.application.name}, 68 PLAN_CONTEXT_CONSENT_PERMISSIONS: scope_descriptions, 69 }, 70 ) 71 except FlowNonApplicableException: 72 LOGGER.warning("Flow not applicable to user") 73 return None 74 plan.append_stage(in_memory_stage(OAuthDeviceCodeFinishStage)) 75 return plan.to_redirect(self.request, self.token.provider.authorization_flow) 76 77 78class DeviceEntryView(PolicyAccessView): 79 """View used to initiate the device-code flow, url entered by endusers""" 80 81 def dispatch(self, request: HttpRequest) -> HttpResponse: 82 brand: Brand = request.brand 83 device_flow = brand.flow_device_code 84 if not device_flow: 85 LOGGER.info("Brand has no device code flow configured", brand=brand) 86 return HttpResponse(status=404) 87 if QS_KEY_CODE in request.GET: 88 validation = CodeValidatorView(request.GET[QS_KEY_CODE], request=request).dispatch( 89 request 90 ) 91 if validation: 92 return validation 93 LOGGER.info("Got code from query parameter but no matching token found") 94 95 # Regardless, we start the planner and return to it 96 planner = FlowPlanner(device_flow) 97 planner.allow_empty_flows = True 98 try: 99 plan = planner.plan(self.request) 100 except FlowNonApplicableException: 101 LOGGER.warning("Flow not applicable to user") 102 return HttpResponse(status=404) 103 plan.append_stage(in_memory_stage(OAuthDeviceCodeStage)) 104 105 self.request.session[SESSION_KEY_PLAN] = plan 106 return plan.to_redirect(self.request, device_flow) 107 108 109class OAuthDeviceCodeChallenge(Challenge): 110 """OAuth Device code challenge""" 111 112 component = CharField(default="ak-provider-oauth2-device-code") 113 114 115class OAuthDeviceCodeChallengeResponse(ChallengeResponse): 116 """Response that includes the user-entered device code""" 117 118 code = CharField() 119 component = CharField(default="ak-provider-oauth2-device-code") 120 121 def validate_code(self, code: int) -> HttpResponse | None: 122 """Validate code and save the returned http response""" 123 response = CodeValidatorView(code, request=self.stage.request).dispatch(self.stage.request) 124 if not response: 125 raise ValidationError(_("Invalid code"), "invalid") 126 return response 127 128 129class OAuthDeviceCodeStage(ChallengeStageView): 130 """Flow challenge for users to enter device code""" 131 132 response_class = OAuthDeviceCodeChallengeResponse 133 134 def get_challenge(self, *args, **kwargs) -> Challenge: 135 return OAuthDeviceCodeChallenge( 136 data={ 137 "component": "ak-provider-oauth2-device-code", 138 } 139 ) 140 141 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: 142 return response.validated_data["code"]
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
QS_KEY_CODE =
'code'
36class CodeValidatorView(PolicyAccessView): 37 """Helper to validate frontside token""" 38 39 def __init__(self, code: str, **kwargs: Any) -> None: 40 super().__init__(**kwargs) 41 self.code = code 42 43 def resolve_provider_application(self): 44 self.token = DeviceToken.objects.filter(user_code=self.code).first() 45 if not self.token: 46 raise Application.DoesNotExist 47 self.provider = self.token.provider 48 self.application = self.token.provider.application 49 50 def post(self, request: HttpRequest, *args, **kwargs): 51 return self.get(request, *args, **kwargs) 52 53 def get(self, request: HttpRequest, *args, **kwargs): 54 scope_descriptions = UserInfoView().get_scope_descriptions(self.token.scope, self.provider) 55 planner = FlowPlanner(self.provider.authorization_flow) 56 planner.allow_empty_flows = True 57 planner.use_cache = False 58 try: 59 plan = planner.plan( 60 request, 61 { 62 PLAN_CONTEXT_SSO: True, 63 PLAN_CONTEXT_APPLICATION: self.application, 64 # OAuth2 related params 65 PLAN_CONTEXT_DEVICE: self.token, 66 # Consent related params 67 PLAN_CONTEXT_CONSENT_HEADER: _("You're about to sign into %(application)s.") 68 % {"application": self.application.name}, 69 PLAN_CONTEXT_CONSENT_PERMISSIONS: scope_descriptions, 70 }, 71 ) 72 except FlowNonApplicableException: 73 LOGGER.warning("Flow not applicable to user") 74 return None 75 plan.append_stage(in_memory_stage(OAuthDeviceCodeFinishStage)) 76 return plan.to_redirect(self.request, self.token.provider.authorization_flow)
Helper to validate frontside token
CodeValidatorView(code: str, **kwargs: Any)
39 def __init__(self, code: str, **kwargs: Any) -> None: 40 super().__init__(**kwargs) 41 self.code = code
Constructor. Called in the URLconf; can contain helpful extra keyword arguments, and other things.
def
resolve_provider_application(self):
43 def resolve_provider_application(self): 44 self.token = DeviceToken.objects.filter(user_code=self.code).first() 45 if not self.token: 46 raise Application.DoesNotExist 47 self.provider = self.token.provider 48 self.application = self.token.provider.application
Resolve self.provider and self.application. *.DoesNotExist Exceptions cause a normal AccessDenied view to be shown. An Http404 exception is not caught, and will return directly
def
get(self, request: django.http.request.HttpRequest, *args, **kwargs):
53 def get(self, request: HttpRequest, *args, **kwargs): 54 scope_descriptions = UserInfoView().get_scope_descriptions(self.token.scope, self.provider) 55 planner = FlowPlanner(self.provider.authorization_flow) 56 planner.allow_empty_flows = True 57 planner.use_cache = False 58 try: 59 plan = planner.plan( 60 request, 61 { 62 PLAN_CONTEXT_SSO: True, 63 PLAN_CONTEXT_APPLICATION: self.application, 64 # OAuth2 related params 65 PLAN_CONTEXT_DEVICE: self.token, 66 # Consent related params 67 PLAN_CONTEXT_CONSENT_HEADER: _("You're about to sign into %(application)s.") 68 % {"application": self.application.name}, 69 PLAN_CONTEXT_CONSENT_PERMISSIONS: scope_descriptions, 70 }, 71 ) 72 except FlowNonApplicableException: 73 LOGGER.warning("Flow not applicable to user") 74 return None 75 plan.append_stage(in_memory_stage(OAuthDeviceCodeFinishStage)) 76 return plan.to_redirect(self.request, self.token.provider.authorization_flow)
79class DeviceEntryView(PolicyAccessView): 80 """View used to initiate the device-code flow, url entered by endusers""" 81 82 def dispatch(self, request: HttpRequest) -> HttpResponse: 83 brand: Brand = request.brand 84 device_flow = brand.flow_device_code 85 if not device_flow: 86 LOGGER.info("Brand has no device code flow configured", brand=brand) 87 return HttpResponse(status=404) 88 if QS_KEY_CODE in request.GET: 89 validation = CodeValidatorView(request.GET[QS_KEY_CODE], request=request).dispatch( 90 request 91 ) 92 if validation: 93 return validation 94 LOGGER.info("Got code from query parameter but no matching token found") 95 96 # Regardless, we start the planner and return to it 97 planner = FlowPlanner(device_flow) 98 planner.allow_empty_flows = True 99 try: 100 plan = planner.plan(self.request) 101 except FlowNonApplicableException: 102 LOGGER.warning("Flow not applicable to user") 103 return HttpResponse(status=404) 104 plan.append_stage(in_memory_stage(OAuthDeviceCodeStage)) 105 106 self.request.session[SESSION_KEY_PLAN] = plan 107 return plan.to_redirect(self.request, device_flow)
View used to initiate the device-code flow, url entered by endusers
def
dispatch( self, request: django.http.request.HttpRequest) -> django.http.response.HttpResponse:
82 def dispatch(self, request: HttpRequest) -> HttpResponse: 83 brand: Brand = request.brand 84 device_flow = brand.flow_device_code 85 if not device_flow: 86 LOGGER.info("Brand has no device code flow configured", brand=brand) 87 return HttpResponse(status=404) 88 if QS_KEY_CODE in request.GET: 89 validation = CodeValidatorView(request.GET[QS_KEY_CODE], request=request).dispatch( 90 request 91 ) 92 if validation: 93 return validation 94 LOGGER.info("Got code from query parameter but no matching token found") 95 96 # Regardless, we start the planner and return to it 97 planner = FlowPlanner(device_flow) 98 planner.allow_empty_flows = True 99 try: 100 plan = planner.plan(self.request) 101 except FlowNonApplicableException: 102 LOGGER.warning("Flow not applicable to user") 103 return HttpResponse(status=404) 104 plan.append_stage(in_memory_stage(OAuthDeviceCodeStage)) 105 106 self.request.session[SESSION_KEY_PLAN] = plan 107 return plan.to_redirect(self.request, device_flow)
110class OAuthDeviceCodeChallenge(Challenge): 111 """OAuth Device code challenge""" 112 113 component = CharField(default="ak-provider-oauth2-device-code")
OAuth Device code challenge
116class OAuthDeviceCodeChallengeResponse(ChallengeResponse): 117 """Response that includes the user-entered device code""" 118 119 code = CharField() 120 component = CharField(default="ak-provider-oauth2-device-code") 121 122 def validate_code(self, code: int) -> HttpResponse | None: 123 """Validate code and save the returned http response""" 124 response = CodeValidatorView(code, request=self.stage.request).dispatch(self.stage.request) 125 if not response: 126 raise ValidationError(_("Invalid code"), "invalid") 127 return response
Response that includes the user-entered device code
def
validate_code(self, code: int) -> django.http.response.HttpResponse | None:
122 def validate_code(self, code: int) -> HttpResponse | None: 123 """Validate code and save the returned http response""" 124 response = CodeValidatorView(code, request=self.stage.request).dispatch(self.stage.request) 125 if not response: 126 raise ValidationError(_("Invalid code"), "invalid") 127 return response
Validate code and save the returned http response
130class OAuthDeviceCodeStage(ChallengeStageView): 131 """Flow challenge for users to enter device code""" 132 133 response_class = OAuthDeviceCodeChallengeResponse 134 135 def get_challenge(self, *args, **kwargs) -> Challenge: 136 return OAuthDeviceCodeChallenge( 137 data={ 138 "component": "ak-provider-oauth2-device-code", 139 } 140 ) 141 142 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: 143 return response.validated_data["code"]
Flow challenge for users to enter device code
response_class =
<class 'OAuthDeviceCodeChallengeResponse'>
135 def get_challenge(self, *args, **kwargs) -> Challenge: 136 return OAuthDeviceCodeChallenge( 137 data={ 138 "component": "ak-provider-oauth2-device-code", 139 } 140 )
Return the challenge that the client should solve
def
challenge_valid( self, response: authentik.flows.challenge.ChallengeResponse) -> django.http.response.HttpResponse:
142 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: 143 return response.validated_data["code"]
Callback when the challenge has the correct format