14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for branch implementations - tests a branch format."""
23
import bzrlib.bzrdir as bzrdir
24
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
25
from bzrlib.commit import commit
26
import bzrlib.errors as errors
27
from bzrlib.errors import (FileExists,
30
UninitializableFormat,
34
from bzrlib.osutils import getcwd
35
from bzrlib.tests import TestCase, TestCaseWithTransport, TestSkipped
36
from bzrlib.tests.bzrdir_implementations.test_bzrdir import TestCaseWithBzrDir
20
from bzrlib.selftest import TestCaseInTempDir
21
from bzrlib.branch import Branch
37
22
from bzrlib.trace import mutter
38
import bzrlib.transactions as transactions
39
from bzrlib.transport import get_transport
40
from bzrlib.transport.http import HttpServer
41
from bzrlib.transport.memory import MemoryServer
42
from bzrlib.upgrade import upgrade
43
from bzrlib.workingtree import WorkingTree
46
# TODO: Make a branch using basis branch, and check that it
47
# doesn't request any files that could have been avoided, by
48
# hooking into the Transport.
51
class TestCaseWithBranch(TestCaseWithBzrDir):
54
super(TestCaseWithBranch, self).setUp()
58
if self.branch is None:
59
self.branch = self.make_branch('')
62
def make_branch(self, relpath, format=None):
63
repo = self.make_repository(relpath, format=format)
64
# fixme RBC 20060210 this isnt necessarily a fixable thing,
65
# Skipped is the wrong exception to raise.
67
return self.branch_format.initialize(repo.bzrdir)
68
except errors.UninitializableFormat:
69
raise TestSkipped('Uninitializable branch format')
71
def make_repository(self, relpath, shared=False, format=None):
72
made_control = self.make_bzrdir(relpath, format=format)
73
return made_control.create_repository(shared=shared)
76
class TestBranch(TestCaseWithBranch):
25
class TestAppendRevisions(TestCaseInTempDir):
26
"""Test appending more than one revision"""
78
27
def test_append_revisions(self):
79
"""Test appending more than one revision"""
80
br = self.get_branch()
28
from bzrlib.branch import Branch
29
br = Branch(".", init=True)
81
30
br.append_revision("rev1")
82
31
self.assertEquals(br.revision_history(), ["rev1",])
83
32
br.append_revision("rev2", "rev3")
84
33
self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
37
class TestFetch(TestCaseInTempDir):
86
38
def test_fetch_revisions(self):
87
39
"""Test fetch-revision operation."""
88
get_transport(self.get_url()).mkdir('b1')
89
get_transport(self.get_url()).mkdir('b2')
90
wt = self.make_branch_and_tree('b1')
92
b2 = self.make_branch('b2')
93
file('b1/foo', 'w').write('hello')
94
wt.add(['foo'], ['foo-id'])
95
wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
40
from bzrlib.fetch import Fetcher
43
b1 = Branch('b1', init=True)
44
b2 = Branch('b2', init=True)
45
file(os.sep.join(['b1', 'foo']), 'w').write('hello')
46
b1.add(['foo'], ['foo-id'])
47
b1.commit('lala!', rev_id='revision-1', allow_pointless=False)
97
49
mutter('start fetch')
98
self.assertEqual((1, []), b2.fetch(b1))
100
rev = b2.repository.get_revision('revision-1')
101
tree = b2.repository.revision_tree('revision-1')
102
self.assertEqual(tree.get_file_text('foo-id'), 'hello')
104
def get_unbalanced_tree_pair(self):
105
"""Return two branches, a and b, with one file in a."""
106
get_transport(self.get_url()).mkdir('a')
107
tree_a = self.make_branch_and_tree('a')
108
file('a/b', 'wb').write('b')
110
tree_a.commit("silly commit", rev_id='A')
112
get_transport(self.get_url()).mkdir('b')
113
tree_b = self.make_branch_and_tree('b')
114
return tree_a, tree_b
116
def get_balanced_branch_pair(self):
117
"""Returns br_a, br_b as with one commit in a, and b has a's stores."""
118
tree_a, tree_b = self.get_unbalanced_tree_pair()
119
tree_b.branch.repository.fetch(tree_a.branch.repository)
120
return tree_a, tree_b
122
def test_clone_branch(self):
123
"""Copy the stores from one branch to another"""
124
tree_a, tree_b = self.get_balanced_branch_pair()
125
tree_b.commit("silly commit")
127
# this fails to test that the history from a was not used.
128
dir_c = tree_a.bzrdir.clone('c', basis=tree_b.bzrdir)
129
self.assertEqual(tree_a.branch.revision_history(),
130
dir_c.open_branch().revision_history())
132
def test_clone_partial(self):
133
"""Copy only part of the history of a branch."""
134
# TODO: RBC 20060208 test with a revision not on revision-history.
135
# what should that behaviour be ? Emailed the list.
136
wt_a = self.make_branch_and_tree('a')
137
self.build_tree(['a/one'])
139
wt_a.commit('commit one', rev_id='1')
140
self.build_tree(['a/two'])
142
wt_a.commit('commit two', rev_id='2')
143
repo_b = self.make_repository('b')
144
wt_a.bzrdir.open_repository().copy_content_into(repo_b)
145
br_b = wt_a.bzrdir.open_branch().clone(repo_b.bzrdir, revision_id='1')
146
self.assertEqual(br_b.last_revision(), '1')
148
def test_sprout_partial(self):
149
# test sprouting with a prefix of the revision-history.
150
# also needs not-on-revision-history behaviour defined.
151
wt_a = self.make_branch_and_tree('a')
152
self.build_tree(['a/one'])
154
wt_a.commit('commit one', rev_id='1')
155
self.build_tree(['a/two'])
157
wt_a.commit('commit two', rev_id='2')
158
repo_b = self.make_repository('b')
159
wt_a.bzrdir.open_repository().copy_content_into(repo_b)
160
br_b = wt_a.bzrdir.open_branch().sprout(repo_b.bzrdir, revision_id='1')
161
self.assertEqual(br_b.last_revision(), '1')
163
def test_clone_branch_nickname(self):
164
# test the nick name is preserved always
165
raise TestSkipped('XXX branch cloning is not yet tested..')
167
def test_clone_branch_parent(self):
168
# test the parent is preserved always
169
raise TestSkipped('XXX branch cloning is not yet tested..')
171
def test_sprout_branch_nickname(self):
172
# test the nick name is reset always
173
raise TestSkipped('XXX branch sprouting is not yet tested..')
175
def test_sprout_branch_parent(self):
176
source = self.make_branch('source')
177
target = source.bzrdir.sprout(self.get_url('target')).open_branch()
178
self.assertEqual(source.bzrdir.root_transport.base, target.get_parent())
180
def test_record_initial_ghost_merge(self):
181
"""A pending merge with no revision present is still a merge."""
182
wt = self.make_branch_and_tree('.')
184
wt.add_pending_merge('non:existent@rev--ision--0--2')
185
wt.commit('pretend to merge nonexistent-revision', rev_id='first')
186
rev = branch.repository.get_revision(branch.last_revision())
187
self.assertEqual(len(rev.parent_ids), 1)
188
# parent_sha1s is not populated now, WTF. rbc 20051003
189
self.assertEqual(len(rev.parent_sha1s), 0)
190
self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
192
def test_bad_revision(self):
193
self.assertRaises(errors.InvalidRevisionId,
194
self.get_branch().repository.get_revision,
198
# compare the gpg-to-sign info for a commit with a ghost and
199
# an identical tree without a ghost
200
# fetch missing should rewrite the TOC of weaves to list newly available parents.
202
def test_pending_merges(self):
203
"""Tracking pending-merged revisions."""
204
wt = self.make_branch_and_tree('.')
206
self.assertEquals(wt.pending_merges(), [])
207
wt.add_pending_merge('foo@azkhazan-123123-abcabc')
208
self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
209
wt.add_pending_merge('foo@azkhazan-123123-abcabc')
210
self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
211
wt.add_pending_merge('wibble@fofof--20050401--1928390812')
212
self.assertEquals(wt.pending_merges(),
213
['foo@azkhazan-123123-abcabc',
214
'wibble@fofof--20050401--1928390812'])
215
wt.commit("commit from base with two merges")
216
rev = b.repository.get_revision(b.revision_history()[0])
217
self.assertEquals(len(rev.parent_ids), 2)
218
self.assertEquals(rev.parent_ids[0],
219
'foo@azkhazan-123123-abcabc')
220
self.assertEquals(rev.parent_ids[1],
221
'wibble@fofof--20050401--1928390812')
222
# list should be cleared when we do a commit
223
self.assertEquals(wt.pending_merges(), [])
225
def test_sign_existing_revision(self):
226
wt = self.make_branch_and_tree('.')
228
wt.commit("base", allow_pointless=True, rev_id='A')
229
from bzrlib.testament import Testament
230
strategy = bzrlib.gpg.LoopbackGPGStrategy(None)
231
branch.repository.sign_revision('A', strategy)
232
self.assertEqual(Testament.from_revision(branch.repository,
233
'A').as_short_text(),
234
branch.repository.get_signature_text('A'))
236
def test_store_signature(self):
237
wt = self.make_branch_and_tree('.')
239
branch.repository.store_revision_signature(
240
bzrlib.gpg.LoopbackGPGStrategy(None), 'FOO', 'A')
241
self.assertRaises(errors.NoSuchRevision,
242
branch.repository.has_signature_for_revision_id,
244
wt.commit("base", allow_pointless=True, rev_id='A')
245
self.assertEqual('FOO',
246
branch.repository.get_signature_text('A'))
248
def test_branch_keeps_signatures(self):
249
wt = self.make_branch_and_tree('source')
250
wt.commit('A', allow_pointless=True, rev_id='A')
251
wt.branch.repository.sign_revision('A',
252
bzrlib.gpg.LoopbackGPGStrategy(None))
253
#FIXME: clone should work to urls,
254
# wt.clone should work to disks.
255
self.build_tree(['target/'])
256
d2 = wt.bzrdir.clone('target')
257
self.assertEqual(wt.branch.repository.get_signature_text('A'),
258
d2.open_repository().get_signature_text('A'))
260
def test_nicks(self):
261
"""Branch nicknames"""
262
t = get_transport(self.get_url())
264
branch = self.make_branch('bzr.dev')
265
self.assertEqual(branch.nick, 'bzr.dev')
266
t.move('bzr.dev', 'bzr.ab')
267
branch = Branch.open(self.get_url('bzr.ab'))
268
self.assertEqual(branch.nick, 'bzr.ab')
269
branch.nick = "Aaron's branch"
270
branch.nick = "Aaron's branch"
274
branch.control_files.controlfilename("branch.conf")
278
self.assertEqual(branch.nick, "Aaron's branch")
279
t.move('bzr.ab', 'integration')
280
branch = Branch.open(self.get_url('integration'))
281
self.assertEqual(branch.nick, "Aaron's branch")
282
branch.nick = u"\u1234"
283
self.assertEqual(branch.nick, u"\u1234")
285
def test_commit_nicks(self):
286
"""Nicknames are committed to the revision"""
287
get_transport(self.get_url()).mkdir('bzr.dev')
288
wt = self.make_branch_and_tree('bzr.dev')
290
branch.nick = "My happy branch"
291
wt.commit('My commit respect da nick.')
292
committed = branch.repository.get_revision(branch.last_revision())
293
self.assertEqual(committed.properties["branch-nick"],
296
def test_create_open_branch_uses_repository(self):
298
repo = self.make_repository('.', shared=True)
299
except errors.IncompatibleFormat:
301
repo.bzrdir.root_transport.mkdir('child')
302
child_dir = self.bzrdir_format.initialize('child')
304
child_branch = self.branch_format.initialize(child_dir)
305
except errors.UninitializableFormat:
306
# branch references are not default init'able.
308
self.assertEqual(repo.bzrdir.root_transport.base,
309
child_branch.repository.bzrdir.root_transport.base)
310
child_branch = bzrlib.branch.Branch.open(self.get_url('child'))
311
self.assertEqual(repo.bzrdir.root_transport.base,
312
child_branch.repository.bzrdir.root_transport.base)
314
def test_format_description(self):
315
tree = self.make_branch_and_tree('tree')
316
text = tree.branch._format.get_format_description()
317
self.failUnless(len(text))
319
def test_check_branch_report_results(self):
320
"""Checking a branch produces results which can be printed"""
321
branch = self.make_branch('.')
322
result = branch.check()
323
# reports results through logging
324
result.report_results(verbose=True)
325
result.report_results(verbose=False)
327
def test_get_commit_builder(self):
328
self.assertIsInstance(self.make_branch(".").get_commit_builder([]),
329
bzrlib.repository.CommitBuilder)
332
class ChrootedTests(TestCaseWithBranch):
333
"""A support class that provides readonly urls outside the local namespace.
335
This is done by checking if self.transport_server is a MemoryServer. if it
336
is then we are chrooted already, if it is not then an HttpServer is used
341
super(ChrootedTests, self).setUp()
342
if not self.transport_server == MemoryServer:
343
self.transport_readonly_server = HttpServer
345
def test_open_containing(self):
346
self.assertRaises(NotBranchError, Branch.open_containing,
347
self.get_readonly_url(''))
348
self.assertRaises(NotBranchError, Branch.open_containing,
349
self.get_readonly_url('g/p/q'))
350
branch = self.make_branch('.')
351
branch, relpath = Branch.open_containing(self.get_readonly_url(''))
352
self.assertEqual('', relpath)
353
branch, relpath = Branch.open_containing(self.get_readonly_url('g/p/q'))
354
self.assertEqual('g/p/q', relpath)
50
f = Fetcher(from_branch=b1, to_branch=b2)
51
eq = self.assertEquals
53
eq(f.last_revision, 'revision-1')
55
rev = b2.get_revision('revision-1')
56
tree = b2.revision_tree('revision-1')
57
eq(tree.get_file_text('foo-id'), 'hello')
356
60
# TODO: rewrite this as a regular unittest, without relying on the displayed output
357
61
# >>> from bzrlib.commit import commit
358
62
# >>> bzrlib.trace.silent = True
359
63
# >>> br1 = ScratchBranch(files=['foo', 'bar'])
360
# >>> br1.working_tree().add('foo')
361
# >>> br1.working_tree().add('bar')
362
66
# >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
363
67
# >>> br2 = ScratchBranch()
364
68
# >>> br2.update_revisions(br1)
371
75
# Added 0 revisions.
372
76
# >>> br1.text_store.total_size() == br2.text_store.total_size()
375
class InstrumentedTransaction(object):
378
self.calls.append('finish')
384
class TestDecorator(object):
390
self._calls.append('lr')
392
def lock_write(self):
393
self._calls.append('lw')
396
self._calls.append('ul')
399
def do_with_read(self):
403
def except_with_read(self):
407
def do_with_write(self):
411
def except_with_write(self):
415
class TestDecorators(TestCase):
417
def test_needs_read_lock(self):
418
branch = TestDecorator()
419
self.assertEqual(1, branch.do_with_read())
420
self.assertEqual(['lr', 'ul'], branch._calls)
422
def test_excepts_in_read_lock(self):
423
branch = TestDecorator()
424
self.assertRaises(RuntimeError, branch.except_with_read)
425
self.assertEqual(['lr', 'ul'], branch._calls)
427
def test_needs_write_lock(self):
428
branch = TestDecorator()
429
self.assertEqual(2, branch.do_with_write())
430
self.assertEqual(['lw', 'ul'], branch._calls)
432
def test_excepts_in_write_lock(self):
433
branch = TestDecorator()
434
self.assertRaises(RuntimeError, branch.except_with_write)
435
self.assertEqual(['lw', 'ul'], branch._calls)
438
class TestBranchTransaction(TestCaseWithBranch):
441
super(TestBranchTransaction, self).setUp()
444
def test_default_get_transaction(self):
445
"""branch.get_transaction on a new branch should give a PassThrough."""
446
self.failUnless(isinstance(self.get_branch().get_transaction(),
447
transactions.PassThroughTransaction))
449
def test__set_new_transaction(self):
450
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
452
def test__set_over_existing_transaction_raises(self):
453
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
454
self.assertRaises(errors.LockError,
455
self.get_branch()._set_transaction,
456
transactions.ReadOnlyTransaction())
458
def test_finish_no_transaction_raises(self):
459
self.assertRaises(errors.LockError, self.get_branch()._finish_transaction)
461
def test_finish_readonly_transaction_works(self):
462
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
463
self.get_branch()._finish_transaction()
464
self.assertEqual(None, self.get_branch().control_files._transaction)
466
def test_unlock_calls_finish(self):
467
self.get_branch().lock_read()
468
transaction = InstrumentedTransaction()
469
self.get_branch().control_files._transaction = transaction
470
self.get_branch().unlock()
471
self.assertEqual(['finish'], transaction.calls)
473
def test_lock_read_acquires_ro_transaction(self):
474
self.get_branch().lock_read()
475
self.failUnless(isinstance(self.get_branch().get_transaction(),
476
transactions.ReadOnlyTransaction))
477
self.get_branch().unlock()
479
def test_lock_write_acquires_write_transaction(self):
480
self.get_branch().lock_write()
481
# cannot use get_transaction as its magic
482
self.failUnless(isinstance(self.get_branch().control_files._transaction,
483
transactions.WriteTransaction))
484
self.get_branch().unlock()
487
class TestBranchPushLocations(TestCaseWithBranch):
489
def test_get_push_location_unset(self):
490
self.assertEqual(None, self.get_branch().get_push_location())
492
def test_get_push_location_exact(self):
493
from bzrlib.config import (branches_config_filename,
494
ensure_config_dir_exists)
495
ensure_config_dir_exists()
496
fn = branches_config_filename()
497
print >> open(fn, 'wt'), ("[%s]\n"
498
"push_location=foo" %
499
self.get_branch().base[:-1])
500
self.assertEqual("foo", self.get_branch().get_push_location())
502
def test_set_push_location(self):
503
from bzrlib.config import (branches_config_filename,
504
ensure_config_dir_exists)
505
ensure_config_dir_exists()
506
fn = branches_config_filename()
507
self.get_branch().set_push_location('foo')
508
self.assertFileEqual("[%s]\n"
509
"push_location = foo" % self.get_branch().base[:-1],
512
# TODO RBC 20051029 test getting a push location from a branch in a
513
# recursive section - that is, it appends the branch name.
516
class TestFormat(TestCaseWithBranch):
517
"""Tests for the format itself."""
519
def test_format_initialize_find_open(self):
520
# loopback test to check the current format initializes to itself.
521
if not self.branch_format.is_supported():
522
# unsupported formats are not loopback testable
523
# because the default open will not open them and
524
# they may not be initializable.
526
# supported formats must be able to init and open
527
t = get_transport(self.get_url())
528
readonly_t = get_transport(self.get_readonly_url())
529
made_branch = self.make_branch('.')
530
self.failUnless(isinstance(made_branch, bzrlib.branch.Branch))
532
# find it via bzrdir opening:
533
opened_control = bzrdir.BzrDir.open(readonly_t.base)
534
direct_opened_branch = opened_control.open_branch()
535
self.assertEqual(direct_opened_branch.__class__, made_branch.__class__)
536
self.assertEqual(opened_control, direct_opened_branch.bzrdir)
537
self.failUnless(isinstance(direct_opened_branch._format,
538
self.branch_format.__class__))
540
# find it via Branch.open
541
opened_branch = bzrlib.branch.Branch.open(readonly_t.base)
542
self.failUnless(isinstance(opened_branch, made_branch.__class__))
543
self.assertEqual(made_branch._format.__class__,
544
opened_branch._format.__class__)
545
# if it has a unique id string, can we probe for it ?
547
self.branch_format.get_format_string()
548
except NotImplementedError:
550
self.assertEqual(self.branch_format,
551
bzrlib.branch.BranchFormat.find_format(opened_control))