1
# Copyright (C) 2005-2011 Canonical Ltd
1
# (C) 2005 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
11
# GNU General Public License for more details.
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for branch implementations - tests a branch format."""
20
branch as _mod_branch,
35
from bzrlib.tests import (
38
from bzrlib.tests.http_server import HttpServer
39
from bzrlib.transport import memory
42
class TestTestCaseWithBranch(per_branch.TestCaseWithBranch):
44
def test_branch_format_matches_bzrdir_branch_format(self):
45
bzrdir_branch_format = self.bzrdir_format.get_branch_format()
46
self.assertIs(self.branch_format.__class__,
47
bzrdir_branch_format.__class__)
49
def test_make_branch_gets_expected_format(self):
50
branch = self.make_branch('.')
51
self.assertIs(self.branch_format.__class__,
52
branch._format.__class__)
55
class TestBranch(per_branch.TestCaseWithBranch):
57
def test_create_tree_with_merge(self):
58
tree = self.create_tree_with_merge()
60
self.addCleanup(tree.unlock)
61
graph = tree.branch.repository.get_graph()
62
ancestry_graph = graph.get_parent_map(
63
tree.branch.repository.all_revision_ids())
64
self.assertEqual({'rev-1':('null:',),
66
'rev-1.1.1':('rev-1', ),
67
'rev-3':('rev-2', 'rev-1.1.1', ),
70
def test_revision_ids_are_utf8(self):
71
wt = self.make_branch_and_tree('tree')
72
wt.commit('f', rev_id='rev1')
73
wt.commit('f', rev_id='rev2')
74
wt.commit('f', rev_id='rev3')
76
br = self.get_branch()
78
br.generate_revision_history('rev3')
79
rh = br.revision_history()
80
self.assertEqual(['rev1', 'rev2', 'rev3'], rh)
81
for revision_id in rh:
82
self.assertIsInstance(revision_id, str)
83
last = br.last_revision()
84
self.assertEqual('rev3', last)
85
self.assertIsInstance(last, str)
86
revno, last = br.last_revision_info()
87
self.assertEqual(3, revno)
88
self.assertEqual('rev3', last)
89
self.assertIsInstance(last, str)
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
from bzrlib.branch import Branch
19
from bzrlib.clone import copy_branch
20
from bzrlib.commit import commit
21
import bzrlib.errors as errors
22
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
23
from bzrlib.selftest import TestCaseInTempDir
24
from bzrlib.trace import mutter
25
import bzrlib.transactions as transactions
26
from bzrlib.selftest.HTTPTestUtil import TestCaseWithWebserver
28
class TestBranch(TestCaseInTempDir):
30
def test_append_revisions(self):
31
"""Test appending more than one revision"""
32
br = Branch.initialize(".")
33
br.append_revision("rev1")
34
self.assertEquals(br.revision_history(), ["rev1",])
35
br.append_revision("rev2", "rev3")
36
self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
91
38
def test_fetch_revisions(self):
92
39
"""Test fetch-revision operation."""
93
wt = self.make_branch_and_tree('b1')
95
self.build_tree_contents([('b1/foo', 'hello')])
96
wt.add(['foo'], ['foo-id'])
97
wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
99
b2 = self.make_branch('b2')
102
rev = b2.repository.get_revision('revision-1')
103
tree = b2.repository.revision_tree('revision-1')
105
self.addCleanup(tree.unlock)
106
self.assertEqual(tree.get_file_text('foo-id'), 'hello')
108
def test_get_revision_delta(self):
109
tree_a = self.make_branch_and_tree('a')
110
self.build_tree(['a/foo'])
111
tree_a.add('foo', 'file1')
112
tree_a.commit('rev1', rev_id='rev1')
113
self.build_tree(['a/vla'])
114
tree_a.add('vla', 'file2')
115
tree_a.commit('rev2', rev_id='rev2')
117
delta = tree_a.branch.get_revision_delta(1)
118
self.assertIsInstance(delta, _mod_delta.TreeDelta)
119
self.assertEqual([('foo', 'file1', 'file')], delta.added)
120
delta = tree_a.branch.get_revision_delta(2)
121
self.assertIsInstance(delta, _mod_delta.TreeDelta)
122
self.assertEqual([('vla', 'file2', 'file')], delta.added)
124
def get_unbalanced_tree_pair(self):
125
"""Return two branches, a and b, with one file in a."""
126
tree_a = self.make_branch_and_tree('a')
127
self.build_tree_contents([('a/b', 'b')])
129
tree_a.commit("silly commit", rev_id='A')
131
tree_b = self.make_branch_and_tree('b')
132
return tree_a, tree_b
134
def get_balanced_branch_pair(self):
135
"""Returns br_a, br_b as with one commit in a, and b has a's stores."""
136
tree_a, tree_b = self.get_unbalanced_tree_pair()
137
tree_b.branch.repository.fetch(tree_a.branch.repository)
138
return tree_a, tree_b
140
def test_clone_partial(self):
40
from bzrlib.fetch import Fetcher
43
b1 = Branch.initialize('b1')
44
b2 = Branch.initialize('b2')
45
file(os.sep.join(['b1', 'foo']), 'w').write('hello')
46
b1.add(['foo'], ['foo-id'])
47
b1.commit('lala!', rev_id='revision-1', allow_pointless=False)
50
f = Fetcher(from_branch=b1, to_branch=b2)
51
eq = self.assertEquals
53
eq(f.last_revision, 'revision-1')
55
rev = b2.get_revision('revision-1')
56
tree = b2.revision_tree('revision-1')
57
eq(tree.get_file_text('foo-id'), 'hello')
59
def test_push_stores(self):
60
"""Copy the stores from one branch to another"""
62
br_a = Branch.initialize("a")
63
file('a/b', 'wb').write('b')
65
commit(br_a, "silly commit")
68
br_b = Branch.initialize("b")
69
self.assertRaises(NoSuchRevision, br_b.get_revision,
70
br_a.revision_history()[0])
71
br_a.push_stores(br_b)
72
rev = br_b.get_revision(br_a.revision_history()[0])
73
tree = br_b.revision_tree(br_a.revision_history()[0])
75
if tree.inventory[file_id].kind == "file":
76
tree.get_file(file_id).read()
79
def test_copy_branch(self):
80
"""Copy the stores from one branch to another"""
81
br_a, br_b = self.test_push_stores()
82
commit(br_b, "silly commit")
84
br_c = copy_branch(br_a, 'c', basis_branch=br_b)
85
self.assertEqual(br_a.revision_history(), br_c.revision_history())
86
## # basis branches currently disabled for weave format
87
## self.assertFalse(br_b.last_revision() in br_c.revision_history())
88
## br_c.get_revision(br_b.last_revision())
90
def test_copy_partial(self):
141
91
"""Copy only part of the history of a branch."""
142
# TODO: RBC 20060208 test with a revision not on revision-history.
143
# what should that behaviour be ? Emailed the list.
144
# First, make a branch with two commits.
145
wt_a = self.make_branch_and_tree('a')
146
self.build_tree(['a/one'])
148
wt_a.commit('commit one', rev_id='1')
92
self.build_tree(['a/', 'a/one'])
93
br_a = Branch.initialize('a')
95
br_a.commit('commit one', rev_id='u@d-1')
149
96
self.build_tree(['a/two'])
151
wt_a.commit('commit two', rev_id='2')
152
# Now make a copy of the repository.
153
repo_b = self.make_repository('b')
154
wt_a.branch.repository.copy_content_into(repo_b)
155
# wt_a might be a lightweight checkout, so get a hold of the actual
156
# branch (because you can't do a partial clone of a lightweight
158
branch = wt_a.branch.bzrdir.open_branch()
159
# Then make a branch where the new repository is, but specify a revision
160
# ID. The new branch's history will stop at the specified revision.
161
br_b = branch.clone(repo_b.bzrdir, revision_id='1')
162
self.assertEqual('1', br_b.last_revision())
164
def get_parented_branch(self):
165
wt_a = self.make_branch_and_tree('a')
166
self.build_tree(['a/one'])
168
wt_a.commit('commit one', rev_id='1')
170
branch_b = wt_a.branch.bzrdir.sprout('b', revision_id='1').open_branch()
171
self.assertEqual(wt_a.branch.base, branch_b.get_parent())
174
def test_clone_branch_nickname(self):
175
# test the nick name is preserved always
176
raise tests.TestSkipped('XXX branch cloning is not yet tested.')
178
def test_clone_branch_parent(self):
179
# test the parent is preserved always
180
branch_b = self.get_parented_branch()
181
repo_c = self.make_repository('c')
182
branch_b.repository.copy_content_into(repo_c)
183
branch_c = branch_b.clone(repo_c.bzrdir)
184
self.assertNotEqual(None, branch_c.get_parent())
185
self.assertEqual(branch_b.get_parent(), branch_c.get_parent())
187
# We can also set a specific parent, and it should be honored
188
random_parent = 'http://example.com/path/to/branch'
189
branch_b.set_parent(random_parent)
190
repo_d = self.make_repository('d')
191
branch_b.repository.copy_content_into(repo_d)
192
branch_d = branch_b.clone(repo_d.bzrdir)
193
self.assertEqual(random_parent, branch_d.get_parent())
195
def test_submit_branch(self):
196
"""Submit location can be queried and set"""
197
branch = self.make_branch('branch')
198
self.assertEqual(branch.get_submit_branch(), None)
199
branch.set_submit_branch('sftp://example.com')
200
self.assertEqual(branch.get_submit_branch(), 'sftp://example.com')
201
branch.set_submit_branch('sftp://example.net')
202
self.assertEqual(branch.get_submit_branch(), 'sftp://example.net')
204
def test_public_branch(self):
205
"""public location can be queried and set"""
206
branch = self.make_branch('branch')
207
self.assertEqual(branch.get_public_branch(), None)
208
branch.set_public_branch('sftp://example.com')
209
self.assertEqual(branch.get_public_branch(), 'sftp://example.com')
210
branch.set_public_branch('sftp://example.net')
211
self.assertEqual(branch.get_public_branch(), 'sftp://example.net')
212
branch.set_public_branch(None)
213
self.assertEqual(branch.get_public_branch(), None)
215
def test_record_initial_ghost(self):
216
"""Branches should support having ghosts."""
217
wt = self.make_branch_and_tree('.')
218
wt.set_parent_ids(['non:existent@rev--ision--0--2'],
219
allow_leftmost_as_ghost=True)
220
self.assertEqual(['non:existent@rev--ision--0--2'],
222
rev_id = wt.commit('commit against a ghost first parent.')
223
rev = wt.branch.repository.get_revision(rev_id)
224
self.assertEqual(rev.parent_ids, ['non:existent@rev--ision--0--2'])
98
br_a.commit('commit two', rev_id='u@d-2')
99
br_b = copy_branch(br_a, 'b', revision='u@d-1')
100
self.assertEqual(br_b.last_revision(), 'u@d-1')
101
self.assertTrue(os.path.exists('b/one'))
102
self.assertFalse(os.path.exists('b/two'))
105
def test_record_initial_ghost_merge(self):
106
"""A pending merge with no revision present is still a merge."""
107
branch = Branch.initialize('.')
108
branch.add_pending_merge('non:existent@rev--ision--0--2')
109
branch.commit('pretend to merge nonexistent-revision', rev_id='first')
110
rev = branch.get_revision(branch.last_revision())
111
self.assertEqual(len(rev.parent_ids), 1)
225
112
# parent_sha1s is not populated now, WTF. rbc 20051003
226
113
self.assertEqual(len(rev.parent_sha1s), 0)
228
def test_record_two_ghosts(self):
229
"""Recording with all ghosts works."""
230
wt = self.make_branch_and_tree('.')
232
'foo@azkhazan-123123-abcabc',
233
'wibble@fofof--20050401--1928390812',
235
allow_leftmost_as_ghost=True)
236
rev_id = wt.commit("commit from ghost base with one merge")
237
# the revision should have been committed with two parents
238
rev = wt.branch.repository.get_revision(rev_id)
239
self.assertEqual(['foo@azkhazan-123123-abcabc',
240
'wibble@fofof--20050401--1928390812'],
243
def test_bad_revision(self):
244
self.assertRaises(errors.InvalidRevisionId,
245
self.get_branch().repository.get_revision,
114
self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
248
116
# TODO 20051003 RBC:
249
# compare the gpg-to-sign info for a commit with a ghost and
117
# compare the gpg-to-sign info for a commit with a ghost and
250
118
# an identical tree without a ghost
251
119
# fetch missing should rewrite the TOC of weaves to list newly available parents.
121
def test_pending_merges(self):
122
"""Tracking pending-merged revisions."""
123
b = Branch.initialize('.')
125
self.assertEquals(b.pending_merges(), [])
126
b.add_pending_merge('foo@azkhazan-123123-abcabc')
127
self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
128
b.add_pending_merge('foo@azkhazan-123123-abcabc')
129
self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
130
b.add_pending_merge('wibble@fofof--20050401--1928390812')
131
self.assertEquals(b.pending_merges(),
132
['foo@azkhazan-123123-abcabc',
133
'wibble@fofof--20050401--1928390812'])
134
b.commit("commit from base with two merges")
135
rev = b.get_revision(b.revision_history()[0])
136
self.assertEquals(len(rev.parent_ids), 2)
137
self.assertEquals(rev.parent_ids[0],
138
'foo@azkhazan-123123-abcabc')
139
self.assertEquals(rev.parent_ids[1],
140
'wibble@fofof--20050401--1928390812')
141
# list should be cleared when we do a commit
142
self.assertEquals(b.pending_merges(), [])
253
144
def test_sign_existing_revision(self):
254
wt = self.make_branch_and_tree('.')
256
wt.commit("base", allow_pointless=True, rev_id='A')
146
branch = Branch.initialize('.')
147
branch.commit("base", allow_pointless=True, rev_id='A')
257
148
from bzrlib.testament import Testament
258
strategy = gpg.LoopbackGPGStrategy(None)
259
branch.repository.lock_write()
260
branch.repository.start_write_group()
261
branch.repository.sign_revision('A', strategy)
262
branch.repository.commit_write_group()
263
branch.repository.unlock()
264
self.assertEqual('-----BEGIN PSEUDO-SIGNED CONTENT-----\n' +
265
Testament.from_revision(branch.repository,
266
'A').as_short_text() +
267
'-----END PSEUDO-SIGNED CONTENT-----\n',
268
branch.repository.get_signature_text('A'))
270
def test_store_signature(self):
271
wt = self.make_branch_and_tree('.')
275
branch.repository.start_write_group()
277
branch.repository.store_revision_signature(
278
gpg.LoopbackGPGStrategy(None), 'FOO', 'A')
280
branch.repository.abort_write_group()
283
branch.repository.commit_write_group()
286
# A signature without a revision should not be accessible.
287
self.assertRaises(errors.NoSuchRevision,
288
branch.repository.has_signature_for_revision_id,
290
wt.commit("base", allow_pointless=True, rev_id='A')
291
self.assertEqual('-----BEGIN PSEUDO-SIGNED CONTENT-----\n'
292
'FOO-----END PSEUDO-SIGNED CONTENT-----\n',
293
branch.repository.get_signature_text('A'))
295
def test_branch_keeps_signatures(self):
296
wt = self.make_branch_and_tree('source')
297
wt.commit('A', allow_pointless=True, rev_id='A')
298
repo = wt.branch.repository
300
repo.start_write_group()
301
repo.sign_revision('A', gpg.LoopbackGPGStrategy(None))
302
repo.commit_write_group()
304
#FIXME: clone should work to urls,
305
# wt.clone should work to disks.
306
self.build_tree(['target/'])
307
d2 = repo.bzrdir.clone(urlutils.local_path_to_url('target'))
308
self.assertEqual(repo.get_signature_text('A'),
309
d2.open_repository().get_signature_text('A'))
311
def test_nicks(self):
312
"""Test explicit and implicit branch nicknames.
314
Nicknames are implicitly the name of the branch's directory, unless an
315
explicit nickname is set. That is, an explicit nickname always
316
overrides the implicit one.
318
t = self.get_transport()
319
branch = self.make_branch('bzr.dev')
320
# The nick will be 'bzr.dev', because there is no explicit nick set.
321
self.assertEqual(branch.nick, 'bzr.dev')
322
# Move the branch to a different directory, 'bzr.ab'. Now that branch
323
# will report its nick as 'bzr.ab'.
324
t.move('bzr.dev', 'bzr.ab')
325
branch = _mod_branch.Branch.open(self.get_url('bzr.ab'))
326
self.assertEqual(branch.nick, 'bzr.ab')
327
# Set the branch nick explicitly. This will ensure there's a branch
328
# config file in the branch.
329
branch.nick = "Aaron's branch"
330
if not isinstance(branch, remote.RemoteBranch):
331
self.assertTrue(branch._transport.has("branch.conf"))
332
# Because the nick has been set explicitly, the nick is now always
333
# "Aaron's branch", regardless of directory name.
334
self.assertEqual(branch.nick, "Aaron's branch")
335
t.move('bzr.ab', 'integration')
336
branch = _mod_branch.Branch.open(self.get_url('integration'))
337
self.assertEqual(branch.nick, "Aaron's branch")
338
branch.nick = u"\u1234"
339
self.assertEqual(branch.nick, u"\u1234")
341
def test_commit_nicks(self):
342
"""Nicknames are committed to the revision"""
343
wt = self.make_branch_and_tree('bzr.dev')
345
branch.nick = "My happy branch"
346
wt.commit('My commit respect da nick.')
347
committed = branch.repository.get_revision(branch.last_revision())
348
self.assertEqual(committed.properties["branch-nick"],
351
def test_create_colocated(self):
353
repo = self.make_repository('.', shared=True)
354
except errors.IncompatibleFormat:
356
self.assertEquals(0, len(repo.bzrdir.list_branches()))
358
child_branch1 = self.branch_format.initialize(repo.bzrdir,
360
except (errors.UninitializableFormat, errors.NoColocatedBranchSupport):
361
# branch references are not default init'able and
362
# not all bzrdirs support colocated branches.
364
self.assertEquals(1, len(repo.bzrdir.list_branches()))
365
self.branch_format.initialize(repo.bzrdir, name='branch2')
366
self.assertEquals(2, len(repo.bzrdir.list_branches()))
368
def test_create_open_branch_uses_repository(self):
370
repo = self.make_repository('.', shared=True)
371
except errors.IncompatibleFormat:
373
child_transport = repo.bzrdir.root_transport.clone('child')
374
child_transport.mkdir('.')
375
child_dir = self.bzrdir_format.initialize_on_transport(child_transport)
377
child_branch = self.branch_format.initialize(child_dir)
378
except errors.UninitializableFormat:
379
# branch references are not default init'able.
381
self.assertEqual(repo.bzrdir.root_transport.base,
382
child_branch.repository.bzrdir.root_transport.base)
383
child_branch = _mod_branch.Branch.open(self.get_url('child'))
384
self.assertEqual(repo.bzrdir.root_transport.base,
385
child_branch.repository.bzrdir.root_transport.base)
387
def test_format_description(self):
388
tree = self.make_branch_and_tree('tree')
389
text = tree.branch._format.get_format_description()
390
self.assertTrue(len(text))
392
def test_get_commit_builder(self):
393
branch = self.make_branch(".")
395
builder = branch.get_commit_builder([])
396
self.assertIsInstance(builder, repository.CommitBuilder)
397
branch.repository.commit_write_group()
400
def test_generate_revision_history(self):
401
"""Create a fake revision history easily."""
402
tree = self.make_branch_and_tree('.')
403
rev1 = tree.commit('foo')
404
orig_history = tree.branch.revision_history()
405
rev2 = tree.commit('bar', allow_pointless=True)
406
tree.branch.generate_revision_history(rev1)
407
self.assertEqual(orig_history, tree.branch.revision_history())
409
def test_generate_revision_history_NULL_REVISION(self):
410
tree = self.make_branch_and_tree('.')
411
rev1 = tree.commit('foo')
412
tree.branch.generate_revision_history(revision.NULL_REVISION)
413
self.assertEqual([], tree.branch.revision_history())
415
def test_create_checkout(self):
416
tree_a = self.make_branch_and_tree('a')
417
branch_a = tree_a.branch
418
checkout_b = branch_a.create_checkout('b')
419
self.assertEqual('null:', checkout_b.last_revision())
420
checkout_b.commit('rev1', rev_id='rev1')
421
self.assertEqual('rev1', branch_a.last_revision())
422
self.assertNotEqual(checkout_b.branch.base, branch_a.base)
424
checkout_c = branch_a.create_checkout('c', lightweight=True)
425
self.assertEqual('rev1', checkout_c.last_revision())
426
checkout_c.commit('rev2', rev_id='rev2')
427
self.assertEqual('rev2', branch_a.last_revision())
428
self.assertEqual(checkout_c.branch.base, branch_a.base)
430
checkout_d = branch_a.create_checkout('d', lightweight=True)
431
self.assertEqual('rev2', checkout_d.last_revision())
432
checkout_e = branch_a.create_checkout('e')
433
self.assertEqual('rev2', checkout_e.last_revision())
435
def test_create_anonymous_lightweight_checkout(self):
436
"""A lightweight checkout from a readonly branch should succeed."""
437
tree_a = self.make_branch_and_tree('a')
438
rev_id = tree_a.commit('put some content in the branch')
439
# open the branch via a readonly transport
440
source_branch = _mod_branch.Branch.open(self.get_readonly_url('a'))
441
# sanity check that the test will be valid
442
self.assertRaises((errors.LockError, errors.TransportNotPossible),
443
source_branch.lock_write)
444
checkout = source_branch.create_checkout('c', lightweight=True)
445
self.assertEqual(rev_id, checkout.last_revision())
447
def test_create_anonymous_heavyweight_checkout(self):
448
"""A regular checkout from a readonly branch should succeed."""
449
tree_a = self.make_branch_and_tree('a')
450
rev_id = tree_a.commit('put some content in the branch')
451
# open the branch via a readonly transport
452
source_branch = _mod_branch.Branch.open(self.get_readonly_url('a'))
453
# sanity check that the test will be valid
454
self.assertRaises((errors.LockError, errors.TransportNotPossible),
455
source_branch.lock_write)
456
checkout = source_branch.create_checkout('c')
457
self.assertEqual(rev_id, checkout.last_revision())
459
def test_set_revision_history(self):
460
tree = self.make_branch_and_tree('a')
461
tree.commit('a commit', rev_id='rev1')
463
self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
464
br.set_revision_history, ["rev1"])
465
self.assertEquals(br.revision_history(), ["rev1"])
466
self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
467
br.set_revision_history, [])
468
self.assertEquals(br.revision_history(), [])
470
def test_heads_to_fetch(self):
471
# heads_to_fetch is a method that returns a collection of revids that
472
# need to be fetched to copy this branch into another repo. At a
473
# minimum this will include the tip.
474
# (In native formats, this is the tip + tags, but other formats may
475
# have other revs needed)
476
tree = self.make_branch_and_tree('a')
477
tree.commit('first commit', rev_id='rev1')
478
tree.commit('second commit', rev_id='rev2')
479
must_fetch, should_fetch = tree.branch.heads_to_fetch()
480
self.assertTrue('rev2' in must_fetch)
482
def test_heads_to_fetch_not_null_revision(self):
483
# NULL_REVISION does not appear in the result of heads_to_fetch, even
484
# for an empty branch.
485
tree = self.make_branch_and_tree('a')
486
must_fetch, should_fetch = tree.branch.heads_to_fetch()
487
self.assertFalse(revision.NULL_REVISION in must_fetch)
488
self.assertFalse(revision.NULL_REVISION in should_fetch)
491
class TestBranchFormat(per_branch.TestCaseWithBranch):
493
def test_branch_format_network_name(self):
494
br = self.make_branch('.')
496
network_name = format.network_name()
497
self.assertIsInstance(network_name, str)
498
# We want to test that the network_name matches the actual format on
499
# disk. For local branches that means that using network_name as a key
500
# in the registry gives back the same format. For remote branches we
501
# check that the network_name of the RemoteBranchFormat we have locally
502
# matches the actual format present on disk.
503
if isinstance(format, remote.RemoteBranchFormat):
505
real_branch = br._real_branch
506
self.assertEqual(real_branch._format.network_name(), network_name)
508
registry = _mod_branch.network_format_registry
509
looked_up_format = registry.get(network_name)
510
self.assertEqual(format.__class__, looked_up_format.__class__)
513
class ChrootedTests(per_branch.TestCaseWithBranch):
514
"""A support class that provides readonly urls outside the local namespace.
516
This is done by checking if self.transport_server is a MemoryServer. if it
517
is then we are chrooted already, if it is not then an HttpServer is used
522
super(ChrootedTests, self).setUp()
523
if not self.vfs_transport_factory == memory.MemoryServer:
524
self.transport_readonly_server = HttpServer
149
branch.sign_revision('A', bzrlib.gpg.LoopbackGPGStrategy(None))
150
self.assertEqual(Testament.from_revision(branch, 'A').as_short_text(),
151
branch.revision_store.get('A', 'sig').read())
154
class TestRemote(TestCaseWithWebserver):
526
156
def test_open_containing(self):
527
self.assertRaises(errors.NotBranchError,
528
_mod_branch.Branch.open_containing,
529
self.get_readonly_url(''))
530
self.assertRaises(errors.NotBranchError,
531
_mod_branch.Branch.open_containing,
532
self.get_readonly_url('g/p/q'))
533
branch = self.make_branch('.')
534
branch, relpath = _mod_branch.Branch.open_containing(
535
self.get_readonly_url(''))
536
self.assertEqual('', relpath)
537
branch, relpath = _mod_branch.Branch.open_containing(
538
self.get_readonly_url('g/p/q'))
539
self.assertEqual('g/p/q', relpath)
157
self.assertRaises(NotBranchError, Branch.open_containing,
158
self.get_remote_url(''))
159
self.assertRaises(NotBranchError, Branch.open_containing,
160
self.get_remote_url('g/p/q'))
161
b = Branch.initialize('.')
162
Branch.open_containing(self.get_remote_url(''))
163
Branch.open_containing(self.get_remote_url('g/p/q'))
165
# TODO: rewrite this as a regular unittest, without relying on the displayed output
166
# >>> from bzrlib.commit import commit
167
# >>> bzrlib.trace.silent = True
168
# >>> br1 = ScratchBranch(files=['foo', 'bar'])
171
# >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
172
# >>> br2 = ScratchBranch()
173
# >>> br2.update_revisions(br1)
175
# Added 1 inventories.
177
# >>> br2.revision_history()
179
# >>> br2.update_revisions(br1)
181
# >>> br1.text_store.total_size() == br2.text_store.total_size()
542
184
class InstrumentedTransaction(object):
551
class TestDecorator(object):
557
self._calls.append('lr')
559
def lock_write(self):
560
self._calls.append('lw')
563
self._calls.append('ul')
565
@_mod_branch.needs_read_lock
566
def do_with_read(self):
569
@_mod_branch.needs_read_lock
570
def except_with_read(self):
573
@_mod_branch.needs_write_lock
574
def do_with_write(self):
577
@_mod_branch.needs_write_lock
578
def except_with_write(self):
582
class TestDecorators(tests.TestCase):
584
def test_needs_read_lock(self):
585
branch = TestDecorator()
586
self.assertEqual(1, branch.do_with_read())
587
self.assertEqual(['lr', 'ul'], branch._calls)
589
def test_excepts_in_read_lock(self):
590
branch = TestDecorator()
591
self.assertRaises(RuntimeError, branch.except_with_read)
592
self.assertEqual(['lr', 'ul'], branch._calls)
594
def test_needs_write_lock(self):
595
branch = TestDecorator()
596
self.assertEqual(2, branch.do_with_write())
597
self.assertEqual(['lw', 'ul'], branch._calls)
599
def test_excepts_in_write_lock(self):
600
branch = TestDecorator()
601
self.assertRaises(RuntimeError, branch.except_with_write)
602
self.assertEqual(['lw', 'ul'], branch._calls)
605
class TestBranchPushLocations(per_branch.TestCaseWithBranch):
607
def test_get_push_location_unset(self):
608
self.assertEqual(None, self.get_branch().get_push_location())
610
def test_get_push_location_exact(self):
611
b = self.get_branch()
612
config.LocationConfig.from_string(
613
'[%s]\npush_location=foo\n' % (b.base,), b.base, save=True)
614
self.assertEqual("foo", self.get_branch().get_push_location())
616
def test_set_push_location(self):
617
branch = self.get_branch()
618
branch.set_push_location('foo')
619
self.assertEqual('foo', branch.get_push_location())
622
class TestChildSubmitFormats(per_branch.TestCaseWithBranch):
624
def test_get_child_submit_format_default(self):
625
self.assertEqual(None, self.get_branch().get_child_submit_format())
627
def test_get_child_submit_format(self):
628
branch = self.get_branch()
629
branch.get_config().set_user_option('child_submit_format', '10')
630
branch = self.get_branch()
631
self.assertEqual('10', branch.get_child_submit_format())
634
class TestFormat(per_branch.TestCaseWithBranch):
635
"""Tests for the format itself."""
637
def test_get_reference(self):
638
"""get_reference on all regular branches should return None."""
639
if not self.branch_format.is_supported():
640
# unsupported formats are not loopback testable
641
# because the default open will not open them and
642
# they may not be initializable.
644
made_branch = self.make_branch('.')
645
self.assertEqual(None,
646
made_branch._format.get_reference(made_branch.bzrdir))
648
def test_set_reference(self):
649
"""set_reference on all regular branches should be callable."""
650
if not self.branch_format.is_supported():
651
# unsupported formats are not loopback testable
652
# because the default open will not open them and
653
# they may not be initializable.
655
this_branch = self.make_branch('this')
656
other_branch = self.make_branch('other')
658
this_branch._format.set_reference(this_branch.bzrdir, None,
660
except NotImplementedError:
664
ref = this_branch._format.get_reference(this_branch.bzrdir)
665
self.assertEqual(ref, other_branch.base)
667
def test_format_initialize_find_open(self):
668
# loopback test to check the current format initializes to itself.
669
if not self.branch_format.is_supported():
670
# unsupported formats are not loopback testable
671
# because the default open will not open them and
672
# they may not be initializable.
674
# supported formats must be able to init and open
675
t = self.get_transport()
676
readonly_t = transport.get_transport_from_url(self.get_readonly_url())
677
made_branch = self.make_branch('.')
678
self.assertIsInstance(made_branch, _mod_branch.Branch)
680
# find it via bzrdir opening:
681
opened_control = bzrdir.BzrDir.open(readonly_t.base)
682
direct_opened_branch = opened_control.open_branch()
683
self.assertEqual(direct_opened_branch.__class__, made_branch.__class__)
684
self.assertEqual(opened_control, direct_opened_branch.bzrdir)
685
self.assertIsInstance(direct_opened_branch._format,
686
self.branch_format.__class__)
688
# find it via Branch.open
689
opened_branch = _mod_branch.Branch.open(readonly_t.base)
690
self.assertIsInstance(opened_branch, made_branch.__class__)
691
self.assertEqual(made_branch._format.__class__,
692
opened_branch._format.__class__)
693
# if it has a unique id string, can we probe for it ?
695
self.branch_format.get_format_string()
696
except NotImplementedError:
698
self.assertEqual(self.branch_format,
699
opened_control.find_branch_format())
702
class TestBound(per_branch.TestCaseWithBranch):
704
def test_bind_unbind(self):
705
branch = self.make_branch('1')
706
branch2 = self.make_branch('2')
709
except errors.UpgradeRequired:
710
raise tests.TestNotApplicable('Format does not support binding')
711
self.assertTrue(branch.unbind())
712
self.assertFalse(branch.unbind())
713
self.assertIs(None, branch.get_bound_location())
715
def test_old_bound_location(self):
716
branch = self.make_branch('branch1')
718
self.assertIs(None, branch.get_old_bound_location())
719
except errors.UpgradeRequired:
720
raise tests.TestNotApplicable(
721
'Format does not store old bound locations')
722
branch2 = self.make_branch('branch2')
724
self.assertIs(None, branch.get_old_bound_location())
726
self.assertContainsRe(branch.get_old_bound_location(), '\/branch2\/$')
728
def test_bind_diverged(self):
729
tree_a = self.make_branch_and_tree('tree_a')
730
tree_a.commit('rev1a')
731
tree_b = tree_a.bzrdir.sprout('tree_b').open_workingtree()
732
tree_a.commit('rev2a')
733
tree_b.commit('rev2b')
735
tree_b.branch.bind(tree_a.branch)
736
except errors.UpgradeRequired:
737
raise tests.TestNotApplicable('Format does not support binding')
739
def test_unbind_clears_cached_master_branch(self):
740
"""b.unbind clears any cached value of b.get_master_branch."""
741
master = self.make_branch('master')
742
branch = self.make_branch('branch')
745
except errors.UpgradeRequired:
746
raise tests.TestNotApplicable('Format does not support binding')
747
self.addCleanup(branch.lock_write().unlock)
748
self.assertNotEqual(None, branch.get_master_branch())
750
self.assertEqual(None, branch.get_master_branch())
752
def test_unlocked_does_not_cache_master_branch(self):
753
"""Unlocked branches do not cache the result of get_master_branch."""
754
master = self.make_branch('master')
755
branch1 = self.make_branch('branch')
758
except errors.UpgradeRequired:
759
raise tests.TestNotApplicable('Format does not support binding')
761
branch2 = branch1.bzrdir.open_branch()
762
self.assertNotEqual(None, branch1.get_master_branch())
763
# Unbind the branch via branch2. branch1 isn't locked so will
764
# immediately return the new value for get_master_branch.
766
self.assertEqual(None, branch1.get_master_branch())
768
def test_bind_clears_cached_master_branch(self):
769
"""b.bind clears any cached value of b.get_master_branch."""
770
master1 = self.make_branch('master1')
771
master2 = self.make_branch('master2')
772
branch = self.make_branch('branch')
775
except errors.UpgradeRequired:
776
raise tests.TestNotApplicable('Format does not support binding')
777
self.addCleanup(branch.lock_write().unlock)
778
self.assertNotEqual(None, branch.get_master_branch())
780
self.assertEqual('.', urlutils.relative_url(self.get_url('master2'),
781
branch.get_master_branch().base))
783
def test_set_bound_location_clears_cached_master_branch(self):
784
"""b.set_bound_location clears any cached value of b.get_master_branch.
786
master1 = self.make_branch('master1')
787
master2 = self.make_branch('master2')
788
branch = self.make_branch('branch')
791
except errors.UpgradeRequired:
792
raise tests.TestNotApplicable('Format does not support binding')
793
self.addCleanup(branch.lock_write().unlock)
794
self.assertNotEqual(None, branch.get_master_branch())
795
branch.set_bound_location(self.get_url('master2'))
796
self.assertEqual('.', urlutils.relative_url(self.get_url('master2'),
797
branch.get_master_branch().base))
800
class TestStrict(per_branch.TestCaseWithBranch):
802
def test_strict_history(self):
803
tree1 = self.make_branch_and_tree('tree1')
805
tree1.branch.set_append_revisions_only(True)
806
except errors.UpgradeRequired:
807
raise tests.TestSkipped('Format does not support strict history')
808
tree1.commit('empty commit')
809
tree2 = tree1.bzrdir.sprout('tree2').open_workingtree()
810
tree2.commit('empty commit 2')
811
tree1.pull(tree2.branch)
812
tree1.commit('empty commit 3')
813
tree2.commit('empty commit 4')
814
self.assertRaises(errors.DivergedBranches, tree1.pull, tree2.branch)
815
tree2.merge_from_branch(tree1.branch)
816
tree2.commit('empty commit 5')
817
self.assertRaises(errors.AppendRevisionsOnlyViolation, tree1.pull,
819
tree3 = tree1.bzrdir.sprout('tree3').open_workingtree()
820
tree3.merge_from_branch(tree2.branch)
821
tree3.commit('empty commit 6')
822
tree2.pull(tree3.branch)
825
class TestIgnoreFallbacksParameter(per_branch.TestCaseWithBranch):
827
def make_branch_with_fallback(self):
828
fallback = self.make_branch('fallback')
829
if not fallback._format.supports_stacking():
830
raise tests.TestNotApplicable("format does not support stacking")
831
stacked = self.make_branch('stacked')
832
stacked.set_stacked_on_url(fallback.base)
835
def test_fallbacks_not_opened(self):
836
stacked = self.make_branch_with_fallback()
837
self.get_transport('').rename('fallback', 'moved')
838
reopened = stacked.bzrdir.open_branch(ignore_fallbacks=True)
839
self.assertEqual([], reopened.repository._fallback_repositories)
841
def test_fallbacks_are_opened(self):
842
stacked = self.make_branch_with_fallback()
843
reopened = stacked.bzrdir.open_branch(ignore_fallbacks=False)
844
self.assertLength(1, reopened.repository._fallback_repositories)
847
class TestReferenceLocation(per_branch.TestCaseWithBranch):
849
def test_reference_parent(self):
850
tree = self.make_branch_and_tree('tree')
851
subtree = self.make_branch_and_tree('tree/subtree')
852
subtree.set_root_id('subtree-id')
854
tree.add_reference(subtree)
855
except errors.UnsupportedOperation:
856
raise tests.TestNotApplicable('Tree cannot hold references.')
857
reference_parent = tree.branch.reference_parent('subtree-id',
859
self.assertEqual(subtree.branch.base, reference_parent.base)
861
def test_reference_parent_accepts_possible_transports(self):
862
tree = self.make_branch_and_tree('tree')
863
subtree = self.make_branch_and_tree('tree/subtree')
864
subtree.set_root_id('subtree-id')
866
tree.add_reference(subtree)
867
except errors.UnsupportedOperation:
868
raise tests.TestNotApplicable('Tree cannot hold references.')
869
reference_parent = tree.branch.reference_parent('subtree-id',
870
'subtree', possible_transports=[subtree.bzrdir.root_transport])
872
def test_get_reference_info(self):
873
branch = self.make_branch('branch')
875
path, loc = branch.get_reference_info('file-id')
876
except errors.UnsupportedOperation:
877
raise tests.TestNotApplicable('Branch cannot hold references.')
878
self.assertIs(None, path)
879
self.assertIs(None, loc)
881
def test_set_reference_info(self):
882
branch = self.make_branch('branch')
884
branch.set_reference_info('file-id', 'path/to/location',
886
except errors.UnsupportedOperation:
887
raise tests.TestNotApplicable('Branch cannot hold references.')
889
def test_set_get_reference_info(self):
890
branch = self.make_branch('branch')
892
branch.set_reference_info('file-id', 'path/to/file',
894
except errors.UnsupportedOperation:
895
raise tests.TestNotApplicable('Branch cannot hold references.')
896
# Create a new instance to ensure storage is permanent
897
branch = _mod_branch.Branch.open('branch')
898
tree_path, branch_location = branch.get_reference_info('file-id')
899
self.assertEqual('path/to/location', branch_location)
901
def test_set_null_reference_info(self):
902
branch = self.make_branch('branch')
904
branch.set_reference_info('file-id', 'path/to/file',
906
except errors.UnsupportedOperation:
907
raise tests.TestNotApplicable('Branch cannot hold references.')
908
branch.set_reference_info('file-id', None, None)
909
tree_path, branch_location = branch.get_reference_info('file-id')
910
self.assertIs(None, tree_path)
911
self.assertIs(None, branch_location)
913
def test_set_null_reference_info_when_null(self):
914
branch = self.make_branch('branch')
916
tree_path, branch_location = branch.get_reference_info('file-id')
917
except errors.UnsupportedOperation:
918
raise tests.TestNotApplicable('Branch cannot hold references.')
919
self.assertIs(None, tree_path)
920
self.assertIs(None, branch_location)
921
branch.set_reference_info('file-id', None, None)
923
def test_set_null_requires_two_nones(self):
924
branch = self.make_branch('branch')
926
e = self.assertRaises(ValueError, branch.set_reference_info,
927
'file-id', 'path', None)
928
except errors.UnsupportedOperation:
929
raise tests.TestNotApplicable('Branch cannot hold references.')
930
self.assertEqual('tree_path must be None when branch_location is'
932
e = self.assertRaises(ValueError, branch.set_reference_info,
933
'file-id', None, 'location')
934
self.assertEqual('branch_location must be None when tree_path is'
937
def make_branch_with_reference(self, location, reference_location,
939
branch = self.make_branch(location)
941
branch.set_reference_info(file_id, 'path/to/file',
943
except errors.UnsupportedOperation:
944
raise tests.TestNotApplicable('Branch cannot hold references.')
947
def test_reference_parent_from_reference_info_(self):
948
referenced_branch = self.make_branch('reference_branch')
949
branch = self.make_branch_with_reference('branch',
950
referenced_branch.base)
951
parent = branch.reference_parent('file-id', 'path/to/file')
952
self.assertEqual(parent.base, referenced_branch.base)
954
def test_branch_relative_reference_location(self):
955
branch = self.make_branch('branch')
957
branch.set_reference_info('file-id', 'path/to/file',
958
'../reference_branch')
959
except errors.UnsupportedOperation:
960
raise tests.TestNotApplicable('Branch cannot hold references.')
961
referenced_branch = self.make_branch('reference_branch')
962
parent = branch.reference_parent('file-id', 'path/to/file')
963
self.assertEqual(parent.base, referenced_branch.base)
965
def test_sprout_copies_reference_location(self):
966
branch = self.make_branch_with_reference('branch', '../reference')
967
new_branch = branch.bzrdir.sprout('new-branch').open_branch()
968
self.assertEqual('../reference',
969
new_branch.get_reference_info('file-id')[1])
971
def test_clone_copies_reference_location(self):
972
branch = self.make_branch_with_reference('branch', '../reference')
973
new_branch = branch.bzrdir.clone('new-branch').open_branch()
974
self.assertEqual('../reference',
975
new_branch.get_reference_info('file-id')[1])
977
def test_copied_locations_are_rebased(self):
978
branch = self.make_branch_with_reference('branch', 'reference')
979
new_branch = branch.bzrdir.sprout('branch/new-branch').open_branch()
980
self.assertEqual('../reference',
981
new_branch.get_reference_info('file-id')[1])
983
def test_update_references_retains_old_references(self):
984
branch = self.make_branch_with_reference('branch', 'reference')
985
new_branch = self.make_branch_with_reference(
986
'new_branch', 'reference', 'file-id2')
987
new_branch.update_references(branch)
988
self.assertEqual('reference',
989
branch.get_reference_info('file-id')[1])
991
def test_update_references_retains_known_references(self):
992
branch = self.make_branch_with_reference('branch', 'reference')
993
new_branch = self.make_branch_with_reference(
994
'new_branch', 'reference2')
995
new_branch.update_references(branch)
996
self.assertEqual('reference',
997
branch.get_reference_info('file-id')[1])
999
def test_update_references_skips_known_references(self):
1000
branch = self.make_branch_with_reference('branch', 'reference')
1001
new_branch = branch.bzrdir.sprout('branch/new-branch').open_branch()
1002
new_branch.set_reference_info('file-id', '../foo', '../foo')
1003
new_branch.update_references(branch)
1004
self.assertEqual('reference',
1005
branch.get_reference_info('file-id')[1])
1007
def test_pull_updates_references(self):
1008
branch = self.make_branch_with_reference('branch', 'reference')
1009
new_branch = branch.bzrdir.sprout('branch/new-branch').open_branch()
1010
new_branch.set_reference_info('file-id2', '../foo', '../foo')
1011
branch.pull(new_branch)
1012
self.assertEqual('foo',
1013
branch.get_reference_info('file-id2')[1])
1015
def test_push_updates_references(self):
1016
branch = self.make_branch_with_reference('branch', 'reference')
1017
new_branch = branch.bzrdir.sprout('branch/new-branch').open_branch()
1018
new_branch.set_reference_info('file-id2', '../foo', '../foo')
1019
new_branch.push(branch)
1020
self.assertEqual('foo',
1021
branch.get_reference_info('file-id2')[1])
1023
def test_merge_updates_references(self):
1024
branch = self.make_branch_with_reference('branch', 'reference')
1025
tree = self.make_branch_and_tree('tree')
1027
branch.pull(tree.branch)
1028
checkout = branch.create_checkout('checkout', lightweight=True)
1029
checkout.commit('bar')
1031
self.addCleanup(tree.unlock)
1032
merger = merge.Merger.from_revision_ids(None, tree,
1033
branch.last_revision(),
1034
other_branch=branch)
1035
merger.merge_type = merge.Merge3Merger
1037
self.assertEqual('../branch/reference',
1038
tree.branch.get_reference_info('file-id')[1])
1041
class TestBranchControlComponent(per_branch.TestCaseWithBranch):
1042
"""Branch implementations adequately implement ControlComponent."""
1044
def test_urls(self):
1045
br = self.make_branch('branch')
1046
self.assertIsInstance(br.user_url, str)
1047
self.assertEqual(br.user_url, br.user_transport.base)
1048
# for all current bzrdir implementations the user dir must be
1049
# above the control dir but we might need to relax that?
1050
self.assertEqual(br.control_url.find(br.user_url), 0)
1051
self.assertEqual(br.control_url, br.control_transport.base)
193
class TestBranchTransaction(TestCaseInTempDir):
196
super(TestBranchTransaction, self).setUp()
197
self.branch = Branch.initialize('.')
199
def test_default_get_transaction(self):
200
"""branch.get_transaction on a new branch should give a PassThrough."""
201
self.failUnless(isinstance(self.branch.get_transaction(),
202
transactions.PassThroughTransaction))
204
def test__set_new_transaction(self):
205
self.branch._set_transaction(transactions.ReadOnlyTransaction())
207
def test__set_over_existing_transaction_raises(self):
208
self.branch._set_transaction(transactions.ReadOnlyTransaction())
209
self.assertRaises(errors.LockError,
210
self.branch._set_transaction,
211
transactions.ReadOnlyTransaction())
213
def test_finish_no_transaction_raises(self):
214
self.assertRaises(errors.LockError, self.branch._finish_transaction)
216
def test_finish_readonly_transaction_works(self):
217
self.branch._set_transaction(transactions.ReadOnlyTransaction())
218
self.branch._finish_transaction()
219
self.assertEqual(None, self.branch._transaction)
221
def test_unlock_calls_finish(self):
222
self.branch.lock_read()
223
transaction = InstrumentedTransaction()
224
self.branch._transaction = transaction
226
self.assertEqual(['finish'], transaction.calls)
228
def test_lock_read_acquires_ro_transaction(self):
229
self.branch.lock_read()
230
self.failUnless(isinstance(self.branch.get_transaction(),
231
transactions.ReadOnlyTransaction))
234
def test_lock_write_acquires_passthrough_transaction(self):
235
self.branch.lock_write()
236
# cannot use get_transaction as its magic
237
self.failUnless(isinstance(self.branch._transaction,
238
transactions.PassThroughTransaction))