authentik.policies.views
authentik access helper classes
1"""authentik access helper classes""" 2 3from typing import Any 4 5from django.contrib import messages 6from django.contrib.auth.mixins import AccessMixin 7from django.http import Http404, HttpRequest, HttpResponse 8from django.utils.translation import gettext as _ 9from django.views.generic.base import View 10from structlog.stdlib import get_logger 11 12from authentik.core.apps import AppAccessWithoutBindings 13from authentik.core.models import Application, Provider, User 14from authentik.flows.exceptions import EmptyFlowException, FlowNonApplicableException 15from authentik.flows.models import Flow, FlowDesignation 16from authentik.flows.planner import ( 17 PLAN_CONTEXT_APPLICATION, 18 PLAN_CONTEXT_POST, 19 FlowPlanner, 20) 21from authentik.flows.views.executor import ( 22 SESSION_KEY_POST, 23 ToDefaultFlow, 24) 25from authentik.lib.sentry import SentryIgnoredException 26from authentik.policies.denied import AccessDeniedResponse 27from authentik.policies.engine import PolicyEngine 28from authentik.policies.models import PolicyBindingModel 29from authentik.policies.types import PolicyRequest, PolicyResult 30 31LOGGER = get_logger() 32 33 34class RequestValidationError(SentryIgnoredException): 35 """Error raised in pre_permission_check, when a request is invalid.""" 36 37 response: HttpResponse | None 38 39 def __init__(self, response: HttpResponse | None = None): 40 super().__init__() 41 if response: 42 self.response = response 43 44 45class PolicyAccessView(AccessMixin, View): 46 """Mixin class for usage in Authorization views. 47 Provider functions to check application access, etc""" 48 49 provider: Provider | None = None 50 application: Application | None = None 51 52 def pre_permission_check(self): 53 """Optionally hook in before permission check to check if a request is valid. 54 Can raise `RequestValidationError` to return a response.""" 55 56 def resolve_provider_application(self): 57 """Resolve self.provider and self.application. *.DoesNotExist Exceptions cause a normal 58 AccessDenied view to be shown. An Http404 exception 59 is not caught, and will return directly""" 60 raise NotImplementedError 61 62 def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse: 63 try: 64 self.pre_permission_check() 65 except RequestValidationError as exc: 66 if exc.response: 67 return exc.response 68 return self.handle_no_permission() 69 try: 70 self.resolve_provider_application() 71 except (Application.DoesNotExist, Provider.DoesNotExist) as exc: 72 LOGGER.warning("failed to resolve application", exc=exc) 73 return self.handle_no_permission_authenticated( 74 PolicyResult(False, _("Failed to resolve application")) 75 ) 76 # Check if user is unauthenticated, so we pass the application 77 # for the identification stage 78 if not request.user.is_authenticated: 79 return self.handle_no_permission() 80 # Check permissions 81 result = self.user_has_access() 82 if not result.passing: 83 return self.handle_no_permission_authenticated(result) 84 return super().dispatch(request, *args, **kwargs) 85 86 def handle_no_permission(self) -> HttpResponse: 87 """User has no access and is not authenticated, so we remember the application 88 they try to access and redirect to the login URL. The application is saved to show 89 a hint on the Identification Stage what the user should login for.""" 90 flow_context = {} 91 authn_flow = None 92 if self.application: 93 flow_context[PLAN_CONTEXT_APPLICATION] = self.application 94 if self.provider and self.provider.authentication_flow: 95 authn_flow = self.provider.authentication_flow 96 # Because this view might get hit with a POST request, we need to preserve that data 97 # since later views might need it (mostly SAML) 98 if self.request.method.lower() == "post": 99 self.request.session[SESSION_KEY_POST] = self.request.POST 100 flow_context[PLAN_CONTEXT_POST] = self.request.POST 101 102 if not authn_flow: 103 authn_flow = ToDefaultFlow.get_flow(self.request, FlowDesignation.AUTHENTICATION) 104 if not authn_flow: 105 raise Http404 106 planner = FlowPlanner(authn_flow) 107 try: 108 plan = planner.plan(self.request, self.modify_flow_context(authn_flow, flow_context)) 109 except (FlowNonApplicableException, EmptyFlowException) as exc: 110 LOGGER.warning("Non-applicable authentication flow", exc=exc) 111 raise Http404 from None 112 return plan.to_redirect(self.request, authn_flow, next=self.request.get_full_path()) 113 114 def handle_no_permission_authenticated( 115 self, result: PolicyResult | None = None 116 ) -> HttpResponse: 117 """Function called when user has no permissions but is authenticated""" 118 response = AccessDeniedResponse(self.request) 119 if result: 120 response.policy_result = result 121 return response 122 123 def modify_policy_request(self, request: PolicyRequest) -> PolicyRequest: 124 """optionally modify the policy request""" 125 return request 126 127 def modify_flow_context(self, flow: Flow, context: dict[str, Any]) -> dict[str, Any]: 128 """optionally modify the flow context which is used for the authentication flow""" 129 return context 130 131 def user_has_access( 132 self, user: User | None = None, pbm: PolicyBindingModel | None = None 133 ) -> PolicyResult: 134 """Check if user has access to application.""" 135 user = user or self.request.user 136 policy_engine = PolicyEngine( 137 pbm or self.application, user or self.request.user, self.request 138 ) 139 policy_engine.empty_result = AppAccessWithoutBindings.get() 140 policy_engine.use_cache = False 141 policy_engine.request = self.modify_policy_request(policy_engine.request) 142 policy_engine.build() 143 result = policy_engine.result 144 log_kwargs = {} 145 if pbm: 146 log_kwargs["pbm"] = pbm.pk 147 else: 148 log_kwargs["app"] = self.application.slug 149 LOGGER.debug( 150 "PolicyAccessView user_has_access", user=user.username, result=result, **log_kwargs 151 ) 152 if not result.passing: 153 for message in result.messages: 154 messages.error(self.request, _(message)) 155 return result
35class RequestValidationError(SentryIgnoredException): 36 """Error raised in pre_permission_check, when a request is invalid.""" 37 38 response: HttpResponse | None 39 40 def __init__(self, response: HttpResponse | None = None): 41 super().__init__() 42 if response: 43 self.response = response
Error raised in pre_permission_check, when a request is invalid.
46class PolicyAccessView(AccessMixin, View): 47 """Mixin class for usage in Authorization views. 48 Provider functions to check application access, etc""" 49 50 provider: Provider | None = None 51 application: Application | None = None 52 53 def pre_permission_check(self): 54 """Optionally hook in before permission check to check if a request is valid. 55 Can raise `RequestValidationError` to return a response.""" 56 57 def resolve_provider_application(self): 58 """Resolve self.provider and self.application. *.DoesNotExist Exceptions cause a normal 59 AccessDenied view to be shown. An Http404 exception 60 is not caught, and will return directly""" 61 raise NotImplementedError 62 63 def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse: 64 try: 65 self.pre_permission_check() 66 except RequestValidationError as exc: 67 if exc.response: 68 return exc.response 69 return self.handle_no_permission() 70 try: 71 self.resolve_provider_application() 72 except (Application.DoesNotExist, Provider.DoesNotExist) as exc: 73 LOGGER.warning("failed to resolve application", exc=exc) 74 return self.handle_no_permission_authenticated( 75 PolicyResult(False, _("Failed to resolve application")) 76 ) 77 # Check if user is unauthenticated, so we pass the application 78 # for the identification stage 79 if not request.user.is_authenticated: 80 return self.handle_no_permission() 81 # Check permissions 82 result = self.user_has_access() 83 if not result.passing: 84 return self.handle_no_permission_authenticated(result) 85 return super().dispatch(request, *args, **kwargs) 86 87 def handle_no_permission(self) -> HttpResponse: 88 """User has no access and is not authenticated, so we remember the application 89 they try to access and redirect to the login URL. The application is saved to show 90 a hint on the Identification Stage what the user should login for.""" 91 flow_context = {} 92 authn_flow = None 93 if self.application: 94 flow_context[PLAN_CONTEXT_APPLICATION] = self.application 95 if self.provider and self.provider.authentication_flow: 96 authn_flow = self.provider.authentication_flow 97 # Because this view might get hit with a POST request, we need to preserve that data 98 # since later views might need it (mostly SAML) 99 if self.request.method.lower() == "post": 100 self.request.session[SESSION_KEY_POST] = self.request.POST 101 flow_context[PLAN_CONTEXT_POST] = self.request.POST 102 103 if not authn_flow: 104 authn_flow = ToDefaultFlow.get_flow(self.request, FlowDesignation.AUTHENTICATION) 105 if not authn_flow: 106 raise Http404 107 planner = FlowPlanner(authn_flow) 108 try: 109 plan = planner.plan(self.request, self.modify_flow_context(authn_flow, flow_context)) 110 except (FlowNonApplicableException, EmptyFlowException) as exc: 111 LOGGER.warning("Non-applicable authentication flow", exc=exc) 112 raise Http404 from None 113 return plan.to_redirect(self.request, authn_flow, next=self.request.get_full_path()) 114 115 def handle_no_permission_authenticated( 116 self, result: PolicyResult | None = None 117 ) -> HttpResponse: 118 """Function called when user has no permissions but is authenticated""" 119 response = AccessDeniedResponse(self.request) 120 if result: 121 response.policy_result = result 122 return response 123 124 def modify_policy_request(self, request: PolicyRequest) -> PolicyRequest: 125 """optionally modify the policy request""" 126 return request 127 128 def modify_flow_context(self, flow: Flow, context: dict[str, Any]) -> dict[str, Any]: 129 """optionally modify the flow context which is used for the authentication flow""" 130 return context 131 132 def user_has_access( 133 self, user: User | None = None, pbm: PolicyBindingModel | None = None 134 ) -> PolicyResult: 135 """Check if user has access to application.""" 136 user = user or self.request.user 137 policy_engine = PolicyEngine( 138 pbm or self.application, user or self.request.user, self.request 139 ) 140 policy_engine.empty_result = AppAccessWithoutBindings.get() 141 policy_engine.use_cache = False 142 policy_engine.request = self.modify_policy_request(policy_engine.request) 143 policy_engine.build() 144 result = policy_engine.result 145 log_kwargs = {} 146 if pbm: 147 log_kwargs["pbm"] = pbm.pk 148 else: 149 log_kwargs["app"] = self.application.slug 150 LOGGER.debug( 151 "PolicyAccessView user_has_access", user=user.username, result=result, **log_kwargs 152 ) 153 if not result.passing: 154 for message in result.messages: 155 messages.error(self.request, _(message)) 156 return result
Mixin class for usage in Authorization views. Provider functions to check application access, etc
53 def pre_permission_check(self): 54 """Optionally hook in before permission check to check if a request is valid. 55 Can raise `RequestValidationError` to return a response."""
Optionally hook in before permission check to check if a request is valid.
Can raise RequestValidationError to return a response.
57 def resolve_provider_application(self): 58 """Resolve self.provider and self.application. *.DoesNotExist Exceptions cause a normal 59 AccessDenied view to be shown. An Http404 exception 60 is not caught, and will return directly""" 61 raise NotImplementedError
Resolve self.provider and self.application. *.DoesNotExist Exceptions cause a normal AccessDenied view to be shown. An Http404 exception is not caught, and will return directly
63 def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse: 64 try: 65 self.pre_permission_check() 66 except RequestValidationError as exc: 67 if exc.response: 68 return exc.response 69 return self.handle_no_permission() 70 try: 71 self.resolve_provider_application() 72 except (Application.DoesNotExist, Provider.DoesNotExist) as exc: 73 LOGGER.warning("failed to resolve application", exc=exc) 74 return self.handle_no_permission_authenticated( 75 PolicyResult(False, _("Failed to resolve application")) 76 ) 77 # Check if user is unauthenticated, so we pass the application 78 # for the identification stage 79 if not request.user.is_authenticated: 80 return self.handle_no_permission() 81 # Check permissions 82 result = self.user_has_access() 83 if not result.passing: 84 return self.handle_no_permission_authenticated(result) 85 return super().dispatch(request, *args, **kwargs)
87 def handle_no_permission(self) -> HttpResponse: 88 """User has no access and is not authenticated, so we remember the application 89 they try to access and redirect to the login URL. The application is saved to show 90 a hint on the Identification Stage what the user should login for.""" 91 flow_context = {} 92 authn_flow = None 93 if self.application: 94 flow_context[PLAN_CONTEXT_APPLICATION] = self.application 95 if self.provider and self.provider.authentication_flow: 96 authn_flow = self.provider.authentication_flow 97 # Because this view might get hit with a POST request, we need to preserve that data 98 # since later views might need it (mostly SAML) 99 if self.request.method.lower() == "post": 100 self.request.session[SESSION_KEY_POST] = self.request.POST 101 flow_context[PLAN_CONTEXT_POST] = self.request.POST 102 103 if not authn_flow: 104 authn_flow = ToDefaultFlow.get_flow(self.request, FlowDesignation.AUTHENTICATION) 105 if not authn_flow: 106 raise Http404 107 planner = FlowPlanner(authn_flow) 108 try: 109 plan = planner.plan(self.request, self.modify_flow_context(authn_flow, flow_context)) 110 except (FlowNonApplicableException, EmptyFlowException) as exc: 111 LOGGER.warning("Non-applicable authentication flow", exc=exc) 112 raise Http404 from None 113 return plan.to_redirect(self.request, authn_flow, next=self.request.get_full_path())
User has no access and is not authenticated, so we remember the application they try to access and redirect to the login URL. The application is saved to show a hint on the Identification Stage what the user should login for.
115 def handle_no_permission_authenticated( 116 self, result: PolicyResult | None = None 117 ) -> HttpResponse: 118 """Function called when user has no permissions but is authenticated""" 119 response = AccessDeniedResponse(self.request) 120 if result: 121 response.policy_result = result 122 return response
Function called when user has no permissions but is authenticated
124 def modify_policy_request(self, request: PolicyRequest) -> PolicyRequest: 125 """optionally modify the policy request""" 126 return request
optionally modify the policy request
128 def modify_flow_context(self, flow: Flow, context: dict[str, Any]) -> dict[str, Any]: 129 """optionally modify the flow context which is used for the authentication flow""" 130 return context
optionally modify the flow context which is used for the authentication flow
132 def user_has_access( 133 self, user: User | None = None, pbm: PolicyBindingModel | None = None 134 ) -> PolicyResult: 135 """Check if user has access to application.""" 136 user = user or self.request.user 137 policy_engine = PolicyEngine( 138 pbm or self.application, user or self.request.user, self.request 139 ) 140 policy_engine.empty_result = AppAccessWithoutBindings.get() 141 policy_engine.use_cache = False 142 policy_engine.request = self.modify_policy_request(policy_engine.request) 143 policy_engine.build() 144 result = policy_engine.result 145 log_kwargs = {} 146 if pbm: 147 log_kwargs["pbm"] = pbm.pk 148 else: 149 log_kwargs["app"] = self.application.slug 150 LOGGER.debug( 151 "PolicyAccessView user_has_access", user=user.username, result=result, **log_kwargs 152 ) 153 if not result.passing: 154 for message in result.messages: 155 messages.error(self.request, _(message)) 156 return result
Check if user has access to application.