Source code for asv.commands.run

# Licensed under a 3-clause BSD style license - see LICENSE.rst
import os
import sys
import logging
import traceback
import time
import argparse
import textwrap
from collections import defaultdict

from . import Command, common_args
from ..benchmarks import Benchmarks
from ..console import log
from ..machine import Machine
from ..repo import get_repo, NoSuchNameError
from ..results import Results, get_existing_hashes, iter_results_for_machine_and_hash
from ..runner import run_benchmarks, skip_benchmarks
from .. import environment, util
from .setup import Setup
from .show import Show


[docs] def _do_build(args): env, conf, repo, commit_hash = args started_at = time.time() success = False try: with log.set_level(logging.WARN): env.install_project(conf, repo, commit_hash) success = True except util.ProcessError: pass duration = time.time() - started_at return (env.name, (success, duration))
[docs] def _do_build_multiprocess(args_sets): """ multiprocessing callback to build the project in one particular environment. """ try: res = [] for args in args_sets: res.append(_do_build(args)) return res except BaseException as exc: raise util.ParallelFailure(str(exc), exc.__class__, traceback.format_exc())
[docs] class Run(Command): @classmethod
[docs] def setup_arguments(cls, subparsers): parser = subparsers.add_parser( "run", help="Run a benchmark suite", formatter_class=argparse.RawDescriptionHelpFormatter, description=textwrap.dedent( """ Run a benchmark suite. examples: asv run main run for one branch asv run main^! run for one commit (git) asv run "--merges main" run for only merge commits (git) """)) cls._setup_arguments(parser) parser.set_defaults(func=cls.run_from_args) return parser
@classmethod
[docs] def _setup_arguments(cls, parser, env_default_same=False): parser.add_argument( 'range', nargs='?', default=None, help="""Range of commits to benchmark. For a git repository, this is passed as the first argument to ``git rev-list``; or Mercurial log command. See 'specifying ranges' section of the `gitrevisions` manpage, or 'hg help revisions', for more info. Also accepts the special values 'NEW', 'ALL', 'EXISTING', and 'HASHFILE:xxx'. 'NEW' will benchmark all commits since the latest benchmarked on this machine. 'ALL' will benchmark all commits in the project. 'EXISTING' will benchmark against all commits for which there are existing benchmarks on any machine. 'HASHFILE:xxx' will benchmark only a specific set of hashes given in the file named 'xxx' ('-' means stdin), which must have one hash per line. By default, will benchmark the head of each configured of the branches.""") parser.add_argument( "--date-period", type=common_args.time_period, default=None, help="""Pick only one commit in each given time period. For example: 1d (daily), 1w (weekly), 1y (yearly).""") parser.add_argument( "--steps", "-s", type=common_args.positive_int, default=None, help="""Maximum number of steps to benchmark. This is used to subsample the commits determined by range to a reasonable number.""") common_args.add_bench(parser) parser.add_argument( "--profile", "-p", action="store_true", help="""In addition to timing, run the benchmarks through the `cProfile` profiler and store the results.""") common_args.add_parallel(parser) common_args.add_show_stderr(parser) parser.add_argument( "--durations", action="store", metavar='N', nargs='?', type=common_args.positive_int_or_inf, default=0, const='all', help="Display total duration for N (or 'all') slowest benchmarks") parser.add_argument( "--quick", "-q", action="store_true", help="""Do a "quick" run, where each benchmark function is run only once. This is useful to find basic errors in the benchmark functions faster. The results are unlikely to be useful, and thus are not saved.""") common_args.add_environment(parser, default_same=env_default_same) parser.add_argument( "--set-commit-hash", default=None, help="""Set the commit hash to use when recording benchmark results. This makes results to be saved also when using an existing environment.""") common_args.add_launch_method(parser) parser.add_argument( "--dry-run", "-n", action="store_true", default=None, help="""Do not save any results to disk.""") common_args.add_machine(parser) parser.add_argument( "--skip-existing-successful", action="store_true", help="""Skip running benchmarks that have previous successful results""") parser.add_argument( "--skip-existing-failed", action="store_true", help="""Skip running benchmarks that have previous failed results""") parser.add_argument( "--skip-existing-commits", action="store_true", help="""Skip running benchmarks for commits that have existing results""") parser.add_argument( "--skip-existing", "-k", action="store_true", help="""Skip running benchmarks that have previous successful or failed results""") common_args.add_record_samples(parser) parser.add_argument( "--interleave-rounds", action="store_true", default=False, help="""Interleave benchmarks with multiple rounds across commits. This can avoid measurement biases from commit ordering, can take longer.""") parser.add_argument( "--no-interleave-rounds", action="store_false", dest="interleave_rounds") # Backward compatibility for '--(no-)interleave-rounds' parser.add_argument( "--interleave-processes", action="store_true", default=False, dest="interleave_rounds", help=argparse.SUPPRESS) parser.add_argument( "--no-interleave-processes", action="store_false", dest="interleave_rounds", help=argparse.SUPPRESS) parser.add_argument( "--no-pull", action="store_true", help="Do not pull the repository")
@classmethod
[docs] def run_from_conf_args(cls, conf, args, **kwargs): return cls.run( conf=conf, range_spec=args.range, steps=args.steps, date_period=args.date_period, bench=args.bench, attribute=args.attribute, parallel=args.parallel, show_stderr=args.show_stderr, quick=args.quick, profile=args.profile, env_spec=args.env_spec, set_commit_hash=args.set_commit_hash, dry_run=args.dry_run, machine=args.machine, skip_successful=args.skip_existing_successful or args.skip_existing, skip_failed=args.skip_existing_failed or args.skip_existing, skip_existing_commits=args.skip_existing_commits, record_samples=args.record_samples, append_samples=args.append_samples, pull=not args.no_pull, interleave_rounds=args.interleave_rounds, launch_method=args.launch_method, durations=args.durations, **kwargs )
@classmethod
[docs] def run(cls, conf, range_spec=None, steps=None, date_period=None, bench=None, attribute=None, parallel=1, show_stderr=False, quick=False, profile=False, env_spec=None, set_commit_hash=None, dry_run=False, machine=None, _machine_file=None, skip_successful=False, skip_failed=False, skip_existing_commits=False, record_samples=False, append_samples=False, pull=True, interleave_rounds=False, launch_method=None, durations=0, _returns={}): machine_params = Machine.load( machine_name=machine, _path=_machine_file, interactive=True) machine_params.save(conf.results_dir) environments = list(environment.get_environments(conf, env_spec)) if environment.is_existing_only(environments) and set_commit_hash is None: # No repository required, so skip using it conf.dvcs = "none" has_existing_env = any(isinstance(env, environment.ExistingEnvironment) for env in environments) if interleave_rounds: if dry_run: raise util.UserError("--interleave-rounds and --dry-run cannot be used together") if has_existing_env: raise util.UserError("--interleave-rounds cannot be used with existing " "environment (or python=same)") elif interleave_rounds is None: # Enable if possible interleave_rounds = not (dry_run or has_existing_env) if append_samples: record_samples = True repo = get_repo(conf) if pull: repo.pull() if set_commit_hash is not None: set_commit_hash = repo.get_hash_from_name(set_commit_hash) # Track failures across the run command failures = False # Comparison period for date_period filtering old_commit_hashes = None if range_spec is None: try: commit_hashes = list(set([repo.get_hash_from_name(branch) for branch in conf.branches])) except NoSuchNameError as exc: raise util.UserError(f'Unknown branch {exc} in configuration') elif range_spec == 'EXISTING': commit_hashes = get_existing_hashes(conf.results_dir) elif range_spec == "NEW": # New commits on each configured branches old_commit_hashes = get_existing_hashes(conf.results_dir) commit_hashes = repo.get_new_branch_commits(conf.branches, old_commit_hashes) elif range_spec == "ALL": # All commits on each configured branches commit_hashes = repo.get_new_branch_commits(conf.branches, []) elif isinstance(range_spec, str) and range_spec.startswith('HASHFILE:'): hashfn = range_spec[9:] if hashfn == '-': hashstr = sys.stdin.read() elif os.path.isfile(hashfn): with open(hashfn, 'r') as f: hashstr = f.read() else: log.error(f'Requested commit hash file "{hashfn}" is not a file') return 1 commit_hashes = [] for h in hashstr.split("\n"): h = h.strip() if h: try: commit_hashes.append(repo.get_hash_from_name(h)) except NoSuchNameError: log.warning(f"Unknown commit hash {h} in input file") elif isinstance(range_spec, list): commit_hashes = range_spec else: commit_hashes = repo.get_hashes_from_range(range_spec) if date_period is not None: commit_hashes = repo.filter_date_period(commit_hashes, date_period, old_commit_hashes) if steps is not None: commit_hashes = util.pick_n(commit_hashes, steps) if len(commit_hashes) == 0: log.error("No commit hashes selected") return 1 Setup.perform_setup(environments, parallel=parallel) if len(environments) == 0: log.error("No environments selected") return 1 if range_spec is not None: for env in environments: if not env.can_install_project(): raise util.UserError( "No range spec may be specified if benchmarking in " "an existing environment") benchmarks = Benchmarks.discover(conf, repo, environments, commit_hashes, regex=bench) benchmarks.save() if len(benchmarks) == 0: if bench == ["just-discover"]: return 0 else: log.error("No benchmarks selected") return 1 benchmark_count = len(benchmarks) steps = len(commit_hashes) * benchmark_count * len(environments) log.info( f"Running {steps} total benchmarks " f"({len(commit_hashes)} commits * {len(environments)} " f"environments * {len(benchmarks)} benchmarks)") parallel, multiprocessing = util.get_multiprocessing(parallel) _returns['benchmarks'] = benchmarks _returns['environments'] = environments _returns['machine_params'] = machine_params.__dict__ if attribute and 'rounds' in attribute: max_rounds = int(attribute['rounds']) else: max_rounds = max(b.get('rounds', 1) for b in benchmarks.values()) log.set_nitems(steps * max_rounds) skipped_benchmarks = defaultdict(lambda: set()) for commit_hash in commit_hashes: if skip_successful or skip_failed or skip_existing_commits: try: for result in iter_results_for_machine_and_hash( conf.results_dir, machine_params.machine, commit_hash): if skip_existing_commits: skipped_benchmarks[commit_hash] = True break for key in result.get_result_keys(benchmarks): if key not in benchmarks: continue value = result.get_result_value(key, benchmarks[key]['params']) failed = value is None or (isinstance(value, list) and None in value) if skip_failed and failed: skipped_benchmarks[(commit_hash, result.env_name)].add(key) if skip_successful and not failed: skipped_benchmarks[(commit_hash, result.env_name)].add(key) except OSError: pass if interleave_rounds: run_round_set = [[j] for j in range(max_rounds, 0, -1)] else: run_round_set = [None] def iter_rounds_commits(): for run_rounds in run_round_set: if interleave_rounds and run_rounds[0] % 2 == 0: for commit_hash in commit_hashes[::-1]: yield run_rounds, commit_hash else: for commit_hash in commit_hashes: yield run_rounds, commit_hash build_durations = defaultdict(lambda: 0) for run_rounds, commit_hash in iter_rounds_commits(): if commit_hash in skipped_benchmarks: for env in environments: for bench in benchmarks: if interleave_rounds: log.step() else: for j in range(max_rounds): log.step() continue for env in environments: skip_list = skipped_benchmarks[(commit_hash, env.name)] for bench in benchmarks: if bench in skip_list: if interleave_rounds: log.step() else: for j in range(max_rounds): log.step() active_environments = [env for env in environments if set(benchmarks.keys()) .difference(skipped_benchmarks[(commit_hash, env.name)])] if not active_environments: continue if commit_hash: if interleave_rounds: round_info = f" (round {max_rounds - run_rounds[0] + 1}/{max_rounds})" else: round_info = "" commit_name = repo.get_decorated_hash(commit_hash, 8) log.info( f"For {conf.project} commit {commit_name}{round_info}:") with log.indent(): for subenv in util.iter_chunks(active_environments, parallel): successes = dict([(env.name, (env.installed_commit_hash == commit_hash, 0)) for env in subenv]) env_to_install = [env for env in subenv if env.installed_commit_hash != commit_hash] subenv_name = ', '.join([x.name for x in env_to_install]) if subenv_name: log.info(f"Building for {subenv_name}") with log.indent(): args = [(env, conf, repo, commit_hash) for env in env_to_install] if parallel != 1: # Parallel run only for environments with different dir_names args_sets = defaultdict(list) for arg in args: args_sets[arg[0].dir_name].append(arg) args_sets = args_sets.values() try: pool = util.get_multiprocessing_pool(parallel) try: res = [] for r in pool.map(_do_build_multiprocess, args_sets): res.extend(r) successes.update(dict(res)) pool.close() pool.join() finally: pool.terminate() except util.ParallelFailure as exc: exc.reraise() else: successes.update(dict(map(_do_build, args))) for env in subenv: success, duration = successes[env.name] build_duration_key = (commit_hash, env.name) build_durations[build_duration_key] += duration build_duration = build_durations[build_duration_key] params = dict(machine_params.__dict__) params['python'] = env.python params.update(env.requirements) skip_save = (dry_run or (isinstance(env, environment.ExistingEnvironment) and set_commit_hash is None)) skip_list = skipped_benchmarks[(commit_hash, env.name)] benchmark_set = benchmarks.filter_out(skip_list) if set_commit_hash is not None: commit_hash = set_commit_hash result = Results( params, env.requirements, commit_hash, repo.get_date(commit_hash), env.python, env.name, env.env_vars ) if not skip_save: result.load_data(conf.results_dir) if build_duration != 0: result.set_build_duration(build_duration) # If we are interleaving commits, we need to # append samples (except for the first round) # and record samples (except for the final # round). force_append_samples = (interleave_rounds and run_rounds[0] < max_rounds) force_record_samples = (interleave_rounds and run_rounds[0] > 1) if success: run_benchmarks( benchmark_set, env, results=result, show_stderr=show_stderr, quick=quick, profile=profile, extra_params=attribute, record_samples=(record_samples or force_record_samples), append_samples=(append_samples or force_append_samples), run_rounds=run_rounds, launch_method=launch_method) else: skip_benchmarks(benchmark_set, env, results=result) if not skip_save: result.save(conf.results_dir) failures = failures or any( code != 0 for code in result.errcode.values()) if durations > 0: duration_set = Show._get_durations([(machine, result)], benchmark_set) log.info(cls.format_durations( duration_set[(machine, env.name)], durations)) if failures: return 2
@classmethod
[docs] def format_durations(cls, durations, num_durations): items = list(durations.items()) items.sort(key=lambda x: (-x[1], x[0])) rows = [["benchmark", "total duration"]] total = 0 for j, (name, duration) in enumerate(items): if j >= num_durations: rows.append(["...", "..."]) break rows.append([name, util.human_time(duration)]) total = sum(durations.values()) rows.append(["total", util.human_time(total)]) msg = util.format_text_table(rows, num_headers=1) return msg