20
20
from bzrlib.lazy_import import lazy_import
21
21
lazy_import(globals(), """
22
from copy import deepcopy
23
from unittest import TestSuite
24
22
from warnings import warn
122
120
def open_downlevel(base):
123
121
"""Open a branch which may be of an old format."""
124
122
return Branch.open(base, _unsupported=True)
127
125
def open(base, _unsupported=False):
128
126
"""Open the branch rooted at base.
134
132
return control.open_branch(_unsupported)
137
def open_containing(url):
135
def open_from_transport(transport, _unsupported=False):
136
"""Open the branch rooted at transport"""
137
control = bzrdir.BzrDir.open_from_transport(transport, _unsupported)
138
return control.open_branch(_unsupported)
141
def open_containing(url, possible_transports=None):
138
142
"""Open an existing branch which contains url.
140
144
This probes for a branch at url, and searches upwards from there.
145
149
format, UnknownFormatError or UnsupportedFormatError are raised.
146
150
If there is one, it is returned, along with the unused portion of url.
148
control, relpath = bzrdir.BzrDir.open_containing(url)
152
control, relpath = bzrdir.BzrDir.open_containing(url,
149
154
return control.open_branch(), relpath
174
179
return self.get_config().get_nickname()
176
181
def _set_nick(self, nick):
177
self.get_config().set_user_option('nickname', nick)
182
self.get_config().set_user_option('nickname', nick, warn_masked=True)
179
184
nick = property(_get_nick, _set_nick)
508
513
def revision_id_to_revno(self, revision_id):
509
514
"""Given a revision id, return its revno"""
510
if revision_id is None:
515
if _mod_revision.is_null(revision_id):
512
517
revision_id = osutils.safe_revision_id(revision_id)
513
518
history = self.revision_history()
587
592
elif make_relative:
588
593
url = urlutils.relative_url(self.base, url)
589
config.set_user_option(name, url)
594
config.set_user_option(name, url, warn_masked=True)
591
596
def _get_config_location(self, name, config=None):
592
597
if config is None:
612
617
pattern is that the user can override it by specifying a
615
self.get_config().set_user_option('submit_branch', location)
620
self.get_config().set_user_option('submit_branch', location,
617
623
def get_public_branch(self):
618
624
"""Return the public location of the branch.
702
708
:param revision_id: The revision-id to truncate history at. May
703
709
be None to copy complete history.
711
if revision_id == _mod_revision.NULL_REVISION:
705
713
new_history = self.revision_history()
706
if revision_id is not None:
714
if revision_id is not None and new_history != []:
707
715
revision_id = osutils.safe_revision_id(revision_id)
709
717
new_history = new_history[:new_history.index(revision_id) + 1]
1399
1413
other_branch=None):
1400
1414
# stop_revision must be a descendant of last_revision
1401
1415
stop_graph = self.repository.get_revision_graph(revision_id)
1402
if last_rev is not None and last_rev not in stop_graph:
1416
if (last_rev is not None and last_rev != _mod_revision.NULL_REVISION
1417
and last_rev not in stop_graph):
1403
1418
# our previous tip is not merged into stop_revision
1404
1419
raise errors.DivergedBranches(self, other_branch)
1405
1420
# make a new revision history from the graph
1444
1459
stop_revision = osutils.safe_revision_id(stop_revision)
1445
1460
# whats the current last revision, before we fetch [and change it
1447
last_rev = self.last_revision()
1462
last_rev = _mod_revision.ensure_null(self.last_revision())
1448
1463
# we fetch here regardless of whether we need to so that we pickup
1449
1464
# filled in ghosts.
1450
1465
self.fetch(other, stop_revision)
1451
my_ancestry = self.repository.get_ancestry(last_rev)
1466
my_ancestry = self.repository.get_ancestry(last_rev,
1452
1468
if stop_revision in my_ancestry:
1453
1469
# last_revision is a descendant of stop_revision
1773
1789
# last_rev is not in the other_last_rev history, AND
1774
1790
# other_last_rev is not in our history, and do it without pulling
1775
1791
# history around
1776
last_rev = self.last_revision()
1777
if last_rev is not None:
1792
last_rev = _mod_revision.ensure_null(self.last_revision())
1793
if last_rev != _mod_revision.NULL_REVISION:
1778
1794
other.lock_read()
1780
1796
other_last_rev = other.last_revision()
1781
if other_last_rev is not None:
1797
if not _mod_revision.is_null(other_last_rev):
1782
1798
# neither branch is new, we have to do some work to
1783
1799
# ascertain diversion.
1784
1800
remote_graph = other.repository.get_revision_graph(
1807
1823
if master is not None:
1808
1824
old_tip = self.last_revision()
1809
1825
self.pull(master, overwrite=True)
1810
if old_tip in self.repository.get_ancestry(self.last_revision()):
1826
if old_tip in self.repository.get_ancestry(
1827
_mod_revision.ensure_null(self.last_revision()),
1954
1972
self._clear_cached_state()
1956
1974
def _check_history_violation(self, revision_id):
1957
last_revision = self.last_revision()
1958
if last_revision is None:
1975
last_revision = _mod_revision.ensure_null(self.last_revision())
1976
if _mod_revision.is_null(last_revision):
1960
1978
if last_revision not in self._lefthand_history(revision_id):
1961
1979
raise errors.AppendRevisionsOnlyViolation(self.base)
2025
2043
if config.get_user_option('bound') != 'True':
2028
config.set_user_option('bound', 'False')
2046
config.set_user_option('bound', 'False', warn_masked=True)
2031
2049
self._set_config_location('bound_location', location,
2033
config.set_user_option('bound', 'True')
2051
config.set_user_option('bound', 'True', warn_masked=True)
2036
2054
def _get_bound_location(self, bound):
2058
2076
value = 'False'
2059
self.get_config().set_user_option('append_revisions_only', value)
2077
self.get_config().set_user_option('append_revisions_only', value,
2061
2080
def _get_append_revisions_only(self):
2062
2081
value = self.get_config().get_user_option('append_revisions_only')
2077
2096
if revision_id is None:
2078
2097
revno, revision_id = self.last_revision_info()
2080
revno = self.revision_id_to_revno(revision_id)
2099
# To figure out the revno for a random revision, we need to build
2100
# the revision history, and count its length.
2101
# We don't care about the order, just how long it is.
2102
# Alternatively, we could start at the current location, and count
2103
# backwards. But there is no guarantee that we will find it since
2104
# it may be a merged revision.
2105
revno = len(list(self.repository.iter_reverse_revision_history(
2081
2107
destination.set_last_revision_info(revno, revision_id)
2083
2109
def _make_tags(self):
2084
2110
return BasicTags(self)
2087
class BranchTestProviderAdapter(object):
2088
"""A tool to generate a suite testing multiple branch formats at once.
2090
This is done by copying the test once for each transport and injecting
2091
the transport_server, transport_readonly_server, and branch_format
2092
classes into each copy. Each copy is also given a new id() to make it
2096
def __init__(self, transport_server, transport_readonly_server, formats,
2097
vfs_transport_factory=None):
2098
self._transport_server = transport_server
2099
self._transport_readonly_server = transport_readonly_server
2100
self._formats = formats
2102
def adapt(self, test):
2103
result = TestSuite()
2104
for branch_format, bzrdir_format in self._formats:
2105
new_test = deepcopy(test)
2106
new_test.transport_server = self._transport_server
2107
new_test.transport_readonly_server = self._transport_readonly_server
2108
new_test.bzrdir_format = bzrdir_format
2109
new_test.branch_format = branch_format
2110
def make_new_test_id():
2111
# the format can be either a class or an instance
2112
name = getattr(branch_format, '__name__',
2113
branch_format.__class__.__name__)
2114
new_id = "%s(%s)" % (new_test.id(), name)
2115
return lambda: new_id
2116
new_test.id = make_new_test_id()
2117
result.addTest(new_test)
2121
2113
######################################################################
2122
2114
# results of operations