# Licensed under a 3-clause BSD style license - see LICENSE.rst
import math
import os
import traceback
from . import step_detect, util
from .util import geom_mean_na, is_na, mean_na
# This is the maximum number of points to include in summary graphs.
# It is based on the number of pixels in the summary graph display on
# a recent Retina MacBook Pro (3840 pixels across the screen, divided
# by 5 summaries across, divided by 2 for good measure and to account
# for width of the line).
[docs]
RESAMPLED_POINTS = 3840 / 5 / 2
[docs]
class GraphSet:
"""Manage multiple :py:class:`Graph` objects"""
def __init__(self):
super().__init__()
[docs]
def get_graph(self, benchmark_name, params):
graph = Graph(benchmark_name, params)
if graph.path not in self._graphs:
self._graphs[graph.path] = graph
self._groups.setdefault(benchmark_name, []).append(graph)
return self._graphs[graph.path]
[docs]
def get_graph_group(self, benchmark_name):
return self._groups.get(benchmark_name, [])
[docs]
def get_params(self):
"""Return all params used in graphs and their corresponding values set"""
params = {}
for graph in self._graphs.values():
for key, value in graph.params.items():
params.setdefault(key, set())
if value:
params[key].add(value)
return params
[docs]
def detect_steps(self, pool=None, dots=None):
for graph in self._graphs.values():
graph.detect_steps(pool)
if dots is not None and pool is None:
dots()
# Wait for results to compute
for graph in self._graphs.values():
graph.get_steps()
if dots is not None and pool is not None:
dots()
[docs]
def get_summary_graphs(self, dots=None):
for graphs in self._groups.values():
yield make_summary_graph(graphs)
if dots is not None:
dots()
[docs]
def save(self, html_dir, dots=None):
for graph in self._graphs.values():
graph.save(html_dir)
if dots is not None:
dots()
[docs]
def __iter__(self):
return iter(self._graphs.items())
[docs]
def __len__(self):
return len(self._graphs)
[docs]
class Graph:
"""
Manages a single "line" in the resulting plots for the front end.
Unlike "results", which contain the timings for a single commit,
these contain the timings for a single benchmark.
"""
def __init__(self, benchmark_name, params):
"""
Initially the graph contains no data. It must be added using
multiple calls to `add_data_point`.
Parameters
----------
benchmark_name : str
A unique string to identify the benchmark, and display in
the frontend.
params : dict of str -> str
A dictionary of parameters describing the benchmark.
"""
[docs]
self.benchmark_name = benchmark_name
[docs]
self.path = self.get_file_path(self.params, benchmark_name)
[docs]
self.scalar_series = True
@classmethod
[docs]
def get_file_path(cls, params, benchmark_name):
"""
Get a file path understood by the JS frontend, corresponding
on the given parameters and benchmark_name.
The implementation must match asv.js:graph_to_path
"""
parts = []
l = list(params.items())
for key, val in l:
if val is None:
part = f'{key}-null'
elif val:
part = f'{key}-{val}'
else:
part = f'{key}'
parts.append(util.sanitize_filename(part))
parts.sort()
parts.insert(0, 'graphs')
parts.append(util.sanitize_filename(benchmark_name))
return os.path.join(*parts)
[docs]
def add_data_point(self, revision, value, weight=None):
"""
Add a data point to the graph.
Parameters
----------
revision : int
An integer value representing the commit revision in the commit log
value : float or list
The value(s) to plot in the benchmark.
weight : float or list or None
Weights corresponding to the values.
Missing estimates are indicated with None.
"""
self.data_points.setdefault(revision, [])
self.data_weights.setdefault(revision, [])
if not is_na(value):
if not hasattr(value, '__len__'):
value = [value]
weight = [weight]
else:
self.scalar_series = False
if self.n_series is None:
self.n_series = len(value)
elif len(value) != self.n_series:
raise ValueError("Mismatching number of data series in graph")
if weight is None:
weight = [None] * len(value)
self.data_points[revision].append(value)
self.data_weights[revision].append(weight)
[docs]
def get_data(self):
"""
Get the sorted and reduced data and weights.
"""
if self.n_series is None:
# No non-null data points
self.n_series = 1
def mean_axis0(v):
if not v:
return [None] * self.n_series
return [mean_na(x[j] for x in v) for j in range(self.n_series)]
# Average data over commit log
val = []
for k in self.data_points.keys():
v = mean_axis0(self.data_points[k])
w = mean_axis0(self.data_weights[k])
val.append((k, v, w))
del v, w
val.sort()
# Discard missing data at edges
i = 0
for i in range(len(val)):
if any(not is_na(v) for v in val[i][1]):
break
else:
i = len(val)
j = i
for j in range(len(val) - 1, i, -1):
if any(not is_na(v) for v in val[j][1]):
break
val = val[i : j + 1]
# Single-element series
if self.scalar_series:
val = [(k, v[0], w[0]) for k, v, w in val]
return val
[docs]
def save(self, html_dir):
"""
Save the graph to a .json file used by the frontend.
Parameters
----------
html_dir : str
The root of the HTML tree.
"""
filename = os.path.join(html_dir, self.path + ".json")
# Drop weights
val = [v[:2] for v in self.get_data()]
util.write_json(filename, val, compact=True)
[docs]
def detect_steps(self, pool=None):
"""
Run step detection algorithm on the graph data.
Afterward, the results can be obtained via get_steps()
Parameters
----------
pool : multiprocessing.Pool, optional
Pool to use for asynchronous jobs.
If not given, run in serial.
"""
if self._steps is not None:
# Already computed
return
val = self.get_data()
if not val:
# Nothing to compute
self._steps = [[]] * self.n_series
return
if self.scalar_series:
items = [val]
else:
items = [[(v[0], v[1][j], v[2][j]) for v in val] for j in range(self.n_series)]
if pool is None:
self._steps = [_compute_graph_steps(item, reraise=False) for item in items]
else:
self._steps = [pool.apply_async(_compute_graph_steps, (item,)) for item in items]
[docs]
def get_steps(self):
"""
Return results from step detection.
Returns
-------
steps : list of (left, right, val, min, err)
Result of fitting a piecewise function to the graph.
Missing data points do not necessarily belong in any piece.
The values are: `left` (inclusive) and `right` (exclusive) specify
a revision interval, `val` the median value in the interval, `min`
the minimum value in the interval, and `err` the mean deviation from
the median.
"""
if self._steps is None:
self.detect_steps()
for j, item in enumerate(self._steps):
if not isinstance(item, list):
self._steps[j] = item.get()
if self.scalar_series:
return self._steps[0]
else:
return self._steps
[docs]
def _compute_graph_steps(data, reraise=True):
try:
x = [d[0] for d in data]
y = [d[1] for d in data]
w = [d[2] for d in data]
steps = step_detect.detect_steps(y, w)
new_steps = []
for left, right, cur_val, cur_min, cur_err in steps:
new_steps.append((x[left], x[right - 1] + 1, cur_val, cur_min, cur_err))
return new_steps
except BaseException as exc:
if reraise:
raise util.ParallelFailure(str(exc), exc.__class__, traceback.format_exc())
else:
raise
[docs]
def make_summary_graph(graphs):
x, ys = _combine_graph_data(graphs)
y = _compute_summary_data_series(*ys)
val = list(zip(x, y))
# Resample
val = resample_data(val)
# Return as a graph
graph = Graph(graphs[0].benchmark_name, {'summary': ''})
for x, y in val:
graph.add_data_point(x, y)
return graph
[docs]
def _compute_summary_data_series(*ys):
"""
Given a multiple input series:
.. code-block::
y0, y1, ...
calculate summary data series:
.. code-block::
val = [geom_mean([y0[0], y1[0], ...]),
geom_mean([y0[1], y1[1], ...]),
...]
Missing data in each y-series is filled for each series
separately, before averaging. Data points that are missing from
all series are however marked missing.
"""
# Fill missing data
filled = [_fill_missing_data(y) for y in ys]
# Compute geom mean of filled series
res = []
for i in range(len(ys[0])):
# Average filled series, unless it's missing from all
# original series
if any(not is_na(y[i]) for y in ys):
v = geom_mean_na(y[i] for y in filled)
else:
v = None
res.append(v)
return res
[docs]
def _fill_missing_data(y, max_gap_fraction=0.1):
"""
Fill missing data to series by linearly interpolating inside gaps
that are smaller than ``max_gap_fraction`` of total available
points.
"""
# Count number of valid values in each series
valid_count = sum(int(not is_na(v)) for v in y)
max_gap_size = math.ceil(max_gap_fraction * valid_count)
# Fill gaps of missing data in each series
filled = list(y)
prev = None
prev_idx = 0
for i, v in enumerate(y):
if not is_na(v):
gap_size = i - prev_idx - 1
if 0 < gap_size <= max_gap_size and not is_na(prev):
# Interpolate gap
for k in range(1, gap_size + 1):
filled[prev_idx + k] = (v * k + (gap_size + 1 - k) * prev) / (gap_size + 1)
prev = v
prev_idx = i
return filled
[docs]
def _combine_graph_data(graphs):
"""
Combine data series from multiple graphs to compatible form
Parameters
----------
graphs : list of Graph
Input data.
Returns
-------
x : list of float
List of x-coordinates, in increasing order
ys : list of list of float
List of data series, [y0, y1, y2, ...],
where y0, y1, y2, ... are each lists of y-coordinates
corresponding to `x`, one for each data series in each graph.
When some of the graphs do not have data for a given x-value,
the missing data is indicated by None values.
"""
datasets = [graph.get_data() for graph in graphs]
n_series = sum(graph.n_series for graph in graphs)
# Find distinct x-values
x = set()
for dataset in datasets:
x.update(k for k, _, _ in dataset)
x = sorted(x)
x_idx = dict(zip(x, range(len(x))))
# Get y-values
ys = [[None] * len(x_idx) for j in range(n_series)]
pos = 0
for dataset, graph in zip(datasets, graphs):
for k, v, dv in dataset:
i = x_idx[k]
if graph.scalar_series:
v = [v]
for j, y in enumerate(v):
ys[pos + j][i] = y
pos += graph.n_series
return x, ys
[docs]
def resample_data(val, num_points=RESAMPLED_POINTS):
if len(val) < num_points:
return val
min_revision = min(x[0] for x in val)
max_revision = max(x[0] for x in val)
step_size = int((max_revision - min_revision) / num_points)
if step_size == 0:
step_size = max_revision - min_revision + 1
new_val = []
j = 0
for i in range(min_revision + step_size, max_revision + step_size, step_size):
chunk = []
while j < len(val) and val[j][0] < i:
chunk.append(val[j][1])
j += 1
if len(chunk):
new_val.append((i, mean_na(chunk)))
return new_val