# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
Supports mercurial repositories for the benchmarked project.
"""
import os
import re
import sys
try:
import hglib
except ImportError:
from .. import util
from ..console import log
from ..repo import NoSuchNameError, Repo
[docs]
class Hg(Repo):
[docs]
_default_branch = "default"
def __init__(self, url, mirror_path):
# TODO: shared repositories in Mercurial are only possible
# through an extension, and it's not clear how to use those in
# this context. So here, we always make full clones for
# each of the environments.
[docs]
self._repo = None # Initialize
[docs]
self._path = os.path.abspath(mirror_path)
if hglib is None:
raise ImportError("hglib")
if self.is_local_repo(url):
# Local repository, no need for mirror
self._path = os.path.abspath(url)
self._pulled = True
elif not self.is_local_repo(self._path):
if os.path.exists(self._path):
self._raise_bad_mirror_error(self._path)
# Clone is missing
log.info("Cloning project")
url = url.removeprefix("hg+")
# Mercurial branches are global, so there is no need for
# an analog of git --mirror
hglib.clone(
self._encode_filename(url), dest=self._encode_filename(self._path), noupdate=True
)
self._repo = hglib.open(self._encode_filename(self._path))
[docs]
def __del__(self):
if self._repo is not None:
self._repo.close()
self._repo = None
[docs]
def _decode(self, item):
return item.decode(self._encoding)
[docs]
def _encode(self, item):
return item.encode(self._encoding)
[docs]
def _encode_filename(self, filename):
return filename.encode(sys.getfilesystemencoding())
@classmethod
[docs]
def is_local_repo(cls, path):
return os.path.isdir(path) and os.path.isdir(os.path.join(path, '.hg'))
@classmethod
[docs]
def url_match(cls, url):
regexes = [r'^hg\+https?://.*$', r'^https?://.*?\.hg$', r'^ssh://hg@.*$']
for regex in regexes:
if re.match(regex, url):
return True
# Check for a local path
if cls.is_local_repo(url):
return True
return False
[docs]
def get_range_spec(self, commit_a, commit_b):
return f'{commit_a}::{commit_b} and not {commit_a}'
[docs]
def get_new_range_spec(self, latest_result, branch=None):
return f'{latest_result}::{self.get_branch_name(branch)}'
[docs]
def pull(self):
# We assume the remote isn't updated during the run of asv
# itself.
if self._pulled:
return
log.info("Fetching recent changes")
self._repo.pull()
self._pulled = True
[docs]
def checkout(self, path, commit_hash):
# Need to pull -- the copy is not updated automatically, since
# the repository data is not shared
def checkout_existing():
with hglib.open(self._encode_filename(path)) as subrepo:
subrepo.pull()
subrepo.update(self._encode(commit_hash), clean=True)
subrepo.rawcommand([b"--config", b"extensions.purge=", b"purge", b"--all"])
if os.path.isdir(path):
try:
checkout_existing()
except (hglib.error.CommandError, hglib.error.ServerError):
# Remove and re-clone
util.long_path_rmtree(path)
if not os.path.isdir(path):
hglib.clone(self._encode_filename(self._path), dest=self._encode_filename(path))
checkout_existing()
[docs]
def get_date(self, hash):
# TODO: This works on Linux, but should be extended for other platforms
rev = self._repo.log(self._encode(hash))[0]
return int(rev.date.strftime("%s")) * 1000
[docs]
def get_hashes_from_range(self, range_spec, **kwargs):
range_spec = self._encode(f"sort({range_spec}, -rev)")
return [self._decode(rev.node) for rev in self._repo.log(range_spec, **kwargs)]
[docs]
def get_hash_from_name(self, name):
if name is None:
name = self.get_branch_name()
try:
return self._decode(self._repo.log(self._encode(name))[0].node)
except hglib.error.CommandError as err:
if b'unknown revision' in err.err:
raise NoSuchNameError(name)
raise
[docs]
def get_hash_from_parent(self, name):
return self.get_hash_from_name(f'p1({name})')
[docs]
def get_name_from_hash(self, commit):
# XXX: implement
return None
[docs]
def get_date_from_name(self, name):
return self.get_date(name)
[docs]
def get_branch_commits(self, branch):
if self._repo.version >= (4, 5):
query = "branch({0})"
else:
query = "ancestors({0})"
return self.get_hashes_from_range(
query.format(self.get_branch_name(branch)), followfirst=True
)
[docs]
def get_revisions(self, commits):
revisions = {}
for i, item in enumerate(self._repo.log(b"all()")):
node = self._decode(item.node)
if node in commits:
revisions[node] = i
return revisions