authentik.providers.oauth2.views.device_init

Device flow views

  1"""Device flow views"""
  2
  3from typing import Any
  4
  5from django.http import HttpRequest, HttpResponse
  6from django.utils.translation import gettext as _
  7from rest_framework.exceptions import ValidationError
  8from rest_framework.fields import CharField
  9from structlog.stdlib import get_logger
 10
 11from authentik.brands.models import Brand
 12from authentik.core.models import Application
 13from authentik.flows.challenge import Challenge, ChallengeResponse
 14from authentik.flows.exceptions import FlowNonApplicableException
 15from authentik.flows.models import in_memory_stage
 16from authentik.flows.planner import PLAN_CONTEXT_APPLICATION, PLAN_CONTEXT_SSO, FlowPlanner
 17from authentik.flows.stage import ChallengeStageView
 18from authentik.flows.views.executor import SESSION_KEY_PLAN
 19from authentik.policies.views import PolicyAccessView
 20from authentik.providers.oauth2.models import DeviceToken
 21from authentik.providers.oauth2.views.device_finish import (
 22    PLAN_CONTEXT_DEVICE,
 23    OAuthDeviceCodeFinishStage,
 24)
 25from authentik.providers.oauth2.views.userinfo import UserInfoView
 26from authentik.stages.consent.stage import (
 27    PLAN_CONTEXT_CONSENT_HEADER,
 28    PLAN_CONTEXT_CONSENT_PERMISSIONS,
 29)
 30
 31LOGGER = get_logger()
 32QS_KEY_CODE = "code"  # nosec
 33
 34
 35class CodeValidatorView(PolicyAccessView):
 36    """Helper to validate frontside token"""
 37
 38    def __init__(self, code: str, **kwargs: Any) -> None:
 39        super().__init__(**kwargs)
 40        self.code = code
 41
 42    def resolve_provider_application(self):
 43        self.token = DeviceToken.objects.filter(user_code=self.code).first()
 44        if not self.token:
 45            raise Application.DoesNotExist
 46        self.provider = self.token.provider
 47        self.application = self.token.provider.application
 48
 49    def post(self, request: HttpRequest, *args, **kwargs):
 50        return self.get(request, *args, **kwargs)
 51
 52    def get(self, request: HttpRequest, *args, **kwargs):
 53        scope_descriptions = UserInfoView().get_scope_descriptions(self.token.scope, self.provider)
 54        planner = FlowPlanner(self.provider.authorization_flow)
 55        planner.allow_empty_flows = True
 56        planner.use_cache = False
 57        try:
 58            plan = planner.plan(
 59                request,
 60                {
 61                    PLAN_CONTEXT_SSO: True,
 62                    PLAN_CONTEXT_APPLICATION: self.application,
 63                    # OAuth2 related params
 64                    PLAN_CONTEXT_DEVICE: self.token,
 65                    # Consent related params
 66                    PLAN_CONTEXT_CONSENT_HEADER: _("You're about to sign into %(application)s.")
 67                    % {"application": self.application.name},
 68                    PLAN_CONTEXT_CONSENT_PERMISSIONS: scope_descriptions,
 69                },
 70            )
 71        except FlowNonApplicableException:
 72            LOGGER.warning("Flow not applicable to user")
 73            return None
 74        plan.append_stage(in_memory_stage(OAuthDeviceCodeFinishStage))
 75        return plan.to_redirect(self.request, self.token.provider.authorization_flow)
 76
 77
 78class DeviceEntryView(PolicyAccessView):
 79    """View used to initiate the device-code flow, url entered by endusers"""
 80
 81    def dispatch(self, request: HttpRequest) -> HttpResponse:
 82        brand: Brand = request.brand
 83        device_flow = brand.flow_device_code
 84        if not device_flow:
 85            LOGGER.info("Brand has no device code flow configured", brand=brand)
 86            return HttpResponse(status=404)
 87        if QS_KEY_CODE in request.GET:
 88            validation = CodeValidatorView(request.GET[QS_KEY_CODE], request=request).dispatch(
 89                request
 90            )
 91            if validation:
 92                return validation
 93            LOGGER.info("Got code from query parameter but no matching token found")
 94
 95        # Regardless, we start the planner and return to it
 96        planner = FlowPlanner(device_flow)
 97        planner.allow_empty_flows = True
 98        try:
 99            plan = planner.plan(self.request)
100        except FlowNonApplicableException:
101            LOGGER.warning("Flow not applicable to user")
102            return HttpResponse(status=404)
103        plan.append_stage(in_memory_stage(OAuthDeviceCodeStage))
104
105        self.request.session[SESSION_KEY_PLAN] = plan
106        return plan.to_redirect(self.request, device_flow)
107
108
109class OAuthDeviceCodeChallenge(Challenge):
110    """OAuth Device code challenge"""
111
112    component = CharField(default="ak-provider-oauth2-device-code")
113
114
115class OAuthDeviceCodeChallengeResponse(ChallengeResponse):
116    """Response that includes the user-entered device code"""
117
118    code = CharField()
119    component = CharField(default="ak-provider-oauth2-device-code")
120
121    def validate_code(self, code: int) -> HttpResponse | None:
122        """Validate code and save the returned http response"""
123        response = CodeValidatorView(code, request=self.stage.request).dispatch(self.stage.request)
124        if not response:
125            raise ValidationError(_("Invalid code"), "invalid")
126        return response
127
128
129class OAuthDeviceCodeStage(ChallengeStageView):
130    """Flow challenge for users to enter device code"""
131
132    response_class = OAuthDeviceCodeChallengeResponse
133
134    def get_challenge(self, *args, **kwargs) -> Challenge:
135        return OAuthDeviceCodeChallenge(
136            data={
137                "component": "ak-provider-oauth2-device-code",
138            }
139        )
140
141    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
142        return response.validated_data["code"]
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
QS_KEY_CODE = 'code'
class CodeValidatorView(authentik.policies.views.PolicyAccessView):
36class CodeValidatorView(PolicyAccessView):
37    """Helper to validate frontside token"""
38
39    def __init__(self, code: str, **kwargs: Any) -> None:
40        super().__init__(**kwargs)
41        self.code = code
42
43    def resolve_provider_application(self):
44        self.token = DeviceToken.objects.filter(user_code=self.code).first()
45        if not self.token:
46            raise Application.DoesNotExist
47        self.provider = self.token.provider
48        self.application = self.token.provider.application
49
50    def post(self, request: HttpRequest, *args, **kwargs):
51        return self.get(request, *args, **kwargs)
52
53    def get(self, request: HttpRequest, *args, **kwargs):
54        scope_descriptions = UserInfoView().get_scope_descriptions(self.token.scope, self.provider)
55        planner = FlowPlanner(self.provider.authorization_flow)
56        planner.allow_empty_flows = True
57        planner.use_cache = False
58        try:
59            plan = planner.plan(
60                request,
61                {
62                    PLAN_CONTEXT_SSO: True,
63                    PLAN_CONTEXT_APPLICATION: self.application,
64                    # OAuth2 related params
65                    PLAN_CONTEXT_DEVICE: self.token,
66                    # Consent related params
67                    PLAN_CONTEXT_CONSENT_HEADER: _("You're about to sign into %(application)s.")
68                    % {"application": self.application.name},
69                    PLAN_CONTEXT_CONSENT_PERMISSIONS: scope_descriptions,
70                },
71            )
72        except FlowNonApplicableException:
73            LOGGER.warning("Flow not applicable to user")
74            return None
75        plan.append_stage(in_memory_stage(OAuthDeviceCodeFinishStage))
76        return plan.to_redirect(self.request, self.token.provider.authorization_flow)

Helper to validate frontside token

CodeValidatorView(code: str, **kwargs: Any)
39    def __init__(self, code: str, **kwargs: Any) -> None:
40        super().__init__(**kwargs)
41        self.code = code

Constructor. Called in the URLconf; can contain helpful extra keyword arguments, and other things.

code
def resolve_provider_application(self):
43    def resolve_provider_application(self):
44        self.token = DeviceToken.objects.filter(user_code=self.code).first()
45        if not self.token:
46            raise Application.DoesNotExist
47        self.provider = self.token.provider
48        self.application = self.token.provider.application

Resolve self.provider and self.application. *.DoesNotExist Exceptions cause a normal AccessDenied view to be shown. An Http404 exception is not caught, and will return directly

def post(self, request: django.http.request.HttpRequest, *args, **kwargs):
50    def post(self, request: HttpRequest, *args, **kwargs):
51        return self.get(request, *args, **kwargs)
def get(self, request: django.http.request.HttpRequest, *args, **kwargs):
53    def get(self, request: HttpRequest, *args, **kwargs):
54        scope_descriptions = UserInfoView().get_scope_descriptions(self.token.scope, self.provider)
55        planner = FlowPlanner(self.provider.authorization_flow)
56        planner.allow_empty_flows = True
57        planner.use_cache = False
58        try:
59            plan = planner.plan(
60                request,
61                {
62                    PLAN_CONTEXT_SSO: True,
63                    PLAN_CONTEXT_APPLICATION: self.application,
64                    # OAuth2 related params
65                    PLAN_CONTEXT_DEVICE: self.token,
66                    # Consent related params
67                    PLAN_CONTEXT_CONSENT_HEADER: _("You're about to sign into %(application)s.")
68                    % {"application": self.application.name},
69                    PLAN_CONTEXT_CONSENT_PERMISSIONS: scope_descriptions,
70                },
71            )
72        except FlowNonApplicableException:
73            LOGGER.warning("Flow not applicable to user")
74            return None
75        plan.append_stage(in_memory_stage(OAuthDeviceCodeFinishStage))
76        return plan.to_redirect(self.request, self.token.provider.authorization_flow)
class DeviceEntryView(authentik.policies.views.PolicyAccessView):
 79class DeviceEntryView(PolicyAccessView):
 80    """View used to initiate the device-code flow, url entered by endusers"""
 81
 82    def dispatch(self, request: HttpRequest) -> HttpResponse:
 83        brand: Brand = request.brand
 84        device_flow = brand.flow_device_code
 85        if not device_flow:
 86            LOGGER.info("Brand has no device code flow configured", brand=brand)
 87            return HttpResponse(status=404)
 88        if QS_KEY_CODE in request.GET:
 89            validation = CodeValidatorView(request.GET[QS_KEY_CODE], request=request).dispatch(
 90                request
 91            )
 92            if validation:
 93                return validation
 94            LOGGER.info("Got code from query parameter but no matching token found")
 95
 96        # Regardless, we start the planner and return to it
 97        planner = FlowPlanner(device_flow)
 98        planner.allow_empty_flows = True
 99        try:
100            plan = planner.plan(self.request)
101        except FlowNonApplicableException:
102            LOGGER.warning("Flow not applicable to user")
103            return HttpResponse(status=404)
104        plan.append_stage(in_memory_stage(OAuthDeviceCodeStage))
105
106        self.request.session[SESSION_KEY_PLAN] = plan
107        return plan.to_redirect(self.request, device_flow)

View used to initiate the device-code flow, url entered by endusers

def dispatch( self, request: django.http.request.HttpRequest) -> django.http.response.HttpResponse:
 82    def dispatch(self, request: HttpRequest) -> HttpResponse:
 83        brand: Brand = request.brand
 84        device_flow = brand.flow_device_code
 85        if not device_flow:
 86            LOGGER.info("Brand has no device code flow configured", brand=brand)
 87            return HttpResponse(status=404)
 88        if QS_KEY_CODE in request.GET:
 89            validation = CodeValidatorView(request.GET[QS_KEY_CODE], request=request).dispatch(
 90                request
 91            )
 92            if validation:
 93                return validation
 94            LOGGER.info("Got code from query parameter but no matching token found")
 95
 96        # Regardless, we start the planner and return to it
 97        planner = FlowPlanner(device_flow)
 98        planner.allow_empty_flows = True
 99        try:
100            plan = planner.plan(self.request)
101        except FlowNonApplicableException:
102            LOGGER.warning("Flow not applicable to user")
103            return HttpResponse(status=404)
104        plan.append_stage(in_memory_stage(OAuthDeviceCodeStage))
105
106        self.request.session[SESSION_KEY_PLAN] = plan
107        return plan.to_redirect(self.request, device_flow)
class OAuthDeviceCodeChallenge(authentik.flows.challenge.Challenge):
110class OAuthDeviceCodeChallenge(Challenge):
111    """OAuth Device code challenge"""
112
113    component = CharField(default="ak-provider-oauth2-device-code")

OAuth Device code challenge

component
class OAuthDeviceCodeChallengeResponse(authentik.flows.challenge.ChallengeResponse):
116class OAuthDeviceCodeChallengeResponse(ChallengeResponse):
117    """Response that includes the user-entered device code"""
118
119    code = CharField()
120    component = CharField(default="ak-provider-oauth2-device-code")
121
122    def validate_code(self, code: int) -> HttpResponse | None:
123        """Validate code and save the returned http response"""
124        response = CodeValidatorView(code, request=self.stage.request).dispatch(self.stage.request)
125        if not response:
126            raise ValidationError(_("Invalid code"), "invalid")
127        return response

Response that includes the user-entered device code

code
component
def validate_code(self, code: int) -> django.http.response.HttpResponse | None:
122    def validate_code(self, code: int) -> HttpResponse | None:
123        """Validate code and save the returned http response"""
124        response = CodeValidatorView(code, request=self.stage.request).dispatch(self.stage.request)
125        if not response:
126            raise ValidationError(_("Invalid code"), "invalid")
127        return response

Validate code and save the returned http response

class OAuthDeviceCodeStage(authentik.flows.stage.ChallengeStageView):
130class OAuthDeviceCodeStage(ChallengeStageView):
131    """Flow challenge for users to enter device code"""
132
133    response_class = OAuthDeviceCodeChallengeResponse
134
135    def get_challenge(self, *args, **kwargs) -> Challenge:
136        return OAuthDeviceCodeChallenge(
137            data={
138                "component": "ak-provider-oauth2-device-code",
139            }
140        )
141
142    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
143        return response.validated_data["code"]

Flow challenge for users to enter device code

response_class = <class 'OAuthDeviceCodeChallengeResponse'>
def get_challenge(self, *args, **kwargs) -> authentik.flows.challenge.Challenge:
135    def get_challenge(self, *args, **kwargs) -> Challenge:
136        return OAuthDeviceCodeChallenge(
137            data={
138                "component": "ak-provider-oauth2-device-code",
139            }
140        )

Return the challenge that the client should solve

def challenge_valid( self, response: authentik.flows.challenge.ChallengeResponse) -> django.http.response.HttpResponse:
142    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
143        return response.validated_data["code"]

Callback when the challenge has the correct format