authentik.enterprise.policies.unique_password.tasks

 1from django.db.models.aggregates import Count
 2from django.utils.translation import gettext_lazy as _
 3from dramatiq.actor import actor
 4from structlog import get_logger
 5
 6from authentik.enterprise.policies.unique_password.models import (
 7    UniquePasswordPolicy,
 8    UserPasswordHistory,
 9)
10from authentik.tasks.middleware import CurrentTask
11
12LOGGER = get_logger()
13
14
15@actor(
16    description=_(
17        "Check if any UniquePasswordPolicy exists, and if not, purge the password history table."
18    )
19)
20def check_and_purge_password_history():
21    self = CurrentTask.get_task()
22
23    if not UniquePasswordPolicy.objects.exists():
24        UserPasswordHistory.objects.all().delete()
25        LOGGER.debug("Purged UserPasswordHistory table as no policies are in use")
26        self.info("Successfully purged UserPasswordHistory")
27        return
28
29    self.info("Not purging password histories, a unique password policy exists")
30
31
32@actor(description=_("Remove user password history that are too old."))
33def trim_password_histories():
34    """Removes rows from UserPasswordHistory older than
35    the `n` most recent entries.
36
37    The `n` is defined by the largest configured value for all bound
38    UniquePasswordPolicy policies.
39    """
40
41    self = CurrentTask.get_task()
42
43    # No policy, we'll let the cleanup above do its thing
44    if not UniquePasswordPolicy.objects.exists():
45        return
46
47    num_rows_to_preserve = 0
48    for policy in UniquePasswordPolicy.objects.all():
49        num_rows_to_preserve = max(num_rows_to_preserve, policy.num_historical_passwords)
50
51    all_pks_to_keep = []
52
53    # Get all users who have password history entries
54    users_with_history = (
55        UserPasswordHistory.objects.values("user")
56        .annotate(count=Count("user"))
57        .filter(count__gt=0)
58        .values_list("user", flat=True)
59    )
60    for user_pk in users_with_history:
61        entries = UserPasswordHistory.objects.filter(user__pk=user_pk)
62        pks_to_keep = entries.order_by("-created_at")[:num_rows_to_preserve].values_list(
63            "pk", flat=True
64        )
65        all_pks_to_keep.extend(pks_to_keep)
66
67    num_deleted, _ = UserPasswordHistory.objects.exclude(pk__in=all_pks_to_keep).delete()
68    LOGGER.debug("Deleted stale password history records", count=num_deleted)
69    self.info(f"Delete {num_deleted} stale password history records")
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
trim_password_histories = Actor(<function trim_password_histories>, queue_name='default', actor_name='trim_password_histories')

Removes rows from UserPasswordHistory older than the n most recent entries.

The n is defined by the largest configured value for all bound UniquePasswordPolicy policies.