authentik.flows.signals
authentik flow signals
1"""authentik flow signals""" 2 3from django.core.cache import cache 4from django.db import connection 5from django.db.models.signals import post_save, pre_delete 6from django.dispatch import receiver 7from structlog.stdlib import get_logger 8 9from authentik.flows.apps import GAUGE_FLOWS_CACHED 10from authentik.flows.planner import CACHE_PREFIX 11from authentik.root.monitoring import monitoring_set 12 13LOGGER = get_logger() 14 15 16def delete_cache_prefix(prefix: str) -> int: 17 """Delete keys prefixed with `prefix` and return count of deleted keys.""" 18 keys = cache.keys(prefix) 19 cache.delete_many(keys) 20 return len(keys) 21 22 23@receiver(monitoring_set) 24def monitoring_set_flows(sender, **kwargs): 25 """set flow gauges""" 26 GAUGE_FLOWS_CACHED.labels(tenant=connection.schema_name).set( 27 len(cache.keys(f"{CACHE_PREFIX}*") or []) 28 ) 29 30 31@receiver(post_save) 32@receiver(pre_delete) 33def invalidate_flow_cache(sender, instance, **_): 34 """Invalidate flow cache when flow is updated""" 35 from authentik.flows.models import Flow, FlowStageBinding, Stage 36 from authentik.flows.planner import cache_key 37 38 if isinstance(instance, Flow): 39 total = delete_cache_prefix(f"{cache_key(instance)}*") 40 LOGGER.debug("Invalidating Flow cache", flow=instance, len=total) 41 if isinstance(instance, FlowStageBinding) and instance.target_id: 42 total = delete_cache_prefix(f"{cache_key(instance.target)}*") 43 LOGGER.debug("Invalidating Flow cache from FlowStageBinding", binding=instance, len=total) 44 if isinstance(instance, Stage): 45 total = 0 46 for binding in FlowStageBinding.objects.filter(stage=instance): 47 prefix = cache_key(binding.target) 48 total += delete_cache_prefix(f"{prefix}*") 49 LOGGER.debug("Invalidating Flow cache from Stage", stage=instance, len=total)
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
def
delete_cache_prefix(prefix: str) -> int:
17def delete_cache_prefix(prefix: str) -> int: 18 """Delete keys prefixed with `prefix` and return count of deleted keys.""" 19 keys = cache.keys(prefix) 20 cache.delete_many(keys) 21 return len(keys)
Delete keys prefixed with prefix and return count of deleted keys.
@receiver(monitoring_set)
def
monitoring_set_flows(sender, **kwargs):
24@receiver(monitoring_set) 25def monitoring_set_flows(sender, **kwargs): 26 """set flow gauges""" 27 GAUGE_FLOWS_CACHED.labels(tenant=connection.schema_name).set( 28 len(cache.keys(f"{CACHE_PREFIX}*") or []) 29 )
set flow gauges
@receiver(post_save)
@receiver(pre_delete)
def
invalidate_flow_cache(sender, instance, **_):
32@receiver(post_save) 33@receiver(pre_delete) 34def invalidate_flow_cache(sender, instance, **_): 35 """Invalidate flow cache when flow is updated""" 36 from authentik.flows.models import Flow, FlowStageBinding, Stage 37 from authentik.flows.planner import cache_key 38 39 if isinstance(instance, Flow): 40 total = delete_cache_prefix(f"{cache_key(instance)}*") 41 LOGGER.debug("Invalidating Flow cache", flow=instance, len=total) 42 if isinstance(instance, FlowStageBinding) and instance.target_id: 43 total = delete_cache_prefix(f"{cache_key(instance.target)}*") 44 LOGGER.debug("Invalidating Flow cache from FlowStageBinding", binding=instance, len=total) 45 if isinstance(instance, Stage): 46 total = 0 47 for binding in FlowStageBinding.objects.filter(stage=instance): 48 prefix = cache_key(binding.target) 49 total += delete_cache_prefix(f"{prefix}*") 50 LOGGER.debug("Invalidating Flow cache from Stage", stage=instance, len=total)
Invalidate flow cache when flow is updated