authentik.policies.process
authentik policy task
1"""authentik policy task""" 2 3from multiprocessing import get_context 4from multiprocessing.connection import Connection 5from time import perf_counter 6 7from django.core.cache import cache 8from sentry_sdk import start_span 9from sentry_sdk.tracing import Span 10from structlog.stdlib import get_logger 11 12from authentik.events.models import Event, EventAction 13from authentik.lib.config import CONFIG 14from authentik.lib.utils.errors import exception_to_dict 15from authentik.policies.exceptions import PolicyException 16from authentik.policies.models import PolicyBinding 17from authentik.policies.types import CACHE_PREFIX, PolicyRequest, PolicyResult 18 19LOGGER = get_logger() 20 21FORK_CTX = get_context("fork") 22CACHE_TIMEOUT = CONFIG.get_int("cache.timeout_policies") 23PROCESS_CLASS = FORK_CTX.Process 24 25 26def cache_key(binding: PolicyBinding, request: PolicyRequest) -> str: 27 """Generate Cache key for policy""" 28 prefix = f"{CACHE_PREFIX}{binding.policy_binding_uuid.hex}_" 29 if request.http_request and hasattr(request.http_request, "session"): 30 prefix += f"_{request.http_request.session.session_key}" 31 if request.user: 32 prefix += f"#{request.user.pk}" 33 return prefix 34 35 36class PolicyProcess(PROCESS_CLASS): 37 """Evaluate a single policy within a separate process""" 38 39 connection: Connection 40 binding: PolicyBinding 41 request: PolicyRequest 42 43 def __init__( 44 self, 45 binding: PolicyBinding, 46 request: PolicyRequest, 47 connection: Connection | None, 48 ): 49 super().__init__() 50 self.binding = binding 51 self.request = request 52 if not isinstance(self.request, PolicyRequest): 53 raise ValueError(f"{self.request} is not a Policy Request.") 54 if connection: 55 self.connection = connection 56 57 def create_event(self, action: str, message: str, **kwargs): 58 """Create event with common values from `self.request` and `self.binding`.""" 59 event = Event.new( 60 action=action, 61 message=message, 62 policy_uuid=self.binding.policy.policy_uuid.hex, 63 binding=self.binding, 64 request=self.request, 65 **kwargs, 66 ) 67 event.set_user(self.request.user) 68 if self.request.http_request: 69 event.from_http(self.request.http_request) 70 else: 71 event.save() 72 73 def execute(self) -> PolicyResult: 74 """Run actual policy, returns result""" 75 LOGGER.debug( 76 "P_ENG(proc): Running policy", 77 policy=self.binding.policy, 78 user=self.request.user.username, 79 # this is used for filtering in access checking where logs are sent to the admin 80 process="PolicyProcess", 81 ) 82 try: 83 policy_result = self.binding.passes(self.request) 84 # Invert result if policy.negate is set 85 if self.binding.negate: 86 policy_result.passing = not policy_result.passing 87 if self.binding.policy and not self.request.debug: 88 if self.binding.policy.execution_logging: 89 self.create_event( 90 EventAction.POLICY_EXECUTION, 91 message="Policy Execution", 92 result=policy_result, 93 ) 94 except PolicyException as exc: 95 # Either use passed original exception or whatever we have 96 src_exc = exc.src_exc if exc.src_exc else exc 97 # Create policy exception event, only when we're not debugging 98 if not self.request.debug: 99 self.create_event( 100 EventAction.POLICY_EXCEPTION, 101 message="Policy failed to execute", 102 exception=exception_to_dict(src_exc), 103 ) 104 LOGGER.debug("P_ENG(proc): error, using failure result", exc=src_exc) 105 policy_result = PolicyResult(self.binding.failure_result, str(src_exc)) 106 policy_result.source_binding = self.binding 107 should_cache = self.request.should_cache 108 if should_cache: 109 key = cache_key(self.binding, self.request) 110 cache.set(key, policy_result, CACHE_TIMEOUT) 111 LOGGER.debug( 112 "P_ENG(proc): finished", 113 policy=self.binding.policy, 114 cached=should_cache, 115 result=policy_result, 116 # this is used for filtering in access checking where logs are sent to the admin 117 process="PolicyProcess", 118 passing=policy_result.passing, 119 user=self.request.user.username, 120 ) 121 return policy_result 122 123 def profiling_wrapper(self): 124 """Run with profiling enabled""" 125 with start_span( 126 op="authentik.policy.process.execute", 127 ) as span: 128 span: Span 129 span.set_data("policy", self.binding.policy) 130 span.set_data("request", self.request) 131 return self.execute() 132 133 def run(self): # pragma: no cover 134 """Task wrapper to run policy checking""" 135 result = None 136 try: 137 start = perf_counter() 138 result = self.profiling_wrapper() 139 end = perf_counter() 140 result._exec_time = max((end - start), 0) 141 except Exception as exc: # noqa 142 LOGGER.warning("Policy failed to run", exc=exc) 143 result = PolicyResult(False, str(exc)) 144 finally: 145 self.connection.send(result)
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
FORK_CTX =
<multiprocessing.context.ForkContext object>
CACHE_TIMEOUT =
300
PROCESS_CLASS =
<class 'multiprocessing.context.ForkProcess'>
def
cache_key( binding: authentik.policies.models.PolicyBinding, request: authentik.policies.types.PolicyRequest) -> str:
27def cache_key(binding: PolicyBinding, request: PolicyRequest) -> str: 28 """Generate Cache key for policy""" 29 prefix = f"{CACHE_PREFIX}{binding.policy_binding_uuid.hex}_" 30 if request.http_request and hasattr(request.http_request, "session"): 31 prefix += f"_{request.http_request.session.session_key}" 32 if request.user: 33 prefix += f"#{request.user.pk}" 34 return prefix
Generate Cache key for policy
class
PolicyProcess(multiprocessing.context.ForkProcess):
37class PolicyProcess(PROCESS_CLASS): 38 """Evaluate a single policy within a separate process""" 39 40 connection: Connection 41 binding: PolicyBinding 42 request: PolicyRequest 43 44 def __init__( 45 self, 46 binding: PolicyBinding, 47 request: PolicyRequest, 48 connection: Connection | None, 49 ): 50 super().__init__() 51 self.binding = binding 52 self.request = request 53 if not isinstance(self.request, PolicyRequest): 54 raise ValueError(f"{self.request} is not a Policy Request.") 55 if connection: 56 self.connection = connection 57 58 def create_event(self, action: str, message: str, **kwargs): 59 """Create event with common values from `self.request` and `self.binding`.""" 60 event = Event.new( 61 action=action, 62 message=message, 63 policy_uuid=self.binding.policy.policy_uuid.hex, 64 binding=self.binding, 65 request=self.request, 66 **kwargs, 67 ) 68 event.set_user(self.request.user) 69 if self.request.http_request: 70 event.from_http(self.request.http_request) 71 else: 72 event.save() 73 74 def execute(self) -> PolicyResult: 75 """Run actual policy, returns result""" 76 LOGGER.debug( 77 "P_ENG(proc): Running policy", 78 policy=self.binding.policy, 79 user=self.request.user.username, 80 # this is used for filtering in access checking where logs are sent to the admin 81 process="PolicyProcess", 82 ) 83 try: 84 policy_result = self.binding.passes(self.request) 85 # Invert result if policy.negate is set 86 if self.binding.negate: 87 policy_result.passing = not policy_result.passing 88 if self.binding.policy and not self.request.debug: 89 if self.binding.policy.execution_logging: 90 self.create_event( 91 EventAction.POLICY_EXECUTION, 92 message="Policy Execution", 93 result=policy_result, 94 ) 95 except PolicyException as exc: 96 # Either use passed original exception or whatever we have 97 src_exc = exc.src_exc if exc.src_exc else exc 98 # Create policy exception event, only when we're not debugging 99 if not self.request.debug: 100 self.create_event( 101 EventAction.POLICY_EXCEPTION, 102 message="Policy failed to execute", 103 exception=exception_to_dict(src_exc), 104 ) 105 LOGGER.debug("P_ENG(proc): error, using failure result", exc=src_exc) 106 policy_result = PolicyResult(self.binding.failure_result, str(src_exc)) 107 policy_result.source_binding = self.binding 108 should_cache = self.request.should_cache 109 if should_cache: 110 key = cache_key(self.binding, self.request) 111 cache.set(key, policy_result, CACHE_TIMEOUT) 112 LOGGER.debug( 113 "P_ENG(proc): finished", 114 policy=self.binding.policy, 115 cached=should_cache, 116 result=policy_result, 117 # this is used for filtering in access checking where logs are sent to the admin 118 process="PolicyProcess", 119 passing=policy_result.passing, 120 user=self.request.user.username, 121 ) 122 return policy_result 123 124 def profiling_wrapper(self): 125 """Run with profiling enabled""" 126 with start_span( 127 op="authentik.policy.process.execute", 128 ) as span: 129 span: Span 130 span.set_data("policy", self.binding.policy) 131 span.set_data("request", self.request) 132 return self.execute() 133 134 def run(self): # pragma: no cover 135 """Task wrapper to run policy checking""" 136 result = None 137 try: 138 start = perf_counter() 139 result = self.profiling_wrapper() 140 end = perf_counter() 141 result._exec_time = max((end - start), 0) 142 except Exception as exc: # noqa 143 LOGGER.warning("Policy failed to run", exc=exc) 144 result = PolicyResult(False, str(exc)) 145 finally: 146 self.connection.send(result)
Evaluate a single policy within a separate process
PolicyProcess( binding: authentik.policies.models.PolicyBinding, request: authentik.policies.types.PolicyRequest, connection: multiprocessing.connection.Connection | None)
44 def __init__( 45 self, 46 binding: PolicyBinding, 47 request: PolicyRequest, 48 connection: Connection | None, 49 ): 50 super().__init__() 51 self.binding = binding 52 self.request = request 53 if not isinstance(self.request, PolicyRequest): 54 raise ValueError(f"{self.request} is not a Policy Request.") 55 if connection: 56 self.connection = connection
def
create_event(self, action: str, message: str, **kwargs):
58 def create_event(self, action: str, message: str, **kwargs): 59 """Create event with common values from `self.request` and `self.binding`.""" 60 event = Event.new( 61 action=action, 62 message=message, 63 policy_uuid=self.binding.policy.policy_uuid.hex, 64 binding=self.binding, 65 request=self.request, 66 **kwargs, 67 ) 68 event.set_user(self.request.user) 69 if self.request.http_request: 70 event.from_http(self.request.http_request) 71 else: 72 event.save()
Create event with common values from self.request and self.binding.
74 def execute(self) -> PolicyResult: 75 """Run actual policy, returns result""" 76 LOGGER.debug( 77 "P_ENG(proc): Running policy", 78 policy=self.binding.policy, 79 user=self.request.user.username, 80 # this is used for filtering in access checking where logs are sent to the admin 81 process="PolicyProcess", 82 ) 83 try: 84 policy_result = self.binding.passes(self.request) 85 # Invert result if policy.negate is set 86 if self.binding.negate: 87 policy_result.passing = not policy_result.passing 88 if self.binding.policy and not self.request.debug: 89 if self.binding.policy.execution_logging: 90 self.create_event( 91 EventAction.POLICY_EXECUTION, 92 message="Policy Execution", 93 result=policy_result, 94 ) 95 except PolicyException as exc: 96 # Either use passed original exception or whatever we have 97 src_exc = exc.src_exc if exc.src_exc else exc 98 # Create policy exception event, only when we're not debugging 99 if not self.request.debug: 100 self.create_event( 101 EventAction.POLICY_EXCEPTION, 102 message="Policy failed to execute", 103 exception=exception_to_dict(src_exc), 104 ) 105 LOGGER.debug("P_ENG(proc): error, using failure result", exc=src_exc) 106 policy_result = PolicyResult(self.binding.failure_result, str(src_exc)) 107 policy_result.source_binding = self.binding 108 should_cache = self.request.should_cache 109 if should_cache: 110 key = cache_key(self.binding, self.request) 111 cache.set(key, policy_result, CACHE_TIMEOUT) 112 LOGGER.debug( 113 "P_ENG(proc): finished", 114 policy=self.binding.policy, 115 cached=should_cache, 116 result=policy_result, 117 # this is used for filtering in access checking where logs are sent to the admin 118 process="PolicyProcess", 119 passing=policy_result.passing, 120 user=self.request.user.username, 121 ) 122 return policy_result
Run actual policy, returns result
def
profiling_wrapper(self):
124 def profiling_wrapper(self): 125 """Run with profiling enabled""" 126 with start_span( 127 op="authentik.policy.process.execute", 128 ) as span: 129 span: Span 130 span.set_data("policy", self.binding.policy) 131 span.set_data("request", self.request) 132 return self.execute()
Run with profiling enabled
def
run(self):
134 def run(self): # pragma: no cover 135 """Task wrapper to run policy checking""" 136 result = None 137 try: 138 start = perf_counter() 139 result = self.profiling_wrapper() 140 end = perf_counter() 141 result._exec_time = max((end - start), 0) 142 except Exception as exc: # noqa 143 LOGGER.warning("Policy failed to run", exc=exc) 144 result = PolicyResult(False, str(exc)) 145 finally: 146 self.connection.send(result)
Task wrapper to run policy checking