14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for branch implementations - tests a branch format."""
22
from bzrlib import branch, bzrdir, errors, gpg, transactions, repository
23
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
24
from bzrlib.delta import TreeDelta
25
from bzrlib.errors import (FileExists,
28
UninitializableFormat,
31
from bzrlib.osutils import getcwd
32
import bzrlib.revision
33
from bzrlib.tests import TestCase, TestCaseWithTransport, TestSkipped
34
from bzrlib.tests.bzrdir_implementations.test_bzrdir import TestCaseWithBzrDir
35
from bzrlib.trace import mutter
36
from bzrlib.transport import get_transport
37
from bzrlib.transport.http import HttpServer
38
from bzrlib.transport.memory import MemoryServer
39
from bzrlib.upgrade import upgrade
40
from bzrlib.workingtree import WorkingTree
43
# TODO: Make a branch using basis branch, and check that it
44
# doesn't request any files that could have been avoided, by
45
# hooking into the Transport.
48
class TestCaseWithBranch(TestCaseWithBzrDir):
51
super(TestCaseWithBranch, self).setUp()
55
if self.branch is None:
56
self.branch = self.make_branch('')
59
def make_branch(self, relpath, format=None):
60
repo = self.make_repository(relpath, format=format)
61
# fixme RBC 20060210 this isnt necessarily a fixable thing,
62
# Skipped is the wrong exception to raise.
64
return self.branch_format.initialize(repo.bzrdir)
65
except errors.UninitializableFormat:
66
raise TestSkipped('Uninitializable branch format')
68
def make_repository(self, relpath, shared=False, format=None):
69
made_control = self.make_bzrdir(relpath, format=format)
70
return made_control.create_repository(shared=shared)
73
class TestBranch(TestCaseWithBranch):
75
def test_append_revisions(self):
76
"""Test appending more than one revision"""
77
br = self.get_branch()
17
from bzrlib.selftest import InTempDir
21
class TestAppendRevisions(InTempDir):
22
"""Test appending more than one revision"""
24
from bzrlib.branch import Branch
25
br = Branch(".", init=True)
78
26
br.append_revision("rev1")
79
27
self.assertEquals(br.revision_history(), ["rev1",])
80
28
br.append_revision("rev2", "rev3")
81
29
self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
83
def test_fetch_revisions(self):
84
"""Test fetch-revision operation."""
85
get_transport(self.get_url()).mkdir('b1')
86
get_transport(self.get_url()).mkdir('b2')
87
wt = self.make_branch_and_tree('b1')
89
b2 = self.make_branch('b2')
90
file('b1/foo', 'w').write('hello')
91
wt.add(['foo'], ['foo-id'])
92
wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
95
self.assertEqual((1, []), b2.fetch(b1))
97
rev = b2.repository.get_revision('revision-1')
98
tree = b2.repository.revision_tree('revision-1')
99
self.assertEqual(tree.get_file_text('foo-id'), 'hello')
101
def test_get_revision_delta(self):
102
tree_a = self.make_branch_and_tree('a')
103
self.build_tree(['a/foo'])
104
tree_a.add('foo', 'file1')
105
tree_a.commit('rev1', rev_id='rev1')
106
self.build_tree(['a/vla'])
107
tree_a.add('vla', 'file2')
108
tree_a.commit('rev2', rev_id='rev2')
110
delta = tree_a.branch.get_revision_delta(1)
111
self.assertIsInstance(delta, TreeDelta)
112
self.assertEqual([('foo', 'file1', 'file')], delta.added)
113
delta = tree_a.branch.get_revision_delta(2)
114
self.assertIsInstance(delta, TreeDelta)
115
self.assertEqual([('vla', 'file2', 'file')], delta.added)
117
def get_unbalanced_tree_pair(self):
118
"""Return two branches, a and b, with one file in a."""
119
get_transport(self.get_url()).mkdir('a')
120
tree_a = self.make_branch_and_tree('a')
121
file('a/b', 'wb').write('b')
123
tree_a.commit("silly commit", rev_id='A')
125
get_transport(self.get_url()).mkdir('b')
126
tree_b = self.make_branch_and_tree('b')
127
return tree_a, tree_b
129
def get_balanced_branch_pair(self):
130
"""Returns br_a, br_b as with one commit in a, and b has a's stores."""
131
tree_a, tree_b = self.get_unbalanced_tree_pair()
132
tree_b.branch.repository.fetch(tree_a.branch.repository)
133
return tree_a, tree_b
135
def test_clone_branch(self):
136
"""Copy the stores from one branch to another"""
137
tree_a, tree_b = self.get_balanced_branch_pair()
138
tree_b.commit("silly commit")
140
# this fails to test that the history from a was not used.
141
dir_c = tree_a.bzrdir.clone('c', basis=tree_b.bzrdir)
142
self.assertEqual(tree_a.branch.revision_history(),
143
dir_c.open_branch().revision_history())
145
def test_clone_partial(self):
146
"""Copy only part of the history of a branch."""
147
# TODO: RBC 20060208 test with a revision not on revision-history.
148
# what should that behaviour be ? Emailed the list.
149
wt_a = self.make_branch_and_tree('a')
150
self.build_tree(['a/one'])
152
wt_a.commit('commit one', rev_id='1')
153
self.build_tree(['a/two'])
155
wt_a.commit('commit two', rev_id='2')
156
repo_b = self.make_repository('b')
157
wt_a.bzrdir.open_repository().copy_content_into(repo_b)
158
br_b = wt_a.bzrdir.open_branch().clone(repo_b.bzrdir, revision_id='1')
159
self.assertEqual(br_b.last_revision(), '1')
161
def test_sprout_partial(self):
162
# test sprouting with a prefix of the revision-history.
163
# also needs not-on-revision-history behaviour defined.
164
wt_a = self.make_branch_and_tree('a')
165
self.build_tree(['a/one'])
167
wt_a.commit('commit one', rev_id='1')
168
self.build_tree(['a/two'])
170
wt_a.commit('commit two', rev_id='2')
171
repo_b = self.make_repository('b')
172
wt_a.bzrdir.open_repository().copy_content_into(repo_b)
173
br_b = wt_a.bzrdir.open_branch().sprout(repo_b.bzrdir, revision_id='1')
174
self.assertEqual(br_b.last_revision(), '1')
176
def test_clone_branch_nickname(self):
177
# test the nick name is preserved always
178
raise TestSkipped('XXX branch cloning is not yet tested..')
180
def test_clone_branch_parent(self):
181
# test the parent is preserved always
182
raise TestSkipped('XXX branch cloning is not yet tested..')
184
def test_sprout_branch_nickname(self):
185
# test the nick name is reset always
186
raise TestSkipped('XXX branch sprouting is not yet tested..')
188
def test_sprout_branch_parent(self):
189
source = self.make_branch('source')
190
target = source.bzrdir.sprout(self.get_url('target')).open_branch()
191
self.assertEqual(source.bzrdir.root_transport.base, target.get_parent())
193
def test_submit_branch(self):
194
"""Submit location can be queried and set"""
195
branch = self.make_branch('branch')
196
self.assertEqual(branch.get_submit_branch(), None)
197
branch.set_submit_branch('sftp://example.com')
198
self.assertEqual(branch.get_submit_branch(), 'sftp://example.com')
199
branch.set_submit_branch('sftp://example.net')
200
self.assertEqual(branch.get_submit_branch(), 'sftp://example.net')
202
def test_record_initial_ghost_merge(self):
203
"""A pending merge with no revision present is still a merge."""
204
wt = self.make_branch_and_tree('.')
206
wt.add_pending_merge('non:existent@rev--ision--0--2')
207
wt.commit('pretend to merge nonexistent-revision', rev_id='first')
208
rev = branch.repository.get_revision(branch.last_revision())
209
self.assertEqual(len(rev.parent_ids), 1)
210
# parent_sha1s is not populated now, WTF. rbc 20051003
211
self.assertEqual(len(rev.parent_sha1s), 0)
212
self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
214
def test_bad_revision(self):
215
self.assertRaises(errors.InvalidRevisionId,
216
self.get_branch().repository.get_revision,
220
# compare the gpg-to-sign info for a commit with a ghost and
221
# an identical tree without a ghost
222
# fetch missing should rewrite the TOC of weaves to list newly available parents.
224
def test_pending_merges(self):
225
"""Tracking pending-merged revisions."""
226
wt = self.make_branch_and_tree('.')
228
self.assertEquals(wt.pending_merges(), [])
229
wt.add_pending_merge('foo@azkhazan-123123-abcabc')
230
self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
231
wt.add_pending_merge('foo@azkhazan-123123-abcabc')
232
self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
233
wt.add_pending_merge('wibble@fofof--20050401--1928390812')
234
self.assertEquals(wt.pending_merges(),
235
['foo@azkhazan-123123-abcabc',
236
'wibble@fofof--20050401--1928390812'])
237
wt.commit("commit from base with two merges")
238
rev = b.repository.get_revision(b.revision_history()[0])
239
self.assertEquals(len(rev.parent_ids), 2)
240
self.assertEquals(rev.parent_ids[0],
241
'foo@azkhazan-123123-abcabc')
242
self.assertEquals(rev.parent_ids[1],
243
'wibble@fofof--20050401--1928390812')
244
# list should be cleared when we do a commit
245
self.assertEquals(wt.pending_merges(), [])
247
def test_sign_existing_revision(self):
248
wt = self.make_branch_and_tree('.')
250
wt.commit("base", allow_pointless=True, rev_id='A')
251
from bzrlib.testament import Testament
252
strategy = gpg.LoopbackGPGStrategy(None)
253
branch.repository.sign_revision('A', strategy)
254
self.assertEqual(Testament.from_revision(branch.repository,
255
'A').as_short_text(),
256
branch.repository.get_signature_text('A'))
258
def test_store_signature(self):
259
wt = self.make_branch_and_tree('.')
261
branch.repository.store_revision_signature(
262
gpg.LoopbackGPGStrategy(None), 'FOO', 'A')
263
self.assertRaises(errors.NoSuchRevision,
264
branch.repository.has_signature_for_revision_id,
266
wt.commit("base", allow_pointless=True, rev_id='A')
267
self.assertEqual('FOO',
268
branch.repository.get_signature_text('A'))
270
def test_branch_keeps_signatures(self):
271
wt = self.make_branch_and_tree('source')
272
wt.commit('A', allow_pointless=True, rev_id='A')
273
wt.branch.repository.sign_revision('A',
274
gpg.LoopbackGPGStrategy(None))
275
#FIXME: clone should work to urls,
276
# wt.clone should work to disks.
277
self.build_tree(['target/'])
278
d2 = wt.bzrdir.clone('target')
279
self.assertEqual(wt.branch.repository.get_signature_text('A'),
280
d2.open_repository().get_signature_text('A'))
282
def test_nicks(self):
283
"""Branch nicknames"""
284
t = get_transport(self.get_url())
286
branch = self.make_branch('bzr.dev')
287
self.assertEqual(branch.nick, 'bzr.dev')
288
t.move('bzr.dev', 'bzr.ab')
289
branch = Branch.open(self.get_url('bzr.ab'))
290
self.assertEqual(branch.nick, 'bzr.ab')
291
branch.nick = "Aaron's branch"
292
branch.nick = "Aaron's branch"
296
branch.control_files.controlfilename("branch.conf")
300
self.assertEqual(branch.nick, "Aaron's branch")
301
t.move('bzr.ab', 'integration')
302
branch = Branch.open(self.get_url('integration'))
303
self.assertEqual(branch.nick, "Aaron's branch")
304
branch.nick = u"\u1234"
305
self.assertEqual(branch.nick, u"\u1234")
307
def test_commit_nicks(self):
308
"""Nicknames are committed to the revision"""
309
get_transport(self.get_url()).mkdir('bzr.dev')
310
wt = self.make_branch_and_tree('bzr.dev')
312
branch.nick = "My happy branch"
313
wt.commit('My commit respect da nick.')
314
committed = branch.repository.get_revision(branch.last_revision())
315
self.assertEqual(committed.properties["branch-nick"],
318
def test_create_open_branch_uses_repository(self):
320
repo = self.make_repository('.', shared=True)
321
except errors.IncompatibleFormat:
323
repo.bzrdir.root_transport.mkdir('child')
324
child_dir = self.bzrdir_format.initialize('child')
326
child_branch = self.branch_format.initialize(child_dir)
327
except errors.UninitializableFormat:
328
# branch references are not default init'able.
330
self.assertEqual(repo.bzrdir.root_transport.base,
331
child_branch.repository.bzrdir.root_transport.base)
332
child_branch = branch.Branch.open(self.get_url('child'))
333
self.assertEqual(repo.bzrdir.root_transport.base,
334
child_branch.repository.bzrdir.root_transport.base)
336
def test_format_description(self):
337
tree = self.make_branch_and_tree('tree')
338
text = tree.branch._format.get_format_description()
339
self.failUnless(len(text))
341
def test_check_branch_report_results(self):
342
"""Checking a branch produces results which can be printed"""
343
branch = self.make_branch('.')
344
result = branch.check()
345
# reports results through logging
346
result.report_results(verbose=True)
347
result.report_results(verbose=False)
349
def test_get_commit_builder(self):
350
self.assertIsInstance(self.make_branch(".").get_commit_builder([]),
351
repository.CommitBuilder)
353
def test_generate_revision_history(self):
354
"""Create a fake revision history easily."""
355
tree = self.make_branch_and_tree('.')
356
rev1 = tree.commit('foo')
357
orig_history = tree.branch.revision_history()
358
rev2 = tree.commit('bar', allow_pointless=True)
359
tree.branch.generate_revision_history(rev1)
360
self.assertEqual(orig_history, tree.branch.revision_history())
362
def test_generate_revision_history_NULL_REVISION(self):
363
tree = self.make_branch_and_tree('.')
364
rev1 = tree.commit('foo')
365
tree.branch.generate_revision_history(bzrlib.revision.NULL_REVISION)
366
self.assertEqual([], tree.branch.revision_history())
369
class ChrootedTests(TestCaseWithBranch):
370
"""A support class that provides readonly urls outside the local namespace.
372
This is done by checking if self.transport_server is a MemoryServer. if it
373
is then we are chrooted already, if it is not then an HttpServer is used
378
super(ChrootedTests, self).setUp()
379
if not self.transport_server == MemoryServer:
380
self.transport_readonly_server = HttpServer
382
def test_open_containing(self):
383
self.assertRaises(NotBranchError, Branch.open_containing,
384
self.get_readonly_url(''))
385
self.assertRaises(NotBranchError, Branch.open_containing,
386
self.get_readonly_url('g/p/q'))
387
branch = self.make_branch('.')
388
branch, relpath = Branch.open_containing(self.get_readonly_url(''))
389
self.assertEqual('', relpath)
390
branch, relpath = Branch.open_containing(self.get_readonly_url('g/p/q'))
391
self.assertEqual('g/p/q', relpath)
394
class InstrumentedTransaction(object):
397
self.calls.append('finish')
403
class TestDecorator(object):
409
self._calls.append('lr')
411
def lock_write(self):
412
self._calls.append('lw')
415
self._calls.append('ul')
418
def do_with_read(self):
422
def except_with_read(self):
426
def do_with_write(self):
430
def except_with_write(self):
434
class TestDecorators(TestCase):
436
def test_needs_read_lock(self):
437
branch = TestDecorator()
438
self.assertEqual(1, branch.do_with_read())
439
self.assertEqual(['lr', 'ul'], branch._calls)
441
def test_excepts_in_read_lock(self):
442
branch = TestDecorator()
443
self.assertRaises(RuntimeError, branch.except_with_read)
444
self.assertEqual(['lr', 'ul'], branch._calls)
446
def test_needs_write_lock(self):
447
branch = TestDecorator()
448
self.assertEqual(2, branch.do_with_write())
449
self.assertEqual(['lw', 'ul'], branch._calls)
451
def test_excepts_in_write_lock(self):
452
branch = TestDecorator()
453
self.assertRaises(RuntimeError, branch.except_with_write)
454
self.assertEqual(['lw', 'ul'], branch._calls)
457
class TestBranchTransaction(TestCaseWithBranch):
460
super(TestBranchTransaction, self).setUp()
463
def test_default_get_transaction(self):
464
"""branch.get_transaction on a new branch should give a PassThrough."""
465
self.failUnless(isinstance(self.get_branch().get_transaction(),
466
transactions.PassThroughTransaction))
468
def test__set_new_transaction(self):
469
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
471
def test__set_over_existing_transaction_raises(self):
472
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
473
self.assertRaises(errors.LockError,
474
self.get_branch()._set_transaction,
475
transactions.ReadOnlyTransaction())
477
def test_finish_no_transaction_raises(self):
478
self.assertRaises(errors.LockError, self.get_branch()._finish_transaction)
480
def test_finish_readonly_transaction_works(self):
481
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
482
self.get_branch()._finish_transaction()
483
self.assertEqual(None, self.get_branch().control_files._transaction)
485
def test_unlock_calls_finish(self):
486
self.get_branch().lock_read()
487
transaction = InstrumentedTransaction()
488
self.get_branch().control_files._transaction = transaction
489
self.get_branch().unlock()
490
self.assertEqual(['finish'], transaction.calls)
492
def test_lock_read_acquires_ro_transaction(self):
493
self.get_branch().lock_read()
494
self.failUnless(isinstance(self.get_branch().get_transaction(),
495
transactions.ReadOnlyTransaction))
496
self.get_branch().unlock()
498
def test_lock_write_acquires_write_transaction(self):
499
self.get_branch().lock_write()
500
# cannot use get_transaction as its magic
501
self.failUnless(isinstance(self.get_branch().control_files._transaction,
502
transactions.WriteTransaction))
503
self.get_branch().unlock()
506
class TestBranchPushLocations(TestCaseWithBranch):
508
def test_get_push_location_unset(self):
509
self.assertEqual(None, self.get_branch().get_push_location())
511
def test_get_push_location_exact(self):
512
from bzrlib.config import (locations_config_filename,
513
ensure_config_dir_exists)
514
ensure_config_dir_exists()
515
fn = locations_config_filename()
516
print >> open(fn, 'wt'), ("[%s]\n"
517
"push_location=foo" %
518
self.get_branch().base[:-1])
519
self.assertEqual("foo", self.get_branch().get_push_location())
521
def test_set_push_location(self):
522
from bzrlib.config import (locations_config_filename,
523
ensure_config_dir_exists)
524
ensure_config_dir_exists()
525
fn = locations_config_filename()
526
self.get_branch().set_push_location('foo')
527
self.assertFileEqual("[%s]\n"
528
"push_location = foo" % self.get_branch().base[:-1],
531
# TODO RBC 20051029 test getting a push location from a branch in a
532
# recursive section - that is, it appends the branch name.
535
class TestFormat(TestCaseWithBranch):
536
"""Tests for the format itself."""
538
def test_format_initialize_find_open(self):
539
# loopback test to check the current format initializes to itself.
540
if not self.branch_format.is_supported():
541
# unsupported formats are not loopback testable
542
# because the default open will not open them and
543
# they may not be initializable.
545
# supported formats must be able to init and open
546
t = get_transport(self.get_url())
547
readonly_t = get_transport(self.get_readonly_url())
548
made_branch = self.make_branch('.')
549
self.failUnless(isinstance(made_branch, branch.Branch))
551
# find it via bzrdir opening:
552
opened_control = bzrdir.BzrDir.open(readonly_t.base)
553
direct_opened_branch = opened_control.open_branch()
554
self.assertEqual(direct_opened_branch.__class__, made_branch.__class__)
555
self.assertEqual(opened_control, direct_opened_branch.bzrdir)
556
self.failUnless(isinstance(direct_opened_branch._format,
557
self.branch_format.__class__))
559
# find it via Branch.open
560
opened_branch = branch.Branch.open(readonly_t.base)
561
self.failUnless(isinstance(opened_branch, made_branch.__class__))
562
self.assertEqual(made_branch._format.__class__,
563
opened_branch._format.__class__)
564
# if it has a unique id string, can we probe for it ?
566
self.branch_format.get_format_string()
567
except NotImplementedError:
569
self.assertEqual(self.branch_format,
570
branch.BranchFormat.find_format(opened_control))