14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for branch implementations - tests a branch format."""
23
import bzrlib.bzrdir as bzrdir
24
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
25
from bzrlib.commit import commit
26
import bzrlib.errors as errors
27
from bzrlib.errors import (FileExists,
30
UninitializableFormat,
34
from bzrlib.osutils import getcwd
35
from bzrlib.tests import TestCase, TestCaseWithTransport, TestSkipped
36
from bzrlib.tests.bzrdir_implementations.test_bzrdir import TestCaseWithBzrDir
37
from bzrlib.trace import mutter
38
import bzrlib.transactions as transactions
39
from bzrlib.transport import get_transport
40
from bzrlib.transport.http import HttpServer
41
from bzrlib.transport.memory import MemoryServer
42
from bzrlib.upgrade import upgrade
43
from bzrlib.workingtree import WorkingTree
46
# TODO: Make a branch using basis branch, and check that it
47
# doesn't request any files that could have been avoided, by
48
# hooking into the Transport.
51
class TestCaseWithBranch(TestCaseWithBzrDir):
54
super(TestCaseWithBranch, self).setUp()
58
if self.branch is None:
59
self.branch = self.make_branch('')
62
def make_branch(self, relpath, format=None):
63
repo = self.make_repository(relpath, format=format)
64
# fixme RBC 20060210 this isnt necessarily a fixable thing,
65
# Skipped is the wrong exception to raise.
67
return self.branch_format.initialize(repo.bzrdir)
68
except errors.UninitializableFormat:
69
raise TestSkipped('Uninitializable branch format')
71
def make_repository(self, relpath, shared=False, format=None):
72
made_control = self.make_bzrdir(relpath, format=format)
73
return made_control.create_repository(shared=shared)
76
class TestBranch(TestCaseWithBranch):
78
def test_append_revisions(self):
79
"""Test appending more than one revision"""
80
br = self.get_branch()
17
from bzrlib.selftest import InTempDir
21
class TestAppendRevisions(InTempDir):
22
"""Test appending more than one revision"""
24
from bzrlib.branch import Branch
25
br = Branch(".", init=True)
81
26
br.append_revision("rev1")
82
27
self.assertEquals(br.revision_history(), ["rev1",])
83
28
br.append_revision("rev2", "rev3")
84
29
self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
86
def test_fetch_revisions(self):
87
"""Test fetch-revision operation."""
88
get_transport(self.get_url()).mkdir('b1')
89
get_transport(self.get_url()).mkdir('b2')
90
wt = self.make_branch_and_tree('b1')
92
b2 = self.make_branch('b2')
93
file('b1/foo', 'w').write('hello')
94
wt.add(['foo'], ['foo-id'])
95
wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
98
self.assertEqual((1, []), b2.fetch(b1))
100
rev = b2.repository.get_revision('revision-1')
101
tree = b2.repository.revision_tree('revision-1')
102
self.assertEqual(tree.get_file_text('foo-id'), 'hello')
104
def get_unbalanced_tree_pair(self):
105
"""Return two branches, a and b, with one file in a."""
106
get_transport(self.get_url()).mkdir('a')
107
tree_a = self.make_branch_and_tree('a')
108
file('a/b', 'wb').write('b')
110
tree_a.commit("silly commit", rev_id='A')
112
get_transport(self.get_url()).mkdir('b')
113
tree_b = self.make_branch_and_tree('b')
114
return tree_a, tree_b
116
def get_balanced_branch_pair(self):
117
"""Returns br_a, br_b as with one commit in a, and b has a's stores."""
118
tree_a, tree_b = self.get_unbalanced_tree_pair()
119
tree_b.branch.repository.fetch(tree_a.branch.repository)
120
return tree_a, tree_b
122
def test_clone_branch(self):
123
"""Copy the stores from one branch to another"""
124
tree_a, tree_b = self.get_balanced_branch_pair()
125
tree_b.commit("silly commit")
127
# this fails to test that the history from a was not used.
128
dir_c = tree_a.bzrdir.clone('c', basis=tree_b.bzrdir)
129
self.assertEqual(tree_a.branch.revision_history(),
130
dir_c.open_branch().revision_history())
132
def test_clone_partial(self):
133
"""Copy only part of the history of a branch."""
134
# TODO: RBC 20060208 test with a revision not on revision-history.
135
# what should that behaviour be ? Emailed the list.
136
wt_a = self.make_branch_and_tree('a')
137
self.build_tree(['a/one'])
139
wt_a.commit('commit one', rev_id='1')
140
self.build_tree(['a/two'])
142
wt_a.commit('commit two', rev_id='2')
143
repo_b = self.make_repository('b')
144
wt_a.bzrdir.open_repository().copy_content_into(repo_b)
145
br_b = wt_a.bzrdir.open_branch().clone(repo_b.bzrdir, revision_id='1')
146
self.assertEqual(br_b.last_revision(), '1')
148
def test_sprout_partial(self):
149
# test sprouting with a prefix of the revision-history.
150
# also needs not-on-revision-history behaviour defined.
151
wt_a = self.make_branch_and_tree('a')
152
self.build_tree(['a/one'])
154
wt_a.commit('commit one', rev_id='1')
155
self.build_tree(['a/two'])
157
wt_a.commit('commit two', rev_id='2')
158
repo_b = self.make_repository('b')
159
wt_a.bzrdir.open_repository().copy_content_into(repo_b)
160
br_b = wt_a.bzrdir.open_branch().sprout(repo_b.bzrdir, revision_id='1')
161
self.assertEqual(br_b.last_revision(), '1')
163
def test_clone_branch_nickname(self):
164
# test the nick name is preserved always
165
raise TestSkipped('XXX branch cloning is not yet tested..')
167
def test_clone_branch_parent(self):
168
# test the parent is preserved always
169
raise TestSkipped('XXX branch cloning is not yet tested..')
171
def test_sprout_branch_nickname(self):
172
# test the nick name is reset always
173
raise TestSkipped('XXX branch sprouting is not yet tested..')
175
def test_sprout_branch_parent(self):
176
source = self.make_branch('source')
177
target = source.bzrdir.sprout(self.get_url('target')).open_branch()
178
self.assertEqual(source.bzrdir.root_transport.base, target.get_parent())
180
def test_record_initial_ghost_merge(self):
181
"""A pending merge with no revision present is still a merge."""
182
wt = self.make_branch_and_tree('.')
184
wt.add_pending_merge('non:existent@rev--ision--0--2')
185
wt.commit('pretend to merge nonexistent-revision', rev_id='first')
186
rev = branch.repository.get_revision(branch.last_revision())
187
self.assertEqual(len(rev.parent_ids), 1)
188
# parent_sha1s is not populated now, WTF. rbc 20051003
189
self.assertEqual(len(rev.parent_sha1s), 0)
190
self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
192
def test_bad_revision(self):
193
self.assertRaises(errors.InvalidRevisionId,
194
self.get_branch().repository.get_revision,
198
# compare the gpg-to-sign info for a commit with a ghost and
199
# an identical tree without a ghost
200
# fetch missing should rewrite the TOC of weaves to list newly available parents.
202
def test_pending_merges(self):
203
"""Tracking pending-merged revisions."""
204
wt = self.make_branch_and_tree('.')
206
self.assertEquals(wt.pending_merges(), [])
207
wt.add_pending_merge('foo@azkhazan-123123-abcabc')
208
self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
209
wt.add_pending_merge('foo@azkhazan-123123-abcabc')
210
self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
211
wt.add_pending_merge('wibble@fofof--20050401--1928390812')
212
self.assertEquals(wt.pending_merges(),
213
['foo@azkhazan-123123-abcabc',
214
'wibble@fofof--20050401--1928390812'])
215
wt.commit("commit from base with two merges")
216
rev = b.repository.get_revision(b.revision_history()[0])
217
self.assertEquals(len(rev.parent_ids), 2)
218
self.assertEquals(rev.parent_ids[0],
219
'foo@azkhazan-123123-abcabc')
220
self.assertEquals(rev.parent_ids[1],
221
'wibble@fofof--20050401--1928390812')
222
# list should be cleared when we do a commit
223
self.assertEquals(wt.pending_merges(), [])
225
def test_sign_existing_revision(self):
226
wt = self.make_branch_and_tree('.')
228
wt.commit("base", allow_pointless=True, rev_id='A')
229
from bzrlib.testament import Testament
230
strategy = bzrlib.gpg.LoopbackGPGStrategy(None)
231
branch.repository.sign_revision('A', strategy)
232
self.assertEqual(Testament.from_revision(branch.repository,
233
'A').as_short_text(),
234
branch.repository.get_signature_text('A'))
236
def test_store_signature(self):
237
wt = self.make_branch_and_tree('.')
239
branch.repository.store_revision_signature(
240
bzrlib.gpg.LoopbackGPGStrategy(None), 'FOO', 'A')
241
self.assertRaises(errors.NoSuchRevision,
242
branch.repository.has_signature_for_revision_id,
244
wt.commit("base", allow_pointless=True, rev_id='A')
245
self.assertEqual('FOO',
246
branch.repository.get_signature_text('A'))
248
def test_branch_keeps_signatures(self):
249
wt = self.make_branch_and_tree('source')
250
wt.commit('A', allow_pointless=True, rev_id='A')
251
wt.branch.repository.sign_revision('A',
252
bzrlib.gpg.LoopbackGPGStrategy(None))
253
#FIXME: clone should work to urls,
254
# wt.clone should work to disks.
255
self.build_tree(['target/'])
256
d2 = wt.bzrdir.clone('target')
257
self.assertEqual(wt.branch.repository.get_signature_text('A'),
258
d2.open_repository().get_signature_text('A'))
260
def test_nicks(self):
261
"""Branch nicknames"""
262
t = get_transport(self.get_url())
264
branch = self.make_branch('bzr.dev')
265
self.assertEqual(branch.nick, 'bzr.dev')
266
t.move('bzr.dev', 'bzr.ab')
267
branch = Branch.open(self.get_url('bzr.ab'))
268
self.assertEqual(branch.nick, 'bzr.ab')
269
branch.nick = "Aaron's branch"
270
branch.nick = "Aaron's branch"
274
branch.control_files.controlfilename("branch.conf")
278
self.assertEqual(branch.nick, "Aaron's branch")
279
t.move('bzr.ab', 'integration')
280
branch = Branch.open(self.get_url('integration'))
281
self.assertEqual(branch.nick, "Aaron's branch")
282
branch.nick = u"\u1234"
283
self.assertEqual(branch.nick, u"\u1234")
285
def test_commit_nicks(self):
286
"""Nicknames are committed to the revision"""
287
get_transport(self.get_url()).mkdir('bzr.dev')
288
wt = self.make_branch_and_tree('bzr.dev')
290
branch.nick = "My happy branch"
291
wt.commit('My commit respect da nick.')
292
committed = branch.repository.get_revision(branch.last_revision())
293
self.assertEqual(committed.properties["branch-nick"],
296
def test_create_open_branch_uses_repository(self):
298
repo = self.make_repository('.', shared=True)
299
except errors.IncompatibleFormat:
301
repo.bzrdir.root_transport.mkdir('child')
302
child_dir = self.bzrdir_format.initialize('child')
304
child_branch = self.branch_format.initialize(child_dir)
305
except errors.UninitializableFormat:
306
# branch references are not default init'able.
308
self.assertEqual(repo.bzrdir.root_transport.base,
309
child_branch.repository.bzrdir.root_transport.base)
310
child_branch = bzrlib.branch.Branch.open(self.get_url('child'))
311
self.assertEqual(repo.bzrdir.root_transport.base,
312
child_branch.repository.bzrdir.root_transport.base)
314
def test_format_description(self):
315
tree = self.make_branch_and_tree('tree')
316
text = tree.branch._format.get_format_description()
317
self.failUnless(len(text))
320
class ChrootedTests(TestCaseWithBranch):
321
"""A support class that provides readonly urls outside the local namespace.
323
This is done by checking if self.transport_server is a MemoryServer. if it
324
is then we are chrooted already, if it is not then an HttpServer is used
329
super(ChrootedTests, self).setUp()
330
if not self.transport_server == MemoryServer:
331
self.transport_readonly_server = HttpServer
333
def test_open_containing(self):
334
self.assertRaises(NotBranchError, Branch.open_containing,
335
self.get_readonly_url(''))
336
self.assertRaises(NotBranchError, Branch.open_containing,
337
self.get_readonly_url('g/p/q'))
338
branch = self.make_branch('.')
339
branch, relpath = Branch.open_containing(self.get_readonly_url(''))
340
self.assertEqual('', relpath)
341
branch, relpath = Branch.open_containing(self.get_readonly_url('g/p/q'))
342
self.assertEqual('g/p/q', relpath)
344
# TODO: rewrite this as a regular unittest, without relying on the displayed output
345
# >>> from bzrlib.commit import commit
346
# >>> bzrlib.trace.silent = True
347
# >>> br1 = ScratchBranch(files=['foo', 'bar'])
348
# >>> br1.working_tree().add('foo')
349
# >>> br1.working_tree().add('bar')
350
# >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
351
# >>> br2 = ScratchBranch()
352
# >>> br2.update_revisions(br1)
354
# Added 1 inventories.
356
# >>> br2.revision_history()
358
# >>> br2.update_revisions(br1)
360
# >>> br1.text_store.total_size() == br2.text_store.total_size()
363
class InstrumentedTransaction(object):
366
self.calls.append('finish')
372
class TestDecorator(object):
378
self._calls.append('lr')
380
def lock_write(self):
381
self._calls.append('lw')
384
self._calls.append('ul')
387
def do_with_read(self):
391
def except_with_read(self):
395
def do_with_write(self):
399
def except_with_write(self):
403
class TestDecorators(TestCase):
405
def test_needs_read_lock(self):
406
branch = TestDecorator()
407
self.assertEqual(1, branch.do_with_read())
408
self.assertEqual(['lr', 'ul'], branch._calls)
410
def test_excepts_in_read_lock(self):
411
branch = TestDecorator()
412
self.assertRaises(RuntimeError, branch.except_with_read)
413
self.assertEqual(['lr', 'ul'], branch._calls)
415
def test_needs_write_lock(self):
416
branch = TestDecorator()
417
self.assertEqual(2, branch.do_with_write())
418
self.assertEqual(['lw', 'ul'], branch._calls)
420
def test_excepts_in_write_lock(self):
421
branch = TestDecorator()
422
self.assertRaises(RuntimeError, branch.except_with_write)
423
self.assertEqual(['lw', 'ul'], branch._calls)
426
class TestBranchTransaction(TestCaseWithBranch):
429
super(TestBranchTransaction, self).setUp()
432
def test_default_get_transaction(self):
433
"""branch.get_transaction on a new branch should give a PassThrough."""
434
self.failUnless(isinstance(self.get_branch().get_transaction(),
435
transactions.PassThroughTransaction))
437
def test__set_new_transaction(self):
438
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
440
def test__set_over_existing_transaction_raises(self):
441
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
442
self.assertRaises(errors.LockError,
443
self.get_branch()._set_transaction,
444
transactions.ReadOnlyTransaction())
446
def test_finish_no_transaction_raises(self):
447
self.assertRaises(errors.LockError, self.get_branch()._finish_transaction)
449
def test_finish_readonly_transaction_works(self):
450
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
451
self.get_branch()._finish_transaction()
452
self.assertEqual(None, self.get_branch().control_files._transaction)
454
def test_unlock_calls_finish(self):
455
self.get_branch().lock_read()
456
transaction = InstrumentedTransaction()
457
self.get_branch().control_files._transaction = transaction
458
self.get_branch().unlock()
459
self.assertEqual(['finish'], transaction.calls)
461
def test_lock_read_acquires_ro_transaction(self):
462
self.get_branch().lock_read()
463
self.failUnless(isinstance(self.get_branch().get_transaction(),
464
transactions.ReadOnlyTransaction))
465
self.get_branch().unlock()
467
def test_lock_write_acquires_write_transaction(self):
468
self.get_branch().lock_write()
469
# cannot use get_transaction as its magic
470
self.failUnless(isinstance(self.get_branch().control_files._transaction,
471
transactions.WriteTransaction))
472
self.get_branch().unlock()
475
class TestBranchPushLocations(TestCaseWithBranch):
477
def test_get_push_location_unset(self):
478
self.assertEqual(None, self.get_branch().get_push_location())
480
def test_get_push_location_exact(self):
481
from bzrlib.config import (branches_config_filename,
482
ensure_config_dir_exists)
483
ensure_config_dir_exists()
484
fn = branches_config_filename()
485
print >> open(fn, 'wt'), ("[%s]\n"
486
"push_location=foo" %
487
self.get_branch().base[:-1])
488
self.assertEqual("foo", self.get_branch().get_push_location())
490
def test_set_push_location(self):
491
from bzrlib.config import (branches_config_filename,
492
ensure_config_dir_exists)
493
ensure_config_dir_exists()
494
fn = branches_config_filename()
495
self.get_branch().set_push_location('foo')
496
self.assertFileEqual("[%s]\n"
497
"push_location = foo" % self.get_branch().base[:-1],
500
# TODO RBC 20051029 test getting a push location from a branch in a
501
# recursive section - that is, it appends the branch name.
504
class TestFormat(TestCaseWithBranch):
505
"""Tests for the format itself."""
507
def test_format_initialize_find_open(self):
508
# loopback test to check the current format initializes to itself.
509
if not self.branch_format.is_supported():
510
# unsupported formats are not loopback testable
511
# because the default open will not open them and
512
# they may not be initializable.
514
# supported formats must be able to init and open
515
t = get_transport(self.get_url())
516
readonly_t = get_transport(self.get_readonly_url())
517
made_branch = self.make_branch('.')
518
self.failUnless(isinstance(made_branch, bzrlib.branch.Branch))
520
# find it via bzrdir opening:
521
opened_control = bzrdir.BzrDir.open(readonly_t.base)
522
direct_opened_branch = opened_control.open_branch()
523
self.assertEqual(direct_opened_branch.__class__, made_branch.__class__)
524
self.assertEqual(opened_control, direct_opened_branch.bzrdir)
525
self.failUnless(isinstance(direct_opened_branch._format,
526
self.branch_format.__class__))
528
# find it via Branch.open
529
opened_branch = bzrlib.branch.Branch.open(readonly_t.base)
530
self.failUnless(isinstance(opened_branch, made_branch.__class__))
531
self.assertEqual(made_branch._format.__class__,
532
opened_branch._format.__class__)
533
# if it has a unique id string, can we probe for it ?
535
self.branch_format.get_format_string()
536
except NotImplementedError:
538
self.assertEqual(self.branch_format,
539
bzrlib.branch.BranchFormat.find_format(opened_control))