Source code for asv.plugins.mercurial

# Licensed under a 3-clause BSD style license - see LICENSE.rst

"""
Supports mercurial repositories for the benchmarked project.
"""

import os
import re
import sys

try:
    import hglib
except ImportError:
[docs] hglib = None
from .. import util from ..console import log from ..repo import NoSuchNameError, Repo
[docs] class Hg(Repo):
[docs] dvcs = "hg"
[docs] _default_branch = "default"
[docs] _encoding = "utf-8"
def __init__(self, url, mirror_path): # TODO: shared repositories in Mercurial are only possible # through an extension, and it's not clear how to use those in # this context. So here, we always make full clones for # each of the environments.
[docs] self._repo = None # Initialize
[docs] self._path = os.path.abspath(mirror_path)
[docs] self._pulled = False
if hglib is None: raise ImportError("hglib") if self.is_local_repo(url): # Local repository, no need for mirror self._path = os.path.abspath(url) self._pulled = True elif not self.is_local_repo(self._path): if os.path.exists(self._path): self._raise_bad_mirror_error(self._path) # Clone is missing log.info("Cloning project") url = url.removeprefix("hg+") # Mercurial branches are global, so there is no need for # an analog of git --mirror hglib.clone( self._encode_filename(url), dest=self._encode_filename(self._path), noupdate=True ) self._repo = hglib.open(self._encode_filename(self._path))
[docs] def __del__(self): if self._repo is not None: self._repo.close() self._repo = None
[docs] def _decode(self, item): return item.decode(self._encoding)
[docs] def _encode(self, item): return item.encode(self._encoding)
[docs] def _encode_filename(self, filename): return filename.encode(sys.getfilesystemencoding())
@classmethod
[docs] def is_local_repo(cls, path): return os.path.isdir(path) and os.path.isdir(os.path.join(path, '.hg'))
@classmethod
[docs] def url_match(cls, url): regexes = [r'^hg\+https?://.*$', r'^https?://.*?\.hg$', r'^ssh://hg@.*$'] for regex in regexes: if re.match(regex, url): return True # Check for a local path if cls.is_local_repo(url): return True return False
[docs] def get_range_spec(self, commit_a, commit_b): return f'{commit_a}::{commit_b} and not {commit_a}'
[docs] def get_new_range_spec(self, latest_result, branch=None): return f'{latest_result}::{self.get_branch_name(branch)}'
[docs] def pull(self): # We assume the remote isn't updated during the run of asv # itself. if self._pulled: return log.info("Fetching recent changes") self._repo.pull() self._pulled = True
[docs] def checkout(self, path, commit_hash): # Need to pull -- the copy is not updated automatically, since # the repository data is not shared def checkout_existing(): with hglib.open(self._encode_filename(path)) as subrepo: subrepo.pull() subrepo.update(self._encode(commit_hash), clean=True) subrepo.rawcommand([b"--config", b"extensions.purge=", b"purge", b"--all"]) if os.path.isdir(path): try: checkout_existing() except (hglib.error.CommandError, hglib.error.ServerError): # Remove and re-clone util.long_path_rmtree(path) if not os.path.isdir(path): hglib.clone(self._encode_filename(self._path), dest=self._encode_filename(path)) checkout_existing()
[docs] def get_date(self, hash): # TODO: This works on Linux, but should be extended for other platforms rev = self._repo.log(self._encode(hash))[0] return int(rev.date.strftime("%s")) * 1000
[docs] def get_hashes_from_range(self, range_spec, **kwargs): range_spec = self._encode(f"sort({range_spec}, -rev)") return [self._decode(rev.node) for rev in self._repo.log(range_spec, **kwargs)]
[docs] def get_hash_from_name(self, name): if name is None: name = self.get_branch_name() try: return self._decode(self._repo.log(self._encode(name))[0].node) except hglib.error.CommandError as err: if b'unknown revision' in err.err: raise NoSuchNameError(name) raise
[docs] def get_hash_from_parent(self, name): return self.get_hash_from_name(f'p1({name})')
[docs] def get_name_from_hash(self, commit): # XXX: implement return None
[docs] def get_tags(self): tags = {} for item in self._repo.log(b"tag()"): tags[self._decode(item.tags)] = self._decode(item.node) return tags
[docs] def get_date_from_name(self, name): return self.get_date(name)
[docs] def get_branch_commits(self, branch): if self._repo.version >= (4, 5): query = "branch({0})" else: query = "ancestors({0})" return self.get_hashes_from_range( query.format(self.get_branch_name(branch)), followfirst=True )
[docs] def get_revisions(self, commits): revisions = {} for i, item in enumerate(self._repo.log(b"all()")): node = self._decode(item.node) if node in commits: revisions[node] = i return revisions