# Licensed under a 3-clause BSD style license - see LICENSE.rst
import datetime
import itertools
import json
import math
import os
import pstats
import socket
import struct
import sys
import tempfile
import threading
import time
import traceback
from . import util
from .console import log
from .results import Results, format_benchmark_result
# Can't use benchmark.__file__, because that points to the compiled
# file, so it can't be run by another version of Python.
[docs]
BENCHMARK_RUN_SCRIPT = os.path.join(os.path.dirname(os.path.abspath(__file__)), "benchmark.py")
[docs]
JSON_ERROR_RETCODE = -257
[docs]
BenchmarkResult = util.namedtuple_with_doc(
'BenchmarkResult',
['result', 'samples', 'number', 'errcode', 'stderr', 'profile'],
"""
Postprocessed benchmark result
Attributes
----------
result : list of object
List of numeric values of the benchmarks (one for each parameter
combination).
Values are `None` if benchmark failed or NaN if it was skipped.
samples : list of {list, None}
List of lists of sampled raw data points (or Nones if
no sampling done).
number : list of {dict, None}
List of actual repeat counts for each sample (or Nones if
no sampling done).
errcode : int
Process exit code
stderr : str
Process stdout/stderr output
profile : bytes
If `profile` is `True` and run was at least partially successful,
this key will be a byte string containing the cProfile data.
Otherwise, None.
""",
)
[docs]
def skip_benchmarks(benchmarks, env, results=None):
"""
Mark benchmarks as skipped.
Parameters
----------
benchmarks : Benchmarks
Set of benchmarks to skip
env : Environment
Environment to skip them in
results : Results, optional
Where to store the results.
If omitted, stored to a new unnamed Results object.
Returns
-------
results : Results
Benchmark results.
"""
if results is None:
results = Results.unnamed()
log.warning(f"Skipping {env.name}")
with log.indent():
for name, benchmark in benchmarks.items():
log.step()
log.warning(f'{name} skipped')
started_at = datetime.datetime.now(datetime.timezone.utc)
r = fail_benchmark(benchmark)
results.add_result(
benchmark,
r,
selected_idx=benchmarks.benchmark_selection.get(name),
started_at=started_at,
)
return results
[docs]
def run_benchmarks(
benchmarks,
env,
results=None,
show_stderr=False,
quick=False,
profile=False,
extra_params=None,
record_samples=False,
append_samples=False,
run_rounds=None,
launch_method=None,
):
"""
Run all of the benchmarks in the given `Environment`.
Parameters
----------
benchmarks : Benchmarks
Benchmarks to run
env : Environment object
Environment in which to run the benchmarks.
results : Results, optional
Where to store the results.
If omitted, stored to a new unnamed Results object.
show_stderr : bool, optional
When `True`, display any stderr emitted by the benchmark.
quick : bool, optional
When `True`, run each benchmark function exactly once.
This is useful to quickly find errors in the benchmark
functions, without taking the time necessary to get
accurate timings.
profile : bool, optional
When `True`, run the benchmark through the `cProfile`
profiler.
extra_params : dict, optional
Override values for benchmark attributes.
record_samples : bool, optional
Whether to retain result samples or discard them.
append_samples : bool, optional
Whether to retain any previously measured result samples
and use them in statistics computations.
run_rounds : sequence of int, optional
Run rounds for benchmarks with multiple rounds.
If None, run all rounds.
launch_method : {'auto', 'spawn', 'forkserver'}, optional
Benchmark launching method to use.
Returns
-------
results : Results
Benchmark results.
"""
if extra_params is None:
extra_params = {}
else:
extra_params = dict(extra_params)
if quick:
extra_params['number'] = 1
extra_params['repeat'] = 1
extra_params['warmup_time'] = 0
extra_params['rounds'] = 1
if results is None:
results = Results.unnamed()
# Find all setup_cache routines needed
setup_cache_timeout = {}
benchmark_order = {}
cache_users = {}
max_rounds = 0
def get_rounds(benchmark):
"""Get number of rounds to use for a job"""
if 'rounds' in extra_params:
return int(extra_params['rounds'])
else:
return int(benchmark.get('rounds', 1))
for name, benchmark in sorted(benchmarks.items()):
# Set benchmark timeout from config if not set
# gh-13 gh-973
if benchmark.get('timeout') is None:
benchmark['timeout'] = extra_params.get('timeout', env._default_benchmark_timeout)
key = benchmark.get('setup_cache_key')
setup_cache_timeout[key] = max(
benchmark.get('setup_cache_timeout', benchmark['timeout']),
setup_cache_timeout.get(key, 0),
)
benchmark_order.setdefault(key, []).append((name, benchmark))
max_rounds = max(max_rounds, get_rounds(benchmark))
cache_users.setdefault(key, set()).add(name)
if run_rounds is None:
run_rounds = list(range(1, max_rounds + 1))
# Interleave benchmark runs, in setup_cache order
existing_results = results.get_result_keys(benchmarks)
def iter_run_items():
for run_round in run_rounds[::-1]:
for setup_cache_key, benchmark_set in benchmark_order.items():
for name, benchmark in benchmark_set:
log.step()
rounds = get_rounds(benchmark)
if run_round > rounds:
if (
not append_samples
and run_round == run_rounds[-1]
and name in existing_results
):
# We need to remove samples here so that
# append_samples=False has an effect on all
# benchmarks regardless of whether they were
# run this round.
selected_idx = benchmarks.benchmark_selection.get(name)
results.remove_samples(name, selected_idx)
continue
is_final = run_round == 1
yield name, benchmark, setup_cache_key, is_final
# Run benchmarks in order
cache_dirs = {None: None}
failed_benchmarks = set()
failed_setup_cache = {}
if append_samples:
previous_result_keys = existing_results
else:
previous_result_keys = set()
benchmark_durations = {}
log.info(f"Benchmarking {env.name}")
partial_info_time = None
indent = log.indent()
indent.__enter__()
spawner = get_spawner(env, benchmarks.benchmark_dir, launch_method=launch_method)
try:
# Preimport benchmark suite (if using forkserver)
success, out = spawner.preimport()
if success:
if show_stderr and out:
log.info("Importing benchmark suite produced output:")
with log.indent():
log.error(out.rstrip())
else:
log.warning("Importing benchmark suite failed (skipping all benchmarks).")
if show_stderr and out:
with log.indent():
log.error(out)
stderr = 'asv: benchmark suite import failed'
for name, benchmark, setup_cache_key, is_final in iter_run_items():
if name in failed_benchmarks:
continue
selected_idx = benchmarks.benchmark_selection.get(name)
started_at = datetime.datetime.now(datetime.timezone.utc)
res = fail_benchmark(benchmark, stderr=stderr)
results.add_result(
benchmark,
res,
selected_idx=selected_idx,
started_at=started_at,
record_samples=record_samples,
)
failed_benchmarks.add(name)
return results
# Run benchmarks
for name, benchmark, setup_cache_key, is_final in iter_run_items():
selected_idx = benchmarks.benchmark_selection.get(name)
started_at = datetime.datetime.now(datetime.timezone.utc)
# Don't try to rerun failed benchmarks
if name in failed_benchmarks:
if is_final:
partial_info_time = None
log.info(name, reserve_space=True)
log_benchmark_result(results, benchmark, show_stderr=show_stderr)
continue
# Setup cache first, if needed
if setup_cache_key is None:
cache_dir = None
elif setup_cache_key in cache_dirs:
cache_dir = cache_dirs[setup_cache_key]
elif setup_cache_key not in failed_setup_cache:
partial_info_time = None
log.info(f"Setting up {setup_cache_key}", reserve_space=True)
params_str = json.dumps({'cpu_affinity': extra_params.get('cpu_affinity')})
cache_dir, stderr = spawner.create_setup_cache(
name, setup_cache_timeout[setup_cache_key], params_str
)
if cache_dir is not None:
log.add_padded('ok')
cache_dirs[setup_cache_key] = cache_dir
else:
log.add_padded('failed')
if stderr and show_stderr:
with log.indent():
log.error(stderr)
failed_setup_cache[setup_cache_key] = stderr
duration = (
datetime.datetime.now(datetime.timezone.utc) - started_at
).total_seconds()
results.set_setup_cache_duration(setup_cache_key, duration)
started_at = datetime.datetime.now(datetime.timezone.utc)
if setup_cache_key in failed_setup_cache:
# Mark benchmark as failed
partial_info_time = None
log.warning(f'{name} skipped (setup_cache failed)')
stderr = f'asv: setup_cache failed\n\n{failed_setup_cache[setup_cache_key]}'
res = fail_benchmark(benchmark, stderr=stderr)
results.add_result(
benchmark,
res,
selected_idx=selected_idx,
started_at=started_at,
record_samples=record_samples,
)
failed_benchmarks.add(name)
continue
# If appending to previous results, make sure to use the
# same value for 'number' attribute.
cur_extra_params = extra_params
if name in previous_result_keys:
cur_extra_params = []
prev_stats = results.get_result_stats(name, benchmark['params'])
for s in prev_stats:
if s is None or 'number' not in s:
p = extra_params
else:
p = dict(extra_params)
p['number'] = s['number']
cur_extra_params.append(p)
# Run benchmark
if is_final:
partial_info_time = None
log.info(name, reserve_space=True)
elif partial_info_time is None or time.time() > partial_info_time + 30:
partial_info_time = time.time()
log.info(f'Running ({name}--)')
res = run_benchmark(
benchmark,
spawner,
profile=profile,
selected_idx=selected_idx,
extra_params=cur_extra_params,
cwd=cache_dir,
)
# Retain runtime durations
ended_at = datetime.datetime.now(datetime.timezone.utc)
if name in benchmark_durations:
benchmark_durations[name] += (ended_at - started_at).total_seconds()
else:
benchmark_durations[name] = (ended_at - started_at).total_seconds()
# Save result
results.add_result(
benchmark,
res,
selected_idx=selected_idx,
started_at=started_at,
duration=benchmark_durations[name],
record_samples=(not is_final or record_samples),
append_samples=(name in previous_result_keys),
)
previous_result_keys.add(name)
if all(r is None for r in res.result):
failed_benchmarks.add(name)
# Log result
if is_final:
partial_info_time = None
log_benchmark_result(results, benchmark, show_stderr=show_stderr)
else:
log.add('.')
# Cleanup setup cache, if no users left
if cache_dir is not None and is_final:
cache_users[setup_cache_key].remove(name)
if not cache_users[setup_cache_key]:
# No users of this cache left, perform cleanup
util.long_path_rmtree(cache_dir, True)
del cache_dirs[setup_cache_key]
finally:
# Cleanup any dangling caches
for cache_dir in cache_dirs.values():
if cache_dir is not None:
util.long_path_rmtree(cache_dir, True)
indent.__exit__(None, None, None)
spawner.close()
return results
[docs]
def get_spawner(env, benchmark_dir, launch_method):
has_fork = hasattr(os, 'fork') and hasattr(socket, 'AF_UNIX')
if launch_method in (None, 'auto'):
# Don't use ForkServer as default as threading and fork are not compatible
launch_method = "spawn"
if launch_method == "spawn":
spawner_cls = Spawner
elif launch_method == "forkserver":
if not has_fork:
raise util.UserError("'forkserver' launch method not available on this platform")
spawner_cls = ForkServer
else:
raise ValueError(f"Invalid launch_method: {launch_method}")
return spawner_cls(env, benchmark_dir)
[docs]
def log_benchmark_result(results, benchmark, show_stderr=False):
info, details = format_benchmark_result(results, benchmark)
log.add_padded(info)
if details:
log.info(details, color='default')
# Dump program output
stderr = results.stderr.get(benchmark['name'])
errcode = results.errcode.get(benchmark['name'])
if errcode not in (None, 0, util.TIMEOUT_RETCODE, JSON_ERROR_RETCODE):
# Display also error code
if not stderr:
stderr = ""
else:
stderr += "\n"
stderr += f"asv: benchmark failed (exit status {errcode})"
if stderr and show_stderr:
with log.indent():
log.error(stderr)
[docs]
def fail_benchmark(benchmark, stderr='', errcode=1):
"""
Return a BenchmarkResult describing a failed benchmark.
"""
if benchmark['params']:
# Mark only selected parameter combinations skipped
params = itertools.product(*benchmark['params'])
result = [None for idx in params]
samples = [None] * len(result)
number = [None] * len(result)
else:
result = [None]
samples = [None]
number = [None]
return BenchmarkResult(
result=result, samples=samples, number=number, errcode=errcode, stderr=stderr, profile=None
)
[docs]
def run_benchmark(
benchmark, spawner, profile, selected_idx=None, extra_params=None, cwd=None, prev_result=None
):
"""
Run a benchmark.
Parameters
----------
benchmark : dict
Benchmark object dict
spawner : Spawner
Benchmark process spawner
profile : bool
Whether to run with profile
selected_idx : set, optional
Set of parameter indices to run for.
extra_params : {dict, list}, optional
Additional parameters to pass to the benchmark.
If a list, each entry should correspond to a benchmark
parameter combination.
cwd : str, optional
Working directory to run the benchmark in.
If None, run in a temporary directory.
Returns
-------
result : BenchmarkResult
Result data.
"""
if extra_params is None:
extra_params = {}
result = []
samples = []
number = []
profiles = []
stderr = ''
errcode = 0
if benchmark['params']:
param_iter = enumerate(itertools.product(*benchmark['params']))
else:
param_iter = [(0, None)]
for param_idx, params in param_iter:
if selected_idx is not None and param_idx not in selected_idx:
result.append(math.nan)
samples.append(None)
number.append(None)
profiles.append(None)
continue
if isinstance(extra_params, list):
cur_extra_params = extra_params[param_idx]
else:
cur_extra_params = extra_params
res = _run_benchmark_single_param(
benchmark, spawner, param_idx, extra_params=cur_extra_params, profile=profile, cwd=cwd
)
result += res.result
samples += res.samples
number += res.number
profiles.append(res.profile)
if res.stderr:
stderr += "\n\n"
stderr += res.stderr
if res.errcode != 0:
errcode = res.errcode
return BenchmarkResult(
result=result,
samples=samples,
number=number,
errcode=errcode,
stderr=stderr.strip(),
profile=_combine_profile_data(profiles),
)
[docs]
def _run_benchmark_single_param(benchmark, spawner, param_idx, profile, extra_params, cwd):
"""
Run a benchmark, for single parameter combination index in case it
is parameterized
Parameters
----------
benchmark : dict
Benchmark object dict
spawner : Spawner
Benchmark process spawner
param_idx : {int, None}
Parameter index to run benchmark for
profile : bool
Whether to run with profile
extra_params : dict
Additional parameters to pass to the benchmark
cwd : {str, None}
Working directory to run the benchmark in.
If None, run in a temporary directory.
Returns
-------
result : BenchmarkResult
Result data.
"""
name = benchmark['name']
if benchmark['params']:
name += f'-{param_idx}'
if profile:
profile_fd, profile_path = tempfile.mkstemp()
os.close(profile_fd)
else:
profile_path = 'None'
params_str = json.dumps(extra_params)
if cwd is None:
real_cwd = tempfile.mkdtemp()
else:
real_cwd = cwd
result_file = tempfile.NamedTemporaryFile(delete=False)
try:
result_file.close()
out, errcode = spawner.run(
name=name,
params_str=params_str,
profile_path=profile_path,
result_file_name=result_file.name,
timeout=benchmark['timeout'],
cwd=real_cwd,
)
if errcode != 0:
if errcode == util.TIMEOUT_RETCODE:
out += f"\n\nasv: benchmark timed out (timeout {benchmark['timeout']}s)\n"
result = None
samples = None
number = None
else:
with open(result_file.name, 'r') as stream:
data = stream.read()
try:
data = json.loads(data)
except ValueError as exc:
data = None
errcode = JSON_ERROR_RETCODE
out += f"\n\nasv: failed to parse benchmark result: {exc}\n"
# Special parsing for timing benchmark results
if isinstance(data, dict) and 'samples' in data and 'number' in data:
result = True
samples = data['samples']
number = data['number']
else:
result = data
samples = None
number = None
if benchmark['params'] and out:
(params,) = itertools.islice(
itertools.product(*benchmark['params']), param_idx, param_idx + 1
)
out = f"For parameters: {', '.join(params)}\n{out}"
if profile:
with open(profile_path, 'rb') as profile_fd:
profile_data = profile_fd.read()
profile_data = profile_data if profile_data else None
else:
profile_data = None
return BenchmarkResult(
result=[result],
samples=[samples],
number=[number],
errcode=errcode,
stderr=out.strip(),
profile=profile_data,
)
except KeyboardInterrupt:
spawner.interrupt()
raise util.UserError("Interrupted.")
finally:
os.remove(result_file.name)
if profile:
os.remove(profile_path)
if cwd is None:
util.long_path_rmtree(real_cwd, True)
[docs]
class Spawner:
"""
Manage launching individual benchmark.py commands
"""
def __init__(self, env, benchmark_dir):
[docs]
self.benchmark_dir = os.path.abspath(benchmark_dir)
[docs]
self.interrupted = False
[docs]
def interrupt(self):
self.interrupted = True
[docs]
def create_setup_cache(self, benchmark_id, timeout, params_str):
cache_dir = tempfile.mkdtemp()
env_vars = dict(os.environ)
env_vars.update(self.env.env_vars)
out, _, errcode = self.env.run(
[
BENCHMARK_RUN_SCRIPT,
'setup_cache',
os.path.abspath(self.benchmark_dir),
benchmark_id,
params_str,
],
dots=False,
display_error=False,
return_stderr=True,
valid_return_codes=None,
redirect_stderr=True,
cwd=cache_dir,
timeout=timeout,
env=env_vars,
)
if errcode == 0:
return cache_dir, None
else:
util.long_path_rmtree(cache_dir, True)
out += f'\nasv: setup_cache failed (exit status {errcode})'
return None, out.strip()
[docs]
def run(self, name, params_str, profile_path, result_file_name, timeout, cwd):
env_vars = dict(os.environ)
env_vars.update(self.env.env_vars)
out, _, errcode = self.env.run(
[
BENCHMARK_RUN_SCRIPT,
'run',
os.path.abspath(self.benchmark_dir),
name,
params_str,
profile_path,
result_file_name,
],
dots=False,
timeout=timeout,
display_error=False,
return_stderr=True,
redirect_stderr=True,
valid_return_codes=None,
cwd=cwd,
env=env_vars,
)
return out, errcode
[docs]
def preimport(self):
return True, ""
[docs]
class ForkServer(Spawner):
def __init__(self, env, root):
super().__init__(env, root)
if not (hasattr(os, 'fork') and hasattr(os, 'setpgid')):
raise RuntimeError("ForkServer only available on POSIX")
[docs]
self.tmp_dir = tempfile.mkdtemp(prefix='asv-forkserver-')
[docs]
self.socket_name = os.path.join(self.tmp_dir, 'socket')
env_vars = dict(os.environ)
env_vars.update(env.env_vars)
[docs]
self.server_proc = env.run(
[BENCHMARK_RUN_SCRIPT, 'run_server', self.benchmark_dir, self.socket_name],
return_popen=True,
redirect_stderr=True,
env=env_vars,
)
[docs]
self._server_output = None
[docs]
self.stdout_reader_thread = threading.Thread(target=self._stdout_reader)
self.stdout_reader_thread.start()
# Wait for the socket to appear
while self.stdout_reader_thread.is_alive():
if os.path.exists(self.socket_name):
break
time.sleep(0.05)
if not os.path.exists(self.socket_name):
os.rmdir(self.tmp_dir)
raise RuntimeError("Failed to start server thread")
[docs]
def _stdout_reader(self):
try:
out = self.server_proc.stdout.read()
self.server_proc.stdout.close()
out = out.decode('utf-8', 'replace')
except Exception:
import traceback
out = traceback.format_exc()
self._server_output = out
[docs]
def run(self, name, params_str, profile_path, result_file_name, timeout, cwd):
msg = {
'action': 'run',
'benchmark_id': name,
'params_str': params_str,
'profile_path': profile_path,
'result_file': result_file_name,
'timeout': timeout,
'cwd': cwd,
}
result = self._send_command(msg)
return result['out'], result['errcode']
[docs]
def preimport(self):
success = True
out = ""
try:
out = self._send_command({'action': 'preimport'})
except Exception as exc:
success = False
out = "asv: benchmark runner crashed\n"
if isinstance(exc, util.UserError):
out += str(exc)
else:
out += traceback.format_exc()
out = out.rstrip()
return success, out
[docs]
def _send_command(self, msg):
msg = json.dumps(msg)
msg = msg.encode('utf-8')
# Connect (with wait+retry)
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
for retry in range(5, 0, -1):
try:
s.connect(self.socket_name)
break
except OSError:
if retry > 1:
time.sleep(0.2)
else:
raise
# Send command
try:
s.sendall(struct.pack('<Q', len(msg)))
s.sendall(msg)
# Read result
(read_size,) = struct.unpack('<Q', util.recvall(s, 8))
result_text = util.recvall(s, read_size)
result_text = result_text.decode('utf-8')
result = json.loads(result_text)
except Exception:
exitcode = self.server_proc.poll()
if exitcode is not None:
raise util.UserError(f"Process exited with code {exitcode}")
raise
finally:
s.close()
return result
[docs]
def close(self):
import signal
# Check for termination
if self.server_proc.poll() is None:
util._killpg_safe(self.server_proc.pid, signal.SIGINT)
if self.server_proc.poll() is None:
time.sleep(0.1)
if self.server_proc.poll() is None:
# Kill process group
util._killpg_safe(self.server_proc.pid, signal.SIGKILL)
self.server_proc.wait()
self.stdout_reader_thread.join()
if self._server_output and not self.interrupted:
with log.indent():
log.error("asv: forkserver:")
log.error(self._server_output)
util.long_path_rmtree(self.tmp_dir)
[docs]
def _combine_profile_data(datasets):
"""
Combine a list of profile data to a single profile
"""
datasets = [data for data in datasets if data is not None]
if not datasets:
return None
elif len(datasets) == 1:
return datasets[0]
# Load and combine stats
stats = None
while datasets:
data = datasets.pop(0)
f = tempfile.NamedTemporaryFile(delete=False)
try:
f.write(data)
f.close()
if stats is None:
stats = pstats.Stats(f.name)
else:
stats.add(f.name)
finally:
os.remove(f.name)
# Write combined stats out
f = tempfile.NamedTemporaryFile(delete=False)
try:
f.close()
stats.dump_stats(f.name)
with open(f.name, 'rb') as fp:
return fp.read()
finally:
os.remove(f.name)