authentik.providers.oauth2.views.authorize
authentik OAuth2 Authorization views
1"""authentik OAuth2 Authorization views""" 2 3from dataclasses import InitVar, dataclass, field 4from datetime import timedelta 5from json import dumps 6from re import error as RegexError 7from re import fullmatch 8from typing import Any 9from urllib.parse import parse_qs, quote, urlencode, urlparse, urlsplit, urlunparse, urlunsplit 10from uuid import uuid4 11 12from django.conf import settings 13from django.http import HttpRequest, HttpResponse, HttpResponseRedirect 14from django.http.response import Http404, HttpResponseBadRequest 15from django.shortcuts import get_object_or_404 16from django.utils import timezone, translation 17from django.utils.translation import gettext as _ 18from structlog.stdlib import get_logger 19 20from authentik.common.oauth.constants import ( 21 FORBIDDEN_URI_SCHEMES, 22 PKCE_METHOD_PLAIN, 23 PKCE_METHOD_S256, 24 PROMPT_CONSENT, 25 PROMPT_LOGIN, 26 PROMPT_NONE, 27 QS_LOGIN_HINT, 28 SCOPE_GITHUB, 29 SCOPE_OFFLINE_ACCESS, 30 SCOPE_OPENID, 31 TOKEN_TYPE, 32 UI_LOCALES, 33) 34from authentik.core.models import Application 35from authentik.events.models import Event, EventAction 36from authentik.events.signals import get_login_event 37from authentik.flows.challenge import ( 38 PLAN_CONTEXT_TITLE, 39 AutosubmitChallenge, 40 HttpChallengeResponse, 41) 42from authentik.flows.exceptions import FlowNonApplicableException 43from authentik.flows.models import Flow, in_memory_stage 44from authentik.flows.planner import PLAN_CONTEXT_APPLICATION, PLAN_CONTEXT_SSO, FlowPlanner 45from authentik.flows.stage import PLAN_CONTEXT_PENDING_USER_IDENTIFIER, StageView 46from authentik.lib.utils.time import timedelta_from_string 47from authentik.lib.views import bad_request_message 48from authentik.policies.types import PolicyRequest 49from authentik.policies.views import PolicyAccessView, RequestValidationError 50from authentik.providers.oauth2.errors import ( 51 AuthorizeError, 52 ClientIdError, 53 OAuth2Error, 54 RedirectUriError, 55) 56from authentik.providers.oauth2.id_token import IDToken 57from authentik.providers.oauth2.models import ( 58 AccessToken, 59 AuthorizationCode, 60 GrantType, 61 OAuth2Provider, 62 RedirectURIMatchingMode, 63 ResponseMode, 64 ResponseTypes, 65 ScopeMapping, 66) 67from authentik.providers.oauth2.utils import HttpResponseRedirectScheme 68from authentik.providers.oauth2.views.userinfo import UserInfoView 69from authentik.stages.consent.models import ConsentMode, ConsentStage 70from authentik.stages.consent.stage import ( 71 PLAN_CONTEXT_CONSENT_HEADER, 72 PLAN_CONTEXT_CONSENT_PERMISSIONS, 73) 74 75LOGGER = get_logger() 76 77PLAN_CONTEXT_PARAMS = "goauthentik.io/providers/oauth2/params" 78SESSION_KEY_LAST_LOGIN_UID = "authentik/providers/oauth2/last_login_uid" 79 80ALLOWED_PROMPT_PARAMS = {PROMPT_NONE, PROMPT_CONSENT, PROMPT_LOGIN} 81 82 83@dataclass(slots=True) 84class OAuthAuthorizationParams: 85 """Parameters required to authorize an OAuth Client""" 86 87 client_id: str 88 redirect_uri: str 89 response_type: str 90 response_mode: str | None 91 scope: set[str] 92 state: str 93 nonce: str | None 94 prompt: set[str] 95 grant_type: str 96 97 provider: OAuth2Provider = field(default_factory=OAuth2Provider) 98 99 request: str | None = None 100 101 max_age: int | None = None 102 103 code_challenge: str | None = None 104 code_challenge_method: str | None = None 105 106 github_compat: InitVar[bool] = False 107 108 @staticmethod 109 def from_request(request: HttpRequest, github_compat=False) -> OAuthAuthorizationParams: 110 """ 111 Get all the params used by the Authorization Code Flow 112 (and also for the Implicit and Hybrid). 113 114 See: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest 115 """ 116 # Because in this endpoint we handle both GET 117 # and POST request. 118 query_dict = request.POST if request.method == "POST" else request.GET 119 state = query_dict.get("state") 120 redirect_uri = query_dict.get("redirect_uri", "") 121 122 response_type = query_dict.get("response_type", "") 123 124 # Validate and check the response_mode against the predefined dict 125 # Set to Query or Fragment if not defined in request 126 response_mode = query_dict.get("response_mode", False) 127 128 max_age = query_dict.get("max_age") 129 return OAuthAuthorizationParams( 130 client_id=query_dict.get("client_id", ""), 131 redirect_uri=redirect_uri, 132 response_type=response_type, 133 response_mode=response_mode, 134 grant_type="", 135 scope=set(query_dict.get("scope", "").split()), 136 state=state, 137 nonce=query_dict.get("nonce"), 138 prompt=ALLOWED_PROMPT_PARAMS.intersection(set(query_dict.get("prompt", "").split())), 139 request=query_dict.get("request", None), 140 max_age=int(max_age) if max_age else None, 141 code_challenge=query_dict.get("code_challenge"), 142 code_challenge_method=query_dict.get("code_challenge_method", "plain"), 143 github_compat=github_compat, 144 ) 145 146 def __post_init__(self, github_compat=False): 147 self.provider: OAuth2Provider = OAuth2Provider.objects.filter( 148 client_id=self.client_id 149 ).first() 150 if not self.provider: 151 LOGGER.warning("Invalid client identifier", client_id=self.client_id) 152 raise ClientIdError(client_id=self.client_id) 153 self.check_redirect_uri() 154 self.check_grant() 155 self.check_scope(github_compat) 156 if self.request: 157 raise AuthorizeError( 158 self.redirect_uri, "request_not_supported", self.grant_type, self.state 159 ) 160 self.check_nonce() 161 self.check_code_challenge() 162 163 def check_grant(self): 164 """Check grant""" 165 # Determine which flow to use. 166 if self.response_type in [ResponseTypes.CODE]: 167 self.grant_type = GrantType.AUTHORIZATION_CODE 168 elif self.response_type in [ 169 ResponseTypes.ID_TOKEN, 170 ResponseTypes.ID_TOKEN_TOKEN, 171 ]: 172 self.grant_type = GrantType.IMPLICIT 173 elif self.response_type in [ 174 ResponseTypes.CODE_TOKEN, 175 ResponseTypes.CODE_ID_TOKEN, 176 ResponseTypes.CODE_ID_TOKEN_TOKEN, 177 ]: 178 self.grant_type = GrantType.HYBRID 179 # Grant type validation. 180 if not self.grant_type: 181 LOGGER.warning("Invalid response type", type=self.response_type) 182 raise AuthorizeError(self.redirect_uri, "unsupported_response_type", "", self.state) 183 184 if self.grant_type not in self.provider.grant_types: 185 LOGGER.warning("Invalid grant_type for provider", grant_type=self.grant_type) 186 raise AuthorizeError(self.redirect_uri, "invalid_request", self.grant_type, self.state) 187 188 if self.response_mode not in ResponseMode.values: 189 self.response_mode = ResponseMode.QUERY 190 191 if self.grant_type in [GrantType.IMPLICIT, GrantType.HYBRID]: 192 self.response_mode = ResponseMode.FRAGMENT 193 194 def check_redirect_uri(self): 195 """Redirect URI validation.""" 196 allowed_redirect_urls = self.provider.authorization_redirect_uris 197 if not self.redirect_uri: 198 LOGGER.warning("Missing redirect uri.") 199 raise RedirectUriError("", allowed_redirect_urls).with_cause("redirect_uri_missing") 200 201 match_found = False 202 for allowed in allowed_redirect_urls: 203 if allowed.matching_mode == RedirectURIMatchingMode.STRICT: 204 if self.redirect_uri == allowed.url: 205 match_found = True 206 break 207 if allowed.matching_mode == RedirectURIMatchingMode.REGEX: 208 try: 209 if fullmatch(allowed.url, self.redirect_uri): 210 match_found = True 211 break 212 except RegexError as exc: 213 LOGGER.warning( 214 "Failed to parse regular expression", 215 exc=exc, 216 url=allowed.url, 217 provider=self.provider, 218 ) 219 if not match_found: 220 raise RedirectUriError(self.redirect_uri, allowed_redirect_urls).with_cause( 221 "redirect_uri_no_match" 222 ) 223 # Check against forbidden schemes 224 if urlparse(self.redirect_uri).scheme in FORBIDDEN_URI_SCHEMES: 225 raise RedirectUriError(self.redirect_uri, allowed_redirect_urls).with_cause( 226 "redirect_uri_forbidden_scheme" 227 ) 228 229 def check_scope(self, github_compat=False): 230 """Ensure openid scope is set in Hybrid flows, or when requesting an id_token""" 231 default_scope_names = set( 232 ScopeMapping.objects.filter(provider__in=[self.provider]).values_list( 233 "scope_name", flat=True 234 ) 235 ) 236 if len(self.scope) == 0: 237 self.scope = default_scope_names 238 LOGGER.info( 239 "No scopes requested, defaulting to all configured scopes", scopes=self.scope 240 ) 241 scopes_to_check = self.scope 242 if github_compat: 243 scopes_to_check = self.scope - SCOPE_GITHUB 244 if not scopes_to_check.issubset(default_scope_names): 245 LOGGER.info( 246 "Application requested scopes not configured, setting to overlap", 247 scope_allowed=default_scope_names, 248 scope_given=self.scope, 249 ) 250 self.scope = self.scope.intersection(default_scope_names) 251 if SCOPE_OPENID not in self.scope and ( 252 self.grant_type == GrantType.HYBRID 253 or self.response_type in [ResponseTypes.ID_TOKEN, ResponseTypes.ID_TOKEN_TOKEN] 254 ): 255 LOGGER.warning("Missing 'openid' scope.") 256 raise AuthorizeError( 257 self.redirect_uri, "invalid_scope", self.grant_type, self.state 258 ).with_cause("scope_openid_missing") 259 if SCOPE_OFFLINE_ACCESS in self.scope: 260 # https://openid.net/specs/openid-connect-core-1_0.html#OfflineAccess 261 # Don't explicitly request consent with offline_access, as the spec allows for 262 # "other conditions for processing the request permitting offline access to the 263 # requested resources are in place" 264 # which we interpret as "the admin picks an authorization flow with or without consent" 265 if self.response_type not in [ 266 ResponseTypes.CODE, 267 ResponseTypes.CODE_TOKEN, 268 ResponseTypes.CODE_ID_TOKEN, 269 ResponseTypes.CODE_ID_TOKEN_TOKEN, 270 ]: 271 # offline_access requires a response type that has some sort of token 272 # Spec says to ignore the scope when the response_type wouldn't result 273 # in an authorization code being generated 274 self.scope.remove(SCOPE_OFFLINE_ACCESS) 275 276 def check_nonce(self): 277 """Nonce parameter validation.""" 278 # nonce is required for all flows that return an id_token from the authorization endpoint, 279 # see https://openid.net/specs/openid-connect-core-1_0.html#ImplicitAuthRequest or 280 # https://openid.net/specs/openid-connect-core-1_0.html#HybridIDToken and 281 # https://bitbucket.org/openid/connect/issues/972/nonce-requirement-in-hybrid-auth-request 282 if self.response_type not in [ 283 ResponseTypes.ID_TOKEN, 284 ResponseTypes.ID_TOKEN_TOKEN, 285 ResponseTypes.CODE_ID_TOKEN, 286 ResponseTypes.CODE_ID_TOKEN_TOKEN, 287 ]: 288 return 289 if SCOPE_OPENID not in self.scope: 290 return 291 if not self.nonce: 292 LOGGER.warning("Missing nonce for OpenID Request") 293 raise AuthorizeError( 294 self.redirect_uri, "invalid_request", self.grant_type, self.state 295 ).with_cause("nonce_missing") 296 297 def check_code_challenge(self): 298 """PKCE validation of the transformation method.""" 299 if self.code_challenge and self.code_challenge_method not in [ 300 PKCE_METHOD_PLAIN, 301 PKCE_METHOD_S256, 302 ]: 303 raise AuthorizeError( 304 self.redirect_uri, 305 "invalid_request", 306 self.grant_type, 307 self.state, 308 f"Unsupported challenge method {self.code_challenge_method}", 309 ) 310 311 def create_code(self, request: HttpRequest) -> AuthorizationCode: 312 """Create an AuthorizationCode object for the request""" 313 auth_event = get_login_event(request) 314 315 now = timezone.now() 316 317 code = AuthorizationCode( 318 user=request.user, 319 provider=self.provider, 320 auth_time=auth_event.created if auth_event else now, 321 code=uuid4().hex, 322 expires=now + timedelta_from_string(self.provider.access_code_validity), 323 scope=self.scope, 324 nonce=self.nonce, 325 session=request.session["authenticatedsession"], 326 ) 327 328 if self.code_challenge and self.code_challenge_method: 329 code.code_challenge = self.code_challenge 330 code.code_challenge_method = self.code_challenge_method 331 332 return code 333 334 335class AuthorizationFlowInitView(PolicyAccessView): 336 """OAuth2 Flow initializer, checks access to application and starts flow""" 337 338 params: OAuthAuthorizationParams 339 # Enable GitHub compatibility (only allow for scopes which are handled 340 # differently for github compat) 341 github_compat = False 342 343 def pre_permission_check(self): 344 """Check prompt parameter before checking permission/authentication, 345 see https://openid.net/specs/openid-connect-core-1_0.html#rfc.section.3.1.2.6""" 346 # Quick sanity check at the beginning to prevent event spamming 347 if len(self.request.GET) < 1: 348 raise Http404 349 try: 350 self.params = OAuthAuthorizationParams.from_request( 351 self.request, github_compat=self.github_compat 352 ) 353 except AuthorizeError as error: 354 LOGGER.warning(error.description, redirect_uri=error.redirect_uri, cause=error.cause) 355 raise RequestValidationError(error.get_response(self.request)) from None 356 except OAuth2Error as error: 357 LOGGER.warning(error.description, cause=error.cause) 358 raise RequestValidationError( 359 bad_request_message(self.request, error.description, title=error.error) 360 ) from None 361 except OAuth2Provider.DoesNotExist: 362 raise Http404 from None 363 if PROMPT_NONE in self.params.prompt and not self.request.user.is_authenticated: 364 # When "prompt" is set to "none" but the user is not logged in, show an error message 365 error = AuthorizeError( 366 self.params.redirect_uri, 367 "login_required", 368 self.params.grant_type, 369 self.params.state, 370 ) 371 raise RequestValidationError(error.get_response(self.request)) 372 373 def resolve_provider_application(self): 374 client_id = self.request.GET.get("client_id") 375 self.provider = get_object_or_404(OAuth2Provider, client_id=client_id) 376 self.application = self.provider.application 377 378 def modify_flow_context(self, flow: Flow, context: dict[str, Any]) -> dict[str, Any]: 379 if QS_LOGIN_HINT in self.request.GET: 380 context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = self.request.GET.get(QS_LOGIN_HINT) 381 return super().modify_flow_context(flow, context) 382 383 def modify_policy_request(self, request: PolicyRequest) -> PolicyRequest: 384 request.context["oauth_scopes"] = self.params.scope 385 request.context["oauth_grant_type"] = self.params.grant_type 386 request.context["oauth_code_challenge"] = self.params.code_challenge 387 request.context["oauth_code_challenge_method"] = self.params.code_challenge_method 388 request.context["oauth_max_age"] = self.params.max_age 389 request.context["oauth_redirect_uri"] = self.params.redirect_uri 390 request.context["oauth_response_type"] = self.params.response_type 391 return request 392 393 def dispatch_with_language(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 394 """Activate language from OIDC specific ui_locales parameter, picking the earliest one 395 available""" 396 selected_language = None 397 if UI_LOCALES in self.request.GET: 398 languages = str(self.request.GET[UI_LOCALES]).split(" ") 399 for language in languages: 400 if translation.check_for_language(language): 401 selected_language = translation.get_supported_language_variant(language) 402 LOGGER.debug( 403 "Activating language from oidc ui_locales", locale=selected_language 404 ) 405 break 406 translation.activate(selected_language) 407 response = super().dispatch(request, *args, **kwargs) 408 if selected_language: 409 response.set_cookie( 410 settings.LANGUAGE_COOKIE_NAME, 411 selected_language, 412 max_age=settings.LANGUAGE_COOKIE_AGE, 413 path=settings.LANGUAGE_COOKIE_PATH, 414 domain=settings.LANGUAGE_COOKIE_DOMAIN, 415 secure=settings.LANGUAGE_COOKIE_SECURE, 416 httponly=settings.LANGUAGE_COOKIE_HTTPONLY, 417 samesite=settings.LANGUAGE_COOKIE_SAMESITE, 418 ) 419 if isinstance(response, HttpResponseRedirect): 420 parsed_url = urlparse(response.url) 421 args = parse_qs(parsed_url.query) 422 args["locale"] = selected_language 423 response["Location"] = urlunparse( 424 parsed_url._replace(query=urlencode(args, quote_via=quote, doseq=True)) 425 ) 426 return response 427 428 def dispatch(self, request: HttpRequest, *args, **kwargs): 429 # Activate language before parsing params (error messages should be localized) 430 return self.dispatch_with_language(request, *args, **kwargs) 431 432 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 433 """Start FlowPLanner, return to flow executor shell""" 434 # Require a login event to be set, otherwise make the user re-login 435 login_event = get_login_event(request) 436 if not login_event: 437 LOGGER.warning("request with no login event") 438 return self.handle_no_permission() 439 login_uid = str(login_event.pk) 440 # After we've checked permissions, and the user has access, check if we need 441 # to re-authenticate the user 442 if self.params.max_age: 443 # Attempt to check via the session's login event if set, otherwise we can't 444 # check 445 login_time = login_event.created 446 current_age: timedelta = timezone.now() - login_time 447 if current_age.total_seconds() > self.params.max_age: 448 LOGGER.debug( 449 "Triggering authentication as max_age requirement", 450 max_age=self.params.max_age, 451 ago=int(current_age.total_seconds()), 452 ) 453 # Since we already need to re-authenticate the user, set the old login UID 454 # in case this request has both max_age and prompt=login 455 self.request.session[SESSION_KEY_LAST_LOGIN_UID] = login_uid 456 return self.handle_no_permission() 457 # If prompt=login, we need to re-authenticate the user regardless 458 # Check if we're not already doing the re-authentication 459 if PROMPT_LOGIN in self.params.prompt: 460 # No previous login UID saved, so save the current uid and trigger 461 # re-login, or previous login UID matches current one, so no re-login happened yet 462 if ( 463 SESSION_KEY_LAST_LOGIN_UID not in self.request.session 464 or login_uid == self.request.session[SESSION_KEY_LAST_LOGIN_UID] 465 ): 466 self.request.session[SESSION_KEY_LAST_LOGIN_UID] = login_uid 467 return self.handle_no_permission() 468 scope_descriptions = UserInfoView().get_scope_descriptions( 469 self.params.scope, self.params.provider 470 ) 471 # Regardless, we start the planner and return to it 472 planner = FlowPlanner(self.provider.authorization_flow) 473 planner.allow_empty_flows = True 474 try: 475 plan = planner.plan( 476 self.request, 477 { 478 PLAN_CONTEXT_SSO: True, 479 PLAN_CONTEXT_APPLICATION: self.application, 480 # OAuth2 related params 481 PLAN_CONTEXT_PARAMS: self.params, 482 # Consent related params 483 PLAN_CONTEXT_CONSENT_HEADER: _("You're about to sign into %(application)s.") 484 % {"application": self.application.name}, 485 PLAN_CONTEXT_CONSENT_PERMISSIONS: scope_descriptions, 486 }, 487 ) 488 except FlowNonApplicableException: 489 return self.handle_no_permission_authenticated() 490 # OpenID clients can specify a `prompt` parameter, and if its set to consent we 491 # need to inject a consent stage 492 if PROMPT_CONSENT in self.params.prompt: 493 if not any(isinstance(x.stage, ConsentStage) for x in plan.bindings): 494 # Plan does not have any consent stage, so we add an in-memory one 495 stage = ConsentStage( 496 name="OAuth2 Provider In-memory consent stage", 497 mode=ConsentMode.ALWAYS_REQUIRE, 498 ) 499 plan.append_stage(stage) 500 501 plan.append_stage(in_memory_stage(OAuthFulfillmentStage)) 502 503 return plan.to_redirect( 504 self.request, 505 self.provider.authorization_flow, 506 # We can only skip the flow executor and directly go to the final redirect URL if 507 # we can submit the data to the RP via URL 508 allowed_silent_types=( 509 [OAuthFulfillmentStage] 510 if self.params.response_mode in [ResponseMode.QUERY, ResponseMode.FRAGMENT] 511 else [] 512 ), 513 ) 514 515 516class OAuthFulfillmentStage(StageView): 517 """Final stage, restores params from Flow.""" 518 519 params: OAuthAuthorizationParams 520 provider: OAuth2Provider 521 application: Application 522 523 def redirect(self, uri: str) -> HttpResponse: 524 """Redirect using HttpResponseRedirectScheme, compatible with non-http schemes""" 525 parsed = urlparse(uri) 526 527 if self.params.response_mode == ResponseMode.FORM_POST: 528 # parse_qs returns a dictionary with values wrapped in lists, however 529 # we need a flat dictionary for the autosubmit challenge 530 531 # this picks the first item in the list if the value is a list, 532 # otherwise just the value as-is 533 query_params = dict( 534 (k, v[0] if isinstance(v, list) else v) for k, v in parse_qs(parsed.query).items() 535 ) 536 537 challenge = AutosubmitChallenge( 538 data={ 539 "component": "ak-stage-autosubmit", 540 "title": self.executor.plan.context.get( 541 PLAN_CONTEXT_TITLE, 542 _("Redirecting to {app}...".format_map({"app": self.application.name})), 543 ), 544 "url": self.params.redirect_uri, 545 "attrs": query_params, 546 } 547 ) 548 549 challenge.is_valid() 550 self.executor.stage_ok() 551 return HttpChallengeResponse( 552 challenge=challenge, 553 ) 554 self.executor.stage_ok() 555 return HttpResponseRedirectScheme(uri, allowed_schemes=[parsed.scheme]) 556 557 def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 558 """Wrapper when this stage gets hit with a post request""" 559 return self.get(request, *args, **kwargs) 560 561 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 562 """final Stage of an OAuth2 Flow""" 563 if PLAN_CONTEXT_PARAMS not in self.executor.plan.context: 564 LOGGER.warning("Got to fulfillment stage with no pending context") 565 return HttpResponseBadRequest() 566 self.params: OAuthAuthorizationParams = self.executor.plan.context.pop(PLAN_CONTEXT_PARAMS) 567 self.application: Application = self.executor.plan.context.pop(PLAN_CONTEXT_APPLICATION) 568 self.provider = get_object_or_404(OAuth2Provider, pk=self.application.provider_id) 569 try: 570 # At this point we don't need to check permissions anymore 571 if {PROMPT_NONE, PROMPT_CONSENT}.issubset(self.params.prompt): 572 raise AuthorizeError( 573 self.params.redirect_uri, 574 "consent_required", 575 self.params.grant_type, 576 self.params.state, 577 ) 578 Event.new( 579 EventAction.AUTHORIZE_APPLICATION, 580 authorized_application=self.application, 581 flow=self.executor.plan.flow_pk, 582 scopes=" ".join(self.params.scope), 583 ).from_http(self.request) 584 return self.redirect(self.create_response_uri()) 585 except (ClientIdError, RedirectUriError) as error: 586 error.to_event(application=self.application).from_http(request) 587 self.executor.stage_invalid() 588 589 return bad_request_message(request, error.description, title=error.error) 590 except AuthorizeError as error: 591 error.to_event(application=self.application).from_http(request) 592 self.executor.stage_invalid() 593 return error.get_response(self.request) 594 595 def create_response_uri(self) -> str: 596 """Create a final Response URI the user is redirected to.""" 597 uri = urlsplit(self.params.redirect_uri) 598 599 try: 600 code = None 601 602 if self.params.grant_type in [ 603 GrantType.AUTHORIZATION_CODE, 604 GrantType.HYBRID, 605 ]: 606 code = self.params.create_code(self.request) 607 code.save() 608 609 if self.params.response_mode == ResponseMode.QUERY: 610 query_params = parse_qs(uri.query) 611 query_params["code"] = code.code 612 query_params["state"] = [str(self.params.state) if self.params.state else ""] 613 614 uri = uri._replace(query=urlencode(query_params, doseq=True)) 615 return urlunsplit(uri) 616 617 if self.params.response_mode == ResponseMode.FRAGMENT: 618 query_fragment = {} 619 if self.params.grant_type in [GrantType.AUTHORIZATION_CODE]: 620 query_fragment["code"] = code.code 621 query_fragment["state"] = [str(self.params.state) if self.params.state else ""] 622 else: 623 query_fragment = self.create_implicit_response(code) 624 625 uri = uri._replace( 626 fragment=uri.fragment + urlencode(query_fragment, doseq=True), 627 ) 628 629 return urlunsplit(uri) 630 631 if self.params.response_mode == ResponseMode.FORM_POST: 632 post_params = {} 633 if self.params.grant_type in [GrantType.AUTHORIZATION_CODE]: 634 post_params["code"] = code.code 635 post_params["state"] = [str(self.params.state) if self.params.state else ""] 636 else: 637 post_params = self.create_implicit_response(code) 638 639 uri = uri._replace(query=urlencode(post_params, doseq=True)) 640 641 return urlunsplit(uri) 642 643 raise OAuth2Error() 644 except OAuth2Error as error: 645 LOGGER.warning("Error when trying to create response uri", error=error) 646 raise AuthorizeError( 647 self.params.redirect_uri, 648 "server_error", 649 self.params.grant_type, 650 self.params.state, 651 ) from None 652 653 def create_implicit_response(self, code: AuthorizationCode | None) -> dict: 654 """Create implicit response's URL Fragment dictionary""" 655 query_fragment = {} 656 auth_event = get_login_event(self.request) 657 658 now = timezone.now() 659 access_token_expiry = now + timedelta_from_string(self.provider.access_token_validity) 660 token = AccessToken( 661 user=self.request.user, 662 scope=self.params.scope, 663 expires=access_token_expiry, 664 provider=self.provider, 665 auth_time=auth_event.created if auth_event else now, 666 session=self.request.session["authenticatedsession"], 667 ) 668 669 id_token = IDToken.new(self.provider, token, self.request) 670 id_token.nonce = self.params.nonce 671 672 if self.params.response_type in [ 673 ResponseTypes.CODE_ID_TOKEN, 674 ResponseTypes.CODE_ID_TOKEN_TOKEN, 675 ]: 676 id_token.c_hash = code.c_hash 677 token.id_token = id_token 678 679 # Check if response_type must include access_token in the response. 680 if self.params.response_type in [ 681 ResponseTypes.ID_TOKEN_TOKEN, 682 ResponseTypes.CODE_ID_TOKEN_TOKEN, 683 ResponseTypes.CODE_TOKEN, 684 ]: 685 query_fragment["access_token"] = token.token 686 # Get at_hash of the current token and update the id_token 687 id_token.at_hash = token.at_hash 688 689 # Check if response_type must include id_token in the response. 690 if self.params.response_type in [ 691 ResponseTypes.ID_TOKEN, 692 ResponseTypes.ID_TOKEN_TOKEN, 693 ResponseTypes.CODE_ID_TOKEN, 694 ResponseTypes.CODE_ID_TOKEN_TOKEN, 695 ]: 696 query_fragment["id_token"] = self.provider.encode(id_token.to_dict()) 697 token._id_token = dumps(id_token.to_dict()) 698 699 token.save() 700 701 # Code parameter must be present if it's Hybrid Flow. 702 if self.params.grant_type == GrantType.HYBRID: 703 query_fragment["code"] = code.code 704 705 query_fragment["token_type"] = TOKEN_TYPE 706 query_fragment["expires_in"] = int( 707 timedelta_from_string(self.provider.access_token_validity).total_seconds() 708 ) 709 query_fragment["state"] = self.params.state if self.params.state else "" 710 return query_fragment
84@dataclass(slots=True) 85class OAuthAuthorizationParams: 86 """Parameters required to authorize an OAuth Client""" 87 88 client_id: str 89 redirect_uri: str 90 response_type: str 91 response_mode: str | None 92 scope: set[str] 93 state: str 94 nonce: str | None 95 prompt: set[str] 96 grant_type: str 97 98 provider: OAuth2Provider = field(default_factory=OAuth2Provider) 99 100 request: str | None = None 101 102 max_age: int | None = None 103 104 code_challenge: str | None = None 105 code_challenge_method: str | None = None 106 107 github_compat: InitVar[bool] = False 108 109 @staticmethod 110 def from_request(request: HttpRequest, github_compat=False) -> OAuthAuthorizationParams: 111 """ 112 Get all the params used by the Authorization Code Flow 113 (and also for the Implicit and Hybrid). 114 115 See: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest 116 """ 117 # Because in this endpoint we handle both GET 118 # and POST request. 119 query_dict = request.POST if request.method == "POST" else request.GET 120 state = query_dict.get("state") 121 redirect_uri = query_dict.get("redirect_uri", "") 122 123 response_type = query_dict.get("response_type", "") 124 125 # Validate and check the response_mode against the predefined dict 126 # Set to Query or Fragment if not defined in request 127 response_mode = query_dict.get("response_mode", False) 128 129 max_age = query_dict.get("max_age") 130 return OAuthAuthorizationParams( 131 client_id=query_dict.get("client_id", ""), 132 redirect_uri=redirect_uri, 133 response_type=response_type, 134 response_mode=response_mode, 135 grant_type="", 136 scope=set(query_dict.get("scope", "").split()), 137 state=state, 138 nonce=query_dict.get("nonce"), 139 prompt=ALLOWED_PROMPT_PARAMS.intersection(set(query_dict.get("prompt", "").split())), 140 request=query_dict.get("request", None), 141 max_age=int(max_age) if max_age else None, 142 code_challenge=query_dict.get("code_challenge"), 143 code_challenge_method=query_dict.get("code_challenge_method", "plain"), 144 github_compat=github_compat, 145 ) 146 147 def __post_init__(self, github_compat=False): 148 self.provider: OAuth2Provider = OAuth2Provider.objects.filter( 149 client_id=self.client_id 150 ).first() 151 if not self.provider: 152 LOGGER.warning("Invalid client identifier", client_id=self.client_id) 153 raise ClientIdError(client_id=self.client_id) 154 self.check_redirect_uri() 155 self.check_grant() 156 self.check_scope(github_compat) 157 if self.request: 158 raise AuthorizeError( 159 self.redirect_uri, "request_not_supported", self.grant_type, self.state 160 ) 161 self.check_nonce() 162 self.check_code_challenge() 163 164 def check_grant(self): 165 """Check grant""" 166 # Determine which flow to use. 167 if self.response_type in [ResponseTypes.CODE]: 168 self.grant_type = GrantType.AUTHORIZATION_CODE 169 elif self.response_type in [ 170 ResponseTypes.ID_TOKEN, 171 ResponseTypes.ID_TOKEN_TOKEN, 172 ]: 173 self.grant_type = GrantType.IMPLICIT 174 elif self.response_type in [ 175 ResponseTypes.CODE_TOKEN, 176 ResponseTypes.CODE_ID_TOKEN, 177 ResponseTypes.CODE_ID_TOKEN_TOKEN, 178 ]: 179 self.grant_type = GrantType.HYBRID 180 # Grant type validation. 181 if not self.grant_type: 182 LOGGER.warning("Invalid response type", type=self.response_type) 183 raise AuthorizeError(self.redirect_uri, "unsupported_response_type", "", self.state) 184 185 if self.grant_type not in self.provider.grant_types: 186 LOGGER.warning("Invalid grant_type for provider", grant_type=self.grant_type) 187 raise AuthorizeError(self.redirect_uri, "invalid_request", self.grant_type, self.state) 188 189 if self.response_mode not in ResponseMode.values: 190 self.response_mode = ResponseMode.QUERY 191 192 if self.grant_type in [GrantType.IMPLICIT, GrantType.HYBRID]: 193 self.response_mode = ResponseMode.FRAGMENT 194 195 def check_redirect_uri(self): 196 """Redirect URI validation.""" 197 allowed_redirect_urls = self.provider.authorization_redirect_uris 198 if not self.redirect_uri: 199 LOGGER.warning("Missing redirect uri.") 200 raise RedirectUriError("", allowed_redirect_urls).with_cause("redirect_uri_missing") 201 202 match_found = False 203 for allowed in allowed_redirect_urls: 204 if allowed.matching_mode == RedirectURIMatchingMode.STRICT: 205 if self.redirect_uri == allowed.url: 206 match_found = True 207 break 208 if allowed.matching_mode == RedirectURIMatchingMode.REGEX: 209 try: 210 if fullmatch(allowed.url, self.redirect_uri): 211 match_found = True 212 break 213 except RegexError as exc: 214 LOGGER.warning( 215 "Failed to parse regular expression", 216 exc=exc, 217 url=allowed.url, 218 provider=self.provider, 219 ) 220 if not match_found: 221 raise RedirectUriError(self.redirect_uri, allowed_redirect_urls).with_cause( 222 "redirect_uri_no_match" 223 ) 224 # Check against forbidden schemes 225 if urlparse(self.redirect_uri).scheme in FORBIDDEN_URI_SCHEMES: 226 raise RedirectUriError(self.redirect_uri, allowed_redirect_urls).with_cause( 227 "redirect_uri_forbidden_scheme" 228 ) 229 230 def check_scope(self, github_compat=False): 231 """Ensure openid scope is set in Hybrid flows, or when requesting an id_token""" 232 default_scope_names = set( 233 ScopeMapping.objects.filter(provider__in=[self.provider]).values_list( 234 "scope_name", flat=True 235 ) 236 ) 237 if len(self.scope) == 0: 238 self.scope = default_scope_names 239 LOGGER.info( 240 "No scopes requested, defaulting to all configured scopes", scopes=self.scope 241 ) 242 scopes_to_check = self.scope 243 if github_compat: 244 scopes_to_check = self.scope - SCOPE_GITHUB 245 if not scopes_to_check.issubset(default_scope_names): 246 LOGGER.info( 247 "Application requested scopes not configured, setting to overlap", 248 scope_allowed=default_scope_names, 249 scope_given=self.scope, 250 ) 251 self.scope = self.scope.intersection(default_scope_names) 252 if SCOPE_OPENID not in self.scope and ( 253 self.grant_type == GrantType.HYBRID 254 or self.response_type in [ResponseTypes.ID_TOKEN, ResponseTypes.ID_TOKEN_TOKEN] 255 ): 256 LOGGER.warning("Missing 'openid' scope.") 257 raise AuthorizeError( 258 self.redirect_uri, "invalid_scope", self.grant_type, self.state 259 ).with_cause("scope_openid_missing") 260 if SCOPE_OFFLINE_ACCESS in self.scope: 261 # https://openid.net/specs/openid-connect-core-1_0.html#OfflineAccess 262 # Don't explicitly request consent with offline_access, as the spec allows for 263 # "other conditions for processing the request permitting offline access to the 264 # requested resources are in place" 265 # which we interpret as "the admin picks an authorization flow with or without consent" 266 if self.response_type not in [ 267 ResponseTypes.CODE, 268 ResponseTypes.CODE_TOKEN, 269 ResponseTypes.CODE_ID_TOKEN, 270 ResponseTypes.CODE_ID_TOKEN_TOKEN, 271 ]: 272 # offline_access requires a response type that has some sort of token 273 # Spec says to ignore the scope when the response_type wouldn't result 274 # in an authorization code being generated 275 self.scope.remove(SCOPE_OFFLINE_ACCESS) 276 277 def check_nonce(self): 278 """Nonce parameter validation.""" 279 # nonce is required for all flows that return an id_token from the authorization endpoint, 280 # see https://openid.net/specs/openid-connect-core-1_0.html#ImplicitAuthRequest or 281 # https://openid.net/specs/openid-connect-core-1_0.html#HybridIDToken and 282 # https://bitbucket.org/openid/connect/issues/972/nonce-requirement-in-hybrid-auth-request 283 if self.response_type not in [ 284 ResponseTypes.ID_TOKEN, 285 ResponseTypes.ID_TOKEN_TOKEN, 286 ResponseTypes.CODE_ID_TOKEN, 287 ResponseTypes.CODE_ID_TOKEN_TOKEN, 288 ]: 289 return 290 if SCOPE_OPENID not in self.scope: 291 return 292 if not self.nonce: 293 LOGGER.warning("Missing nonce for OpenID Request") 294 raise AuthorizeError( 295 self.redirect_uri, "invalid_request", self.grant_type, self.state 296 ).with_cause("nonce_missing") 297 298 def check_code_challenge(self): 299 """PKCE validation of the transformation method.""" 300 if self.code_challenge and self.code_challenge_method not in [ 301 PKCE_METHOD_PLAIN, 302 PKCE_METHOD_S256, 303 ]: 304 raise AuthorizeError( 305 self.redirect_uri, 306 "invalid_request", 307 self.grant_type, 308 self.state, 309 f"Unsupported challenge method {self.code_challenge_method}", 310 ) 311 312 def create_code(self, request: HttpRequest) -> AuthorizationCode: 313 """Create an AuthorizationCode object for the request""" 314 auth_event = get_login_event(request) 315 316 now = timezone.now() 317 318 code = AuthorizationCode( 319 user=request.user, 320 provider=self.provider, 321 auth_time=auth_event.created if auth_event else now, 322 code=uuid4().hex, 323 expires=now + timedelta_from_string(self.provider.access_code_validity), 324 scope=self.scope, 325 nonce=self.nonce, 326 session=request.session["authenticatedsession"], 327 ) 328 329 if self.code_challenge and self.code_challenge_method: 330 code.code_challenge = self.code_challenge 331 code.code_challenge_method = self.code_challenge_method 332 333 return code
Parameters required to authorize an OAuth Client
109 @staticmethod 110 def from_request(request: HttpRequest, github_compat=False) -> OAuthAuthorizationParams: 111 """ 112 Get all the params used by the Authorization Code Flow 113 (and also for the Implicit and Hybrid). 114 115 See: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest 116 """ 117 # Because in this endpoint we handle both GET 118 # and POST request. 119 query_dict = request.POST if request.method == "POST" else request.GET 120 state = query_dict.get("state") 121 redirect_uri = query_dict.get("redirect_uri", "") 122 123 response_type = query_dict.get("response_type", "") 124 125 # Validate and check the response_mode against the predefined dict 126 # Set to Query or Fragment if not defined in request 127 response_mode = query_dict.get("response_mode", False) 128 129 max_age = query_dict.get("max_age") 130 return OAuthAuthorizationParams( 131 client_id=query_dict.get("client_id", ""), 132 redirect_uri=redirect_uri, 133 response_type=response_type, 134 response_mode=response_mode, 135 grant_type="", 136 scope=set(query_dict.get("scope", "").split()), 137 state=state, 138 nonce=query_dict.get("nonce"), 139 prompt=ALLOWED_PROMPT_PARAMS.intersection(set(query_dict.get("prompt", "").split())), 140 request=query_dict.get("request", None), 141 max_age=int(max_age) if max_age else None, 142 code_challenge=query_dict.get("code_challenge"), 143 code_challenge_method=query_dict.get("code_challenge_method", "plain"), 144 github_compat=github_compat, 145 )
Get all the params used by the Authorization Code Flow (and also for the Implicit and Hybrid).
See: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest
164 def check_grant(self): 165 """Check grant""" 166 # Determine which flow to use. 167 if self.response_type in [ResponseTypes.CODE]: 168 self.grant_type = GrantType.AUTHORIZATION_CODE 169 elif self.response_type in [ 170 ResponseTypes.ID_TOKEN, 171 ResponseTypes.ID_TOKEN_TOKEN, 172 ]: 173 self.grant_type = GrantType.IMPLICIT 174 elif self.response_type in [ 175 ResponseTypes.CODE_TOKEN, 176 ResponseTypes.CODE_ID_TOKEN, 177 ResponseTypes.CODE_ID_TOKEN_TOKEN, 178 ]: 179 self.grant_type = GrantType.HYBRID 180 # Grant type validation. 181 if not self.grant_type: 182 LOGGER.warning("Invalid response type", type=self.response_type) 183 raise AuthorizeError(self.redirect_uri, "unsupported_response_type", "", self.state) 184 185 if self.grant_type not in self.provider.grant_types: 186 LOGGER.warning("Invalid grant_type for provider", grant_type=self.grant_type) 187 raise AuthorizeError(self.redirect_uri, "invalid_request", self.grant_type, self.state) 188 189 if self.response_mode not in ResponseMode.values: 190 self.response_mode = ResponseMode.QUERY 191 192 if self.grant_type in [GrantType.IMPLICIT, GrantType.HYBRID]: 193 self.response_mode = ResponseMode.FRAGMENT
Check grant
195 def check_redirect_uri(self): 196 """Redirect URI validation.""" 197 allowed_redirect_urls = self.provider.authorization_redirect_uris 198 if not self.redirect_uri: 199 LOGGER.warning("Missing redirect uri.") 200 raise RedirectUriError("", allowed_redirect_urls).with_cause("redirect_uri_missing") 201 202 match_found = False 203 for allowed in allowed_redirect_urls: 204 if allowed.matching_mode == RedirectURIMatchingMode.STRICT: 205 if self.redirect_uri == allowed.url: 206 match_found = True 207 break 208 if allowed.matching_mode == RedirectURIMatchingMode.REGEX: 209 try: 210 if fullmatch(allowed.url, self.redirect_uri): 211 match_found = True 212 break 213 except RegexError as exc: 214 LOGGER.warning( 215 "Failed to parse regular expression", 216 exc=exc, 217 url=allowed.url, 218 provider=self.provider, 219 ) 220 if not match_found: 221 raise RedirectUriError(self.redirect_uri, allowed_redirect_urls).with_cause( 222 "redirect_uri_no_match" 223 ) 224 # Check against forbidden schemes 225 if urlparse(self.redirect_uri).scheme in FORBIDDEN_URI_SCHEMES: 226 raise RedirectUriError(self.redirect_uri, allowed_redirect_urls).with_cause( 227 "redirect_uri_forbidden_scheme" 228 )
Redirect URI validation.
230 def check_scope(self, github_compat=False): 231 """Ensure openid scope is set in Hybrid flows, or when requesting an id_token""" 232 default_scope_names = set( 233 ScopeMapping.objects.filter(provider__in=[self.provider]).values_list( 234 "scope_name", flat=True 235 ) 236 ) 237 if len(self.scope) == 0: 238 self.scope = default_scope_names 239 LOGGER.info( 240 "No scopes requested, defaulting to all configured scopes", scopes=self.scope 241 ) 242 scopes_to_check = self.scope 243 if github_compat: 244 scopes_to_check = self.scope - SCOPE_GITHUB 245 if not scopes_to_check.issubset(default_scope_names): 246 LOGGER.info( 247 "Application requested scopes not configured, setting to overlap", 248 scope_allowed=default_scope_names, 249 scope_given=self.scope, 250 ) 251 self.scope = self.scope.intersection(default_scope_names) 252 if SCOPE_OPENID not in self.scope and ( 253 self.grant_type == GrantType.HYBRID 254 or self.response_type in [ResponseTypes.ID_TOKEN, ResponseTypes.ID_TOKEN_TOKEN] 255 ): 256 LOGGER.warning("Missing 'openid' scope.") 257 raise AuthorizeError( 258 self.redirect_uri, "invalid_scope", self.grant_type, self.state 259 ).with_cause("scope_openid_missing") 260 if SCOPE_OFFLINE_ACCESS in self.scope: 261 # https://openid.net/specs/openid-connect-core-1_0.html#OfflineAccess 262 # Don't explicitly request consent with offline_access, as the spec allows for 263 # "other conditions for processing the request permitting offline access to the 264 # requested resources are in place" 265 # which we interpret as "the admin picks an authorization flow with or without consent" 266 if self.response_type not in [ 267 ResponseTypes.CODE, 268 ResponseTypes.CODE_TOKEN, 269 ResponseTypes.CODE_ID_TOKEN, 270 ResponseTypes.CODE_ID_TOKEN_TOKEN, 271 ]: 272 # offline_access requires a response type that has some sort of token 273 # Spec says to ignore the scope when the response_type wouldn't result 274 # in an authorization code being generated 275 self.scope.remove(SCOPE_OFFLINE_ACCESS)
Ensure openid scope is set in Hybrid flows, or when requesting an id_token
277 def check_nonce(self): 278 """Nonce parameter validation.""" 279 # nonce is required for all flows that return an id_token from the authorization endpoint, 280 # see https://openid.net/specs/openid-connect-core-1_0.html#ImplicitAuthRequest or 281 # https://openid.net/specs/openid-connect-core-1_0.html#HybridIDToken and 282 # https://bitbucket.org/openid/connect/issues/972/nonce-requirement-in-hybrid-auth-request 283 if self.response_type not in [ 284 ResponseTypes.ID_TOKEN, 285 ResponseTypes.ID_TOKEN_TOKEN, 286 ResponseTypes.CODE_ID_TOKEN, 287 ResponseTypes.CODE_ID_TOKEN_TOKEN, 288 ]: 289 return 290 if SCOPE_OPENID not in self.scope: 291 return 292 if not self.nonce: 293 LOGGER.warning("Missing nonce for OpenID Request") 294 raise AuthorizeError( 295 self.redirect_uri, "invalid_request", self.grant_type, self.state 296 ).with_cause("nonce_missing")
Nonce parameter validation.
298 def check_code_challenge(self): 299 """PKCE validation of the transformation method.""" 300 if self.code_challenge and self.code_challenge_method not in [ 301 PKCE_METHOD_PLAIN, 302 PKCE_METHOD_S256, 303 ]: 304 raise AuthorizeError( 305 self.redirect_uri, 306 "invalid_request", 307 self.grant_type, 308 self.state, 309 f"Unsupported challenge method {self.code_challenge_method}", 310 )
PKCE validation of the transformation method.
312 def create_code(self, request: HttpRequest) -> AuthorizationCode: 313 """Create an AuthorizationCode object for the request""" 314 auth_event = get_login_event(request) 315 316 now = timezone.now() 317 318 code = AuthorizationCode( 319 user=request.user, 320 provider=self.provider, 321 auth_time=auth_event.created if auth_event else now, 322 code=uuid4().hex, 323 expires=now + timedelta_from_string(self.provider.access_code_validity), 324 scope=self.scope, 325 nonce=self.nonce, 326 session=request.session["authenticatedsession"], 327 ) 328 329 if self.code_challenge and self.code_challenge_method: 330 code.code_challenge = self.code_challenge 331 code.code_challenge_method = self.code_challenge_method 332 333 return code
Create an AuthorizationCode object for the request
336class AuthorizationFlowInitView(PolicyAccessView): 337 """OAuth2 Flow initializer, checks access to application and starts flow""" 338 339 params: OAuthAuthorizationParams 340 # Enable GitHub compatibility (only allow for scopes which are handled 341 # differently for github compat) 342 github_compat = False 343 344 def pre_permission_check(self): 345 """Check prompt parameter before checking permission/authentication, 346 see https://openid.net/specs/openid-connect-core-1_0.html#rfc.section.3.1.2.6""" 347 # Quick sanity check at the beginning to prevent event spamming 348 if len(self.request.GET) < 1: 349 raise Http404 350 try: 351 self.params = OAuthAuthorizationParams.from_request( 352 self.request, github_compat=self.github_compat 353 ) 354 except AuthorizeError as error: 355 LOGGER.warning(error.description, redirect_uri=error.redirect_uri, cause=error.cause) 356 raise RequestValidationError(error.get_response(self.request)) from None 357 except OAuth2Error as error: 358 LOGGER.warning(error.description, cause=error.cause) 359 raise RequestValidationError( 360 bad_request_message(self.request, error.description, title=error.error) 361 ) from None 362 except OAuth2Provider.DoesNotExist: 363 raise Http404 from None 364 if PROMPT_NONE in self.params.prompt and not self.request.user.is_authenticated: 365 # When "prompt" is set to "none" but the user is not logged in, show an error message 366 error = AuthorizeError( 367 self.params.redirect_uri, 368 "login_required", 369 self.params.grant_type, 370 self.params.state, 371 ) 372 raise RequestValidationError(error.get_response(self.request)) 373 374 def resolve_provider_application(self): 375 client_id = self.request.GET.get("client_id") 376 self.provider = get_object_or_404(OAuth2Provider, client_id=client_id) 377 self.application = self.provider.application 378 379 def modify_flow_context(self, flow: Flow, context: dict[str, Any]) -> dict[str, Any]: 380 if QS_LOGIN_HINT in self.request.GET: 381 context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = self.request.GET.get(QS_LOGIN_HINT) 382 return super().modify_flow_context(flow, context) 383 384 def modify_policy_request(self, request: PolicyRequest) -> PolicyRequest: 385 request.context["oauth_scopes"] = self.params.scope 386 request.context["oauth_grant_type"] = self.params.grant_type 387 request.context["oauth_code_challenge"] = self.params.code_challenge 388 request.context["oauth_code_challenge_method"] = self.params.code_challenge_method 389 request.context["oauth_max_age"] = self.params.max_age 390 request.context["oauth_redirect_uri"] = self.params.redirect_uri 391 request.context["oauth_response_type"] = self.params.response_type 392 return request 393 394 def dispatch_with_language(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 395 """Activate language from OIDC specific ui_locales parameter, picking the earliest one 396 available""" 397 selected_language = None 398 if UI_LOCALES in self.request.GET: 399 languages = str(self.request.GET[UI_LOCALES]).split(" ") 400 for language in languages: 401 if translation.check_for_language(language): 402 selected_language = translation.get_supported_language_variant(language) 403 LOGGER.debug( 404 "Activating language from oidc ui_locales", locale=selected_language 405 ) 406 break 407 translation.activate(selected_language) 408 response = super().dispatch(request, *args, **kwargs) 409 if selected_language: 410 response.set_cookie( 411 settings.LANGUAGE_COOKIE_NAME, 412 selected_language, 413 max_age=settings.LANGUAGE_COOKIE_AGE, 414 path=settings.LANGUAGE_COOKIE_PATH, 415 domain=settings.LANGUAGE_COOKIE_DOMAIN, 416 secure=settings.LANGUAGE_COOKIE_SECURE, 417 httponly=settings.LANGUAGE_COOKIE_HTTPONLY, 418 samesite=settings.LANGUAGE_COOKIE_SAMESITE, 419 ) 420 if isinstance(response, HttpResponseRedirect): 421 parsed_url = urlparse(response.url) 422 args = parse_qs(parsed_url.query) 423 args["locale"] = selected_language 424 response["Location"] = urlunparse( 425 parsed_url._replace(query=urlencode(args, quote_via=quote, doseq=True)) 426 ) 427 return response 428 429 def dispatch(self, request: HttpRequest, *args, **kwargs): 430 # Activate language before parsing params (error messages should be localized) 431 return self.dispatch_with_language(request, *args, **kwargs) 432 433 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 434 """Start FlowPLanner, return to flow executor shell""" 435 # Require a login event to be set, otherwise make the user re-login 436 login_event = get_login_event(request) 437 if not login_event: 438 LOGGER.warning("request with no login event") 439 return self.handle_no_permission() 440 login_uid = str(login_event.pk) 441 # After we've checked permissions, and the user has access, check if we need 442 # to re-authenticate the user 443 if self.params.max_age: 444 # Attempt to check via the session's login event if set, otherwise we can't 445 # check 446 login_time = login_event.created 447 current_age: timedelta = timezone.now() - login_time 448 if current_age.total_seconds() > self.params.max_age: 449 LOGGER.debug( 450 "Triggering authentication as max_age requirement", 451 max_age=self.params.max_age, 452 ago=int(current_age.total_seconds()), 453 ) 454 # Since we already need to re-authenticate the user, set the old login UID 455 # in case this request has both max_age and prompt=login 456 self.request.session[SESSION_KEY_LAST_LOGIN_UID] = login_uid 457 return self.handle_no_permission() 458 # If prompt=login, we need to re-authenticate the user regardless 459 # Check if we're not already doing the re-authentication 460 if PROMPT_LOGIN in self.params.prompt: 461 # No previous login UID saved, so save the current uid and trigger 462 # re-login, or previous login UID matches current one, so no re-login happened yet 463 if ( 464 SESSION_KEY_LAST_LOGIN_UID not in self.request.session 465 or login_uid == self.request.session[SESSION_KEY_LAST_LOGIN_UID] 466 ): 467 self.request.session[SESSION_KEY_LAST_LOGIN_UID] = login_uid 468 return self.handle_no_permission() 469 scope_descriptions = UserInfoView().get_scope_descriptions( 470 self.params.scope, self.params.provider 471 ) 472 # Regardless, we start the planner and return to it 473 planner = FlowPlanner(self.provider.authorization_flow) 474 planner.allow_empty_flows = True 475 try: 476 plan = planner.plan( 477 self.request, 478 { 479 PLAN_CONTEXT_SSO: True, 480 PLAN_CONTEXT_APPLICATION: self.application, 481 # OAuth2 related params 482 PLAN_CONTEXT_PARAMS: self.params, 483 # Consent related params 484 PLAN_CONTEXT_CONSENT_HEADER: _("You're about to sign into %(application)s.") 485 % {"application": self.application.name}, 486 PLAN_CONTEXT_CONSENT_PERMISSIONS: scope_descriptions, 487 }, 488 ) 489 except FlowNonApplicableException: 490 return self.handle_no_permission_authenticated() 491 # OpenID clients can specify a `prompt` parameter, and if its set to consent we 492 # need to inject a consent stage 493 if PROMPT_CONSENT in self.params.prompt: 494 if not any(isinstance(x.stage, ConsentStage) for x in plan.bindings): 495 # Plan does not have any consent stage, so we add an in-memory one 496 stage = ConsentStage( 497 name="OAuth2 Provider In-memory consent stage", 498 mode=ConsentMode.ALWAYS_REQUIRE, 499 ) 500 plan.append_stage(stage) 501 502 plan.append_stage(in_memory_stage(OAuthFulfillmentStage)) 503 504 return plan.to_redirect( 505 self.request, 506 self.provider.authorization_flow, 507 # We can only skip the flow executor and directly go to the final redirect URL if 508 # we can submit the data to the RP via URL 509 allowed_silent_types=( 510 [OAuthFulfillmentStage] 511 if self.params.response_mode in [ResponseMode.QUERY, ResponseMode.FRAGMENT] 512 else [] 513 ), 514 )
OAuth2 Flow initializer, checks access to application and starts flow
344 def pre_permission_check(self): 345 """Check prompt parameter before checking permission/authentication, 346 see https://openid.net/specs/openid-connect-core-1_0.html#rfc.section.3.1.2.6""" 347 # Quick sanity check at the beginning to prevent event spamming 348 if len(self.request.GET) < 1: 349 raise Http404 350 try: 351 self.params = OAuthAuthorizationParams.from_request( 352 self.request, github_compat=self.github_compat 353 ) 354 except AuthorizeError as error: 355 LOGGER.warning(error.description, redirect_uri=error.redirect_uri, cause=error.cause) 356 raise RequestValidationError(error.get_response(self.request)) from None 357 except OAuth2Error as error: 358 LOGGER.warning(error.description, cause=error.cause) 359 raise RequestValidationError( 360 bad_request_message(self.request, error.description, title=error.error) 361 ) from None 362 except OAuth2Provider.DoesNotExist: 363 raise Http404 from None 364 if PROMPT_NONE in self.params.prompt and not self.request.user.is_authenticated: 365 # When "prompt" is set to "none" but the user is not logged in, show an error message 366 error = AuthorizeError( 367 self.params.redirect_uri, 368 "login_required", 369 self.params.grant_type, 370 self.params.state, 371 ) 372 raise RequestValidationError(error.get_response(self.request))
Check prompt parameter before checking permission/authentication, see https://openid.net/specs/openid-connect-core-1_0.html#rfc.section.3.1.2.6
374 def resolve_provider_application(self): 375 client_id = self.request.GET.get("client_id") 376 self.provider = get_object_or_404(OAuth2Provider, client_id=client_id) 377 self.application = self.provider.application
Resolve self.provider and self.application. *.DoesNotExist Exceptions cause a normal AccessDenied view to be shown. An Http404 exception is not caught, and will return directly
379 def modify_flow_context(self, flow: Flow, context: dict[str, Any]) -> dict[str, Any]: 380 if QS_LOGIN_HINT in self.request.GET: 381 context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = self.request.GET.get(QS_LOGIN_HINT) 382 return super().modify_flow_context(flow, context)
optionally modify the flow context which is used for the authentication flow
384 def modify_policy_request(self, request: PolicyRequest) -> PolicyRequest: 385 request.context["oauth_scopes"] = self.params.scope 386 request.context["oauth_grant_type"] = self.params.grant_type 387 request.context["oauth_code_challenge"] = self.params.code_challenge 388 request.context["oauth_code_challenge_method"] = self.params.code_challenge_method 389 request.context["oauth_max_age"] = self.params.max_age 390 request.context["oauth_redirect_uri"] = self.params.redirect_uri 391 request.context["oauth_response_type"] = self.params.response_type 392 return request
optionally modify the policy request
394 def dispatch_with_language(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 395 """Activate language from OIDC specific ui_locales parameter, picking the earliest one 396 available""" 397 selected_language = None 398 if UI_LOCALES in self.request.GET: 399 languages = str(self.request.GET[UI_LOCALES]).split(" ") 400 for language in languages: 401 if translation.check_for_language(language): 402 selected_language = translation.get_supported_language_variant(language) 403 LOGGER.debug( 404 "Activating language from oidc ui_locales", locale=selected_language 405 ) 406 break 407 translation.activate(selected_language) 408 response = super().dispatch(request, *args, **kwargs) 409 if selected_language: 410 response.set_cookie( 411 settings.LANGUAGE_COOKIE_NAME, 412 selected_language, 413 max_age=settings.LANGUAGE_COOKIE_AGE, 414 path=settings.LANGUAGE_COOKIE_PATH, 415 domain=settings.LANGUAGE_COOKIE_DOMAIN, 416 secure=settings.LANGUAGE_COOKIE_SECURE, 417 httponly=settings.LANGUAGE_COOKIE_HTTPONLY, 418 samesite=settings.LANGUAGE_COOKIE_SAMESITE, 419 ) 420 if isinstance(response, HttpResponseRedirect): 421 parsed_url = urlparse(response.url) 422 args = parse_qs(parsed_url.query) 423 args["locale"] = selected_language 424 response["Location"] = urlunparse( 425 parsed_url._replace(query=urlencode(args, quote_via=quote, doseq=True)) 426 ) 427 return response
Activate language from OIDC specific ui_locales parameter, picking the earliest one available
433 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 434 """Start FlowPLanner, return to flow executor shell""" 435 # Require a login event to be set, otherwise make the user re-login 436 login_event = get_login_event(request) 437 if not login_event: 438 LOGGER.warning("request with no login event") 439 return self.handle_no_permission() 440 login_uid = str(login_event.pk) 441 # After we've checked permissions, and the user has access, check if we need 442 # to re-authenticate the user 443 if self.params.max_age: 444 # Attempt to check via the session's login event if set, otherwise we can't 445 # check 446 login_time = login_event.created 447 current_age: timedelta = timezone.now() - login_time 448 if current_age.total_seconds() > self.params.max_age: 449 LOGGER.debug( 450 "Triggering authentication as max_age requirement", 451 max_age=self.params.max_age, 452 ago=int(current_age.total_seconds()), 453 ) 454 # Since we already need to re-authenticate the user, set the old login UID 455 # in case this request has both max_age and prompt=login 456 self.request.session[SESSION_KEY_LAST_LOGIN_UID] = login_uid 457 return self.handle_no_permission() 458 # If prompt=login, we need to re-authenticate the user regardless 459 # Check if we're not already doing the re-authentication 460 if PROMPT_LOGIN in self.params.prompt: 461 # No previous login UID saved, so save the current uid and trigger 462 # re-login, or previous login UID matches current one, so no re-login happened yet 463 if ( 464 SESSION_KEY_LAST_LOGIN_UID not in self.request.session 465 or login_uid == self.request.session[SESSION_KEY_LAST_LOGIN_UID] 466 ): 467 self.request.session[SESSION_KEY_LAST_LOGIN_UID] = login_uid 468 return self.handle_no_permission() 469 scope_descriptions = UserInfoView().get_scope_descriptions( 470 self.params.scope, self.params.provider 471 ) 472 # Regardless, we start the planner and return to it 473 planner = FlowPlanner(self.provider.authorization_flow) 474 planner.allow_empty_flows = True 475 try: 476 plan = planner.plan( 477 self.request, 478 { 479 PLAN_CONTEXT_SSO: True, 480 PLAN_CONTEXT_APPLICATION: self.application, 481 # OAuth2 related params 482 PLAN_CONTEXT_PARAMS: self.params, 483 # Consent related params 484 PLAN_CONTEXT_CONSENT_HEADER: _("You're about to sign into %(application)s.") 485 % {"application": self.application.name}, 486 PLAN_CONTEXT_CONSENT_PERMISSIONS: scope_descriptions, 487 }, 488 ) 489 except FlowNonApplicableException: 490 return self.handle_no_permission_authenticated() 491 # OpenID clients can specify a `prompt` parameter, and if its set to consent we 492 # need to inject a consent stage 493 if PROMPT_CONSENT in self.params.prompt: 494 if not any(isinstance(x.stage, ConsentStage) for x in plan.bindings): 495 # Plan does not have any consent stage, so we add an in-memory one 496 stage = ConsentStage( 497 name="OAuth2 Provider In-memory consent stage", 498 mode=ConsentMode.ALWAYS_REQUIRE, 499 ) 500 plan.append_stage(stage) 501 502 plan.append_stage(in_memory_stage(OAuthFulfillmentStage)) 503 504 return plan.to_redirect( 505 self.request, 506 self.provider.authorization_flow, 507 # We can only skip the flow executor and directly go to the final redirect URL if 508 # we can submit the data to the RP via URL 509 allowed_silent_types=( 510 [OAuthFulfillmentStage] 511 if self.params.response_mode in [ResponseMode.QUERY, ResponseMode.FRAGMENT] 512 else [] 513 ), 514 )
Start FlowPLanner, return to flow executor shell
517class OAuthFulfillmentStage(StageView): 518 """Final stage, restores params from Flow.""" 519 520 params: OAuthAuthorizationParams 521 provider: OAuth2Provider 522 application: Application 523 524 def redirect(self, uri: str) -> HttpResponse: 525 """Redirect using HttpResponseRedirectScheme, compatible with non-http schemes""" 526 parsed = urlparse(uri) 527 528 if self.params.response_mode == ResponseMode.FORM_POST: 529 # parse_qs returns a dictionary with values wrapped in lists, however 530 # we need a flat dictionary for the autosubmit challenge 531 532 # this picks the first item in the list if the value is a list, 533 # otherwise just the value as-is 534 query_params = dict( 535 (k, v[0] if isinstance(v, list) else v) for k, v in parse_qs(parsed.query).items() 536 ) 537 538 challenge = AutosubmitChallenge( 539 data={ 540 "component": "ak-stage-autosubmit", 541 "title": self.executor.plan.context.get( 542 PLAN_CONTEXT_TITLE, 543 _("Redirecting to {app}...".format_map({"app": self.application.name})), 544 ), 545 "url": self.params.redirect_uri, 546 "attrs": query_params, 547 } 548 ) 549 550 challenge.is_valid() 551 self.executor.stage_ok() 552 return HttpChallengeResponse( 553 challenge=challenge, 554 ) 555 self.executor.stage_ok() 556 return HttpResponseRedirectScheme(uri, allowed_schemes=[parsed.scheme]) 557 558 def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 559 """Wrapper when this stage gets hit with a post request""" 560 return self.get(request, *args, **kwargs) 561 562 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 563 """final Stage of an OAuth2 Flow""" 564 if PLAN_CONTEXT_PARAMS not in self.executor.plan.context: 565 LOGGER.warning("Got to fulfillment stage with no pending context") 566 return HttpResponseBadRequest() 567 self.params: OAuthAuthorizationParams = self.executor.plan.context.pop(PLAN_CONTEXT_PARAMS) 568 self.application: Application = self.executor.plan.context.pop(PLAN_CONTEXT_APPLICATION) 569 self.provider = get_object_or_404(OAuth2Provider, pk=self.application.provider_id) 570 try: 571 # At this point we don't need to check permissions anymore 572 if {PROMPT_NONE, PROMPT_CONSENT}.issubset(self.params.prompt): 573 raise AuthorizeError( 574 self.params.redirect_uri, 575 "consent_required", 576 self.params.grant_type, 577 self.params.state, 578 ) 579 Event.new( 580 EventAction.AUTHORIZE_APPLICATION, 581 authorized_application=self.application, 582 flow=self.executor.plan.flow_pk, 583 scopes=" ".join(self.params.scope), 584 ).from_http(self.request) 585 return self.redirect(self.create_response_uri()) 586 except (ClientIdError, RedirectUriError) as error: 587 error.to_event(application=self.application).from_http(request) 588 self.executor.stage_invalid() 589 590 return bad_request_message(request, error.description, title=error.error) 591 except AuthorizeError as error: 592 error.to_event(application=self.application).from_http(request) 593 self.executor.stage_invalid() 594 return error.get_response(self.request) 595 596 def create_response_uri(self) -> str: 597 """Create a final Response URI the user is redirected to.""" 598 uri = urlsplit(self.params.redirect_uri) 599 600 try: 601 code = None 602 603 if self.params.grant_type in [ 604 GrantType.AUTHORIZATION_CODE, 605 GrantType.HYBRID, 606 ]: 607 code = self.params.create_code(self.request) 608 code.save() 609 610 if self.params.response_mode == ResponseMode.QUERY: 611 query_params = parse_qs(uri.query) 612 query_params["code"] = code.code 613 query_params["state"] = [str(self.params.state) if self.params.state else ""] 614 615 uri = uri._replace(query=urlencode(query_params, doseq=True)) 616 return urlunsplit(uri) 617 618 if self.params.response_mode == ResponseMode.FRAGMENT: 619 query_fragment = {} 620 if self.params.grant_type in [GrantType.AUTHORIZATION_CODE]: 621 query_fragment["code"] = code.code 622 query_fragment["state"] = [str(self.params.state) if self.params.state else ""] 623 else: 624 query_fragment = self.create_implicit_response(code) 625 626 uri = uri._replace( 627 fragment=uri.fragment + urlencode(query_fragment, doseq=True), 628 ) 629 630 return urlunsplit(uri) 631 632 if self.params.response_mode == ResponseMode.FORM_POST: 633 post_params = {} 634 if self.params.grant_type in [GrantType.AUTHORIZATION_CODE]: 635 post_params["code"] = code.code 636 post_params["state"] = [str(self.params.state) if self.params.state else ""] 637 else: 638 post_params = self.create_implicit_response(code) 639 640 uri = uri._replace(query=urlencode(post_params, doseq=True)) 641 642 return urlunsplit(uri) 643 644 raise OAuth2Error() 645 except OAuth2Error as error: 646 LOGGER.warning("Error when trying to create response uri", error=error) 647 raise AuthorizeError( 648 self.params.redirect_uri, 649 "server_error", 650 self.params.grant_type, 651 self.params.state, 652 ) from None 653 654 def create_implicit_response(self, code: AuthorizationCode | None) -> dict: 655 """Create implicit response's URL Fragment dictionary""" 656 query_fragment = {} 657 auth_event = get_login_event(self.request) 658 659 now = timezone.now() 660 access_token_expiry = now + timedelta_from_string(self.provider.access_token_validity) 661 token = AccessToken( 662 user=self.request.user, 663 scope=self.params.scope, 664 expires=access_token_expiry, 665 provider=self.provider, 666 auth_time=auth_event.created if auth_event else now, 667 session=self.request.session["authenticatedsession"], 668 ) 669 670 id_token = IDToken.new(self.provider, token, self.request) 671 id_token.nonce = self.params.nonce 672 673 if self.params.response_type in [ 674 ResponseTypes.CODE_ID_TOKEN, 675 ResponseTypes.CODE_ID_TOKEN_TOKEN, 676 ]: 677 id_token.c_hash = code.c_hash 678 token.id_token = id_token 679 680 # Check if response_type must include access_token in the response. 681 if self.params.response_type in [ 682 ResponseTypes.ID_TOKEN_TOKEN, 683 ResponseTypes.CODE_ID_TOKEN_TOKEN, 684 ResponseTypes.CODE_TOKEN, 685 ]: 686 query_fragment["access_token"] = token.token 687 # Get at_hash of the current token and update the id_token 688 id_token.at_hash = token.at_hash 689 690 # Check if response_type must include id_token in the response. 691 if self.params.response_type in [ 692 ResponseTypes.ID_TOKEN, 693 ResponseTypes.ID_TOKEN_TOKEN, 694 ResponseTypes.CODE_ID_TOKEN, 695 ResponseTypes.CODE_ID_TOKEN_TOKEN, 696 ]: 697 query_fragment["id_token"] = self.provider.encode(id_token.to_dict()) 698 token._id_token = dumps(id_token.to_dict()) 699 700 token.save() 701 702 # Code parameter must be present if it's Hybrid Flow. 703 if self.params.grant_type == GrantType.HYBRID: 704 query_fragment["code"] = code.code 705 706 query_fragment["token_type"] = TOKEN_TYPE 707 query_fragment["expires_in"] = int( 708 timedelta_from_string(self.provider.access_token_validity).total_seconds() 709 ) 710 query_fragment["state"] = self.params.state if self.params.state else "" 711 return query_fragment
Final stage, restores params from Flow.
524 def redirect(self, uri: str) -> HttpResponse: 525 """Redirect using HttpResponseRedirectScheme, compatible with non-http schemes""" 526 parsed = urlparse(uri) 527 528 if self.params.response_mode == ResponseMode.FORM_POST: 529 # parse_qs returns a dictionary with values wrapped in lists, however 530 # we need a flat dictionary for the autosubmit challenge 531 532 # this picks the first item in the list if the value is a list, 533 # otherwise just the value as-is 534 query_params = dict( 535 (k, v[0] if isinstance(v, list) else v) for k, v in parse_qs(parsed.query).items() 536 ) 537 538 challenge = AutosubmitChallenge( 539 data={ 540 "component": "ak-stage-autosubmit", 541 "title": self.executor.plan.context.get( 542 PLAN_CONTEXT_TITLE, 543 _("Redirecting to {app}...".format_map({"app": self.application.name})), 544 ), 545 "url": self.params.redirect_uri, 546 "attrs": query_params, 547 } 548 ) 549 550 challenge.is_valid() 551 self.executor.stage_ok() 552 return HttpChallengeResponse( 553 challenge=challenge, 554 ) 555 self.executor.stage_ok() 556 return HttpResponseRedirectScheme(uri, allowed_schemes=[parsed.scheme])
Redirect using HttpResponseRedirectScheme, compatible with non-http schemes
558 def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 559 """Wrapper when this stage gets hit with a post request""" 560 return self.get(request, *args, **kwargs)
Wrapper when this stage gets hit with a post request
562 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 563 """final Stage of an OAuth2 Flow""" 564 if PLAN_CONTEXT_PARAMS not in self.executor.plan.context: 565 LOGGER.warning("Got to fulfillment stage with no pending context") 566 return HttpResponseBadRequest() 567 self.params: OAuthAuthorizationParams = self.executor.plan.context.pop(PLAN_CONTEXT_PARAMS) 568 self.application: Application = self.executor.plan.context.pop(PLAN_CONTEXT_APPLICATION) 569 self.provider = get_object_or_404(OAuth2Provider, pk=self.application.provider_id) 570 try: 571 # At this point we don't need to check permissions anymore 572 if {PROMPT_NONE, PROMPT_CONSENT}.issubset(self.params.prompt): 573 raise AuthorizeError( 574 self.params.redirect_uri, 575 "consent_required", 576 self.params.grant_type, 577 self.params.state, 578 ) 579 Event.new( 580 EventAction.AUTHORIZE_APPLICATION, 581 authorized_application=self.application, 582 flow=self.executor.plan.flow_pk, 583 scopes=" ".join(self.params.scope), 584 ).from_http(self.request) 585 return self.redirect(self.create_response_uri()) 586 except (ClientIdError, RedirectUriError) as error: 587 error.to_event(application=self.application).from_http(request) 588 self.executor.stage_invalid() 589 590 return bad_request_message(request, error.description, title=error.error) 591 except AuthorizeError as error: 592 error.to_event(application=self.application).from_http(request) 593 self.executor.stage_invalid() 594 return error.get_response(self.request)
final Stage of an OAuth2 Flow
596 def create_response_uri(self) -> str: 597 """Create a final Response URI the user is redirected to.""" 598 uri = urlsplit(self.params.redirect_uri) 599 600 try: 601 code = None 602 603 if self.params.grant_type in [ 604 GrantType.AUTHORIZATION_CODE, 605 GrantType.HYBRID, 606 ]: 607 code = self.params.create_code(self.request) 608 code.save() 609 610 if self.params.response_mode == ResponseMode.QUERY: 611 query_params = parse_qs(uri.query) 612 query_params["code"] = code.code 613 query_params["state"] = [str(self.params.state) if self.params.state else ""] 614 615 uri = uri._replace(query=urlencode(query_params, doseq=True)) 616 return urlunsplit(uri) 617 618 if self.params.response_mode == ResponseMode.FRAGMENT: 619 query_fragment = {} 620 if self.params.grant_type in [GrantType.AUTHORIZATION_CODE]: 621 query_fragment["code"] = code.code 622 query_fragment["state"] = [str(self.params.state) if self.params.state else ""] 623 else: 624 query_fragment = self.create_implicit_response(code) 625 626 uri = uri._replace( 627 fragment=uri.fragment + urlencode(query_fragment, doseq=True), 628 ) 629 630 return urlunsplit(uri) 631 632 if self.params.response_mode == ResponseMode.FORM_POST: 633 post_params = {} 634 if self.params.grant_type in [GrantType.AUTHORIZATION_CODE]: 635 post_params["code"] = code.code 636 post_params["state"] = [str(self.params.state) if self.params.state else ""] 637 else: 638 post_params = self.create_implicit_response(code) 639 640 uri = uri._replace(query=urlencode(post_params, doseq=True)) 641 642 return urlunsplit(uri) 643 644 raise OAuth2Error() 645 except OAuth2Error as error: 646 LOGGER.warning("Error when trying to create response uri", error=error) 647 raise AuthorizeError( 648 self.params.redirect_uri, 649 "server_error", 650 self.params.grant_type, 651 self.params.state, 652 ) from None
Create a final Response URI the user is redirected to.
654 def create_implicit_response(self, code: AuthorizationCode | None) -> dict: 655 """Create implicit response's URL Fragment dictionary""" 656 query_fragment = {} 657 auth_event = get_login_event(self.request) 658 659 now = timezone.now() 660 access_token_expiry = now + timedelta_from_string(self.provider.access_token_validity) 661 token = AccessToken( 662 user=self.request.user, 663 scope=self.params.scope, 664 expires=access_token_expiry, 665 provider=self.provider, 666 auth_time=auth_event.created if auth_event else now, 667 session=self.request.session["authenticatedsession"], 668 ) 669 670 id_token = IDToken.new(self.provider, token, self.request) 671 id_token.nonce = self.params.nonce 672 673 if self.params.response_type in [ 674 ResponseTypes.CODE_ID_TOKEN, 675 ResponseTypes.CODE_ID_TOKEN_TOKEN, 676 ]: 677 id_token.c_hash = code.c_hash 678 token.id_token = id_token 679 680 # Check if response_type must include access_token in the response. 681 if self.params.response_type in [ 682 ResponseTypes.ID_TOKEN_TOKEN, 683 ResponseTypes.CODE_ID_TOKEN_TOKEN, 684 ResponseTypes.CODE_TOKEN, 685 ]: 686 query_fragment["access_token"] = token.token 687 # Get at_hash of the current token and update the id_token 688 id_token.at_hash = token.at_hash 689 690 # Check if response_type must include id_token in the response. 691 if self.params.response_type in [ 692 ResponseTypes.ID_TOKEN, 693 ResponseTypes.ID_TOKEN_TOKEN, 694 ResponseTypes.CODE_ID_TOKEN, 695 ResponseTypes.CODE_ID_TOKEN_TOKEN, 696 ]: 697 query_fragment["id_token"] = self.provider.encode(id_token.to_dict()) 698 token._id_token = dumps(id_token.to_dict()) 699 700 token.save() 701 702 # Code parameter must be present if it's Hybrid Flow. 703 if self.params.grant_type == GrantType.HYBRID: 704 query_fragment["code"] = code.code 705 706 query_fragment["token_type"] = TOKEN_TYPE 707 query_fragment["expires_in"] = int( 708 timedelta_from_string(self.provider.access_token_validity).total_seconds() 709 ) 710 query_fragment["state"] = self.params.state if self.params.state else "" 711 return query_fragment
Create implicit response's URL Fragment dictionary