authentik.policies.views

authentik access helper classes

  1"""authentik access helper classes"""
  2
  3from typing import Any
  4
  5from django.contrib import messages
  6from django.contrib.auth.mixins import AccessMixin
  7from django.http import Http404, HttpRequest, HttpResponse
  8from django.utils.translation import gettext as _
  9from django.views.generic.base import View
 10from structlog.stdlib import get_logger
 11
 12from authentik.core.apps import AppAccessWithoutBindings
 13from authentik.core.models import Application, Provider, User
 14from authentik.flows.exceptions import EmptyFlowException, FlowNonApplicableException
 15from authentik.flows.models import Flow, FlowDesignation
 16from authentik.flows.planner import (
 17    PLAN_CONTEXT_APPLICATION,
 18    PLAN_CONTEXT_POST,
 19    FlowPlanner,
 20)
 21from authentik.flows.views.executor import (
 22    SESSION_KEY_POST,
 23    ToDefaultFlow,
 24)
 25from authentik.lib.sentry import SentryIgnoredException
 26from authentik.policies.denied import AccessDeniedResponse
 27from authentik.policies.engine import PolicyEngine
 28from authentik.policies.models import PolicyBindingModel
 29from authentik.policies.types import PolicyRequest, PolicyResult
 30
 31LOGGER = get_logger()
 32
 33
 34class RequestValidationError(SentryIgnoredException):
 35    """Error raised in pre_permission_check, when a request is invalid."""
 36
 37    response: HttpResponse | None
 38
 39    def __init__(self, response: HttpResponse | None = None):
 40        super().__init__()
 41        if response:
 42            self.response = response
 43
 44
 45class PolicyAccessView(AccessMixin, View):
 46    """Mixin class for usage in Authorization views.
 47    Provider functions to check application access, etc"""
 48
 49    provider: Provider | None = None
 50    application: Application | None = None
 51
 52    def pre_permission_check(self):
 53        """Optionally hook in before permission check to check if a request is valid.
 54        Can raise `RequestValidationError` to return a response."""
 55
 56    def resolve_provider_application(self):
 57        """Resolve self.provider and self.application. *.DoesNotExist Exceptions cause a normal
 58        AccessDenied view to be shown. An Http404 exception
 59        is not caught, and will return directly"""
 60        raise NotImplementedError
 61
 62    def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
 63        try:
 64            self.pre_permission_check()
 65        except RequestValidationError as exc:
 66            if exc.response:
 67                return exc.response
 68            return self.handle_no_permission()
 69        try:
 70            self.resolve_provider_application()
 71        except (Application.DoesNotExist, Provider.DoesNotExist) as exc:
 72            LOGGER.warning("failed to resolve application", exc=exc)
 73            return self.handle_no_permission_authenticated(
 74                PolicyResult(False, _("Failed to resolve application"))
 75            )
 76        # Check if user is unauthenticated, so we pass the application
 77        # for the identification stage
 78        if not request.user.is_authenticated:
 79            return self.handle_no_permission()
 80        # Check permissions
 81        result = self.user_has_access()
 82        if not result.passing:
 83            return self.handle_no_permission_authenticated(result)
 84        return super().dispatch(request, *args, **kwargs)
 85
 86    def handle_no_permission(self) -> HttpResponse:
 87        """User has no access and is not authenticated, so we remember the application
 88        they try to access and redirect to the login URL. The application is saved to show
 89        a hint on the Identification Stage what the user should login for."""
 90        flow_context = {}
 91        authn_flow = None
 92        if self.application:
 93            flow_context[PLAN_CONTEXT_APPLICATION] = self.application
 94            if self.provider and self.provider.authentication_flow:
 95                authn_flow = self.provider.authentication_flow
 96        # Because this view might get hit with a POST request, we need to preserve that data
 97        # since later views might need it (mostly SAML)
 98        if self.request.method.lower() == "post":
 99            self.request.session[SESSION_KEY_POST] = self.request.POST
100            flow_context[PLAN_CONTEXT_POST] = self.request.POST
101
102        if not authn_flow:
103            authn_flow = ToDefaultFlow.get_flow(self.request, FlowDesignation.AUTHENTICATION)
104            if not authn_flow:
105                raise Http404
106        planner = FlowPlanner(authn_flow)
107        try:
108            plan = planner.plan(self.request, self.modify_flow_context(authn_flow, flow_context))
109        except (FlowNonApplicableException, EmptyFlowException) as exc:
110            LOGGER.warning("Non-applicable authentication flow", exc=exc)
111            raise Http404 from None
112        return plan.to_redirect(self.request, authn_flow, next=self.request.get_full_path())
113
114    def handle_no_permission_authenticated(
115        self, result: PolicyResult | None = None
116    ) -> HttpResponse:
117        """Function called when user has no permissions but is authenticated"""
118        response = AccessDeniedResponse(self.request)
119        if result:
120            response.policy_result = result
121        return response
122
123    def modify_policy_request(self, request: PolicyRequest) -> PolicyRequest:
124        """optionally modify the policy request"""
125        return request
126
127    def modify_flow_context(self, flow: Flow, context: dict[str, Any]) -> dict[str, Any]:
128        """optionally modify the flow context which is used for the authentication flow"""
129        return context
130
131    def user_has_access(
132        self, user: User | None = None, pbm: PolicyBindingModel | None = None
133    ) -> PolicyResult:
134        """Check if user has access to application."""
135        user = user or self.request.user
136        policy_engine = PolicyEngine(
137            pbm or self.application, user or self.request.user, self.request
138        )
139        policy_engine.empty_result = AppAccessWithoutBindings.get()
140        policy_engine.use_cache = False
141        policy_engine.request = self.modify_policy_request(policy_engine.request)
142        policy_engine.build()
143        result = policy_engine.result
144        log_kwargs = {}
145        if pbm:
146            log_kwargs["pbm"] = pbm.pk
147        else:
148            log_kwargs["app"] = self.application.slug
149        LOGGER.debug(
150            "PolicyAccessView user_has_access", user=user.username, result=result, **log_kwargs
151        )
152        if not result.passing:
153            for message in result.messages:
154                messages.error(self.request, _(message))
155        return result
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
class RequestValidationError(authentik.lib.sentry.SentryIgnoredException):
35class RequestValidationError(SentryIgnoredException):
36    """Error raised in pre_permission_check, when a request is invalid."""
37
38    response: HttpResponse | None
39
40    def __init__(self, response: HttpResponse | None = None):
41        super().__init__()
42        if response:
43            self.response = response

Error raised in pre_permission_check, when a request is invalid.

RequestValidationError(response: django.http.response.HttpResponse | None = None)
40    def __init__(self, response: HttpResponse | None = None):
41        super().__init__()
42        if response:
43            self.response = response
response: django.http.response.HttpResponse | None
class PolicyAccessView(django.contrib.auth.mixins.AccessMixin, django.views.generic.base.View):
 46class PolicyAccessView(AccessMixin, View):
 47    """Mixin class for usage in Authorization views.
 48    Provider functions to check application access, etc"""
 49
 50    provider: Provider | None = None
 51    application: Application | None = None
 52
 53    def pre_permission_check(self):
 54        """Optionally hook in before permission check to check if a request is valid.
 55        Can raise `RequestValidationError` to return a response."""
 56
 57    def resolve_provider_application(self):
 58        """Resolve self.provider and self.application. *.DoesNotExist Exceptions cause a normal
 59        AccessDenied view to be shown. An Http404 exception
 60        is not caught, and will return directly"""
 61        raise NotImplementedError
 62
 63    def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
 64        try:
 65            self.pre_permission_check()
 66        except RequestValidationError as exc:
 67            if exc.response:
 68                return exc.response
 69            return self.handle_no_permission()
 70        try:
 71            self.resolve_provider_application()
 72        except (Application.DoesNotExist, Provider.DoesNotExist) as exc:
 73            LOGGER.warning("failed to resolve application", exc=exc)
 74            return self.handle_no_permission_authenticated(
 75                PolicyResult(False, _("Failed to resolve application"))
 76            )
 77        # Check if user is unauthenticated, so we pass the application
 78        # for the identification stage
 79        if not request.user.is_authenticated:
 80            return self.handle_no_permission()
 81        # Check permissions
 82        result = self.user_has_access()
 83        if not result.passing:
 84            return self.handle_no_permission_authenticated(result)
 85        return super().dispatch(request, *args, **kwargs)
 86
 87    def handle_no_permission(self) -> HttpResponse:
 88        """User has no access and is not authenticated, so we remember the application
 89        they try to access and redirect to the login URL. The application is saved to show
 90        a hint on the Identification Stage what the user should login for."""
 91        flow_context = {}
 92        authn_flow = None
 93        if self.application:
 94            flow_context[PLAN_CONTEXT_APPLICATION] = self.application
 95            if self.provider and self.provider.authentication_flow:
 96                authn_flow = self.provider.authentication_flow
 97        # Because this view might get hit with a POST request, we need to preserve that data
 98        # since later views might need it (mostly SAML)
 99        if self.request.method.lower() == "post":
100            self.request.session[SESSION_KEY_POST] = self.request.POST
101            flow_context[PLAN_CONTEXT_POST] = self.request.POST
102
103        if not authn_flow:
104            authn_flow = ToDefaultFlow.get_flow(self.request, FlowDesignation.AUTHENTICATION)
105            if not authn_flow:
106                raise Http404
107        planner = FlowPlanner(authn_flow)
108        try:
109            plan = planner.plan(self.request, self.modify_flow_context(authn_flow, flow_context))
110        except (FlowNonApplicableException, EmptyFlowException) as exc:
111            LOGGER.warning("Non-applicable authentication flow", exc=exc)
112            raise Http404 from None
113        return plan.to_redirect(self.request, authn_flow, next=self.request.get_full_path())
114
115    def handle_no_permission_authenticated(
116        self, result: PolicyResult | None = None
117    ) -> HttpResponse:
118        """Function called when user has no permissions but is authenticated"""
119        response = AccessDeniedResponse(self.request)
120        if result:
121            response.policy_result = result
122        return response
123
124    def modify_policy_request(self, request: PolicyRequest) -> PolicyRequest:
125        """optionally modify the policy request"""
126        return request
127
128    def modify_flow_context(self, flow: Flow, context: dict[str, Any]) -> dict[str, Any]:
129        """optionally modify the flow context which is used for the authentication flow"""
130        return context
131
132    def user_has_access(
133        self, user: User | None = None, pbm: PolicyBindingModel | None = None
134    ) -> PolicyResult:
135        """Check if user has access to application."""
136        user = user or self.request.user
137        policy_engine = PolicyEngine(
138            pbm or self.application, user or self.request.user, self.request
139        )
140        policy_engine.empty_result = AppAccessWithoutBindings.get()
141        policy_engine.use_cache = False
142        policy_engine.request = self.modify_policy_request(policy_engine.request)
143        policy_engine.build()
144        result = policy_engine.result
145        log_kwargs = {}
146        if pbm:
147            log_kwargs["pbm"] = pbm.pk
148        else:
149            log_kwargs["app"] = self.application.slug
150        LOGGER.debug(
151            "PolicyAccessView user_has_access", user=user.username, result=result, **log_kwargs
152        )
153        if not result.passing:
154            for message in result.messages:
155                messages.error(self.request, _(message))
156        return result

Mixin class for usage in Authorization views. Provider functions to check application access, etc

provider: authentik.core.models.Provider | None = None
application: authentik.core.models.Application | None = None
def pre_permission_check(self):
53    def pre_permission_check(self):
54        """Optionally hook in before permission check to check if a request is valid.
55        Can raise `RequestValidationError` to return a response."""

Optionally hook in before permission check to check if a request is valid. Can raise RequestValidationError to return a response.

def resolve_provider_application(self):
57    def resolve_provider_application(self):
58        """Resolve self.provider and self.application. *.DoesNotExist Exceptions cause a normal
59        AccessDenied view to be shown. An Http404 exception
60        is not caught, and will return directly"""
61        raise NotImplementedError

Resolve self.provider and self.application. *.DoesNotExist Exceptions cause a normal AccessDenied view to be shown. An Http404 exception is not caught, and will return directly

def dispatch( self, request: django.http.request.HttpRequest, *args: Any, **kwargs: Any) -> django.http.response.HttpResponse:
63    def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
64        try:
65            self.pre_permission_check()
66        except RequestValidationError as exc:
67            if exc.response:
68                return exc.response
69            return self.handle_no_permission()
70        try:
71            self.resolve_provider_application()
72        except (Application.DoesNotExist, Provider.DoesNotExist) as exc:
73            LOGGER.warning("failed to resolve application", exc=exc)
74            return self.handle_no_permission_authenticated(
75                PolicyResult(False, _("Failed to resolve application"))
76            )
77        # Check if user is unauthenticated, so we pass the application
78        # for the identification stage
79        if not request.user.is_authenticated:
80            return self.handle_no_permission()
81        # Check permissions
82        result = self.user_has_access()
83        if not result.passing:
84            return self.handle_no_permission_authenticated(result)
85        return super().dispatch(request, *args, **kwargs)
def handle_no_permission(self) -> django.http.response.HttpResponse:
 87    def handle_no_permission(self) -> HttpResponse:
 88        """User has no access and is not authenticated, so we remember the application
 89        they try to access and redirect to the login URL. The application is saved to show
 90        a hint on the Identification Stage what the user should login for."""
 91        flow_context = {}
 92        authn_flow = None
 93        if self.application:
 94            flow_context[PLAN_CONTEXT_APPLICATION] = self.application
 95            if self.provider and self.provider.authentication_flow:
 96                authn_flow = self.provider.authentication_flow
 97        # Because this view might get hit with a POST request, we need to preserve that data
 98        # since later views might need it (mostly SAML)
 99        if self.request.method.lower() == "post":
100            self.request.session[SESSION_KEY_POST] = self.request.POST
101            flow_context[PLAN_CONTEXT_POST] = self.request.POST
102
103        if not authn_flow:
104            authn_flow = ToDefaultFlow.get_flow(self.request, FlowDesignation.AUTHENTICATION)
105            if not authn_flow:
106                raise Http404
107        planner = FlowPlanner(authn_flow)
108        try:
109            plan = planner.plan(self.request, self.modify_flow_context(authn_flow, flow_context))
110        except (FlowNonApplicableException, EmptyFlowException) as exc:
111            LOGGER.warning("Non-applicable authentication flow", exc=exc)
112            raise Http404 from None
113        return plan.to_redirect(self.request, authn_flow, next=self.request.get_full_path())

User has no access and is not authenticated, so we remember the application they try to access and redirect to the login URL. The application is saved to show a hint on the Identification Stage what the user should login for.

def handle_no_permission_authenticated( self, result: authentik.policies.types.PolicyResult | None = None) -> django.http.response.HttpResponse:
115    def handle_no_permission_authenticated(
116        self, result: PolicyResult | None = None
117    ) -> HttpResponse:
118        """Function called when user has no permissions but is authenticated"""
119        response = AccessDeniedResponse(self.request)
120        if result:
121            response.policy_result = result
122        return response

Function called when user has no permissions but is authenticated

def modify_policy_request( self, request: authentik.policies.types.PolicyRequest) -> authentik.policies.types.PolicyRequest:
124    def modify_policy_request(self, request: PolicyRequest) -> PolicyRequest:
125        """optionally modify the policy request"""
126        return request

optionally modify the policy request

def modify_flow_context( self, flow: authentik.flows.models.Flow, context: dict[str, typing.Any]) -> dict[str, typing.Any]:
128    def modify_flow_context(self, flow: Flow, context: dict[str, Any]) -> dict[str, Any]:
129        """optionally modify the flow context which is used for the authentication flow"""
130        return context

optionally modify the flow context which is used for the authentication flow

def user_has_access( self, user: authentik.core.models.User | None = None, pbm: authentik.policies.models.PolicyBindingModel | None = None) -> authentik.policies.types.PolicyResult:
132    def user_has_access(
133        self, user: User | None = None, pbm: PolicyBindingModel | None = None
134    ) -> PolicyResult:
135        """Check if user has access to application."""
136        user = user or self.request.user
137        policy_engine = PolicyEngine(
138            pbm or self.application, user or self.request.user, self.request
139        )
140        policy_engine.empty_result = AppAccessWithoutBindings.get()
141        policy_engine.use_cache = False
142        policy_engine.request = self.modify_policy_request(policy_engine.request)
143        policy_engine.build()
144        result = policy_engine.result
145        log_kwargs = {}
146        if pbm:
147            log_kwargs["pbm"] = pbm.pk
148        else:
149            log_kwargs["app"] = self.application.slug
150        LOGGER.debug(
151            "PolicyAccessView user_has_access", user=user.username, result=result, **log_kwargs
152        )
153        if not result.passing:
154            for message in result.messages:
155                messages.error(self.request, _(message))
156        return result

Check if user has access to application.