14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for branch implementations - tests a branch format."""
23
import bzrlib.bzrdir as bzrdir
24
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
18
from bzrlib.branch import Branch
19
from bzrlib.clone import copy_branch
25
20
from bzrlib.commit import commit
26
21
import bzrlib.errors as errors
27
from bzrlib.errors import (FileExists,
30
UninitializableFormat,
34
from bzrlib.osutils import getcwd
35
from bzrlib.tests import TestCase, TestCaseWithTransport, TestSkipped
36
from bzrlib.tests.bzrdir_implementations.test_bzrdir import TestCaseWithBzrDir
22
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
23
from bzrlib.selftest import TestCaseInTempDir
37
24
from bzrlib.trace import mutter
38
25
import bzrlib.transactions as transactions
39
from bzrlib.transport import get_transport
40
from bzrlib.transport.http import HttpServer
41
from bzrlib.transport.memory import MemoryServer
42
from bzrlib.upgrade import upgrade
43
from bzrlib.workingtree import WorkingTree
46
# TODO: Make a branch using basis branch, and check that it
47
# doesn't request any files that could have been avoided, by
48
# hooking into the Transport.
51
class TestCaseWithBranch(TestCaseWithBzrDir):
54
super(TestCaseWithBranch, self).setUp()
58
if self.branch is None:
59
self.branch = self.make_branch('')
62
def make_branch(self, relpath, format=None):
63
repo = self.make_repository(relpath, format=format)
64
# fixme RBC 20060210 this isnt necessarily a fixable thing,
65
# Skipped is the wrong exception to raise.
67
return self.branch_format.initialize(repo.bzrdir)
68
except errors.UninitializableFormat:
69
raise TestSkipped('Uninitializable branch format')
71
def make_repository(self, relpath, shared=False, format=None):
72
made_control = self.make_bzrdir(relpath, format=format)
73
return made_control.create_repository(shared=shared)
76
class TestBranch(TestCaseWithBranch):
26
from bzrlib.selftest.HTTPTestUtil import TestCaseWithWebserver
28
class TestBranch(TestCaseInTempDir):
78
30
def test_append_revisions(self):
79
31
"""Test appending more than one revision"""
80
br = self.get_branch()
32
br = Branch.initialize(".")
81
33
br.append_revision("rev1")
82
34
self.assertEquals(br.revision_history(), ["rev1",])
83
35
br.append_revision("rev2", "rev3")
86
38
def test_fetch_revisions(self):
87
39
"""Test fetch-revision operation."""
88
get_transport(self.get_url()).mkdir('b1')
89
get_transport(self.get_url()).mkdir('b2')
90
wt = self.make_branch_and_tree('b1')
92
b2 = self.make_branch('b2')
93
file('b1/foo', 'w').write('hello')
94
wt.add(['foo'], ['foo-id'])
95
wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
40
from bzrlib.fetch import Fetcher
43
b1 = Branch.initialize('b1')
44
b2 = Branch.initialize('b2')
45
file(os.sep.join(['b1', 'foo']), 'w').write('hello')
46
b1.add(['foo'], ['foo-id'])
47
b1.commit('lala!', rev_id='revision-1', allow_pointless=False)
97
49
mutter('start fetch')
98
self.assertEqual((1, []), b2.fetch(b1))
100
rev = b2.repository.get_revision('revision-1')
101
tree = b2.repository.revision_tree('revision-1')
102
self.assertEqual(tree.get_file_text('foo-id'), 'hello')
104
def get_unbalanced_tree_pair(self):
105
"""Return two branches, a and b, with one file in a."""
106
get_transport(self.get_url()).mkdir('a')
107
tree_a = self.make_branch_and_tree('a')
50
f = Fetcher(from_branch=b1, to_branch=b2)
51
eq = self.assertEquals
53
eq(f.last_revision, 'revision-1')
55
rev = b2.get_revision('revision-1')
56
tree = b2.revision_tree('revision-1')
57
eq(tree.get_file_text('foo-id'), 'hello')
59
def test_push_stores(self):
60
"""Copy the stores from one branch to another"""
62
br_a = Branch.initialize("a")
108
63
file('a/b', 'wb').write('b')
110
tree_a.commit("silly commit", rev_id='A')
112
get_transport(self.get_url()).mkdir('b')
113
tree_b = self.make_branch_and_tree('b')
114
return tree_a, tree_b
116
def get_balanced_branch_pair(self):
117
"""Returns br_a, br_b as with one commit in a, and b has a's stores."""
118
tree_a, tree_b = self.get_unbalanced_tree_pair()
119
tree_b.branch.repository.fetch(tree_a.branch.repository)
120
return tree_a, tree_b
122
def test_clone_branch(self):
65
commit(br_a, "silly commit")
68
br_b = Branch.initialize("b")
69
self.assertRaises(NoSuchRevision, br_b.get_revision,
70
br_a.revision_history()[0])
71
br_a.push_stores(br_b)
72
rev = br_b.get_revision(br_a.revision_history()[0])
73
tree = br_b.revision_tree(br_a.revision_history()[0])
75
if tree.inventory[file_id].kind == "file":
76
tree.get_file(file_id).read()
79
def test_copy_branch(self):
123
80
"""Copy the stores from one branch to another"""
124
tree_a, tree_b = self.get_balanced_branch_pair()
125
tree_b.commit("silly commit")
81
br_a, br_b = self.test_push_stores()
82
commit(br_b, "silly commit")
127
# this fails to test that the history from a was not used.
128
dir_c = tree_a.bzrdir.clone('c', basis=tree_b.bzrdir)
129
self.assertEqual(tree_a.branch.revision_history(),
130
dir_c.open_branch().revision_history())
84
br_c = copy_branch(br_a, 'c', basis_branch=br_b)
85
self.assertEqual(br_a.revision_history(), br_c.revision_history())
86
## # basis branches currently disabled for weave format
87
## self.assertFalse(br_b.last_revision() in br_c.revision_history())
88
## br_c.get_revision(br_b.last_revision())
132
def test_clone_partial(self):
90
def test_copy_partial(self):
133
91
"""Copy only part of the history of a branch."""
134
# TODO: RBC 20060208 test with a revision not on revision-history.
135
# what should that behaviour be ? Emailed the list.
136
wt_a = self.make_branch_and_tree('a')
137
self.build_tree(['a/one'])
139
wt_a.commit('commit one', rev_id='1')
140
self.build_tree(['a/two'])
142
wt_a.commit('commit two', rev_id='2')
143
repo_b = self.make_repository('b')
144
wt_a.bzrdir.open_repository().copy_content_into(repo_b)
145
br_b = wt_a.bzrdir.open_branch().clone(repo_b.bzrdir, revision_id='1')
146
self.assertEqual(br_b.last_revision(), '1')
148
def test_sprout_partial(self):
149
# test sprouting with a prefix of the revision-history.
150
# also needs not-on-revision-history behaviour defined.
151
wt_a = self.make_branch_and_tree('a')
152
self.build_tree(['a/one'])
154
wt_a.commit('commit one', rev_id='1')
155
self.build_tree(['a/two'])
157
wt_a.commit('commit two', rev_id='2')
158
repo_b = self.make_repository('b')
159
wt_a.bzrdir.open_repository().copy_content_into(repo_b)
160
br_b = wt_a.bzrdir.open_branch().sprout(repo_b.bzrdir, revision_id='1')
161
self.assertEqual(br_b.last_revision(), '1')
163
def test_clone_branch_nickname(self):
164
# test the nick name is preserved always
165
raise TestSkipped('XXX branch cloning is not yet tested..')
167
def test_clone_branch_parent(self):
168
# test the parent is preserved always
169
raise TestSkipped('XXX branch cloning is not yet tested..')
171
def test_sprout_branch_nickname(self):
172
# test the nick name is reset always
173
raise TestSkipped('XXX branch sprouting is not yet tested..')
175
def test_sprout_branch_parent(self):
176
source = self.make_branch('source')
177
target = source.bzrdir.sprout(self.get_url('target')).open_branch()
178
self.assertEqual(source.bzrdir.root_transport.base, target.get_parent())
92
self.build_tree(['a/', 'a/one'])
93
br_a = Branch.initialize('a')
95
br_a.commit('commit one', rev_id='u@d-1')
96
self.build_tree(['a/two'])
98
br_a.commit('commit two', rev_id='u@d-2')
99
br_b = copy_branch(br_a, 'b', revision='u@d-1')
100
self.assertEqual(br_b.last_revision(), 'u@d-1')
101
self.assertTrue(os.path.exists('b/one'))
102
self.assertFalse(os.path.exists('b/two'))
180
105
def test_record_initial_ghost_merge(self):
181
106
"""A pending merge with no revision present is still a merge."""
182
wt = self.make_branch_and_tree('.')
184
wt.add_pending_merge('non:existent@rev--ision--0--2')
185
wt.commit('pretend to merge nonexistent-revision', rev_id='first')
186
rev = branch.repository.get_revision(branch.last_revision())
107
branch = Branch.initialize('.')
108
branch.add_pending_merge('non:existent@rev--ision--0--2')
109
branch.commit('pretend to merge nonexistent-revision', rev_id='first')
110
rev = branch.get_revision(branch.last_revision())
187
111
self.assertEqual(len(rev.parent_ids), 1)
188
112
# parent_sha1s is not populated now, WTF. rbc 20051003
189
113
self.assertEqual(len(rev.parent_sha1s), 0)
190
114
self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
192
def test_bad_revision(self):
193
self.assertRaises(errors.InvalidRevisionId,
194
self.get_branch().repository.get_revision,
197
116
# TODO 20051003 RBC:
198
117
# compare the gpg-to-sign info for a commit with a ghost and
199
118
# an identical tree without a ghost
202
121
def test_pending_merges(self):
203
122
"""Tracking pending-merged revisions."""
204
wt = self.make_branch_and_tree('.')
206
self.assertEquals(wt.pending_merges(), [])
207
wt.add_pending_merge('foo@azkhazan-123123-abcabc')
208
self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
209
wt.add_pending_merge('foo@azkhazan-123123-abcabc')
210
self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
211
wt.add_pending_merge('wibble@fofof--20050401--1928390812')
212
self.assertEquals(wt.pending_merges(),
123
b = Branch.initialize('.')
125
self.assertEquals(b.pending_merges(), [])
126
b.add_pending_merge('foo@azkhazan-123123-abcabc')
127
self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
128
b.add_pending_merge('foo@azkhazan-123123-abcabc')
129
self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
130
b.add_pending_merge('wibble@fofof--20050401--1928390812')
131
self.assertEquals(b.pending_merges(),
213
132
['foo@azkhazan-123123-abcabc',
214
133
'wibble@fofof--20050401--1928390812'])
215
wt.commit("commit from base with two merges")
216
rev = b.repository.get_revision(b.revision_history()[0])
134
b.commit("commit from base with two merges")
135
rev = b.get_revision(b.revision_history()[0])
217
136
self.assertEquals(len(rev.parent_ids), 2)
218
137
self.assertEquals(rev.parent_ids[0],
219
138
'foo@azkhazan-123123-abcabc')
220
139
self.assertEquals(rev.parent_ids[1],
221
140
'wibble@fofof--20050401--1928390812')
222
141
# list should be cleared when we do a commit
223
self.assertEquals(wt.pending_merges(), [])
225
def test_sign_existing_revision(self):
226
wt = self.make_branch_and_tree('.')
228
wt.commit("base", allow_pointless=True, rev_id='A')
229
from bzrlib.testament import Testament
230
strategy = bzrlib.gpg.LoopbackGPGStrategy(None)
231
branch.repository.sign_revision('A', strategy)
232
self.assertEqual(Testament.from_revision(branch.repository,
233
'A').as_short_text(),
234
branch.repository.get_signature_text('A'))
236
def test_store_signature(self):
237
wt = self.make_branch_and_tree('.')
239
branch.repository.store_revision_signature(
240
bzrlib.gpg.LoopbackGPGStrategy(None), 'FOO', 'A')
241
self.assertRaises(errors.NoSuchRevision,
242
branch.repository.has_signature_for_revision_id,
244
wt.commit("base", allow_pointless=True, rev_id='A')
245
self.assertEqual('FOO',
246
branch.repository.get_signature_text('A'))
248
def test_branch_keeps_signatures(self):
249
wt = self.make_branch_and_tree('source')
250
wt.commit('A', allow_pointless=True, rev_id='A')
251
wt.branch.repository.sign_revision('A',
252
bzrlib.gpg.LoopbackGPGStrategy(None))
253
#FIXME: clone should work to urls,
254
# wt.clone should work to disks.
255
self.build_tree(['target/'])
256
d2 = wt.bzrdir.clone('target')
257
self.assertEqual(wt.branch.repository.get_signature_text('A'),
258
d2.open_repository().get_signature_text('A'))
260
def test_nicks(self):
261
"""Branch nicknames"""
262
t = get_transport(self.get_url())
264
branch = self.make_branch('bzr.dev')
265
self.assertEqual(branch.nick, 'bzr.dev')
266
t.move('bzr.dev', 'bzr.ab')
267
branch = Branch.open(self.get_url('bzr.ab'))
268
self.assertEqual(branch.nick, 'bzr.ab')
269
branch.nick = "Aaron's branch"
270
branch.nick = "Aaron's branch"
274
branch.control_files.controlfilename("branch.conf")
278
self.assertEqual(branch.nick, "Aaron's branch")
279
t.move('bzr.ab', 'integration')
280
branch = Branch.open(self.get_url('integration'))
281
self.assertEqual(branch.nick, "Aaron's branch")
282
branch.nick = u"\u1234"
283
self.assertEqual(branch.nick, u"\u1234")
285
def test_commit_nicks(self):
286
"""Nicknames are committed to the revision"""
287
get_transport(self.get_url()).mkdir('bzr.dev')
288
wt = self.make_branch_and_tree('bzr.dev')
290
branch.nick = "My happy branch"
291
wt.commit('My commit respect da nick.')
292
committed = branch.repository.get_revision(branch.last_revision())
293
self.assertEqual(committed.properties["branch-nick"],
296
def test_create_open_branch_uses_repository(self):
298
repo = self.make_repository('.', shared=True)
299
except errors.IncompatibleFormat:
301
repo.bzrdir.root_transport.mkdir('child')
302
child_dir = self.bzrdir_format.initialize('child')
304
child_branch = self.branch_format.initialize(child_dir)
305
except errors.UninitializableFormat:
306
# branch references are not default init'able.
308
self.assertEqual(repo.bzrdir.root_transport.base,
309
child_branch.repository.bzrdir.root_transport.base)
310
child_branch = bzrlib.branch.Branch.open(self.get_url('child'))
311
self.assertEqual(repo.bzrdir.root_transport.base,
312
child_branch.repository.bzrdir.root_transport.base)
314
def test_format_description(self):
315
tree = self.make_branch_and_tree('tree')
316
text = tree.branch._format.get_format_description()
317
self.failUnless(len(text))
320
class ChrootedTests(TestCaseWithBranch):
321
"""A support class that provides readonly urls outside the local namespace.
323
This is done by checking if self.transport_server is a MemoryServer. if it
324
is then we are chrooted already, if it is not then an HttpServer is used
329
super(ChrootedTests, self).setUp()
330
if not self.transport_server == MemoryServer:
331
self.transport_readonly_server = HttpServer
142
self.assertEquals(b.pending_merges(), [])
145
class TestRemote(TestCaseWithWebserver):
333
147
def test_open_containing(self):
334
148
self.assertRaises(NotBranchError, Branch.open_containing,
335
self.get_readonly_url(''))
149
self.get_remote_url(''))
336
150
self.assertRaises(NotBranchError, Branch.open_containing,
337
self.get_readonly_url('g/p/q'))
338
branch = self.make_branch('.')
339
branch, relpath = Branch.open_containing(self.get_readonly_url(''))
340
self.assertEqual('', relpath)
341
branch, relpath = Branch.open_containing(self.get_readonly_url('g/p/q'))
342
self.assertEqual('g/p/q', relpath)
151
self.get_remote_url('g/p/q'))
152
b = Branch.initialize('.')
153
Branch.open_containing(self.get_remote_url(''))
154
Branch.open_containing(self.get_remote_url('g/p/q'))
344
156
# TODO: rewrite this as a regular unittest, without relying on the displayed output
345
157
# >>> from bzrlib.commit import commit
346
158
# >>> bzrlib.trace.silent = True
347
159
# >>> br1 = ScratchBranch(files=['foo', 'bar'])
348
# >>> br1.working_tree().add('foo')
349
# >>> br1.working_tree().add('bar')
350
162
# >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
351
163
# >>> br2 = ScratchBranch()
352
164
# >>> br2.update_revisions(br1)
372
class TestDecorator(object):
378
self._calls.append('lr')
380
def lock_write(self):
381
self._calls.append('lw')
384
self._calls.append('ul')
387
def do_with_read(self):
391
def except_with_read(self):
395
def do_with_write(self):
399
def except_with_write(self):
403
class TestDecorators(TestCase):
405
def test_needs_read_lock(self):
406
branch = TestDecorator()
407
self.assertEqual(1, branch.do_with_read())
408
self.assertEqual(['lr', 'ul'], branch._calls)
410
def test_excepts_in_read_lock(self):
411
branch = TestDecorator()
412
self.assertRaises(RuntimeError, branch.except_with_read)
413
self.assertEqual(['lr', 'ul'], branch._calls)
415
def test_needs_write_lock(self):
416
branch = TestDecorator()
417
self.assertEqual(2, branch.do_with_write())
418
self.assertEqual(['lw', 'ul'], branch._calls)
420
def test_excepts_in_write_lock(self):
421
branch = TestDecorator()
422
self.assertRaises(RuntimeError, branch.except_with_write)
423
self.assertEqual(['lw', 'ul'], branch._calls)
426
class TestBranchTransaction(TestCaseWithBranch):
184
class TestBranchTransaction(TestCaseInTempDir):
429
187
super(TestBranchTransaction, self).setUp()
188
self.branch = Branch.initialize('.')
432
190
def test_default_get_transaction(self):
433
191
"""branch.get_transaction on a new branch should give a PassThrough."""
434
self.failUnless(isinstance(self.get_branch().get_transaction(),
192
self.failUnless(isinstance(self.branch.get_transaction(),
435
193
transactions.PassThroughTransaction))
437
195
def test__set_new_transaction(self):
438
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
196
self.branch._set_transaction(transactions.ReadOnlyTransaction())
440
198
def test__set_over_existing_transaction_raises(self):
441
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
199
self.branch._set_transaction(transactions.ReadOnlyTransaction())
442
200
self.assertRaises(errors.LockError,
443
self.get_branch()._set_transaction,
201
self.branch._set_transaction,
444
202
transactions.ReadOnlyTransaction())
446
204
def test_finish_no_transaction_raises(self):
447
self.assertRaises(errors.LockError, self.get_branch()._finish_transaction)
205
self.assertRaises(errors.LockError, self.branch._finish_transaction)
449
207
def test_finish_readonly_transaction_works(self):
450
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
451
self.get_branch()._finish_transaction()
452
self.assertEqual(None, self.get_branch().control_files._transaction)
208
self.branch._set_transaction(transactions.ReadOnlyTransaction())
209
self.branch._finish_transaction()
210
self.assertEqual(None, self.branch._transaction)
454
212
def test_unlock_calls_finish(self):
455
self.get_branch().lock_read()
213
self.branch.lock_read()
456
214
transaction = InstrumentedTransaction()
457
self.get_branch().control_files._transaction = transaction
458
self.get_branch().unlock()
215
self.branch._transaction = transaction
459
217
self.assertEqual(['finish'], transaction.calls)
461
219
def test_lock_read_acquires_ro_transaction(self):
462
self.get_branch().lock_read()
463
self.failUnless(isinstance(self.get_branch().get_transaction(),
220
self.branch.lock_read()
221
self.failUnless(isinstance(self.branch.get_transaction(),
464
222
transactions.ReadOnlyTransaction))
465
self.get_branch().unlock()
467
def test_lock_write_acquires_write_transaction(self):
468
self.get_branch().lock_write()
225
def test_lock_write_acquires_passthrough_transaction(self):
226
self.branch.lock_write()
469
227
# cannot use get_transaction as its magic
470
self.failUnless(isinstance(self.get_branch().control_files._transaction,
471
transactions.WriteTransaction))
472
self.get_branch().unlock()
475
class TestBranchPushLocations(TestCaseWithBranch):
477
def test_get_push_location_unset(self):
478
self.assertEqual(None, self.get_branch().get_push_location())
480
def test_get_push_location_exact(self):
481
from bzrlib.config import (branches_config_filename,
482
ensure_config_dir_exists)
483
ensure_config_dir_exists()
484
fn = branches_config_filename()
485
print >> open(fn, 'wt'), ("[%s]\n"
486
"push_location=foo" %
487
self.get_branch().base[:-1])
488
self.assertEqual("foo", self.get_branch().get_push_location())
490
def test_set_push_location(self):
491
from bzrlib.config import (branches_config_filename,
492
ensure_config_dir_exists)
493
ensure_config_dir_exists()
494
fn = branches_config_filename()
495
self.get_branch().set_push_location('foo')
496
self.assertFileEqual("[%s]\n"
497
"push_location = foo" % self.get_branch().base[:-1],
500
# TODO RBC 20051029 test getting a push location from a branch in a
501
# recursive section - that is, it appends the branch name.
504
class TestFormat(TestCaseWithBranch):
505
"""Tests for the format itself."""
507
def test_format_initialize_find_open(self):
508
# loopback test to check the current format initializes to itself.
509
if not self.branch_format.is_supported():
510
# unsupported formats are not loopback testable
511
# because the default open will not open them and
512
# they may not be initializable.
514
# supported formats must be able to init and open
515
t = get_transport(self.get_url())
516
readonly_t = get_transport(self.get_readonly_url())
517
made_branch = self.make_branch('.')
518
self.failUnless(isinstance(made_branch, bzrlib.branch.Branch))
520
# find it via bzrdir opening:
521
opened_control = bzrdir.BzrDir.open(readonly_t.base)
522
direct_opened_branch = opened_control.open_branch()
523
self.assertEqual(direct_opened_branch.__class__, made_branch.__class__)
524
self.assertEqual(opened_control, direct_opened_branch.bzrdir)
525
self.failUnless(isinstance(direct_opened_branch._format,
526
self.branch_format.__class__))
528
# find it via Branch.open
529
opened_branch = bzrlib.branch.Branch.open(readonly_t.base)
530
self.failUnless(isinstance(opened_branch, made_branch.__class__))
531
self.assertEqual(made_branch._format.__class__,
532
opened_branch._format.__class__)
533
# if it has a unique id string, can we probe for it ?
535
self.branch_format.get_format_string()
536
except NotImplementedError:
538
self.assertEqual(self.branch_format,
539
bzrlib.branch.BranchFormat.find_format(opened_control))
228
self.failUnless(isinstance(self.branch._transaction,
229
transactions.PassThroughTransaction))