authentik.policies.process

authentik policy task

  1"""authentik policy task"""
  2
  3from multiprocessing import get_context
  4from multiprocessing.connection import Connection
  5from time import perf_counter
  6
  7from django.core.cache import cache
  8from sentry_sdk import start_span
  9from sentry_sdk.tracing import Span
 10from structlog.stdlib import get_logger
 11
 12from authentik.events.models import Event, EventAction
 13from authentik.lib.config import CONFIG
 14from authentik.lib.utils.errors import exception_to_dict
 15from authentik.policies.exceptions import PolicyException
 16from authentik.policies.models import PolicyBinding
 17from authentik.policies.types import CACHE_PREFIX, PolicyRequest, PolicyResult
 18
 19LOGGER = get_logger()
 20
 21FORK_CTX = get_context("fork")
 22CACHE_TIMEOUT = CONFIG.get_int("cache.timeout_policies")
 23PROCESS_CLASS = FORK_CTX.Process
 24
 25
 26def cache_key(binding: PolicyBinding, request: PolicyRequest) -> str:
 27    """Generate Cache key for policy"""
 28    prefix = f"{CACHE_PREFIX}{binding.policy_binding_uuid.hex}_"
 29    if request.http_request and hasattr(request.http_request, "session"):
 30        prefix += f"_{request.http_request.session.session_key}"
 31    if request.user:
 32        prefix += f"#{request.user.pk}"
 33    return prefix
 34
 35
 36class PolicyProcess(PROCESS_CLASS):
 37    """Evaluate a single policy within a separate process"""
 38
 39    connection: Connection
 40    binding: PolicyBinding
 41    request: PolicyRequest
 42
 43    def __init__(
 44        self,
 45        binding: PolicyBinding,
 46        request: PolicyRequest,
 47        connection: Connection | None,
 48    ):
 49        super().__init__()
 50        self.binding = binding
 51        self.request = request
 52        if not isinstance(self.request, PolicyRequest):
 53            raise ValueError(f"{self.request} is not a Policy Request.")
 54        if connection:
 55            self.connection = connection
 56
 57    def create_event(self, action: str, message: str, **kwargs):
 58        """Create event with common values from `self.request` and `self.binding`."""
 59        event = Event.new(
 60            action=action,
 61            message=message,
 62            policy_uuid=self.binding.policy.policy_uuid.hex,
 63            binding=self.binding,
 64            request=self.request,
 65            **kwargs,
 66        )
 67        event.set_user(self.request.user)
 68        if self.request.http_request:
 69            event.from_http(self.request.http_request)
 70        else:
 71            event.save()
 72
 73    def execute(self) -> PolicyResult:
 74        """Run actual policy, returns result"""
 75        LOGGER.debug(
 76            "P_ENG(proc): Running policy",
 77            policy=self.binding.policy,
 78            user=self.request.user.username,
 79            # this is used for filtering in access checking where logs are sent to the admin
 80            process="PolicyProcess",
 81        )
 82        try:
 83            policy_result = self.binding.passes(self.request)
 84            # Invert result if policy.negate is set
 85            if self.binding.negate:
 86                policy_result.passing = not policy_result.passing
 87            if self.binding.policy and not self.request.debug:
 88                if self.binding.policy.execution_logging:
 89                    self.create_event(
 90                        EventAction.POLICY_EXECUTION,
 91                        message="Policy Execution",
 92                        result=policy_result,
 93                    )
 94        except PolicyException as exc:
 95            # Either use passed original exception or whatever we have
 96            src_exc = exc.src_exc if exc.src_exc else exc
 97            # Create policy exception event, only when we're not debugging
 98            if not self.request.debug:
 99                self.create_event(
100                    EventAction.POLICY_EXCEPTION,
101                    message="Policy failed to execute",
102                    exception=exception_to_dict(src_exc),
103                )
104            LOGGER.debug("P_ENG(proc): error, using failure result", exc=src_exc)
105            policy_result = PolicyResult(self.binding.failure_result, str(src_exc))
106        policy_result.source_binding = self.binding
107        should_cache = self.request.should_cache
108        if should_cache:
109            key = cache_key(self.binding, self.request)
110            cache.set(key, policy_result, CACHE_TIMEOUT)
111        LOGGER.debug(
112            "P_ENG(proc): finished",
113            policy=self.binding.policy,
114            cached=should_cache,
115            result=policy_result,
116            # this is used for filtering in access checking where logs are sent to the admin
117            process="PolicyProcess",
118            passing=policy_result.passing,
119            user=self.request.user.username,
120        )
121        return policy_result
122
123    def profiling_wrapper(self):
124        """Run with profiling enabled"""
125        with start_span(
126            op="authentik.policy.process.execute",
127        ) as span:
128            span: Span
129            span.set_data("policy", self.binding.policy)
130            span.set_data("request", self.request)
131            return self.execute()
132
133    def run(self):  # pragma: no cover
134        """Task wrapper to run policy checking"""
135        result = None
136        try:
137            start = perf_counter()
138            result = self.profiling_wrapper()
139            end = perf_counter()
140            result._exec_time = max((end - start), 0)
141        except Exception as exc:  # noqa
142            LOGGER.warning("Policy failed to run", exc=exc)
143            result = PolicyResult(False, str(exc))
144        finally:
145            self.connection.send(result)
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
FORK_CTX = <multiprocessing.context.ForkContext object>
CACHE_TIMEOUT = 300
PROCESS_CLASS = <class 'multiprocessing.context.ForkProcess'>
def cache_key( binding: authentik.policies.models.PolicyBinding, request: authentik.policies.types.PolicyRequest) -> str:
27def cache_key(binding: PolicyBinding, request: PolicyRequest) -> str:
28    """Generate Cache key for policy"""
29    prefix = f"{CACHE_PREFIX}{binding.policy_binding_uuid.hex}_"
30    if request.http_request and hasattr(request.http_request, "session"):
31        prefix += f"_{request.http_request.session.session_key}"
32    if request.user:
33        prefix += f"#{request.user.pk}"
34    return prefix

Generate Cache key for policy

class PolicyProcess(multiprocessing.context.ForkProcess):
 37class PolicyProcess(PROCESS_CLASS):
 38    """Evaluate a single policy within a separate process"""
 39
 40    connection: Connection
 41    binding: PolicyBinding
 42    request: PolicyRequest
 43
 44    def __init__(
 45        self,
 46        binding: PolicyBinding,
 47        request: PolicyRequest,
 48        connection: Connection | None,
 49    ):
 50        super().__init__()
 51        self.binding = binding
 52        self.request = request
 53        if not isinstance(self.request, PolicyRequest):
 54            raise ValueError(f"{self.request} is not a Policy Request.")
 55        if connection:
 56            self.connection = connection
 57
 58    def create_event(self, action: str, message: str, **kwargs):
 59        """Create event with common values from `self.request` and `self.binding`."""
 60        event = Event.new(
 61            action=action,
 62            message=message,
 63            policy_uuid=self.binding.policy.policy_uuid.hex,
 64            binding=self.binding,
 65            request=self.request,
 66            **kwargs,
 67        )
 68        event.set_user(self.request.user)
 69        if self.request.http_request:
 70            event.from_http(self.request.http_request)
 71        else:
 72            event.save()
 73
 74    def execute(self) -> PolicyResult:
 75        """Run actual policy, returns result"""
 76        LOGGER.debug(
 77            "P_ENG(proc): Running policy",
 78            policy=self.binding.policy,
 79            user=self.request.user.username,
 80            # this is used for filtering in access checking where logs are sent to the admin
 81            process="PolicyProcess",
 82        )
 83        try:
 84            policy_result = self.binding.passes(self.request)
 85            # Invert result if policy.negate is set
 86            if self.binding.negate:
 87                policy_result.passing = not policy_result.passing
 88            if self.binding.policy and not self.request.debug:
 89                if self.binding.policy.execution_logging:
 90                    self.create_event(
 91                        EventAction.POLICY_EXECUTION,
 92                        message="Policy Execution",
 93                        result=policy_result,
 94                    )
 95        except PolicyException as exc:
 96            # Either use passed original exception or whatever we have
 97            src_exc = exc.src_exc if exc.src_exc else exc
 98            # Create policy exception event, only when we're not debugging
 99            if not self.request.debug:
100                self.create_event(
101                    EventAction.POLICY_EXCEPTION,
102                    message="Policy failed to execute",
103                    exception=exception_to_dict(src_exc),
104                )
105            LOGGER.debug("P_ENG(proc): error, using failure result", exc=src_exc)
106            policy_result = PolicyResult(self.binding.failure_result, str(src_exc))
107        policy_result.source_binding = self.binding
108        should_cache = self.request.should_cache
109        if should_cache:
110            key = cache_key(self.binding, self.request)
111            cache.set(key, policy_result, CACHE_TIMEOUT)
112        LOGGER.debug(
113            "P_ENG(proc): finished",
114            policy=self.binding.policy,
115            cached=should_cache,
116            result=policy_result,
117            # this is used for filtering in access checking where logs are sent to the admin
118            process="PolicyProcess",
119            passing=policy_result.passing,
120            user=self.request.user.username,
121        )
122        return policy_result
123
124    def profiling_wrapper(self):
125        """Run with profiling enabled"""
126        with start_span(
127            op="authentik.policy.process.execute",
128        ) as span:
129            span: Span
130            span.set_data("policy", self.binding.policy)
131            span.set_data("request", self.request)
132            return self.execute()
133
134    def run(self):  # pragma: no cover
135        """Task wrapper to run policy checking"""
136        result = None
137        try:
138            start = perf_counter()
139            result = self.profiling_wrapper()
140            end = perf_counter()
141            result._exec_time = max((end - start), 0)
142        except Exception as exc:  # noqa
143            LOGGER.warning("Policy failed to run", exc=exc)
144            result = PolicyResult(False, str(exc))
145        finally:
146            self.connection.send(result)

Evaluate a single policy within a separate process

PolicyProcess( binding: authentik.policies.models.PolicyBinding, request: authentik.policies.types.PolicyRequest, connection: multiprocessing.connection.Connection | None)
44    def __init__(
45        self,
46        binding: PolicyBinding,
47        request: PolicyRequest,
48        connection: Connection | None,
49    ):
50        super().__init__()
51        self.binding = binding
52        self.request = request
53        if not isinstance(self.request, PolicyRequest):
54            raise ValueError(f"{self.request} is not a Policy Request.")
55        if connection:
56            self.connection = connection
connection: multiprocessing.connection.Connection
def create_event(self, action: str, message: str, **kwargs):
58    def create_event(self, action: str, message: str, **kwargs):
59        """Create event with common values from `self.request` and `self.binding`."""
60        event = Event.new(
61            action=action,
62            message=message,
63            policy_uuid=self.binding.policy.policy_uuid.hex,
64            binding=self.binding,
65            request=self.request,
66            **kwargs,
67        )
68        event.set_user(self.request.user)
69        if self.request.http_request:
70            event.from_http(self.request.http_request)
71        else:
72            event.save()

Create event with common values from self.request and self.binding.

def execute(self) -> authentik.policies.types.PolicyResult:
 74    def execute(self) -> PolicyResult:
 75        """Run actual policy, returns result"""
 76        LOGGER.debug(
 77            "P_ENG(proc): Running policy",
 78            policy=self.binding.policy,
 79            user=self.request.user.username,
 80            # this is used for filtering in access checking where logs are sent to the admin
 81            process="PolicyProcess",
 82        )
 83        try:
 84            policy_result = self.binding.passes(self.request)
 85            # Invert result if policy.negate is set
 86            if self.binding.negate:
 87                policy_result.passing = not policy_result.passing
 88            if self.binding.policy and not self.request.debug:
 89                if self.binding.policy.execution_logging:
 90                    self.create_event(
 91                        EventAction.POLICY_EXECUTION,
 92                        message="Policy Execution",
 93                        result=policy_result,
 94                    )
 95        except PolicyException as exc:
 96            # Either use passed original exception or whatever we have
 97            src_exc = exc.src_exc if exc.src_exc else exc
 98            # Create policy exception event, only when we're not debugging
 99            if not self.request.debug:
100                self.create_event(
101                    EventAction.POLICY_EXCEPTION,
102                    message="Policy failed to execute",
103                    exception=exception_to_dict(src_exc),
104                )
105            LOGGER.debug("P_ENG(proc): error, using failure result", exc=src_exc)
106            policy_result = PolicyResult(self.binding.failure_result, str(src_exc))
107        policy_result.source_binding = self.binding
108        should_cache = self.request.should_cache
109        if should_cache:
110            key = cache_key(self.binding, self.request)
111            cache.set(key, policy_result, CACHE_TIMEOUT)
112        LOGGER.debug(
113            "P_ENG(proc): finished",
114            policy=self.binding.policy,
115            cached=should_cache,
116            result=policy_result,
117            # this is used for filtering in access checking where logs are sent to the admin
118            process="PolicyProcess",
119            passing=policy_result.passing,
120            user=self.request.user.username,
121        )
122        return policy_result

Run actual policy, returns result

def profiling_wrapper(self):
124    def profiling_wrapper(self):
125        """Run with profiling enabled"""
126        with start_span(
127            op="authentik.policy.process.execute",
128        ) as span:
129            span: Span
130            span.set_data("policy", self.binding.policy)
131            span.set_data("request", self.request)
132            return self.execute()

Run with profiling enabled

def run(self):
134    def run(self):  # pragma: no cover
135        """Task wrapper to run policy checking"""
136        result = None
137        try:
138            start = perf_counter()
139            result = self.profiling_wrapper()
140            end = perf_counter()
141            result._exec_time = max((end - start), 0)
142        except Exception as exc:  # noqa
143            LOGGER.warning("Policy failed to run", exc=exc)
144            result = PolicyResult(False, str(exc))
145        finally:
146            self.connection.send(result)

Task wrapper to run policy checking