authentik.lib.utils.db
authentik database utilities
1"""authentik database utilities""" 2 3import gc 4from collections.abc import Generator 5 6from django.db import reset_queries 7from django.db.models import Model, QuerySet 8 9 10def chunked_queryset[T: Model](queryset: QuerySet[T], chunk_size: int = 1_000) -> Generator[T]: 11 if not queryset.exists(): 12 return [] 13 14 def get_chunks(qs: QuerySet) -> Generator[QuerySet[T]]: 15 qs = qs.order_by("pk") 16 pks = qs.values_list("pk", flat=True) 17 # The outer queryset.exists() guard can race with a concurrent 18 # transaction that deletes the last matching row (or with a 19 # different isolation-level snapshot), so by the time this 20 # generator starts iterating the queryset may be empty and 21 # pks[0] would raise IndexError and crash the caller. Using 22 # .first() returns None on an empty queryset, which we bail 23 # out on cleanly. See goauthentik/authentik#21643. 24 start_pk = pks.first() 25 if start_pk is None: 26 return 27 while True: 28 try: 29 end_pk = pks.filter(pk__gte=start_pk)[chunk_size] 30 except IndexError: 31 break 32 yield qs.filter(pk__gte=start_pk, pk__lt=end_pk) 33 start_pk = end_pk 34 yield qs.filter(pk__gte=start_pk) 35 36 for chunk in get_chunks(queryset): 37 reset_queries() 38 gc.collect() 39 yield from chunk.iterator(chunk_size=chunk_size)
def
chunked_queryset( queryset: django.db.models.query.QuerySet, chunk_size: int = 1000) -> Generator[T]:
11def chunked_queryset[T: Model](queryset: QuerySet[T], chunk_size: int = 1_000) -> Generator[T]: 12 if not queryset.exists(): 13 return [] 14 15 def get_chunks(qs: QuerySet) -> Generator[QuerySet[T]]: 16 qs = qs.order_by("pk") 17 pks = qs.values_list("pk", flat=True) 18 # The outer queryset.exists() guard can race with a concurrent 19 # transaction that deletes the last matching row (or with a 20 # different isolation-level snapshot), so by the time this 21 # generator starts iterating the queryset may be empty and 22 # pks[0] would raise IndexError and crash the caller. Using 23 # .first() returns None on an empty queryset, which we bail 24 # out on cleanly. See goauthentik/authentik#21643. 25 start_pk = pks.first() 26 if start_pk is None: 27 return 28 while True: 29 try: 30 end_pk = pks.filter(pk__gte=start_pk)[chunk_size] 31 except IndexError: 32 break 33 yield qs.filter(pk__gte=start_pk, pk__lt=end_pk) 34 start_pk = end_pk 35 yield qs.filter(pk__gte=start_pk) 36 37 for chunk in get_chunks(queryset): 38 reset_queries() 39 gc.collect() 40 yield from chunk.iterator(chunk_size=chunk_size)