20
20
from bzrlib.lazy_import import lazy_import
21
21
lazy_import(globals(), """
22
from copy import deepcopy
23
from unittest import TestSuite
22
24
from warnings import warn
120
122
def open_downlevel(base):
121
123
"""Open a branch which may be of an old format."""
122
124
return Branch.open(base, _unsupported=True)
125
127
def open(base, _unsupported=False):
126
128
"""Open the branch rooted at base.
132
134
return control.open_branch(_unsupported)
135
def open_from_transport(transport, _unsupported=False):
136
"""Open the branch rooted at transport"""
137
control = bzrdir.BzrDir.open_from_transport(transport, _unsupported)
138
return control.open_branch(_unsupported)
141
def open_containing(url, possible_transports=None):
137
def open_containing(url):
142
138
"""Open an existing branch which contains url.
144
140
This probes for a branch at url, and searches upwards from there.
149
145
format, UnknownFormatError or UnsupportedFormatError are raised.
150
146
If there is one, it is returned, along with the unused portion of url.
152
control, relpath = bzrdir.BzrDir.open_containing(url,
148
control, relpath = bzrdir.BzrDir.open_containing(url)
154
149
return control.open_branch(), relpath
179
174
return self.get_config().get_nickname()
181
176
def _set_nick(self, nick):
182
self.get_config().set_user_option('nickname', nick, warn_masked=True)
177
self.get_config().set_user_option('nickname', nick)
184
179
nick = property(_get_nick, _set_nick)
513
508
def revision_id_to_revno(self, revision_id):
514
509
"""Given a revision id, return its revno"""
515
if _mod_revision.is_null(revision_id):
510
if revision_id is None:
517
512
revision_id = osutils.safe_revision_id(revision_id)
518
513
history = self.revision_history()
592
587
elif make_relative:
593
588
url = urlutils.relative_url(self.base, url)
594
config.set_user_option(name, url, warn_masked=True)
589
config.set_user_option(name, url)
596
591
def _get_config_location(self, name, config=None):
597
592
if config is None:
617
612
pattern is that the user can override it by specifying a
620
self.get_config().set_user_option('submit_branch', location,
615
self.get_config().set_user_option('submit_branch', location)
623
617
def get_public_branch(self):
624
618
"""Return the public location of the branch.
708
702
:param revision_id: The revision-id to truncate history at. May
709
703
be None to copy complete history.
711
if revision_id == _mod_revision.NULL_REVISION:
713
705
new_history = self.revision_history()
714
if revision_id is not None and new_history != []:
706
if revision_id is not None:
715
707
revision_id = osutils.safe_revision_id(revision_id)
717
709
new_history = new_history[:new_history.index(revision_id) + 1]
1413
1399
other_branch=None):
1414
1400
# stop_revision must be a descendant of last_revision
1415
1401
stop_graph = self.repository.get_revision_graph(revision_id)
1416
if (last_rev is not None and last_rev != _mod_revision.NULL_REVISION
1417
and last_rev not in stop_graph):
1402
if last_rev is not None and last_rev not in stop_graph:
1418
1403
# our previous tip is not merged into stop_revision
1419
1404
raise errors.DivergedBranches(self, other_branch)
1420
1405
# make a new revision history from the graph
1459
1444
stop_revision = osutils.safe_revision_id(stop_revision)
1460
1445
# whats the current last revision, before we fetch [and change it
1462
last_rev = _mod_revision.ensure_null(self.last_revision())
1447
last_rev = self.last_revision()
1463
1448
# we fetch here regardless of whether we need to so that we pickup
1464
1449
# filled in ghosts.
1465
1450
self.fetch(other, stop_revision)
1466
my_ancestry = self.repository.get_ancestry(last_rev,
1451
my_ancestry = self.repository.get_ancestry(last_rev)
1468
1452
if stop_revision in my_ancestry:
1469
1453
# last_revision is a descendant of stop_revision
1789
1773
# last_rev is not in the other_last_rev history, AND
1790
1774
# other_last_rev is not in our history, and do it without pulling
1791
1775
# history around
1792
last_rev = _mod_revision.ensure_null(self.last_revision())
1793
if last_rev != _mod_revision.NULL_REVISION:
1776
last_rev = self.last_revision()
1777
if last_rev is not None:
1794
1778
other.lock_read()
1796
1780
other_last_rev = other.last_revision()
1797
if not _mod_revision.is_null(other_last_rev):
1781
if other_last_rev is not None:
1798
1782
# neither branch is new, we have to do some work to
1799
1783
# ascertain diversion.
1800
1784
remote_graph = other.repository.get_revision_graph(
1823
1807
if master is not None:
1824
1808
old_tip = self.last_revision()
1825
1809
self.pull(master, overwrite=True)
1826
if old_tip in self.repository.get_ancestry(
1827
_mod_revision.ensure_null(self.last_revision()),
1810
if old_tip in self.repository.get_ancestry(self.last_revision()):
1972
1954
self._clear_cached_state()
1974
1956
def _check_history_violation(self, revision_id):
1975
last_revision = _mod_revision.ensure_null(self.last_revision())
1976
if _mod_revision.is_null(last_revision):
1957
last_revision = self.last_revision()
1958
if last_revision is None:
1978
1960
if last_revision not in self._lefthand_history(revision_id):
1979
1961
raise errors.AppendRevisionsOnlyViolation(self.base)
2043
2025
if config.get_user_option('bound') != 'True':
2046
config.set_user_option('bound', 'False', warn_masked=True)
2028
config.set_user_option('bound', 'False')
2049
2031
self._set_config_location('bound_location', location,
2051
config.set_user_option('bound', 'True', warn_masked=True)
2033
config.set_user_option('bound', 'True')
2054
2036
def _get_bound_location(self, bound):
2076
2058
value = 'False'
2077
self.get_config().set_user_option('append_revisions_only', value,
2059
self.get_config().set_user_option('append_revisions_only', value)
2080
2061
def _get_append_revisions_only(self):
2081
2062
value = self.get_config().get_user_option('append_revisions_only')
2096
2077
if revision_id is None:
2097
2078
revno, revision_id = self.last_revision_info()
2099
# To figure out the revno for a random revision, we need to build
2100
# the revision history, and count its length.
2101
# We don't care about the order, just how long it is.
2102
# Alternatively, we could start at the current location, and count
2103
# backwards. But there is no guarantee that we will find it since
2104
# it may be a merged revision.
2105
revno = len(list(self.repository.iter_reverse_revision_history(
2080
revno = self.revision_id_to_revno(revision_id)
2107
2081
destination.set_last_revision_info(revno, revision_id)
2109
2083
def _make_tags(self):
2110
2084
return BasicTags(self)
2087
class BranchTestProviderAdapter(object):
2088
"""A tool to generate a suite testing multiple branch formats at once.
2090
This is done by copying the test once for each transport and injecting
2091
the transport_server, transport_readonly_server, and branch_format
2092
classes into each copy. Each copy is also given a new id() to make it
2096
def __init__(self, transport_server, transport_readonly_server, formats,
2097
vfs_transport_factory=None):
2098
self._transport_server = transport_server
2099
self._transport_readonly_server = transport_readonly_server
2100
self._formats = formats
2102
def adapt(self, test):
2103
result = TestSuite()
2104
for branch_format, bzrdir_format in self._formats:
2105
new_test = deepcopy(test)
2106
new_test.transport_server = self._transport_server
2107
new_test.transport_readonly_server = self._transport_readonly_server
2108
new_test.bzrdir_format = bzrdir_format
2109
new_test.branch_format = branch_format
2110
def make_new_test_id():
2111
# the format can be either a class or an instance
2112
name = getattr(branch_format, '__name__',
2113
branch_format.__class__.__name__)
2114
new_id = "%s(%s)" % (new_test.id(), name)
2115
return lambda: new_id
2116
new_test.id = make_new_test_id()
2117
result.addTest(new_test)
2113
2121
######################################################################
2114
2122
# results of operations