authentik.lib.utils.db

authentik database utilities

 1"""authentik database utilities"""
 2
 3import gc
 4from collections.abc import Generator
 5
 6from django.db import reset_queries
 7from django.db.models import Model, QuerySet
 8
 9
10def chunked_queryset[T: Model](queryset: QuerySet[T], chunk_size: int = 1_000) -> Generator[T]:
11    if not queryset.exists():
12        return []
13
14    def get_chunks(qs: QuerySet) -> Generator[QuerySet[T]]:
15        qs = qs.order_by("pk")
16        pks = qs.values_list("pk", flat=True)
17        # The outer queryset.exists() guard can race with a concurrent
18        # transaction that deletes the last matching row (or with a
19        # different isolation-level snapshot), so by the time this
20        # generator starts iterating the queryset may be empty and
21        # pks[0] would raise IndexError and crash the caller. Using
22        # .first() returns None on an empty queryset, which we bail
23        # out on cleanly. See goauthentik/authentik#21643.
24        start_pk = pks.first()
25        if start_pk is None:
26            return
27        while True:
28            try:
29                end_pk = pks.filter(pk__gte=start_pk)[chunk_size]
30            except IndexError:
31                break
32            yield qs.filter(pk__gte=start_pk, pk__lt=end_pk)
33            start_pk = end_pk
34        yield qs.filter(pk__gte=start_pk)
35
36    for chunk in get_chunks(queryset):
37        reset_queries()
38        gc.collect()
39        yield from chunk.iterator(chunk_size=chunk_size)
def chunked_queryset( queryset: django.db.models.query.QuerySet, chunk_size: int = 1000) -> Generator[T]:
11def chunked_queryset[T: Model](queryset: QuerySet[T], chunk_size: int = 1_000) -> Generator[T]:
12    if not queryset.exists():
13        return []
14
15    def get_chunks(qs: QuerySet) -> Generator[QuerySet[T]]:
16        qs = qs.order_by("pk")
17        pks = qs.values_list("pk", flat=True)
18        # The outer queryset.exists() guard can race with a concurrent
19        # transaction that deletes the last matching row (or with a
20        # different isolation-level snapshot), so by the time this
21        # generator starts iterating the queryset may be empty and
22        # pks[0] would raise IndexError and crash the caller. Using
23        # .first() returns None on an empty queryset, which we bail
24        # out on cleanly. See goauthentik/authentik#21643.
25        start_pk = pks.first()
26        if start_pk is None:
27            return
28        while True:
29            try:
30                end_pk = pks.filter(pk__gte=start_pk)[chunk_size]
31            except IndexError:
32                break
33            yield qs.filter(pk__gte=start_pk, pk__lt=end_pk)
34            start_pk = end_pk
35        yield qs.filter(pk__gte=start_pk)
36
37    for chunk in get_chunks(queryset):
38        reset_queries()
39        gc.collect()
40        yield from chunk.iterator(chunk_size=chunk_size)