authentik.providers.oauth2.views.authorize
authentik OAuth2 Authorization views
1"""authentik OAuth2 Authorization views""" 2 3from dataclasses import InitVar, dataclass, field 4from datetime import timedelta 5from json import dumps 6from re import error as RegexError 7from re import fullmatch 8from typing import Any 9from urllib.parse import parse_qs, quote, urlencode, urlparse, urlsplit, urlunparse, urlunsplit 10from uuid import uuid4 11 12from django.conf import settings 13from django.http import HttpRequest, HttpResponse, HttpResponseRedirect 14from django.http.response import Http404, HttpResponseBadRequest 15from django.shortcuts import get_object_or_404 16from django.utils import timezone, translation 17from django.utils.translation import gettext as _ 18from structlog.stdlib import get_logger 19 20from authentik.common.oauth.constants import ( 21 PKCE_METHOD_PLAIN, 22 PKCE_METHOD_S256, 23 PROMPT_CONSENT, 24 PROMPT_LOGIN, 25 PROMPT_NONE, 26 QS_LOGIN_HINT, 27 SCOPE_GITHUB, 28 SCOPE_OFFLINE_ACCESS, 29 SCOPE_OPENID, 30 TOKEN_TYPE, 31 UI_LOCALES, 32) 33from authentik.core.models import Application 34from authentik.events.models import Event, EventAction 35from authentik.events.signals import get_login_event 36from authentik.flows.challenge import ( 37 PLAN_CONTEXT_TITLE, 38 AutosubmitChallenge, 39 HttpChallengeResponse, 40) 41from authentik.flows.exceptions import FlowNonApplicableException 42from authentik.flows.models import Flow, in_memory_stage 43from authentik.flows.planner import PLAN_CONTEXT_APPLICATION, PLAN_CONTEXT_SSO, FlowPlanner 44from authentik.flows.stage import PLAN_CONTEXT_PENDING_USER_IDENTIFIER, StageView 45from authentik.lib.utils.time import timedelta_from_string 46from authentik.lib.views import bad_request_message 47from authentik.policies.types import PolicyRequest 48from authentik.policies.views import PolicyAccessView, RequestValidationError 49from authentik.providers.oauth2.errors import ( 50 AuthorizeError, 51 ClientIdError, 52 OAuth2Error, 53 RedirectUriError, 54) 55from authentik.providers.oauth2.id_token import IDToken 56from authentik.providers.oauth2.models import ( 57 AccessToken, 58 AuthorizationCode, 59 GrantTypes, 60 OAuth2Provider, 61 RedirectURI, 62 RedirectURIMatchingMode, 63 ResponseMode, 64 ResponseTypes, 65 ScopeMapping, 66) 67from authentik.providers.oauth2.utils import HttpResponseRedirectScheme 68from authentik.providers.oauth2.views.userinfo import UserInfoView 69from authentik.stages.consent.models import ConsentMode, ConsentStage 70from authentik.stages.consent.stage import ( 71 PLAN_CONTEXT_CONSENT_HEADER, 72 PLAN_CONTEXT_CONSENT_PERMISSIONS, 73) 74 75LOGGER = get_logger() 76 77PLAN_CONTEXT_PARAMS = "goauthentik.io/providers/oauth2/params" 78SESSION_KEY_LAST_LOGIN_UID = "authentik/providers/oauth2/last_login_uid" 79 80ALLOWED_PROMPT_PARAMS = {PROMPT_NONE, PROMPT_CONSENT, PROMPT_LOGIN} 81FORBIDDEN_URI_SCHEMES = {"javascript", "data", "vbscript"} 82 83 84@dataclass(slots=True) 85class OAuthAuthorizationParams: 86 """Parameters required to authorize an OAuth Client""" 87 88 client_id: str 89 redirect_uri: str 90 response_type: str 91 response_mode: str | None 92 scope: set[str] 93 state: str 94 nonce: str | None 95 prompt: set[str] 96 grant_type: str 97 98 provider: OAuth2Provider = field(default_factory=OAuth2Provider) 99 100 request: str | None = None 101 102 max_age: int | None = None 103 104 code_challenge: str | None = None 105 code_challenge_method: str | None = None 106 107 github_compat: InitVar[bool] = False 108 109 @staticmethod 110 def from_request(request: HttpRequest, github_compat=False) -> OAuthAuthorizationParams: 111 """ 112 Get all the params used by the Authorization Code Flow 113 (and also for the Implicit and Hybrid). 114 115 See: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest 116 """ 117 # Because in this endpoint we handle both GET 118 # and POST request. 119 query_dict = request.POST if request.method == "POST" else request.GET 120 state = query_dict.get("state") 121 redirect_uri = query_dict.get("redirect_uri", "") 122 123 response_type = query_dict.get("response_type", "") 124 125 # Validate and check the response_mode against the predefined dict 126 # Set to Query or Fragment if not defined in request 127 response_mode = query_dict.get("response_mode", False) 128 129 max_age = query_dict.get("max_age") 130 return OAuthAuthorizationParams( 131 client_id=query_dict.get("client_id", ""), 132 redirect_uri=redirect_uri, 133 response_type=response_type, 134 response_mode=response_mode, 135 grant_type="", 136 scope=set(query_dict.get("scope", "").split()), 137 state=state, 138 nonce=query_dict.get("nonce"), 139 prompt=ALLOWED_PROMPT_PARAMS.intersection(set(query_dict.get("prompt", "").split())), 140 request=query_dict.get("request", None), 141 max_age=int(max_age) if max_age else None, 142 code_challenge=query_dict.get("code_challenge"), 143 code_challenge_method=query_dict.get("code_challenge_method", "plain"), 144 github_compat=github_compat, 145 ) 146 147 def __post_init__(self, github_compat=False): 148 self.provider: OAuth2Provider = OAuth2Provider.objects.filter( 149 client_id=self.client_id 150 ).first() 151 if not self.provider: 152 LOGGER.warning("Invalid client identifier", client_id=self.client_id) 153 raise ClientIdError(client_id=self.client_id) 154 self.check_redirect_uri() 155 self.check_grant() 156 self.check_scope(github_compat) 157 if self.request: 158 raise AuthorizeError( 159 self.redirect_uri, "request_not_supported", self.grant_type, self.state 160 ) 161 self.check_nonce() 162 self.check_code_challenge() 163 164 def check_grant(self): 165 """Check grant""" 166 # Determine which flow to use. 167 if self.response_type in [ResponseTypes.CODE]: 168 self.grant_type = GrantTypes.AUTHORIZATION_CODE 169 elif self.response_type in [ 170 ResponseTypes.ID_TOKEN, 171 ResponseTypes.ID_TOKEN_TOKEN, 172 ]: 173 self.grant_type = GrantTypes.IMPLICIT 174 elif self.response_type in [ 175 ResponseTypes.CODE_TOKEN, 176 ResponseTypes.CODE_ID_TOKEN, 177 ResponseTypes.CODE_ID_TOKEN_TOKEN, 178 ]: 179 self.grant_type = GrantTypes.HYBRID 180 181 # Grant type validation. 182 if not self.grant_type: 183 LOGGER.warning("Invalid response type", type=self.response_type) 184 raise AuthorizeError(self.redirect_uri, "unsupported_response_type", "", self.state) 185 186 if self.response_mode not in ResponseMode.values: 187 self.response_mode = ResponseMode.QUERY 188 189 if self.grant_type in [GrantTypes.IMPLICIT, GrantTypes.HYBRID]: 190 self.response_mode = ResponseMode.FRAGMENT 191 192 def check_redirect_uri(self): 193 """Redirect URI validation.""" 194 allowed_redirect_urls = self.provider.redirect_uris 195 if not self.redirect_uri: 196 LOGGER.warning("Missing redirect uri.") 197 raise RedirectUriError("", allowed_redirect_urls).with_cause("redirect_uri_missing") 198 199 if len(allowed_redirect_urls) < 1: 200 LOGGER.info("Setting redirect for blank redirect_uris", redirect=self.redirect_uri) 201 self.provider.redirect_uris = [ 202 RedirectURI(RedirectURIMatchingMode.STRICT, self.redirect_uri) 203 ] 204 self.provider.save() 205 allowed_redirect_urls = self.provider.redirect_uris 206 207 match_found = False 208 for allowed in allowed_redirect_urls: 209 if allowed.matching_mode == RedirectURIMatchingMode.STRICT: 210 if self.redirect_uri == allowed.url: 211 match_found = True 212 break 213 if allowed.matching_mode == RedirectURIMatchingMode.REGEX: 214 try: 215 if fullmatch(allowed.url, self.redirect_uri): 216 match_found = True 217 break 218 except RegexError as exc: 219 LOGGER.warning( 220 "Failed to parse regular expression", 221 exc=exc, 222 url=allowed.url, 223 provider=self.provider, 224 ) 225 if not match_found: 226 raise RedirectUriError(self.redirect_uri, allowed_redirect_urls).with_cause( 227 "redirect_uri_no_match" 228 ) 229 # Check against forbidden schemes 230 if urlparse(self.redirect_uri).scheme in FORBIDDEN_URI_SCHEMES: 231 raise RedirectUriError(self.redirect_uri, allowed_redirect_urls).with_cause( 232 "redirect_uri_forbidden_scheme" 233 ) 234 235 def check_scope(self, github_compat=False): 236 """Ensure openid scope is set in Hybrid flows, or when requesting an id_token""" 237 default_scope_names = set( 238 ScopeMapping.objects.filter(provider__in=[self.provider]).values_list( 239 "scope_name", flat=True 240 ) 241 ) 242 if len(self.scope) == 0: 243 self.scope = default_scope_names 244 LOGGER.info( 245 "No scopes requested, defaulting to all configured scopes", scopes=self.scope 246 ) 247 scopes_to_check = self.scope 248 if github_compat: 249 scopes_to_check = self.scope - SCOPE_GITHUB 250 if not scopes_to_check.issubset(default_scope_names): 251 LOGGER.info( 252 "Application requested scopes not configured, setting to overlap", 253 scope_allowed=default_scope_names, 254 scope_given=self.scope, 255 ) 256 self.scope = self.scope.intersection(default_scope_names) 257 if SCOPE_OPENID not in self.scope and ( 258 self.grant_type == GrantTypes.HYBRID 259 or self.response_type in [ResponseTypes.ID_TOKEN, ResponseTypes.ID_TOKEN_TOKEN] 260 ): 261 LOGGER.warning("Missing 'openid' scope.") 262 raise AuthorizeError( 263 self.redirect_uri, "invalid_scope", self.grant_type, self.state 264 ).with_cause("scope_openid_missing") 265 if SCOPE_OFFLINE_ACCESS in self.scope: 266 # https://openid.net/specs/openid-connect-core-1_0.html#OfflineAccess 267 # Don't explicitly request consent with offline_access, as the spec allows for 268 # "other conditions for processing the request permitting offline access to the 269 # requested resources are in place" 270 # which we interpret as "the admin picks an authorization flow with or without consent" 271 if self.response_type not in [ 272 ResponseTypes.CODE, 273 ResponseTypes.CODE_TOKEN, 274 ResponseTypes.CODE_ID_TOKEN, 275 ResponseTypes.CODE_ID_TOKEN_TOKEN, 276 ]: 277 # offline_access requires a response type that has some sort of token 278 # Spec says to ignore the scope when the response_type wouldn't result 279 # in an authorization code being generated 280 self.scope.remove(SCOPE_OFFLINE_ACCESS) 281 282 def check_nonce(self): 283 """Nonce parameter validation.""" 284 # nonce is required for all flows that return an id_token from the authorization endpoint, 285 # see https://openid.net/specs/openid-connect-core-1_0.html#ImplicitAuthRequest or 286 # https://openid.net/specs/openid-connect-core-1_0.html#HybridIDToken and 287 # https://bitbucket.org/openid/connect/issues/972/nonce-requirement-in-hybrid-auth-request 288 if self.response_type not in [ 289 ResponseTypes.ID_TOKEN, 290 ResponseTypes.ID_TOKEN_TOKEN, 291 ResponseTypes.CODE_ID_TOKEN, 292 ResponseTypes.CODE_ID_TOKEN_TOKEN, 293 ]: 294 return 295 if SCOPE_OPENID not in self.scope: 296 return 297 if not self.nonce: 298 LOGGER.warning("Missing nonce for OpenID Request") 299 raise AuthorizeError( 300 self.redirect_uri, "invalid_request", self.grant_type, self.state 301 ).with_cause("nonce_missing") 302 303 def check_code_challenge(self): 304 """PKCE validation of the transformation method.""" 305 if self.code_challenge and self.code_challenge_method not in [ 306 PKCE_METHOD_PLAIN, 307 PKCE_METHOD_S256, 308 ]: 309 raise AuthorizeError( 310 self.redirect_uri, 311 "invalid_request", 312 self.grant_type, 313 self.state, 314 f"Unsupported challenge method {self.code_challenge_method}", 315 ) 316 317 def create_code(self, request: HttpRequest) -> AuthorizationCode: 318 """Create an AuthorizationCode object for the request""" 319 auth_event = get_login_event(request) 320 321 now = timezone.now() 322 323 code = AuthorizationCode( 324 user=request.user, 325 provider=self.provider, 326 auth_time=auth_event.created if auth_event else now, 327 code=uuid4().hex, 328 expires=now + timedelta_from_string(self.provider.access_code_validity), 329 scope=self.scope, 330 nonce=self.nonce, 331 session=request.session["authenticatedsession"], 332 ) 333 334 if self.code_challenge and self.code_challenge_method: 335 code.code_challenge = self.code_challenge 336 code.code_challenge_method = self.code_challenge_method 337 338 return code 339 340 341class AuthorizationFlowInitView(PolicyAccessView): 342 """OAuth2 Flow initializer, checks access to application and starts flow""" 343 344 params: OAuthAuthorizationParams 345 # Enable GitHub compatibility (only allow for scopes which are handled 346 # differently for github compat) 347 github_compat = False 348 349 def pre_permission_check(self): 350 """Check prompt parameter before checking permission/authentication, 351 see https://openid.net/specs/openid-connect-core-1_0.html#rfc.section.3.1.2.6""" 352 # Quick sanity check at the beginning to prevent event spamming 353 if len(self.request.GET) < 1: 354 raise Http404 355 try: 356 self.params = OAuthAuthorizationParams.from_request( 357 self.request, github_compat=self.github_compat 358 ) 359 except AuthorizeError as error: 360 LOGGER.warning(error.description, redirect_uri=error.redirect_uri, cause=error.cause) 361 raise RequestValidationError(error.get_response(self.request)) from None 362 except OAuth2Error as error: 363 LOGGER.warning(error.description, cause=error.cause) 364 raise RequestValidationError( 365 bad_request_message(self.request, error.description, title=error.error) 366 ) from None 367 except OAuth2Provider.DoesNotExist: 368 raise Http404 from None 369 if PROMPT_NONE in self.params.prompt and not self.request.user.is_authenticated: 370 # When "prompt" is set to "none" but the user is not logged in, show an error message 371 error = AuthorizeError( 372 self.params.redirect_uri, 373 "login_required", 374 self.params.grant_type, 375 self.params.state, 376 ) 377 raise RequestValidationError(error.get_response(self.request)) 378 379 def resolve_provider_application(self): 380 client_id = self.request.GET.get("client_id") 381 self.provider = get_object_or_404(OAuth2Provider, client_id=client_id) 382 self.application = self.provider.application 383 384 def modify_flow_context(self, flow: Flow, context: dict[str, Any]) -> dict[str, Any]: 385 if QS_LOGIN_HINT in self.request.GET: 386 context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = self.request.GET.get(QS_LOGIN_HINT) 387 return super().modify_flow_context(flow, context) 388 389 def modify_policy_request(self, request: PolicyRequest) -> PolicyRequest: 390 request.context["oauth_scopes"] = self.params.scope 391 request.context["oauth_grant_type"] = self.params.grant_type 392 request.context["oauth_code_challenge"] = self.params.code_challenge 393 request.context["oauth_code_challenge_method"] = self.params.code_challenge_method 394 request.context["oauth_max_age"] = self.params.max_age 395 request.context["oauth_redirect_uri"] = self.params.redirect_uri 396 request.context["oauth_response_type"] = self.params.response_type 397 return request 398 399 def dispatch_with_language(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 400 """Activate language from OIDC specific ui_locales parameter, picking the earliest one 401 available""" 402 selected_language = None 403 if UI_LOCALES in self.request.GET: 404 languages = str(self.request.GET[UI_LOCALES]).split(" ") 405 for language in languages: 406 if translation.check_for_language(language): 407 selected_language = translation.get_supported_language_variant(language) 408 LOGGER.debug( 409 "Activating language from oidc ui_locales", locale=selected_language 410 ) 411 break 412 translation.activate(selected_language) 413 response = super().dispatch(request, *args, **kwargs) 414 if selected_language: 415 response.set_cookie( 416 settings.LANGUAGE_COOKIE_NAME, 417 selected_language, 418 max_age=settings.LANGUAGE_COOKIE_AGE, 419 path=settings.LANGUAGE_COOKIE_PATH, 420 domain=settings.LANGUAGE_COOKIE_DOMAIN, 421 secure=settings.LANGUAGE_COOKIE_SECURE, 422 httponly=settings.LANGUAGE_COOKIE_HTTPONLY, 423 samesite=settings.LANGUAGE_COOKIE_SAMESITE, 424 ) 425 if isinstance(response, HttpResponseRedirect): 426 parsed_url = urlparse(response.url) 427 args = parse_qs(parsed_url.query) 428 args["locale"] = selected_language 429 response["Location"] = urlunparse( 430 parsed_url._replace(query=urlencode(args, quote_via=quote, doseq=True)) 431 ) 432 return response 433 434 def dispatch(self, request: HttpRequest, *args, **kwargs): 435 # Activate language before parsing params (error messages should be localized) 436 return self.dispatch_with_language(request, *args, **kwargs) 437 438 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 439 """Start FlowPLanner, return to flow executor shell""" 440 # Require a login event to be set, otherwise make the user re-login 441 login_event = get_login_event(request) 442 if not login_event: 443 LOGGER.warning("request with no login event") 444 return self.handle_no_permission() 445 login_uid = str(login_event.pk) 446 # After we've checked permissions, and the user has access, check if we need 447 # to re-authenticate the user 448 if self.params.max_age: 449 # Attempt to check via the session's login event if set, otherwise we can't 450 # check 451 login_time = login_event.created 452 current_age: timedelta = timezone.now() - login_time 453 if current_age.total_seconds() > self.params.max_age: 454 LOGGER.debug( 455 "Triggering authentication as max_age requirement", 456 max_age=self.params.max_age, 457 ago=int(current_age.total_seconds()), 458 ) 459 # Since we already need to re-authenticate the user, set the old login UID 460 # in case this request has both max_age and prompt=login 461 self.request.session[SESSION_KEY_LAST_LOGIN_UID] = login_uid 462 return self.handle_no_permission() 463 # If prompt=login, we need to re-authenticate the user regardless 464 # Check if we're not already doing the re-authentication 465 if PROMPT_LOGIN in self.params.prompt: 466 # No previous login UID saved, so save the current uid and trigger 467 # re-login, or previous login UID matches current one, so no re-login happened yet 468 if ( 469 SESSION_KEY_LAST_LOGIN_UID not in self.request.session 470 or login_uid == self.request.session[SESSION_KEY_LAST_LOGIN_UID] 471 ): 472 self.request.session[SESSION_KEY_LAST_LOGIN_UID] = login_uid 473 return self.handle_no_permission() 474 scope_descriptions = UserInfoView().get_scope_descriptions( 475 self.params.scope, self.params.provider 476 ) 477 # Regardless, we start the planner and return to it 478 planner = FlowPlanner(self.provider.authorization_flow) 479 planner.allow_empty_flows = True 480 try: 481 plan = planner.plan( 482 self.request, 483 { 484 PLAN_CONTEXT_SSO: True, 485 PLAN_CONTEXT_APPLICATION: self.application, 486 # OAuth2 related params 487 PLAN_CONTEXT_PARAMS: self.params, 488 # Consent related params 489 PLAN_CONTEXT_CONSENT_HEADER: _("You're about to sign into %(application)s.") 490 % {"application": self.application.name}, 491 PLAN_CONTEXT_CONSENT_PERMISSIONS: scope_descriptions, 492 }, 493 ) 494 except FlowNonApplicableException: 495 return self.handle_no_permission_authenticated() 496 # OpenID clients can specify a `prompt` parameter, and if its set to consent we 497 # need to inject a consent stage 498 if PROMPT_CONSENT in self.params.prompt: 499 if not any(isinstance(x.stage, ConsentStage) for x in plan.bindings): 500 # Plan does not have any consent stage, so we add an in-memory one 501 stage = ConsentStage( 502 name="OAuth2 Provider In-memory consent stage", 503 mode=ConsentMode.ALWAYS_REQUIRE, 504 ) 505 plan.append_stage(stage) 506 507 plan.append_stage(in_memory_stage(OAuthFulfillmentStage)) 508 509 return plan.to_redirect( 510 self.request, 511 self.provider.authorization_flow, 512 # We can only skip the flow executor and directly go to the final redirect URL if 513 # we can submit the data to the RP via URL 514 allowed_silent_types=( 515 [OAuthFulfillmentStage] 516 if self.params.response_mode in [ResponseMode.QUERY, ResponseMode.FRAGMENT] 517 else [] 518 ), 519 ) 520 521 522class OAuthFulfillmentStage(StageView): 523 """Final stage, restores params from Flow.""" 524 525 params: OAuthAuthorizationParams 526 provider: OAuth2Provider 527 application: Application 528 529 def redirect(self, uri: str) -> HttpResponse: 530 """Redirect using HttpResponseRedirectScheme, compatible with non-http schemes""" 531 parsed = urlparse(uri) 532 533 if self.params.response_mode == ResponseMode.FORM_POST: 534 # parse_qs returns a dictionary with values wrapped in lists, however 535 # we need a flat dictionary for the autosubmit challenge 536 537 # this picks the first item in the list if the value is a list, 538 # otherwise just the value as-is 539 query_params = dict( 540 (k, v[0] if isinstance(v, list) else v) for k, v in parse_qs(parsed.query).items() 541 ) 542 543 challenge = AutosubmitChallenge( 544 data={ 545 "component": "ak-stage-autosubmit", 546 "title": self.executor.plan.context.get( 547 PLAN_CONTEXT_TITLE, 548 _("Redirecting to {app}...".format_map({"app": self.application.name})), 549 ), 550 "url": self.params.redirect_uri, 551 "attrs": query_params, 552 } 553 ) 554 555 challenge.is_valid() 556 self.executor.stage_ok() 557 return HttpChallengeResponse( 558 challenge=challenge, 559 ) 560 self.executor.stage_ok() 561 return HttpResponseRedirectScheme(uri, allowed_schemes=[parsed.scheme]) 562 563 def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 564 """Wrapper when this stage gets hit with a post request""" 565 return self.get(request, *args, **kwargs) 566 567 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 568 """final Stage of an OAuth2 Flow""" 569 if PLAN_CONTEXT_PARAMS not in self.executor.plan.context: 570 LOGGER.warning("Got to fulfillment stage with no pending context") 571 return HttpResponseBadRequest() 572 self.params: OAuthAuthorizationParams = self.executor.plan.context.pop(PLAN_CONTEXT_PARAMS) 573 self.application: Application = self.executor.plan.context.pop(PLAN_CONTEXT_APPLICATION) 574 self.provider = get_object_or_404(OAuth2Provider, pk=self.application.provider_id) 575 try: 576 # At this point we don't need to check permissions anymore 577 if {PROMPT_NONE, PROMPT_CONSENT}.issubset(self.params.prompt): 578 raise AuthorizeError( 579 self.params.redirect_uri, 580 "consent_required", 581 self.params.grant_type, 582 self.params.state, 583 ) 584 Event.new( 585 EventAction.AUTHORIZE_APPLICATION, 586 authorized_application=self.application, 587 flow=self.executor.plan.flow_pk, 588 scopes=" ".join(self.params.scope), 589 ).from_http(self.request) 590 return self.redirect(self.create_response_uri()) 591 except (ClientIdError, RedirectUriError) as error: 592 error.to_event(application=self.application).from_http(request) 593 self.executor.stage_invalid() 594 595 return bad_request_message(request, error.description, title=error.error) 596 except AuthorizeError as error: 597 error.to_event(application=self.application).from_http(request) 598 self.executor.stage_invalid() 599 return error.get_response(self.request) 600 601 def create_response_uri(self) -> str: 602 """Create a final Response URI the user is redirected to.""" 603 uri = urlsplit(self.params.redirect_uri) 604 605 try: 606 code = None 607 608 if self.params.grant_type in [ 609 GrantTypes.AUTHORIZATION_CODE, 610 GrantTypes.HYBRID, 611 ]: 612 code = self.params.create_code(self.request) 613 code.save() 614 615 if self.params.response_mode == ResponseMode.QUERY: 616 query_params = parse_qs(uri.query) 617 query_params["code"] = code.code 618 query_params["state"] = [str(self.params.state) if self.params.state else ""] 619 620 uri = uri._replace(query=urlencode(query_params, doseq=True)) 621 return urlunsplit(uri) 622 623 if self.params.response_mode == ResponseMode.FRAGMENT: 624 query_fragment = {} 625 if self.params.grant_type in [GrantTypes.AUTHORIZATION_CODE]: 626 query_fragment["code"] = code.code 627 query_fragment["state"] = [str(self.params.state) if self.params.state else ""] 628 else: 629 query_fragment = self.create_implicit_response(code) 630 631 uri = uri._replace( 632 fragment=uri.fragment + urlencode(query_fragment, doseq=True), 633 ) 634 635 return urlunsplit(uri) 636 637 if self.params.response_mode == ResponseMode.FORM_POST: 638 post_params = {} 639 if self.params.grant_type in [GrantTypes.AUTHORIZATION_CODE]: 640 post_params["code"] = code.code 641 post_params["state"] = [str(self.params.state) if self.params.state else ""] 642 else: 643 post_params = self.create_implicit_response(code) 644 645 uri = uri._replace(query=urlencode(post_params, doseq=True)) 646 647 return urlunsplit(uri) 648 649 raise OAuth2Error() 650 except OAuth2Error as error: 651 LOGGER.warning("Error when trying to create response uri", error=error) 652 raise AuthorizeError( 653 self.params.redirect_uri, 654 "server_error", 655 self.params.grant_type, 656 self.params.state, 657 ) from None 658 659 def create_implicit_response(self, code: AuthorizationCode | None) -> dict: 660 """Create implicit response's URL Fragment dictionary""" 661 query_fragment = {} 662 auth_event = get_login_event(self.request) 663 664 now = timezone.now() 665 access_token_expiry = now + timedelta_from_string(self.provider.access_token_validity) 666 token = AccessToken( 667 user=self.request.user, 668 scope=self.params.scope, 669 expires=access_token_expiry, 670 provider=self.provider, 671 auth_time=auth_event.created if auth_event else now, 672 session=self.request.session["authenticatedsession"], 673 ) 674 675 id_token = IDToken.new(self.provider, token, self.request) 676 id_token.nonce = self.params.nonce 677 678 if self.params.response_type in [ 679 ResponseTypes.CODE_ID_TOKEN, 680 ResponseTypes.CODE_ID_TOKEN_TOKEN, 681 ]: 682 id_token.c_hash = code.c_hash 683 token.id_token = id_token 684 685 # Check if response_type must include access_token in the response. 686 if self.params.response_type in [ 687 ResponseTypes.ID_TOKEN_TOKEN, 688 ResponseTypes.CODE_ID_TOKEN_TOKEN, 689 ResponseTypes.CODE_TOKEN, 690 ]: 691 query_fragment["access_token"] = token.token 692 # Get at_hash of the current token and update the id_token 693 id_token.at_hash = token.at_hash 694 695 # Check if response_type must include id_token in the response. 696 if self.params.response_type in [ 697 ResponseTypes.ID_TOKEN, 698 ResponseTypes.ID_TOKEN_TOKEN, 699 ResponseTypes.CODE_ID_TOKEN, 700 ResponseTypes.CODE_ID_TOKEN_TOKEN, 701 ]: 702 query_fragment["id_token"] = self.provider.encode(id_token.to_dict()) 703 token._id_token = dumps(id_token.to_dict()) 704 705 token.save() 706 707 # Code parameter must be present if it's Hybrid Flow. 708 if self.params.grant_type == GrantTypes.HYBRID: 709 query_fragment["code"] = code.code 710 711 query_fragment["token_type"] = TOKEN_TYPE 712 query_fragment["expires_in"] = int( 713 timedelta_from_string(self.provider.access_token_validity).total_seconds() 714 ) 715 query_fragment["state"] = self.params.state if self.params.state else "" 716 return query_fragment
85@dataclass(slots=True) 86class OAuthAuthorizationParams: 87 """Parameters required to authorize an OAuth Client""" 88 89 client_id: str 90 redirect_uri: str 91 response_type: str 92 response_mode: str | None 93 scope: set[str] 94 state: str 95 nonce: str | None 96 prompt: set[str] 97 grant_type: str 98 99 provider: OAuth2Provider = field(default_factory=OAuth2Provider) 100 101 request: str | None = None 102 103 max_age: int | None = None 104 105 code_challenge: str | None = None 106 code_challenge_method: str | None = None 107 108 github_compat: InitVar[bool] = False 109 110 @staticmethod 111 def from_request(request: HttpRequest, github_compat=False) -> OAuthAuthorizationParams: 112 """ 113 Get all the params used by the Authorization Code Flow 114 (and also for the Implicit and Hybrid). 115 116 See: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest 117 """ 118 # Because in this endpoint we handle both GET 119 # and POST request. 120 query_dict = request.POST if request.method == "POST" else request.GET 121 state = query_dict.get("state") 122 redirect_uri = query_dict.get("redirect_uri", "") 123 124 response_type = query_dict.get("response_type", "") 125 126 # Validate and check the response_mode against the predefined dict 127 # Set to Query or Fragment if not defined in request 128 response_mode = query_dict.get("response_mode", False) 129 130 max_age = query_dict.get("max_age") 131 return OAuthAuthorizationParams( 132 client_id=query_dict.get("client_id", ""), 133 redirect_uri=redirect_uri, 134 response_type=response_type, 135 response_mode=response_mode, 136 grant_type="", 137 scope=set(query_dict.get("scope", "").split()), 138 state=state, 139 nonce=query_dict.get("nonce"), 140 prompt=ALLOWED_PROMPT_PARAMS.intersection(set(query_dict.get("prompt", "").split())), 141 request=query_dict.get("request", None), 142 max_age=int(max_age) if max_age else None, 143 code_challenge=query_dict.get("code_challenge"), 144 code_challenge_method=query_dict.get("code_challenge_method", "plain"), 145 github_compat=github_compat, 146 ) 147 148 def __post_init__(self, github_compat=False): 149 self.provider: OAuth2Provider = OAuth2Provider.objects.filter( 150 client_id=self.client_id 151 ).first() 152 if not self.provider: 153 LOGGER.warning("Invalid client identifier", client_id=self.client_id) 154 raise ClientIdError(client_id=self.client_id) 155 self.check_redirect_uri() 156 self.check_grant() 157 self.check_scope(github_compat) 158 if self.request: 159 raise AuthorizeError( 160 self.redirect_uri, "request_not_supported", self.grant_type, self.state 161 ) 162 self.check_nonce() 163 self.check_code_challenge() 164 165 def check_grant(self): 166 """Check grant""" 167 # Determine which flow to use. 168 if self.response_type in [ResponseTypes.CODE]: 169 self.grant_type = GrantTypes.AUTHORIZATION_CODE 170 elif self.response_type in [ 171 ResponseTypes.ID_TOKEN, 172 ResponseTypes.ID_TOKEN_TOKEN, 173 ]: 174 self.grant_type = GrantTypes.IMPLICIT 175 elif self.response_type in [ 176 ResponseTypes.CODE_TOKEN, 177 ResponseTypes.CODE_ID_TOKEN, 178 ResponseTypes.CODE_ID_TOKEN_TOKEN, 179 ]: 180 self.grant_type = GrantTypes.HYBRID 181 182 # Grant type validation. 183 if not self.grant_type: 184 LOGGER.warning("Invalid response type", type=self.response_type) 185 raise AuthorizeError(self.redirect_uri, "unsupported_response_type", "", self.state) 186 187 if self.response_mode not in ResponseMode.values: 188 self.response_mode = ResponseMode.QUERY 189 190 if self.grant_type in [GrantTypes.IMPLICIT, GrantTypes.HYBRID]: 191 self.response_mode = ResponseMode.FRAGMENT 192 193 def check_redirect_uri(self): 194 """Redirect URI validation.""" 195 allowed_redirect_urls = self.provider.redirect_uris 196 if not self.redirect_uri: 197 LOGGER.warning("Missing redirect uri.") 198 raise RedirectUriError("", allowed_redirect_urls).with_cause("redirect_uri_missing") 199 200 if len(allowed_redirect_urls) < 1: 201 LOGGER.info("Setting redirect for blank redirect_uris", redirect=self.redirect_uri) 202 self.provider.redirect_uris = [ 203 RedirectURI(RedirectURIMatchingMode.STRICT, self.redirect_uri) 204 ] 205 self.provider.save() 206 allowed_redirect_urls = self.provider.redirect_uris 207 208 match_found = False 209 for allowed in allowed_redirect_urls: 210 if allowed.matching_mode == RedirectURIMatchingMode.STRICT: 211 if self.redirect_uri == allowed.url: 212 match_found = True 213 break 214 if allowed.matching_mode == RedirectURIMatchingMode.REGEX: 215 try: 216 if fullmatch(allowed.url, self.redirect_uri): 217 match_found = True 218 break 219 except RegexError as exc: 220 LOGGER.warning( 221 "Failed to parse regular expression", 222 exc=exc, 223 url=allowed.url, 224 provider=self.provider, 225 ) 226 if not match_found: 227 raise RedirectUriError(self.redirect_uri, allowed_redirect_urls).with_cause( 228 "redirect_uri_no_match" 229 ) 230 # Check against forbidden schemes 231 if urlparse(self.redirect_uri).scheme in FORBIDDEN_URI_SCHEMES: 232 raise RedirectUriError(self.redirect_uri, allowed_redirect_urls).with_cause( 233 "redirect_uri_forbidden_scheme" 234 ) 235 236 def check_scope(self, github_compat=False): 237 """Ensure openid scope is set in Hybrid flows, or when requesting an id_token""" 238 default_scope_names = set( 239 ScopeMapping.objects.filter(provider__in=[self.provider]).values_list( 240 "scope_name", flat=True 241 ) 242 ) 243 if len(self.scope) == 0: 244 self.scope = default_scope_names 245 LOGGER.info( 246 "No scopes requested, defaulting to all configured scopes", scopes=self.scope 247 ) 248 scopes_to_check = self.scope 249 if github_compat: 250 scopes_to_check = self.scope - SCOPE_GITHUB 251 if not scopes_to_check.issubset(default_scope_names): 252 LOGGER.info( 253 "Application requested scopes not configured, setting to overlap", 254 scope_allowed=default_scope_names, 255 scope_given=self.scope, 256 ) 257 self.scope = self.scope.intersection(default_scope_names) 258 if SCOPE_OPENID not in self.scope and ( 259 self.grant_type == GrantTypes.HYBRID 260 or self.response_type in [ResponseTypes.ID_TOKEN, ResponseTypes.ID_TOKEN_TOKEN] 261 ): 262 LOGGER.warning("Missing 'openid' scope.") 263 raise AuthorizeError( 264 self.redirect_uri, "invalid_scope", self.grant_type, self.state 265 ).with_cause("scope_openid_missing") 266 if SCOPE_OFFLINE_ACCESS in self.scope: 267 # https://openid.net/specs/openid-connect-core-1_0.html#OfflineAccess 268 # Don't explicitly request consent with offline_access, as the spec allows for 269 # "other conditions for processing the request permitting offline access to the 270 # requested resources are in place" 271 # which we interpret as "the admin picks an authorization flow with or without consent" 272 if self.response_type not in [ 273 ResponseTypes.CODE, 274 ResponseTypes.CODE_TOKEN, 275 ResponseTypes.CODE_ID_TOKEN, 276 ResponseTypes.CODE_ID_TOKEN_TOKEN, 277 ]: 278 # offline_access requires a response type that has some sort of token 279 # Spec says to ignore the scope when the response_type wouldn't result 280 # in an authorization code being generated 281 self.scope.remove(SCOPE_OFFLINE_ACCESS) 282 283 def check_nonce(self): 284 """Nonce parameter validation.""" 285 # nonce is required for all flows that return an id_token from the authorization endpoint, 286 # see https://openid.net/specs/openid-connect-core-1_0.html#ImplicitAuthRequest or 287 # https://openid.net/specs/openid-connect-core-1_0.html#HybridIDToken and 288 # https://bitbucket.org/openid/connect/issues/972/nonce-requirement-in-hybrid-auth-request 289 if self.response_type not in [ 290 ResponseTypes.ID_TOKEN, 291 ResponseTypes.ID_TOKEN_TOKEN, 292 ResponseTypes.CODE_ID_TOKEN, 293 ResponseTypes.CODE_ID_TOKEN_TOKEN, 294 ]: 295 return 296 if SCOPE_OPENID not in self.scope: 297 return 298 if not self.nonce: 299 LOGGER.warning("Missing nonce for OpenID Request") 300 raise AuthorizeError( 301 self.redirect_uri, "invalid_request", self.grant_type, self.state 302 ).with_cause("nonce_missing") 303 304 def check_code_challenge(self): 305 """PKCE validation of the transformation method.""" 306 if self.code_challenge and self.code_challenge_method not in [ 307 PKCE_METHOD_PLAIN, 308 PKCE_METHOD_S256, 309 ]: 310 raise AuthorizeError( 311 self.redirect_uri, 312 "invalid_request", 313 self.grant_type, 314 self.state, 315 f"Unsupported challenge method {self.code_challenge_method}", 316 ) 317 318 def create_code(self, request: HttpRequest) -> AuthorizationCode: 319 """Create an AuthorizationCode object for the request""" 320 auth_event = get_login_event(request) 321 322 now = timezone.now() 323 324 code = AuthorizationCode( 325 user=request.user, 326 provider=self.provider, 327 auth_time=auth_event.created if auth_event else now, 328 code=uuid4().hex, 329 expires=now + timedelta_from_string(self.provider.access_code_validity), 330 scope=self.scope, 331 nonce=self.nonce, 332 session=request.session["authenticatedsession"], 333 ) 334 335 if self.code_challenge and self.code_challenge_method: 336 code.code_challenge = self.code_challenge 337 code.code_challenge_method = self.code_challenge_method 338 339 return code
Parameters required to authorize an OAuth Client
110 @staticmethod 111 def from_request(request: HttpRequest, github_compat=False) -> OAuthAuthorizationParams: 112 """ 113 Get all the params used by the Authorization Code Flow 114 (and also for the Implicit and Hybrid). 115 116 See: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest 117 """ 118 # Because in this endpoint we handle both GET 119 # and POST request. 120 query_dict = request.POST if request.method == "POST" else request.GET 121 state = query_dict.get("state") 122 redirect_uri = query_dict.get("redirect_uri", "") 123 124 response_type = query_dict.get("response_type", "") 125 126 # Validate and check the response_mode against the predefined dict 127 # Set to Query or Fragment if not defined in request 128 response_mode = query_dict.get("response_mode", False) 129 130 max_age = query_dict.get("max_age") 131 return OAuthAuthorizationParams( 132 client_id=query_dict.get("client_id", ""), 133 redirect_uri=redirect_uri, 134 response_type=response_type, 135 response_mode=response_mode, 136 grant_type="", 137 scope=set(query_dict.get("scope", "").split()), 138 state=state, 139 nonce=query_dict.get("nonce"), 140 prompt=ALLOWED_PROMPT_PARAMS.intersection(set(query_dict.get("prompt", "").split())), 141 request=query_dict.get("request", None), 142 max_age=int(max_age) if max_age else None, 143 code_challenge=query_dict.get("code_challenge"), 144 code_challenge_method=query_dict.get("code_challenge_method", "plain"), 145 github_compat=github_compat, 146 )
Get all the params used by the Authorization Code Flow (and also for the Implicit and Hybrid).
See: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest
165 def check_grant(self): 166 """Check grant""" 167 # Determine which flow to use. 168 if self.response_type in [ResponseTypes.CODE]: 169 self.grant_type = GrantTypes.AUTHORIZATION_CODE 170 elif self.response_type in [ 171 ResponseTypes.ID_TOKEN, 172 ResponseTypes.ID_TOKEN_TOKEN, 173 ]: 174 self.grant_type = GrantTypes.IMPLICIT 175 elif self.response_type in [ 176 ResponseTypes.CODE_TOKEN, 177 ResponseTypes.CODE_ID_TOKEN, 178 ResponseTypes.CODE_ID_TOKEN_TOKEN, 179 ]: 180 self.grant_type = GrantTypes.HYBRID 181 182 # Grant type validation. 183 if not self.grant_type: 184 LOGGER.warning("Invalid response type", type=self.response_type) 185 raise AuthorizeError(self.redirect_uri, "unsupported_response_type", "", self.state) 186 187 if self.response_mode not in ResponseMode.values: 188 self.response_mode = ResponseMode.QUERY 189 190 if self.grant_type in [GrantTypes.IMPLICIT, GrantTypes.HYBRID]: 191 self.response_mode = ResponseMode.FRAGMENT
Check grant
193 def check_redirect_uri(self): 194 """Redirect URI validation.""" 195 allowed_redirect_urls = self.provider.redirect_uris 196 if not self.redirect_uri: 197 LOGGER.warning("Missing redirect uri.") 198 raise RedirectUriError("", allowed_redirect_urls).with_cause("redirect_uri_missing") 199 200 if len(allowed_redirect_urls) < 1: 201 LOGGER.info("Setting redirect for blank redirect_uris", redirect=self.redirect_uri) 202 self.provider.redirect_uris = [ 203 RedirectURI(RedirectURIMatchingMode.STRICT, self.redirect_uri) 204 ] 205 self.provider.save() 206 allowed_redirect_urls = self.provider.redirect_uris 207 208 match_found = False 209 for allowed in allowed_redirect_urls: 210 if allowed.matching_mode == RedirectURIMatchingMode.STRICT: 211 if self.redirect_uri == allowed.url: 212 match_found = True 213 break 214 if allowed.matching_mode == RedirectURIMatchingMode.REGEX: 215 try: 216 if fullmatch(allowed.url, self.redirect_uri): 217 match_found = True 218 break 219 except RegexError as exc: 220 LOGGER.warning( 221 "Failed to parse regular expression", 222 exc=exc, 223 url=allowed.url, 224 provider=self.provider, 225 ) 226 if not match_found: 227 raise RedirectUriError(self.redirect_uri, allowed_redirect_urls).with_cause( 228 "redirect_uri_no_match" 229 ) 230 # Check against forbidden schemes 231 if urlparse(self.redirect_uri).scheme in FORBIDDEN_URI_SCHEMES: 232 raise RedirectUriError(self.redirect_uri, allowed_redirect_urls).with_cause( 233 "redirect_uri_forbidden_scheme" 234 )
Redirect URI validation.
236 def check_scope(self, github_compat=False): 237 """Ensure openid scope is set in Hybrid flows, or when requesting an id_token""" 238 default_scope_names = set( 239 ScopeMapping.objects.filter(provider__in=[self.provider]).values_list( 240 "scope_name", flat=True 241 ) 242 ) 243 if len(self.scope) == 0: 244 self.scope = default_scope_names 245 LOGGER.info( 246 "No scopes requested, defaulting to all configured scopes", scopes=self.scope 247 ) 248 scopes_to_check = self.scope 249 if github_compat: 250 scopes_to_check = self.scope - SCOPE_GITHUB 251 if not scopes_to_check.issubset(default_scope_names): 252 LOGGER.info( 253 "Application requested scopes not configured, setting to overlap", 254 scope_allowed=default_scope_names, 255 scope_given=self.scope, 256 ) 257 self.scope = self.scope.intersection(default_scope_names) 258 if SCOPE_OPENID not in self.scope and ( 259 self.grant_type == GrantTypes.HYBRID 260 or self.response_type in [ResponseTypes.ID_TOKEN, ResponseTypes.ID_TOKEN_TOKEN] 261 ): 262 LOGGER.warning("Missing 'openid' scope.") 263 raise AuthorizeError( 264 self.redirect_uri, "invalid_scope", self.grant_type, self.state 265 ).with_cause("scope_openid_missing") 266 if SCOPE_OFFLINE_ACCESS in self.scope: 267 # https://openid.net/specs/openid-connect-core-1_0.html#OfflineAccess 268 # Don't explicitly request consent with offline_access, as the spec allows for 269 # "other conditions for processing the request permitting offline access to the 270 # requested resources are in place" 271 # which we interpret as "the admin picks an authorization flow with or without consent" 272 if self.response_type not in [ 273 ResponseTypes.CODE, 274 ResponseTypes.CODE_TOKEN, 275 ResponseTypes.CODE_ID_TOKEN, 276 ResponseTypes.CODE_ID_TOKEN_TOKEN, 277 ]: 278 # offline_access requires a response type that has some sort of token 279 # Spec says to ignore the scope when the response_type wouldn't result 280 # in an authorization code being generated 281 self.scope.remove(SCOPE_OFFLINE_ACCESS)
Ensure openid scope is set in Hybrid flows, or when requesting an id_token
283 def check_nonce(self): 284 """Nonce parameter validation.""" 285 # nonce is required for all flows that return an id_token from the authorization endpoint, 286 # see https://openid.net/specs/openid-connect-core-1_0.html#ImplicitAuthRequest or 287 # https://openid.net/specs/openid-connect-core-1_0.html#HybridIDToken and 288 # https://bitbucket.org/openid/connect/issues/972/nonce-requirement-in-hybrid-auth-request 289 if self.response_type not in [ 290 ResponseTypes.ID_TOKEN, 291 ResponseTypes.ID_TOKEN_TOKEN, 292 ResponseTypes.CODE_ID_TOKEN, 293 ResponseTypes.CODE_ID_TOKEN_TOKEN, 294 ]: 295 return 296 if SCOPE_OPENID not in self.scope: 297 return 298 if not self.nonce: 299 LOGGER.warning("Missing nonce for OpenID Request") 300 raise AuthorizeError( 301 self.redirect_uri, "invalid_request", self.grant_type, self.state 302 ).with_cause("nonce_missing")
Nonce parameter validation.
304 def check_code_challenge(self): 305 """PKCE validation of the transformation method.""" 306 if self.code_challenge and self.code_challenge_method not in [ 307 PKCE_METHOD_PLAIN, 308 PKCE_METHOD_S256, 309 ]: 310 raise AuthorizeError( 311 self.redirect_uri, 312 "invalid_request", 313 self.grant_type, 314 self.state, 315 f"Unsupported challenge method {self.code_challenge_method}", 316 )
PKCE validation of the transformation method.
318 def create_code(self, request: HttpRequest) -> AuthorizationCode: 319 """Create an AuthorizationCode object for the request""" 320 auth_event = get_login_event(request) 321 322 now = timezone.now() 323 324 code = AuthorizationCode( 325 user=request.user, 326 provider=self.provider, 327 auth_time=auth_event.created if auth_event else now, 328 code=uuid4().hex, 329 expires=now + timedelta_from_string(self.provider.access_code_validity), 330 scope=self.scope, 331 nonce=self.nonce, 332 session=request.session["authenticatedsession"], 333 ) 334 335 if self.code_challenge and self.code_challenge_method: 336 code.code_challenge = self.code_challenge 337 code.code_challenge_method = self.code_challenge_method 338 339 return code
Create an AuthorizationCode object for the request
342class AuthorizationFlowInitView(PolicyAccessView): 343 """OAuth2 Flow initializer, checks access to application and starts flow""" 344 345 params: OAuthAuthorizationParams 346 # Enable GitHub compatibility (only allow for scopes which are handled 347 # differently for github compat) 348 github_compat = False 349 350 def pre_permission_check(self): 351 """Check prompt parameter before checking permission/authentication, 352 see https://openid.net/specs/openid-connect-core-1_0.html#rfc.section.3.1.2.6""" 353 # Quick sanity check at the beginning to prevent event spamming 354 if len(self.request.GET) < 1: 355 raise Http404 356 try: 357 self.params = OAuthAuthorizationParams.from_request( 358 self.request, github_compat=self.github_compat 359 ) 360 except AuthorizeError as error: 361 LOGGER.warning(error.description, redirect_uri=error.redirect_uri, cause=error.cause) 362 raise RequestValidationError(error.get_response(self.request)) from None 363 except OAuth2Error as error: 364 LOGGER.warning(error.description, cause=error.cause) 365 raise RequestValidationError( 366 bad_request_message(self.request, error.description, title=error.error) 367 ) from None 368 except OAuth2Provider.DoesNotExist: 369 raise Http404 from None 370 if PROMPT_NONE in self.params.prompt and not self.request.user.is_authenticated: 371 # When "prompt" is set to "none" but the user is not logged in, show an error message 372 error = AuthorizeError( 373 self.params.redirect_uri, 374 "login_required", 375 self.params.grant_type, 376 self.params.state, 377 ) 378 raise RequestValidationError(error.get_response(self.request)) 379 380 def resolve_provider_application(self): 381 client_id = self.request.GET.get("client_id") 382 self.provider = get_object_or_404(OAuth2Provider, client_id=client_id) 383 self.application = self.provider.application 384 385 def modify_flow_context(self, flow: Flow, context: dict[str, Any]) -> dict[str, Any]: 386 if QS_LOGIN_HINT in self.request.GET: 387 context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = self.request.GET.get(QS_LOGIN_HINT) 388 return super().modify_flow_context(flow, context) 389 390 def modify_policy_request(self, request: PolicyRequest) -> PolicyRequest: 391 request.context["oauth_scopes"] = self.params.scope 392 request.context["oauth_grant_type"] = self.params.grant_type 393 request.context["oauth_code_challenge"] = self.params.code_challenge 394 request.context["oauth_code_challenge_method"] = self.params.code_challenge_method 395 request.context["oauth_max_age"] = self.params.max_age 396 request.context["oauth_redirect_uri"] = self.params.redirect_uri 397 request.context["oauth_response_type"] = self.params.response_type 398 return request 399 400 def dispatch_with_language(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 401 """Activate language from OIDC specific ui_locales parameter, picking the earliest one 402 available""" 403 selected_language = None 404 if UI_LOCALES in self.request.GET: 405 languages = str(self.request.GET[UI_LOCALES]).split(" ") 406 for language in languages: 407 if translation.check_for_language(language): 408 selected_language = translation.get_supported_language_variant(language) 409 LOGGER.debug( 410 "Activating language from oidc ui_locales", locale=selected_language 411 ) 412 break 413 translation.activate(selected_language) 414 response = super().dispatch(request, *args, **kwargs) 415 if selected_language: 416 response.set_cookie( 417 settings.LANGUAGE_COOKIE_NAME, 418 selected_language, 419 max_age=settings.LANGUAGE_COOKIE_AGE, 420 path=settings.LANGUAGE_COOKIE_PATH, 421 domain=settings.LANGUAGE_COOKIE_DOMAIN, 422 secure=settings.LANGUAGE_COOKIE_SECURE, 423 httponly=settings.LANGUAGE_COOKIE_HTTPONLY, 424 samesite=settings.LANGUAGE_COOKIE_SAMESITE, 425 ) 426 if isinstance(response, HttpResponseRedirect): 427 parsed_url = urlparse(response.url) 428 args = parse_qs(parsed_url.query) 429 args["locale"] = selected_language 430 response["Location"] = urlunparse( 431 parsed_url._replace(query=urlencode(args, quote_via=quote, doseq=True)) 432 ) 433 return response 434 435 def dispatch(self, request: HttpRequest, *args, **kwargs): 436 # Activate language before parsing params (error messages should be localized) 437 return self.dispatch_with_language(request, *args, **kwargs) 438 439 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 440 """Start FlowPLanner, return to flow executor shell""" 441 # Require a login event to be set, otherwise make the user re-login 442 login_event = get_login_event(request) 443 if not login_event: 444 LOGGER.warning("request with no login event") 445 return self.handle_no_permission() 446 login_uid = str(login_event.pk) 447 # After we've checked permissions, and the user has access, check if we need 448 # to re-authenticate the user 449 if self.params.max_age: 450 # Attempt to check via the session's login event if set, otherwise we can't 451 # check 452 login_time = login_event.created 453 current_age: timedelta = timezone.now() - login_time 454 if current_age.total_seconds() > self.params.max_age: 455 LOGGER.debug( 456 "Triggering authentication as max_age requirement", 457 max_age=self.params.max_age, 458 ago=int(current_age.total_seconds()), 459 ) 460 # Since we already need to re-authenticate the user, set the old login UID 461 # in case this request has both max_age and prompt=login 462 self.request.session[SESSION_KEY_LAST_LOGIN_UID] = login_uid 463 return self.handle_no_permission() 464 # If prompt=login, we need to re-authenticate the user regardless 465 # Check if we're not already doing the re-authentication 466 if PROMPT_LOGIN in self.params.prompt: 467 # No previous login UID saved, so save the current uid and trigger 468 # re-login, or previous login UID matches current one, so no re-login happened yet 469 if ( 470 SESSION_KEY_LAST_LOGIN_UID not in self.request.session 471 or login_uid == self.request.session[SESSION_KEY_LAST_LOGIN_UID] 472 ): 473 self.request.session[SESSION_KEY_LAST_LOGIN_UID] = login_uid 474 return self.handle_no_permission() 475 scope_descriptions = UserInfoView().get_scope_descriptions( 476 self.params.scope, self.params.provider 477 ) 478 # Regardless, we start the planner and return to it 479 planner = FlowPlanner(self.provider.authorization_flow) 480 planner.allow_empty_flows = True 481 try: 482 plan = planner.plan( 483 self.request, 484 { 485 PLAN_CONTEXT_SSO: True, 486 PLAN_CONTEXT_APPLICATION: self.application, 487 # OAuth2 related params 488 PLAN_CONTEXT_PARAMS: self.params, 489 # Consent related params 490 PLAN_CONTEXT_CONSENT_HEADER: _("You're about to sign into %(application)s.") 491 % {"application": self.application.name}, 492 PLAN_CONTEXT_CONSENT_PERMISSIONS: scope_descriptions, 493 }, 494 ) 495 except FlowNonApplicableException: 496 return self.handle_no_permission_authenticated() 497 # OpenID clients can specify a `prompt` parameter, and if its set to consent we 498 # need to inject a consent stage 499 if PROMPT_CONSENT in self.params.prompt: 500 if not any(isinstance(x.stage, ConsentStage) for x in plan.bindings): 501 # Plan does not have any consent stage, so we add an in-memory one 502 stage = ConsentStage( 503 name="OAuth2 Provider In-memory consent stage", 504 mode=ConsentMode.ALWAYS_REQUIRE, 505 ) 506 plan.append_stage(stage) 507 508 plan.append_stage(in_memory_stage(OAuthFulfillmentStage)) 509 510 return plan.to_redirect( 511 self.request, 512 self.provider.authorization_flow, 513 # We can only skip the flow executor and directly go to the final redirect URL if 514 # we can submit the data to the RP via URL 515 allowed_silent_types=( 516 [OAuthFulfillmentStage] 517 if self.params.response_mode in [ResponseMode.QUERY, ResponseMode.FRAGMENT] 518 else [] 519 ), 520 )
OAuth2 Flow initializer, checks access to application and starts flow
350 def pre_permission_check(self): 351 """Check prompt parameter before checking permission/authentication, 352 see https://openid.net/specs/openid-connect-core-1_0.html#rfc.section.3.1.2.6""" 353 # Quick sanity check at the beginning to prevent event spamming 354 if len(self.request.GET) < 1: 355 raise Http404 356 try: 357 self.params = OAuthAuthorizationParams.from_request( 358 self.request, github_compat=self.github_compat 359 ) 360 except AuthorizeError as error: 361 LOGGER.warning(error.description, redirect_uri=error.redirect_uri, cause=error.cause) 362 raise RequestValidationError(error.get_response(self.request)) from None 363 except OAuth2Error as error: 364 LOGGER.warning(error.description, cause=error.cause) 365 raise RequestValidationError( 366 bad_request_message(self.request, error.description, title=error.error) 367 ) from None 368 except OAuth2Provider.DoesNotExist: 369 raise Http404 from None 370 if PROMPT_NONE in self.params.prompt and not self.request.user.is_authenticated: 371 # When "prompt" is set to "none" but the user is not logged in, show an error message 372 error = AuthorizeError( 373 self.params.redirect_uri, 374 "login_required", 375 self.params.grant_type, 376 self.params.state, 377 ) 378 raise RequestValidationError(error.get_response(self.request))
Check prompt parameter before checking permission/authentication, see https://openid.net/specs/openid-connect-core-1_0.html#rfc.section.3.1.2.6
380 def resolve_provider_application(self): 381 client_id = self.request.GET.get("client_id") 382 self.provider = get_object_or_404(OAuth2Provider, client_id=client_id) 383 self.application = self.provider.application
Resolve self.provider and self.application. *.DoesNotExist Exceptions cause a normal AccessDenied view to be shown. An Http404 exception is not caught, and will return directly
385 def modify_flow_context(self, flow: Flow, context: dict[str, Any]) -> dict[str, Any]: 386 if QS_LOGIN_HINT in self.request.GET: 387 context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = self.request.GET.get(QS_LOGIN_HINT) 388 return super().modify_flow_context(flow, context)
optionally modify the flow context which is used for the authentication flow
390 def modify_policy_request(self, request: PolicyRequest) -> PolicyRequest: 391 request.context["oauth_scopes"] = self.params.scope 392 request.context["oauth_grant_type"] = self.params.grant_type 393 request.context["oauth_code_challenge"] = self.params.code_challenge 394 request.context["oauth_code_challenge_method"] = self.params.code_challenge_method 395 request.context["oauth_max_age"] = self.params.max_age 396 request.context["oauth_redirect_uri"] = self.params.redirect_uri 397 request.context["oauth_response_type"] = self.params.response_type 398 return request
optionally modify the policy request
400 def dispatch_with_language(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 401 """Activate language from OIDC specific ui_locales parameter, picking the earliest one 402 available""" 403 selected_language = None 404 if UI_LOCALES in self.request.GET: 405 languages = str(self.request.GET[UI_LOCALES]).split(" ") 406 for language in languages: 407 if translation.check_for_language(language): 408 selected_language = translation.get_supported_language_variant(language) 409 LOGGER.debug( 410 "Activating language from oidc ui_locales", locale=selected_language 411 ) 412 break 413 translation.activate(selected_language) 414 response = super().dispatch(request, *args, **kwargs) 415 if selected_language: 416 response.set_cookie( 417 settings.LANGUAGE_COOKIE_NAME, 418 selected_language, 419 max_age=settings.LANGUAGE_COOKIE_AGE, 420 path=settings.LANGUAGE_COOKIE_PATH, 421 domain=settings.LANGUAGE_COOKIE_DOMAIN, 422 secure=settings.LANGUAGE_COOKIE_SECURE, 423 httponly=settings.LANGUAGE_COOKIE_HTTPONLY, 424 samesite=settings.LANGUAGE_COOKIE_SAMESITE, 425 ) 426 if isinstance(response, HttpResponseRedirect): 427 parsed_url = urlparse(response.url) 428 args = parse_qs(parsed_url.query) 429 args["locale"] = selected_language 430 response["Location"] = urlunparse( 431 parsed_url._replace(query=urlencode(args, quote_via=quote, doseq=True)) 432 ) 433 return response
Activate language from OIDC specific ui_locales parameter, picking the earliest one available
439 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 440 """Start FlowPLanner, return to flow executor shell""" 441 # Require a login event to be set, otherwise make the user re-login 442 login_event = get_login_event(request) 443 if not login_event: 444 LOGGER.warning("request with no login event") 445 return self.handle_no_permission() 446 login_uid = str(login_event.pk) 447 # After we've checked permissions, and the user has access, check if we need 448 # to re-authenticate the user 449 if self.params.max_age: 450 # Attempt to check via the session's login event if set, otherwise we can't 451 # check 452 login_time = login_event.created 453 current_age: timedelta = timezone.now() - login_time 454 if current_age.total_seconds() > self.params.max_age: 455 LOGGER.debug( 456 "Triggering authentication as max_age requirement", 457 max_age=self.params.max_age, 458 ago=int(current_age.total_seconds()), 459 ) 460 # Since we already need to re-authenticate the user, set the old login UID 461 # in case this request has both max_age and prompt=login 462 self.request.session[SESSION_KEY_LAST_LOGIN_UID] = login_uid 463 return self.handle_no_permission() 464 # If prompt=login, we need to re-authenticate the user regardless 465 # Check if we're not already doing the re-authentication 466 if PROMPT_LOGIN in self.params.prompt: 467 # No previous login UID saved, so save the current uid and trigger 468 # re-login, or previous login UID matches current one, so no re-login happened yet 469 if ( 470 SESSION_KEY_LAST_LOGIN_UID not in self.request.session 471 or login_uid == self.request.session[SESSION_KEY_LAST_LOGIN_UID] 472 ): 473 self.request.session[SESSION_KEY_LAST_LOGIN_UID] = login_uid 474 return self.handle_no_permission() 475 scope_descriptions = UserInfoView().get_scope_descriptions( 476 self.params.scope, self.params.provider 477 ) 478 # Regardless, we start the planner and return to it 479 planner = FlowPlanner(self.provider.authorization_flow) 480 planner.allow_empty_flows = True 481 try: 482 plan = planner.plan( 483 self.request, 484 { 485 PLAN_CONTEXT_SSO: True, 486 PLAN_CONTEXT_APPLICATION: self.application, 487 # OAuth2 related params 488 PLAN_CONTEXT_PARAMS: self.params, 489 # Consent related params 490 PLAN_CONTEXT_CONSENT_HEADER: _("You're about to sign into %(application)s.") 491 % {"application": self.application.name}, 492 PLAN_CONTEXT_CONSENT_PERMISSIONS: scope_descriptions, 493 }, 494 ) 495 except FlowNonApplicableException: 496 return self.handle_no_permission_authenticated() 497 # OpenID clients can specify a `prompt` parameter, and if its set to consent we 498 # need to inject a consent stage 499 if PROMPT_CONSENT in self.params.prompt: 500 if not any(isinstance(x.stage, ConsentStage) for x in plan.bindings): 501 # Plan does not have any consent stage, so we add an in-memory one 502 stage = ConsentStage( 503 name="OAuth2 Provider In-memory consent stage", 504 mode=ConsentMode.ALWAYS_REQUIRE, 505 ) 506 plan.append_stage(stage) 507 508 plan.append_stage(in_memory_stage(OAuthFulfillmentStage)) 509 510 return plan.to_redirect( 511 self.request, 512 self.provider.authorization_flow, 513 # We can only skip the flow executor and directly go to the final redirect URL if 514 # we can submit the data to the RP via URL 515 allowed_silent_types=( 516 [OAuthFulfillmentStage] 517 if self.params.response_mode in [ResponseMode.QUERY, ResponseMode.FRAGMENT] 518 else [] 519 ), 520 )
Start FlowPLanner, return to flow executor shell
523class OAuthFulfillmentStage(StageView): 524 """Final stage, restores params from Flow.""" 525 526 params: OAuthAuthorizationParams 527 provider: OAuth2Provider 528 application: Application 529 530 def redirect(self, uri: str) -> HttpResponse: 531 """Redirect using HttpResponseRedirectScheme, compatible with non-http schemes""" 532 parsed = urlparse(uri) 533 534 if self.params.response_mode == ResponseMode.FORM_POST: 535 # parse_qs returns a dictionary with values wrapped in lists, however 536 # we need a flat dictionary for the autosubmit challenge 537 538 # this picks the first item in the list if the value is a list, 539 # otherwise just the value as-is 540 query_params = dict( 541 (k, v[0] if isinstance(v, list) else v) for k, v in parse_qs(parsed.query).items() 542 ) 543 544 challenge = AutosubmitChallenge( 545 data={ 546 "component": "ak-stage-autosubmit", 547 "title": self.executor.plan.context.get( 548 PLAN_CONTEXT_TITLE, 549 _("Redirecting to {app}...".format_map({"app": self.application.name})), 550 ), 551 "url": self.params.redirect_uri, 552 "attrs": query_params, 553 } 554 ) 555 556 challenge.is_valid() 557 self.executor.stage_ok() 558 return HttpChallengeResponse( 559 challenge=challenge, 560 ) 561 self.executor.stage_ok() 562 return HttpResponseRedirectScheme(uri, allowed_schemes=[parsed.scheme]) 563 564 def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 565 """Wrapper when this stage gets hit with a post request""" 566 return self.get(request, *args, **kwargs) 567 568 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 569 """final Stage of an OAuth2 Flow""" 570 if PLAN_CONTEXT_PARAMS not in self.executor.plan.context: 571 LOGGER.warning("Got to fulfillment stage with no pending context") 572 return HttpResponseBadRequest() 573 self.params: OAuthAuthorizationParams = self.executor.plan.context.pop(PLAN_CONTEXT_PARAMS) 574 self.application: Application = self.executor.plan.context.pop(PLAN_CONTEXT_APPLICATION) 575 self.provider = get_object_or_404(OAuth2Provider, pk=self.application.provider_id) 576 try: 577 # At this point we don't need to check permissions anymore 578 if {PROMPT_NONE, PROMPT_CONSENT}.issubset(self.params.prompt): 579 raise AuthorizeError( 580 self.params.redirect_uri, 581 "consent_required", 582 self.params.grant_type, 583 self.params.state, 584 ) 585 Event.new( 586 EventAction.AUTHORIZE_APPLICATION, 587 authorized_application=self.application, 588 flow=self.executor.plan.flow_pk, 589 scopes=" ".join(self.params.scope), 590 ).from_http(self.request) 591 return self.redirect(self.create_response_uri()) 592 except (ClientIdError, RedirectUriError) as error: 593 error.to_event(application=self.application).from_http(request) 594 self.executor.stage_invalid() 595 596 return bad_request_message(request, error.description, title=error.error) 597 except AuthorizeError as error: 598 error.to_event(application=self.application).from_http(request) 599 self.executor.stage_invalid() 600 return error.get_response(self.request) 601 602 def create_response_uri(self) -> str: 603 """Create a final Response URI the user is redirected to.""" 604 uri = urlsplit(self.params.redirect_uri) 605 606 try: 607 code = None 608 609 if self.params.grant_type in [ 610 GrantTypes.AUTHORIZATION_CODE, 611 GrantTypes.HYBRID, 612 ]: 613 code = self.params.create_code(self.request) 614 code.save() 615 616 if self.params.response_mode == ResponseMode.QUERY: 617 query_params = parse_qs(uri.query) 618 query_params["code"] = code.code 619 query_params["state"] = [str(self.params.state) if self.params.state else ""] 620 621 uri = uri._replace(query=urlencode(query_params, doseq=True)) 622 return urlunsplit(uri) 623 624 if self.params.response_mode == ResponseMode.FRAGMENT: 625 query_fragment = {} 626 if self.params.grant_type in [GrantTypes.AUTHORIZATION_CODE]: 627 query_fragment["code"] = code.code 628 query_fragment["state"] = [str(self.params.state) if self.params.state else ""] 629 else: 630 query_fragment = self.create_implicit_response(code) 631 632 uri = uri._replace( 633 fragment=uri.fragment + urlencode(query_fragment, doseq=True), 634 ) 635 636 return urlunsplit(uri) 637 638 if self.params.response_mode == ResponseMode.FORM_POST: 639 post_params = {} 640 if self.params.grant_type in [GrantTypes.AUTHORIZATION_CODE]: 641 post_params["code"] = code.code 642 post_params["state"] = [str(self.params.state) if self.params.state else ""] 643 else: 644 post_params = self.create_implicit_response(code) 645 646 uri = uri._replace(query=urlencode(post_params, doseq=True)) 647 648 return urlunsplit(uri) 649 650 raise OAuth2Error() 651 except OAuth2Error as error: 652 LOGGER.warning("Error when trying to create response uri", error=error) 653 raise AuthorizeError( 654 self.params.redirect_uri, 655 "server_error", 656 self.params.grant_type, 657 self.params.state, 658 ) from None 659 660 def create_implicit_response(self, code: AuthorizationCode | None) -> dict: 661 """Create implicit response's URL Fragment dictionary""" 662 query_fragment = {} 663 auth_event = get_login_event(self.request) 664 665 now = timezone.now() 666 access_token_expiry = now + timedelta_from_string(self.provider.access_token_validity) 667 token = AccessToken( 668 user=self.request.user, 669 scope=self.params.scope, 670 expires=access_token_expiry, 671 provider=self.provider, 672 auth_time=auth_event.created if auth_event else now, 673 session=self.request.session["authenticatedsession"], 674 ) 675 676 id_token = IDToken.new(self.provider, token, self.request) 677 id_token.nonce = self.params.nonce 678 679 if self.params.response_type in [ 680 ResponseTypes.CODE_ID_TOKEN, 681 ResponseTypes.CODE_ID_TOKEN_TOKEN, 682 ]: 683 id_token.c_hash = code.c_hash 684 token.id_token = id_token 685 686 # Check if response_type must include access_token in the response. 687 if self.params.response_type in [ 688 ResponseTypes.ID_TOKEN_TOKEN, 689 ResponseTypes.CODE_ID_TOKEN_TOKEN, 690 ResponseTypes.CODE_TOKEN, 691 ]: 692 query_fragment["access_token"] = token.token 693 # Get at_hash of the current token and update the id_token 694 id_token.at_hash = token.at_hash 695 696 # Check if response_type must include id_token in the response. 697 if self.params.response_type in [ 698 ResponseTypes.ID_TOKEN, 699 ResponseTypes.ID_TOKEN_TOKEN, 700 ResponseTypes.CODE_ID_TOKEN, 701 ResponseTypes.CODE_ID_TOKEN_TOKEN, 702 ]: 703 query_fragment["id_token"] = self.provider.encode(id_token.to_dict()) 704 token._id_token = dumps(id_token.to_dict()) 705 706 token.save() 707 708 # Code parameter must be present if it's Hybrid Flow. 709 if self.params.grant_type == GrantTypes.HYBRID: 710 query_fragment["code"] = code.code 711 712 query_fragment["token_type"] = TOKEN_TYPE 713 query_fragment["expires_in"] = int( 714 timedelta_from_string(self.provider.access_token_validity).total_seconds() 715 ) 716 query_fragment["state"] = self.params.state if self.params.state else "" 717 return query_fragment
Final stage, restores params from Flow.
530 def redirect(self, uri: str) -> HttpResponse: 531 """Redirect using HttpResponseRedirectScheme, compatible with non-http schemes""" 532 parsed = urlparse(uri) 533 534 if self.params.response_mode == ResponseMode.FORM_POST: 535 # parse_qs returns a dictionary with values wrapped in lists, however 536 # we need a flat dictionary for the autosubmit challenge 537 538 # this picks the first item in the list if the value is a list, 539 # otherwise just the value as-is 540 query_params = dict( 541 (k, v[0] if isinstance(v, list) else v) for k, v in parse_qs(parsed.query).items() 542 ) 543 544 challenge = AutosubmitChallenge( 545 data={ 546 "component": "ak-stage-autosubmit", 547 "title": self.executor.plan.context.get( 548 PLAN_CONTEXT_TITLE, 549 _("Redirecting to {app}...".format_map({"app": self.application.name})), 550 ), 551 "url": self.params.redirect_uri, 552 "attrs": query_params, 553 } 554 ) 555 556 challenge.is_valid() 557 self.executor.stage_ok() 558 return HttpChallengeResponse( 559 challenge=challenge, 560 ) 561 self.executor.stage_ok() 562 return HttpResponseRedirectScheme(uri, allowed_schemes=[parsed.scheme])
Redirect using HttpResponseRedirectScheme, compatible with non-http schemes
564 def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 565 """Wrapper when this stage gets hit with a post request""" 566 return self.get(request, *args, **kwargs)
Wrapper when this stage gets hit with a post request
568 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 569 """final Stage of an OAuth2 Flow""" 570 if PLAN_CONTEXT_PARAMS not in self.executor.plan.context: 571 LOGGER.warning("Got to fulfillment stage with no pending context") 572 return HttpResponseBadRequest() 573 self.params: OAuthAuthorizationParams = self.executor.plan.context.pop(PLAN_CONTEXT_PARAMS) 574 self.application: Application = self.executor.plan.context.pop(PLAN_CONTEXT_APPLICATION) 575 self.provider = get_object_or_404(OAuth2Provider, pk=self.application.provider_id) 576 try: 577 # At this point we don't need to check permissions anymore 578 if {PROMPT_NONE, PROMPT_CONSENT}.issubset(self.params.prompt): 579 raise AuthorizeError( 580 self.params.redirect_uri, 581 "consent_required", 582 self.params.grant_type, 583 self.params.state, 584 ) 585 Event.new( 586 EventAction.AUTHORIZE_APPLICATION, 587 authorized_application=self.application, 588 flow=self.executor.plan.flow_pk, 589 scopes=" ".join(self.params.scope), 590 ).from_http(self.request) 591 return self.redirect(self.create_response_uri()) 592 except (ClientIdError, RedirectUriError) as error: 593 error.to_event(application=self.application).from_http(request) 594 self.executor.stage_invalid() 595 596 return bad_request_message(request, error.description, title=error.error) 597 except AuthorizeError as error: 598 error.to_event(application=self.application).from_http(request) 599 self.executor.stage_invalid() 600 return error.get_response(self.request)
final Stage of an OAuth2 Flow
602 def create_response_uri(self) -> str: 603 """Create a final Response URI the user is redirected to.""" 604 uri = urlsplit(self.params.redirect_uri) 605 606 try: 607 code = None 608 609 if self.params.grant_type in [ 610 GrantTypes.AUTHORIZATION_CODE, 611 GrantTypes.HYBRID, 612 ]: 613 code = self.params.create_code(self.request) 614 code.save() 615 616 if self.params.response_mode == ResponseMode.QUERY: 617 query_params = parse_qs(uri.query) 618 query_params["code"] = code.code 619 query_params["state"] = [str(self.params.state) if self.params.state else ""] 620 621 uri = uri._replace(query=urlencode(query_params, doseq=True)) 622 return urlunsplit(uri) 623 624 if self.params.response_mode == ResponseMode.FRAGMENT: 625 query_fragment = {} 626 if self.params.grant_type in [GrantTypes.AUTHORIZATION_CODE]: 627 query_fragment["code"] = code.code 628 query_fragment["state"] = [str(self.params.state) if self.params.state else ""] 629 else: 630 query_fragment = self.create_implicit_response(code) 631 632 uri = uri._replace( 633 fragment=uri.fragment + urlencode(query_fragment, doseq=True), 634 ) 635 636 return urlunsplit(uri) 637 638 if self.params.response_mode == ResponseMode.FORM_POST: 639 post_params = {} 640 if self.params.grant_type in [GrantTypes.AUTHORIZATION_CODE]: 641 post_params["code"] = code.code 642 post_params["state"] = [str(self.params.state) if self.params.state else ""] 643 else: 644 post_params = self.create_implicit_response(code) 645 646 uri = uri._replace(query=urlencode(post_params, doseq=True)) 647 648 return urlunsplit(uri) 649 650 raise OAuth2Error() 651 except OAuth2Error as error: 652 LOGGER.warning("Error when trying to create response uri", error=error) 653 raise AuthorizeError( 654 self.params.redirect_uri, 655 "server_error", 656 self.params.grant_type, 657 self.params.state, 658 ) from None
Create a final Response URI the user is redirected to.
660 def create_implicit_response(self, code: AuthorizationCode | None) -> dict: 661 """Create implicit response's URL Fragment dictionary""" 662 query_fragment = {} 663 auth_event = get_login_event(self.request) 664 665 now = timezone.now() 666 access_token_expiry = now + timedelta_from_string(self.provider.access_token_validity) 667 token = AccessToken( 668 user=self.request.user, 669 scope=self.params.scope, 670 expires=access_token_expiry, 671 provider=self.provider, 672 auth_time=auth_event.created if auth_event else now, 673 session=self.request.session["authenticatedsession"], 674 ) 675 676 id_token = IDToken.new(self.provider, token, self.request) 677 id_token.nonce = self.params.nonce 678 679 if self.params.response_type in [ 680 ResponseTypes.CODE_ID_TOKEN, 681 ResponseTypes.CODE_ID_TOKEN_TOKEN, 682 ]: 683 id_token.c_hash = code.c_hash 684 token.id_token = id_token 685 686 # Check if response_type must include access_token in the response. 687 if self.params.response_type in [ 688 ResponseTypes.ID_TOKEN_TOKEN, 689 ResponseTypes.CODE_ID_TOKEN_TOKEN, 690 ResponseTypes.CODE_TOKEN, 691 ]: 692 query_fragment["access_token"] = token.token 693 # Get at_hash of the current token and update the id_token 694 id_token.at_hash = token.at_hash 695 696 # Check if response_type must include id_token in the response. 697 if self.params.response_type in [ 698 ResponseTypes.ID_TOKEN, 699 ResponseTypes.ID_TOKEN_TOKEN, 700 ResponseTypes.CODE_ID_TOKEN, 701 ResponseTypes.CODE_ID_TOKEN_TOKEN, 702 ]: 703 query_fragment["id_token"] = self.provider.encode(id_token.to_dict()) 704 token._id_token = dumps(id_token.to_dict()) 705 706 token.save() 707 708 # Code parameter must be present if it's Hybrid Flow. 709 if self.params.grant_type == GrantTypes.HYBRID: 710 query_fragment["code"] = code.code 711 712 query_fragment["token_type"] = TOKEN_TYPE 713 query_fragment["expires_in"] = int( 714 timedelta_from_string(self.provider.access_token_validity).total_seconds() 715 ) 716 query_fragment["state"] = self.params.state if self.params.state else "" 717 return query_fragment
Create implicit response's URL Fragment dictionary