authentik.flows.signals

authentik flow signals

 1"""authentik flow signals"""
 2
 3from django.core.cache import cache
 4from django.db import connection
 5from django.db.models.signals import post_save, pre_delete
 6from django.dispatch import receiver
 7from structlog.stdlib import get_logger
 8
 9from authentik.flows.apps import GAUGE_FLOWS_CACHED
10from authentik.flows.planner import CACHE_PREFIX
11from authentik.root.monitoring import monitoring_set
12
13LOGGER = get_logger()
14
15
16def delete_cache_prefix(prefix: str) -> int:
17    """Delete keys prefixed with `prefix` and return count of deleted keys."""
18    keys = cache.keys(prefix)
19    cache.delete_many(keys)
20    return len(keys)
21
22
23@receiver(monitoring_set)
24def monitoring_set_flows(sender, **kwargs):
25    """set flow gauges"""
26    GAUGE_FLOWS_CACHED.labels(tenant=connection.schema_name).set(
27        len(cache.keys(f"{CACHE_PREFIX}*") or [])
28    )
29
30
31@receiver(post_save)
32@receiver(pre_delete)
33def invalidate_flow_cache(sender, instance, **_):
34    """Invalidate flow cache when flow is updated"""
35    from authentik.flows.models import Flow, FlowStageBinding, Stage
36    from authentik.flows.planner import cache_key
37
38    if isinstance(instance, Flow):
39        total = delete_cache_prefix(f"{cache_key(instance)}*")
40        LOGGER.debug("Invalidating Flow cache", flow=instance, len=total)
41    if isinstance(instance, FlowStageBinding) and instance.target_id:
42        total = delete_cache_prefix(f"{cache_key(instance.target)}*")
43        LOGGER.debug("Invalidating Flow cache from FlowStageBinding", binding=instance, len=total)
44    if isinstance(instance, Stage):
45        total = 0
46        for binding in FlowStageBinding.objects.filter(stage=instance):
47            prefix = cache_key(binding.target)
48            total += delete_cache_prefix(f"{prefix}*")
49        LOGGER.debug("Invalidating Flow cache from Stage", stage=instance, len=total)
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
def delete_cache_prefix(prefix: str) -> int:
17def delete_cache_prefix(prefix: str) -> int:
18    """Delete keys prefixed with `prefix` and return count of deleted keys."""
19    keys = cache.keys(prefix)
20    cache.delete_many(keys)
21    return len(keys)

Delete keys prefixed with prefix and return count of deleted keys.

@receiver(monitoring_set)
def monitoring_set_flows(sender, **kwargs):
24@receiver(monitoring_set)
25def monitoring_set_flows(sender, **kwargs):
26    """set flow gauges"""
27    GAUGE_FLOWS_CACHED.labels(tenant=connection.schema_name).set(
28        len(cache.keys(f"{CACHE_PREFIX}*") or [])
29    )

set flow gauges

@receiver(post_save)
@receiver(pre_delete)
def invalidate_flow_cache(sender, instance, **_):
32@receiver(post_save)
33@receiver(pre_delete)
34def invalidate_flow_cache(sender, instance, **_):
35    """Invalidate flow cache when flow is updated"""
36    from authentik.flows.models import Flow, FlowStageBinding, Stage
37    from authentik.flows.planner import cache_key
38
39    if isinstance(instance, Flow):
40        total = delete_cache_prefix(f"{cache_key(instance)}*")
41        LOGGER.debug("Invalidating Flow cache", flow=instance, len=total)
42    if isinstance(instance, FlowStageBinding) and instance.target_id:
43        total = delete_cache_prefix(f"{cache_key(instance.target)}*")
44        LOGGER.debug("Invalidating Flow cache from FlowStageBinding", binding=instance, len=total)
45    if isinstance(instance, Stage):
46        total = 0
47        for binding in FlowStageBinding.objects.filter(stage=instance):
48            prefix = cache_key(binding.target)
49            total += delete_cache_prefix(f"{prefix}*")
50        LOGGER.debug("Invalidating Flow cache from Stage", stage=instance, len=total)

Invalidate flow cache when flow is updated