authentik.flows.management.commands.benchmark

authentik benchmark command

  1"""authentik benchmark command"""
  2
  3from csv import DictWriter
  4from multiprocessing import Manager, cpu_count, get_context
  5from sys import stdout
  6from time import time
  7
  8from django import db
  9from django.core.management.base import BaseCommand
 10from django.test import RequestFactory
 11from structlog.stdlib import get_logger
 12
 13from authentik import authentik_version
 14from authentik.core.tests.utils import create_test_admin_user
 15from authentik.flows.models import Flow
 16from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlanner
 17
 18LOGGER = get_logger()
 19FORK_CTX = get_context("fork")
 20PROCESS_CLASS = FORK_CTX.Process
 21
 22
 23class FlowPlanProcess(PROCESS_CLASS):  # pragma: no cover
 24    """Test process which executes flow planner"""
 25
 26    def __init__(self, index, return_dict, flow, user) -> None:
 27        super().__init__()
 28        self.index = index
 29        self.return_dict = return_dict
 30        self.flow = flow
 31        self.user = user
 32        self.request = RequestFactory().get("/")
 33
 34    def run(self):
 35        """Execute 1000 flow plans"""
 36        LOGGER.info(f"Proc {self.index} Running")
 37
 38        def test_inner():
 39            planner = FlowPlanner(self.flow)
 40            planner.use_cache = False
 41            planner.plan(self.request, {PLAN_CONTEXT_PENDING_USER: self.user})
 42
 43        diffs = []
 44        for _ in range(1000):
 45            start = time()
 46            test_inner()
 47            end = time()
 48            diffs.append(end - start)
 49        self.return_dict[self.index] = diffs
 50
 51
 52class Command(BaseCommand):
 53    """Benchmark authentik"""
 54
 55    def add_arguments(self, parser):
 56        parser.add_argument(
 57            "-p",
 58            "--processes",
 59            default=cpu_count(),
 60            action="store",
 61            help="How many processes should be started.",
 62        )
 63        parser.add_argument(
 64            "--csv",
 65            action="store_true",
 66            help="Output results as CSV",
 67        )
 68
 69    def benchmark_flows(self, proc_count):
 70        """Get full recovery link"""
 71        flow = Flow.objects.get(slug="default-authentication-flow")
 72        user = create_test_admin_user()
 73        manager = Manager()
 74        return_dict = manager.dict()
 75
 76        jobs = []
 77        db.connections.close_all()
 78        for i in range(proc_count):
 79            proc = FlowPlanProcess(i, return_dict, flow, user)
 80            jobs.append(proc)
 81            proc.start()
 82
 83        for proc in jobs:
 84            proc.join()
 85        return return_dict.values()
 86
 87    def handle(self, *args, **options):
 88        """Start benchmark"""
 89        proc_count = options.get("processes", 1)
 90        all_values = self.benchmark_flows(proc_count)
 91        if options.get("csv"):
 92            self.output_csv(all_values)
 93        else:
 94            self.output_overview(all_values)
 95
 96    def output_overview(self, values):
 97        """Output results human readable"""
 98        total_max: int = max(max(inner) for inner in values)
 99        total_min: int = min(min(inner) for inner in values)
100        total_avg = sum(sum(inner) for inner in values) / sum(len(inner) for inner in values)
101
102        print(f"Version: {authentik_version()}")
103        print(f"Processes: {len(values)}")
104        print(f"\tMax: {total_max * 100}ms")
105        print(f"\tMin: {total_min * 100}ms")
106        print(f"\tAvg: {total_avg * 100}ms")
107
108    def output_csv(self, values):
109        """Output results as CSV"""
110        proc_count = len(values)
111        fieldnames = [f"proc_{idx}" for idx in range(proc_count)]
112        writer = DictWriter(stdout, fieldnames=fieldnames)
113
114        writer.writeheader()
115        for run_idx in range(len(values[0])):
116            row_dict = {}
117            for proc_idx in range(proc_count):
118                row_dict[f"proc_{proc_idx}"] = values[proc_idx][run_idx] * 100
119            writer.writerow(row_dict)
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
FORK_CTX = <multiprocessing.context.ForkContext object>
PROCESS_CLASS = <class 'multiprocessing.context.ForkProcess'>
class FlowPlanProcess(multiprocessing.context.ForkProcess):
24class FlowPlanProcess(PROCESS_CLASS):  # pragma: no cover
25    """Test process which executes flow planner"""
26
27    def __init__(self, index, return_dict, flow, user) -> None:
28        super().__init__()
29        self.index = index
30        self.return_dict = return_dict
31        self.flow = flow
32        self.user = user
33        self.request = RequestFactory().get("/")
34
35    def run(self):
36        """Execute 1000 flow plans"""
37        LOGGER.info(f"Proc {self.index} Running")
38
39        def test_inner():
40            planner = FlowPlanner(self.flow)
41            planner.use_cache = False
42            planner.plan(self.request, {PLAN_CONTEXT_PENDING_USER: self.user})
43
44        diffs = []
45        for _ in range(1000):
46            start = time()
47            test_inner()
48            end = time()
49            diffs.append(end - start)
50        self.return_dict[self.index] = diffs

Test process which executes flow planner

FlowPlanProcess(index, return_dict, flow, user)
27    def __init__(self, index, return_dict, flow, user) -> None:
28        super().__init__()
29        self.index = index
30        self.return_dict = return_dict
31        self.flow = flow
32        self.user = user
33        self.request = RequestFactory().get("/")
index
return_dict
flow
user
request
def run(self):
35    def run(self):
36        """Execute 1000 flow plans"""
37        LOGGER.info(f"Proc {self.index} Running")
38
39        def test_inner():
40            planner = FlowPlanner(self.flow)
41            planner.use_cache = False
42            planner.plan(self.request, {PLAN_CONTEXT_PENDING_USER: self.user})
43
44        diffs = []
45        for _ in range(1000):
46            start = time()
47            test_inner()
48            end = time()
49            diffs.append(end - start)
50        self.return_dict[self.index] = diffs

Execute 1000 flow plans

class Command(django.core.management.base.BaseCommand):
 53class Command(BaseCommand):
 54    """Benchmark authentik"""
 55
 56    def add_arguments(self, parser):
 57        parser.add_argument(
 58            "-p",
 59            "--processes",
 60            default=cpu_count(),
 61            action="store",
 62            help="How many processes should be started.",
 63        )
 64        parser.add_argument(
 65            "--csv",
 66            action="store_true",
 67            help="Output results as CSV",
 68        )
 69
 70    def benchmark_flows(self, proc_count):
 71        """Get full recovery link"""
 72        flow = Flow.objects.get(slug="default-authentication-flow")
 73        user = create_test_admin_user()
 74        manager = Manager()
 75        return_dict = manager.dict()
 76
 77        jobs = []
 78        db.connections.close_all()
 79        for i in range(proc_count):
 80            proc = FlowPlanProcess(i, return_dict, flow, user)
 81            jobs.append(proc)
 82            proc.start()
 83
 84        for proc in jobs:
 85            proc.join()
 86        return return_dict.values()
 87
 88    def handle(self, *args, **options):
 89        """Start benchmark"""
 90        proc_count = options.get("processes", 1)
 91        all_values = self.benchmark_flows(proc_count)
 92        if options.get("csv"):
 93            self.output_csv(all_values)
 94        else:
 95            self.output_overview(all_values)
 96
 97    def output_overview(self, values):
 98        """Output results human readable"""
 99        total_max: int = max(max(inner) for inner in values)
100        total_min: int = min(min(inner) for inner in values)
101        total_avg = sum(sum(inner) for inner in values) / sum(len(inner) for inner in values)
102
103        print(f"Version: {authentik_version()}")
104        print(f"Processes: {len(values)}")
105        print(f"\tMax: {total_max * 100}ms")
106        print(f"\tMin: {total_min * 100}ms")
107        print(f"\tAvg: {total_avg * 100}ms")
108
109    def output_csv(self, values):
110        """Output results as CSV"""
111        proc_count = len(values)
112        fieldnames = [f"proc_{idx}" for idx in range(proc_count)]
113        writer = DictWriter(stdout, fieldnames=fieldnames)
114
115        writer.writeheader()
116        for run_idx in range(len(values[0])):
117            row_dict = {}
118            for proc_idx in range(proc_count):
119                row_dict[f"proc_{proc_idx}"] = values[proc_idx][run_idx] * 100
120            writer.writerow(row_dict)

Benchmark authentik

def add_arguments(self, parser):
56    def add_arguments(self, parser):
57        parser.add_argument(
58            "-p",
59            "--processes",
60            default=cpu_count(),
61            action="store",
62            help="How many processes should be started.",
63        )
64        parser.add_argument(
65            "--csv",
66            action="store_true",
67            help="Output results as CSV",
68        )

Entry point for subclassed commands to add custom arguments.

def benchmark_flows(self, proc_count):
70    def benchmark_flows(self, proc_count):
71        """Get full recovery link"""
72        flow = Flow.objects.get(slug="default-authentication-flow")
73        user = create_test_admin_user()
74        manager = Manager()
75        return_dict = manager.dict()
76
77        jobs = []
78        db.connections.close_all()
79        for i in range(proc_count):
80            proc = FlowPlanProcess(i, return_dict, flow, user)
81            jobs.append(proc)
82            proc.start()
83
84        for proc in jobs:
85            proc.join()
86        return return_dict.values()

Get full recovery link

def handle(self, *args, **options):
88    def handle(self, *args, **options):
89        """Start benchmark"""
90        proc_count = options.get("processes", 1)
91        all_values = self.benchmark_flows(proc_count)
92        if options.get("csv"):
93            self.output_csv(all_values)
94        else:
95            self.output_overview(all_values)

Start benchmark

def output_overview(self, values):
 97    def output_overview(self, values):
 98        """Output results human readable"""
 99        total_max: int = max(max(inner) for inner in values)
100        total_min: int = min(min(inner) for inner in values)
101        total_avg = sum(sum(inner) for inner in values) / sum(len(inner) for inner in values)
102
103        print(f"Version: {authentik_version()}")
104        print(f"Processes: {len(values)}")
105        print(f"\tMax: {total_max * 100}ms")
106        print(f"\tMin: {total_min * 100}ms")
107        print(f"\tAvg: {total_avg * 100}ms")

Output results human readable

def output_csv(self, values):
109    def output_csv(self, values):
110        """Output results as CSV"""
111        proc_count = len(values)
112        fieldnames = [f"proc_{idx}" for idx in range(proc_count)]
113        writer = DictWriter(stdout, fieldnames=fieldnames)
114
115        writer.writeheader()
116        for run_idx in range(len(values[0])):
117            row_dict = {}
118            for proc_idx in range(proc_count):
119                row_dict[f"proc_{proc_idx}"] = values[proc_idx][run_idx] * 100
120            writer.writerow(row_dict)

Output results as CSV