39
40
from bzrlib.osutils import getcwd
40
41
import bzrlib.revision
41
42
from bzrlib.tests import TestCase, TestCaseWithTransport, TestSkipped
42
from bzrlib.tests.bzrdir_implementations.test_bzrdir import TestCaseWithBzrDir
43
from bzrlib.tests.branch_implementations import TestCaseWithBranch
44
from bzrlib.tests.HttpServer import HttpServer
43
45
from bzrlib.trace import mutter
44
46
from bzrlib.transport import get_transport
45
from bzrlib.transport.http import HttpServer
46
47
from bzrlib.transport.memory import MemoryServer
47
48
from bzrlib.upgrade import upgrade
48
49
from bzrlib.workingtree import WorkingTree
51
# TODO: Make a branch using basis branch, and check that it
52
# doesn't request any files that could have been avoided, by
53
# hooking into the Transport.
56
class TestCaseWithBranch(TestCaseWithBzrDir):
59
super(TestCaseWithBranch, self).setUp()
63
if self.branch is None:
64
self.branch = self.make_branch('')
67
def make_branch(self, relpath, format=None):
68
repo = self.make_repository(relpath, format=format)
69
# fixme RBC 20060210 this isnt necessarily a fixable thing,
70
# Skipped is the wrong exception to raise.
72
return self.branch_format.initialize(repo.bzrdir)
73
except errors.UninitializableFormat:
74
raise TestSkipped('Uninitializable branch format')
76
def make_repository(self, relpath, shared=False, format=None):
77
made_control = self.make_bzrdir(relpath, format=format)
78
return made_control.create_repository(shared=shared)
81
52
class TestBranch(TestCaseWithBranch):
83
54
def test_append_revisions(self):
84
55
"""Test appending more than one revision"""
56
wt = self.make_branch_and_tree('tree')
57
wt.commit('f', rev_id='rev1')
58
wt.commit('f', rev_id='rev2')
59
wt.commit('f', rev_id='rev3')
85
61
br = self.get_branch()
86
63
br.append_revision("rev1")
87
64
self.assertEquals(br.revision_history(), ["rev1",])
88
65
br.append_revision("rev2", "rev3")
89
66
self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
67
self.assertRaises(errors.ReservedId, br.append_revision, 'current:')
69
def test_create_tree_with_merge(self):
70
tree = self.create_tree_with_merge()
71
ancestry_graph = tree.branch.repository.get_revision_graph('rev-3')
72
self.assertEqual({'rev-1':[],
74
'rev-1.1.1':['rev-1'],
75
'rev-3':['rev-2', 'rev-1.1.1'],
78
def test_revision_ids_are_utf8(self):
79
wt = self.make_branch_and_tree('tree')
80
wt.commit('f', rev_id='rev1')
81
wt.commit('f', rev_id='rev2')
82
wt.commit('f', rev_id='rev3')
84
br = self.get_branch()
86
br.set_revision_history(['rev1', 'rev2', 'rev3'])
87
rh = br.revision_history()
88
self.assertEqual(['rev1', 'rev2', 'rev3'], rh)
89
for revision_id in rh:
90
self.assertIsInstance(revision_id, str)
91
last = br.last_revision()
92
self.assertEqual('rev3', last)
93
self.assertIsInstance(last, str)
94
revno, last = br.last_revision_info()
95
self.assertEqual(3, revno)
96
self.assertEqual('rev3', last)
97
self.assertIsInstance(last, str)
91
99
def test_fetch_revisions(self):
92
100
"""Test fetch-revision operation."""
93
get_transport(self.get_url()).mkdir('b1')
94
get_transport(self.get_url()).mkdir('b2')
95
101
wt = self.make_branch_and_tree('b1')
97
b2 = self.make_branch('b2')
98
file('b1/foo', 'w').write('hello')
103
self.build_tree_contents([('b1/foo', 'hello')])
99
104
wt.add(['foo'], ['foo-id'])
100
105
wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
102
mutter('start fetch')
107
b2 = self.make_branch('b2')
103
108
self.assertEqual((1, []), b2.fetch(b1))
105
110
rev = b2.repository.get_revision('revision-1')
140
143
tree_b.branch.repository.fetch(tree_a.branch.repository)
141
144
return tree_a, tree_b
143
def test_clone_branch(self):
144
"""Copy the stores from one branch to another"""
145
tree_a, tree_b = self.get_balanced_branch_pair()
146
tree_b.commit("silly commit")
148
# this fails to test that the history from a was not used.
149
dir_c = tree_a.bzrdir.clone('c', basis=tree_b.bzrdir)
150
self.assertEqual(tree_a.branch.revision_history(),
151
dir_c.open_branch().revision_history())
153
146
def test_clone_partial(self):
154
147
"""Copy only part of the history of a branch."""
155
148
# TODO: RBC 20060208 test with a revision not on revision-history.
156
149
# what should that behaviour be ? Emailed the list.
157
wt_a = self.make_branch_and_tree('a')
158
self.build_tree(['a/one'])
160
wt_a.commit('commit one', rev_id='1')
161
self.build_tree(['a/two'])
163
wt_a.commit('commit two', rev_id='2')
164
repo_b = self.make_repository('b')
165
wt_a.bzrdir.open_repository().copy_content_into(repo_b)
166
br_b = wt_a.bzrdir.open_branch().clone(repo_b.bzrdir, revision_id='1')
167
self.assertEqual('1', br_b.last_revision())
169
def test_sprout_partial(self):
170
# test sprouting with a prefix of the revision-history.
171
# also needs not-on-revision-history behaviour defined.
172
wt_a = self.make_branch_and_tree('a')
173
self.build_tree(['a/one'])
175
wt_a.commit('commit one', rev_id='1')
176
self.build_tree(['a/two'])
178
wt_a.commit('commit two', rev_id='2')
179
repo_b = self.make_repository('b')
180
wt_a.bzrdir.open_repository().copy_content_into(repo_b)
181
br_b = wt_a.bzrdir.open_branch().sprout(repo_b.bzrdir, revision_id='1')
150
# First, make a branch with two commits.
151
wt_a = self.make_branch_and_tree('a')
152
self.build_tree(['a/one'])
154
wt_a.commit('commit one', rev_id='1')
155
self.build_tree(['a/two'])
157
wt_a.commit('commit two', rev_id='2')
158
# Now make a copy of the repository.
159
repo_b = self.make_repository('b')
160
wt_a.branch.repository.copy_content_into(repo_b)
161
# wt_a might be a lightweight checkout, so get a hold of the actual
162
# branch (because you can't do a partial clone of a lightweight
164
branch = wt_a.branch.bzrdir.open_branch()
165
# Then make a branch where the new repository is, but specify a revision
166
# ID. The new branch's history will stop at the specified revision.
167
br_b = branch.clone(repo_b.bzrdir, revision_id='1')
182
168
self.assertEqual('1', br_b.last_revision())
184
170
def get_parented_branch(self):
230
207
branch.set_submit_branch('sftp://example.net')
231
208
self.assertEqual(branch.get_submit_branch(), 'sftp://example.net')
210
def test_public_branch(self):
211
"""public location can be queried and set"""
212
branch = self.make_branch('branch')
213
self.assertEqual(branch.get_public_branch(), None)
214
branch.set_public_branch('sftp://example.com')
215
self.assertEqual(branch.get_public_branch(), 'sftp://example.com')
216
branch.set_public_branch('sftp://example.net')
217
self.assertEqual(branch.get_public_branch(), 'sftp://example.net')
218
branch.set_public_branch(None)
219
self.assertEqual(branch.get_public_branch(), None)
233
221
def test_record_initial_ghost(self):
234
222
"""Branches should support having ghosts."""
235
223
wt = self.make_branch_and_tree('.')
286
276
branch.repository.has_signature_for_revision_id,
288
278
wt.commit("base", allow_pointless=True, rev_id='A')
289
self.assertEqual('FOO',
279
self.assertEqual('-----BEGIN PSEUDO-SIGNED CONTENT-----\n'
280
'FOO-----END PSEUDO-SIGNED CONTENT-----\n',
290
281
branch.repository.get_signature_text('A'))
292
283
def test_branch_keeps_signatures(self):
293
284
wt = self.make_branch_and_tree('source')
294
285
wt.commit('A', allow_pointless=True, rev_id='A')
295
wt.branch.repository.sign_revision('A',
296
gpg.LoopbackGPGStrategy(None))
286
repo = wt.branch.repository
287
repo.sign_revision('A', gpg.LoopbackGPGStrategy(None))
297
288
#FIXME: clone should work to urls,
298
289
# wt.clone should work to disks.
299
290
self.build_tree(['target/'])
300
d2 = wt.bzrdir.clone('target')
301
self.assertEqual(wt.branch.repository.get_signature_text('A'),
291
d2 = repo.bzrdir.clone(urlutils.local_path_to_url('target'))
292
self.assertEqual(repo.get_signature_text('A'),
302
293
d2.open_repository().get_signature_text('A'))
304
295
def test_nicks(self):
305
"""Branch nicknames"""
296
"""Test explicit and implicit branch nicknames.
298
Nicknames are implicitly the name of the branch's directory, unless an
299
explicit nickname is set. That is, an explicit nickname always
300
overrides the implicit one.
306
302
t = get_transport(self.get_url())
308
303
branch = self.make_branch('bzr.dev')
304
# The nick will be 'bzr.dev', because there is no explicit nick set.
309
305
self.assertEqual(branch.nick, 'bzr.dev')
306
# Move the branch to a different directory, 'bzr.ab'. Now that branch
307
# will report its nick as 'bzr.ab'.
310
308
t.move('bzr.dev', 'bzr.ab')
311
309
branch = Branch.open(self.get_url('bzr.ab'))
312
310
self.assertEqual(branch.nick, 'bzr.ab')
313
branch.nick = "Aaron's branch"
314
branch.nick = "Aaron's branch"
318
branch.control_files.controlfilename("branch.conf")
311
# Set the branch nick explicitly. This will ensure there's a branch
312
# config file in the branch.
313
branch.nick = "Aaron's branch"
314
branch.nick = "Aaron's branch"
315
if not isinstance(branch, remote.RemoteBranch):
316
controlfilename = branch.control_files.controlfilename
317
self.failUnless(t.has(t.relpath(controlfilename("branch.conf"))))
318
# Because the nick has been set explicitly, the nick is now always
319
# "Aaron's branch", regardless of directory name.
322
320
self.assertEqual(branch.nick, "Aaron's branch")
323
321
t.move('bzr.ab', 'integration')
324
322
branch = Branch.open(self.get_url('integration'))
425
423
"""A regular checkout from a readonly branch should succeed."""
426
424
tree_a = self.make_branch_and_tree('a')
427
425
rev_id = tree_a.commit('put some content in the branch')
428
source_branch = bzrlib.branch.Branch.open(
429
'readonly+' + tree_a.bzrdir.root_transport.base)
426
# open the branch via a readonly transport
427
source_branch = bzrlib.branch.Branch.open(self.get_readonly_url('a'))
430
428
# sanity check that the test will be valid
431
429
self.assertRaises((errors.LockError, errors.TransportNotPossible),
432
430
source_branch.lock_write)
433
431
checkout = source_branch.create_checkout('c')
434
432
self.assertEqual(rev_id, checkout.last_revision())
434
def test_set_revision_history(self):
435
tree = self.make_branch_and_tree('a')
436
tree.commit('a commit', rev_id='rev1')
438
br.set_revision_history(["rev1"])
439
self.assertEquals(br.revision_history(), ["rev1"])
440
br.set_revision_history([])
441
self.assertEquals(br.revision_history(), [])
437
444
class ChrootedTests(TestCaseWithBranch):
438
445
"""A support class that provides readonly urls outside the local namespace.
522
529
self.assertEqual(['lw', 'ul'], branch._calls)
525
class TestBranchTransaction(TestCaseWithBranch):
528
super(TestBranchTransaction, self).setUp()
531
def test_default_get_transaction(self):
532
"""branch.get_transaction on a new branch should give a PassThrough."""
533
self.failUnless(isinstance(self.get_branch().get_transaction(),
534
transactions.PassThroughTransaction))
536
def test__set_new_transaction(self):
537
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
539
def test__set_over_existing_transaction_raises(self):
540
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
541
self.assertRaises(errors.LockError,
542
self.get_branch()._set_transaction,
543
transactions.ReadOnlyTransaction())
545
def test_finish_no_transaction_raises(self):
546
self.assertRaises(errors.LockError, self.get_branch()._finish_transaction)
548
def test_finish_readonly_transaction_works(self):
549
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
550
self.get_branch()._finish_transaction()
551
self.assertEqual(None, self.get_branch().control_files._transaction)
553
def test_unlock_calls_finish(self):
554
self.get_branch().lock_read()
555
transaction = InstrumentedTransaction()
556
self.get_branch().control_files._transaction = transaction
557
self.get_branch().unlock()
558
self.assertEqual(['finish'], transaction.calls)
560
def test_lock_read_acquires_ro_transaction(self):
561
self.get_branch().lock_read()
562
self.failUnless(isinstance(self.get_branch().get_transaction(),
563
transactions.ReadOnlyTransaction))
564
self.get_branch().unlock()
566
def test_lock_write_acquires_write_transaction(self):
567
self.get_branch().lock_write()
568
# cannot use get_transaction as its magic
569
self.failUnless(isinstance(self.get_branch().control_files._transaction,
570
transactions.WriteTransaction))
571
self.get_branch().unlock()
574
532
class TestBranchPushLocations(TestCaseWithBranch):
576
534
def test_get_push_location_unset(self):
587
545
self.assertEqual("foo", self.get_branch().get_push_location())
589
547
def test_set_push_location(self):
590
from bzrlib.config import (locations_config_filename,
591
ensure_config_dir_exists)
592
ensure_config_dir_exists()
593
fn = locations_config_filename()
594
548
branch = self.get_branch()
595
549
branch.set_push_location('foo')
596
local_path = urlutils.local_path_from_url(branch.base[:-1])
597
self.assertFileEqual("[%s]\n"
598
"push_location = foo" % local_path,
601
# TODO RBC 20051029 test getting a push location from a branch in a
602
# recursive section - that is, it appends the branch name.
550
self.assertEqual('foo', branch.get_push_location())
605
553
class TestFormat(TestCaseWithBranch):
606
554
"""Tests for the format itself."""
556
def test_get_reference(self):
557
"""get_reference on all regular branches should return None."""
558
if not self.branch_format.is_supported():
559
# unsupported formats are not loopback testable
560
# because the default open will not open them and
561
# they may not be initializable.
563
made_branch = self.make_branch('.')
564
self.assertEqual(None,
565
made_branch._format.get_reference(made_branch.bzrdir))
608
567
def test_format_initialize_find_open(self):
609
568
# loopback test to check the current format initializes to itself.
610
569
if not self.branch_format.is_supported():
637
596
except NotImplementedError:
639
598
self.assertEqual(self.branch_format,
640
branch.BranchFormat.find_format(opened_control))
599
opened_control.find_branch_format())
602
class TestBound(TestCaseWithBranch):
604
def test_bind_unbind(self):
605
branch = self.make_branch('1')
606
branch2 = self.make_branch('2')
609
except errors.UpgradeRequired:
610
raise TestSkipped('Format does not support binding')
611
self.assertTrue(branch.unbind())
612
self.assertFalse(branch.unbind())
613
self.assertIs(None, branch.get_bound_location())
615
def test_old_bound_location(self):
616
branch = self.make_branch('branch1')
618
self.assertIs(None, branch.get_old_bound_location())
619
except errors.UpgradeRequired:
620
raise TestSkipped('Format does not store old bound locations')
621
branch2 = self.make_branch('branch2')
623
self.assertIs(None, branch.get_old_bound_location())
625
self.assertContainsRe(branch.get_old_bound_location(), '\/branch2\/$')
628
class TestStrict(TestCaseWithBranch):
630
def test_strict_history(self):
631
tree1 = self.make_branch_and_tree('tree1')
633
tree1.branch.set_append_revisions_only(True)
634
except errors.UpgradeRequired:
635
raise TestSkipped('Format does not support strict history')
636
tree1.commit('empty commit')
637
tree2 = tree1.bzrdir.sprout('tree2').open_workingtree()
638
tree2.commit('empty commit 2')
639
tree1.pull(tree2.branch)
640
tree1.commit('empty commit 3')
641
tree2.commit('empty commit 4')
642
self.assertRaises(errors.DivergedBranches, tree1.pull, tree2.branch)
643
tree2.merge_from_branch(tree1.branch)
644
tree2.commit('empty commit 5')
645
self.assertRaises(errors.AppendRevisionsOnlyViolation, tree1.pull,
647
tree3 = tree1.bzrdir.sprout('tree3').open_workingtree()
648
tree3.merge_from_branch(tree2.branch)
649
tree3.commit('empty commit 6')
650
tree2.pull(tree3.branch)