authentik.flows.management.commands.benchmark
authentik benchmark command
1"""authentik benchmark command""" 2 3from csv import DictWriter 4from multiprocessing import Manager, cpu_count, get_context 5from sys import stdout 6from time import time 7 8from django import db 9from django.core.management.base import BaseCommand 10from django.test import RequestFactory 11from structlog.stdlib import get_logger 12 13from authentik import authentik_version 14from authentik.core.tests.utils import create_test_admin_user 15from authentik.flows.models import Flow 16from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlanner 17 18LOGGER = get_logger() 19FORK_CTX = get_context("fork") 20PROCESS_CLASS = FORK_CTX.Process 21 22 23class FlowPlanProcess(PROCESS_CLASS): # pragma: no cover 24 """Test process which executes flow planner""" 25 26 def __init__(self, index, return_dict, flow, user) -> None: 27 super().__init__() 28 self.index = index 29 self.return_dict = return_dict 30 self.flow = flow 31 self.user = user 32 self.request = RequestFactory().get("/") 33 34 def run(self): 35 """Execute 1000 flow plans""" 36 LOGGER.info(f"Proc {self.index} Running") 37 38 def test_inner(): 39 planner = FlowPlanner(self.flow) 40 planner.use_cache = False 41 planner.plan(self.request, {PLAN_CONTEXT_PENDING_USER: self.user}) 42 43 diffs = [] 44 for _ in range(1000): 45 start = time() 46 test_inner() 47 end = time() 48 diffs.append(end - start) 49 self.return_dict[self.index] = diffs 50 51 52class Command(BaseCommand): 53 """Benchmark authentik""" 54 55 def add_arguments(self, parser): 56 parser.add_argument( 57 "-p", 58 "--processes", 59 default=cpu_count(), 60 action="store", 61 help="How many processes should be started.", 62 ) 63 parser.add_argument( 64 "--csv", 65 action="store_true", 66 help="Output results as CSV", 67 ) 68 69 def benchmark_flows(self, proc_count): 70 """Get full recovery link""" 71 flow = Flow.objects.get(slug="default-authentication-flow") 72 user = create_test_admin_user() 73 manager = Manager() 74 return_dict = manager.dict() 75 76 jobs = [] 77 db.connections.close_all() 78 for i in range(proc_count): 79 proc = FlowPlanProcess(i, return_dict, flow, user) 80 jobs.append(proc) 81 proc.start() 82 83 for proc in jobs: 84 proc.join() 85 return return_dict.values() 86 87 def handle(self, *args, **options): 88 """Start benchmark""" 89 proc_count = options.get("processes", 1) 90 all_values = self.benchmark_flows(proc_count) 91 if options.get("csv"): 92 self.output_csv(all_values) 93 else: 94 self.output_overview(all_values) 95 96 def output_overview(self, values): 97 """Output results human readable""" 98 total_max: int = max(max(inner) for inner in values) 99 total_min: int = min(min(inner) for inner in values) 100 total_avg = sum(sum(inner) for inner in values) / sum(len(inner) for inner in values) 101 102 print(f"Version: {authentik_version()}") 103 print(f"Processes: {len(values)}") 104 print(f"\tMax: {total_max * 100}ms") 105 print(f"\tMin: {total_min * 100}ms") 106 print(f"\tAvg: {total_avg * 100}ms") 107 108 def output_csv(self, values): 109 """Output results as CSV""" 110 proc_count = len(values) 111 fieldnames = [f"proc_{idx}" for idx in range(proc_count)] 112 writer = DictWriter(stdout, fieldnames=fieldnames) 113 114 writer.writeheader() 115 for run_idx in range(len(values[0])): 116 row_dict = {} 117 for proc_idx in range(proc_count): 118 row_dict[f"proc_{proc_idx}"] = values[proc_idx][run_idx] * 100 119 writer.writerow(row_dict)
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
FORK_CTX =
<multiprocessing.context.ForkContext object>
PROCESS_CLASS =
<class 'multiprocessing.context.ForkProcess'>
class
FlowPlanProcess(multiprocessing.context.ForkProcess):
24class FlowPlanProcess(PROCESS_CLASS): # pragma: no cover 25 """Test process which executes flow planner""" 26 27 def __init__(self, index, return_dict, flow, user) -> None: 28 super().__init__() 29 self.index = index 30 self.return_dict = return_dict 31 self.flow = flow 32 self.user = user 33 self.request = RequestFactory().get("/") 34 35 def run(self): 36 """Execute 1000 flow plans""" 37 LOGGER.info(f"Proc {self.index} Running") 38 39 def test_inner(): 40 planner = FlowPlanner(self.flow) 41 planner.use_cache = False 42 planner.plan(self.request, {PLAN_CONTEXT_PENDING_USER: self.user}) 43 44 diffs = [] 45 for _ in range(1000): 46 start = time() 47 test_inner() 48 end = time() 49 diffs.append(end - start) 50 self.return_dict[self.index] = diffs
Test process which executes flow planner
def
run(self):
35 def run(self): 36 """Execute 1000 flow plans""" 37 LOGGER.info(f"Proc {self.index} Running") 38 39 def test_inner(): 40 planner = FlowPlanner(self.flow) 41 planner.use_cache = False 42 planner.plan(self.request, {PLAN_CONTEXT_PENDING_USER: self.user}) 43 44 diffs = [] 45 for _ in range(1000): 46 start = time() 47 test_inner() 48 end = time() 49 diffs.append(end - start) 50 self.return_dict[self.index] = diffs
Execute 1000 flow plans
class
Command(django.core.management.base.BaseCommand):
53class Command(BaseCommand): 54 """Benchmark authentik""" 55 56 def add_arguments(self, parser): 57 parser.add_argument( 58 "-p", 59 "--processes", 60 default=cpu_count(), 61 action="store", 62 help="How many processes should be started.", 63 ) 64 parser.add_argument( 65 "--csv", 66 action="store_true", 67 help="Output results as CSV", 68 ) 69 70 def benchmark_flows(self, proc_count): 71 """Get full recovery link""" 72 flow = Flow.objects.get(slug="default-authentication-flow") 73 user = create_test_admin_user() 74 manager = Manager() 75 return_dict = manager.dict() 76 77 jobs = [] 78 db.connections.close_all() 79 for i in range(proc_count): 80 proc = FlowPlanProcess(i, return_dict, flow, user) 81 jobs.append(proc) 82 proc.start() 83 84 for proc in jobs: 85 proc.join() 86 return return_dict.values() 87 88 def handle(self, *args, **options): 89 """Start benchmark""" 90 proc_count = options.get("processes", 1) 91 all_values = self.benchmark_flows(proc_count) 92 if options.get("csv"): 93 self.output_csv(all_values) 94 else: 95 self.output_overview(all_values) 96 97 def output_overview(self, values): 98 """Output results human readable""" 99 total_max: int = max(max(inner) for inner in values) 100 total_min: int = min(min(inner) for inner in values) 101 total_avg = sum(sum(inner) for inner in values) / sum(len(inner) for inner in values) 102 103 print(f"Version: {authentik_version()}") 104 print(f"Processes: {len(values)}") 105 print(f"\tMax: {total_max * 100}ms") 106 print(f"\tMin: {total_min * 100}ms") 107 print(f"\tAvg: {total_avg * 100}ms") 108 109 def output_csv(self, values): 110 """Output results as CSV""" 111 proc_count = len(values) 112 fieldnames = [f"proc_{idx}" for idx in range(proc_count)] 113 writer = DictWriter(stdout, fieldnames=fieldnames) 114 115 writer.writeheader() 116 for run_idx in range(len(values[0])): 117 row_dict = {} 118 for proc_idx in range(proc_count): 119 row_dict[f"proc_{proc_idx}"] = values[proc_idx][run_idx] * 100 120 writer.writerow(row_dict)
Benchmark authentik
def
add_arguments(self, parser):
56 def add_arguments(self, parser): 57 parser.add_argument( 58 "-p", 59 "--processes", 60 default=cpu_count(), 61 action="store", 62 help="How many processes should be started.", 63 ) 64 parser.add_argument( 65 "--csv", 66 action="store_true", 67 help="Output results as CSV", 68 )
Entry point for subclassed commands to add custom arguments.
def
benchmark_flows(self, proc_count):
70 def benchmark_flows(self, proc_count): 71 """Get full recovery link""" 72 flow = Flow.objects.get(slug="default-authentication-flow") 73 user = create_test_admin_user() 74 manager = Manager() 75 return_dict = manager.dict() 76 77 jobs = [] 78 db.connections.close_all() 79 for i in range(proc_count): 80 proc = FlowPlanProcess(i, return_dict, flow, user) 81 jobs.append(proc) 82 proc.start() 83 84 for proc in jobs: 85 proc.join() 86 return return_dict.values()
Get full recovery link
def
handle(self, *args, **options):
88 def handle(self, *args, **options): 89 """Start benchmark""" 90 proc_count = options.get("processes", 1) 91 all_values = self.benchmark_flows(proc_count) 92 if options.get("csv"): 93 self.output_csv(all_values) 94 else: 95 self.output_overview(all_values)
Start benchmark
def
output_overview(self, values):
97 def output_overview(self, values): 98 """Output results human readable""" 99 total_max: int = max(max(inner) for inner in values) 100 total_min: int = min(min(inner) for inner in values) 101 total_avg = sum(sum(inner) for inner in values) / sum(len(inner) for inner in values) 102 103 print(f"Version: {authentik_version()}") 104 print(f"Processes: {len(values)}") 105 print(f"\tMax: {total_max * 100}ms") 106 print(f"\tMin: {total_min * 100}ms") 107 print(f"\tAvg: {total_avg * 100}ms")
Output results human readable
def
output_csv(self, values):
109 def output_csv(self, values): 110 """Output results as CSV""" 111 proc_count = len(values) 112 fieldnames = [f"proc_{idx}" for idx in range(proc_count)] 113 writer = DictWriter(stdout, fieldnames=fieldnames) 114 115 writer.writeheader() 116 for run_idx in range(len(values[0])): 117 row_dict = {} 118 for proc_idx in range(proc_count): 119 row_dict[f"proc_{proc_idx}"] = values[proc_idx][run_idx] * 100 120 writer.writerow(row_dict)
Output results as CSV