14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for branch implementations - tests a branch format."""
23
import bzrlib.bzrdir as bzrdir
24
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
25
from bzrlib.commit import commit
26
import bzrlib.errors as errors
27
from bzrlib.errors import (FileExists,
30
UninitializableFormat,
34
from bzrlib.osutils import getcwd
35
from bzrlib.tests import TestCase, TestCaseWithTransport, TestSkipped
36
from bzrlib.trace import mutter
37
import bzrlib.transactions as transactions
38
from bzrlib.transport import get_transport
39
from bzrlib.transport.http import HttpServer
40
from bzrlib.transport.memory import MemoryServer
41
from bzrlib.upgrade import upgrade
42
from bzrlib.workingtree import WorkingTree
45
# TODO: Make a branch using basis branch, and check that it
46
# doesn't request any files that could have been avoided, by
47
# hooking into the Transport.
50
class TestCaseWithBranch(TestCaseWithTransport):
53
super(TestCaseWithBranch, self).setUp()
57
if self.branch is None:
58
self.branch = self.make_branch(None)
61
def make_branch(self, relpath):
62
repo = self.make_repository(relpath)
63
# fixme RBC 20060210 this isnt necessarily a fixable thing,
64
# Skipped is the wrong exception to raise.
66
return self.branch_format.initialize(repo.bzrdir)
67
except errors.UninitializableFormat:
68
raise TestSkipped('Uninitializable branch format')
70
def make_repository(self, relpath, shared=False):
72
url = self.get_url(relpath)
73
segments = url.split('/')
74
if segments and segments[-1] not in ('', '.'):
75
parent = '/'.join(segments[:-1])
76
t = get_transport(parent)
81
made_control = self.bzrdir_format.initialize(url)
82
return made_control.create_repository(shared=shared)
83
except UninitializableFormat:
84
raise TestSkipped("Format %s is not initializable.")
87
class TestBranch(TestCaseWithBranch):
17
from bzrlib.selftest import TestCaseInTempDir
20
class TestAppendRevisions(TestCaseInTempDir):
21
"""Test appending more than one revision"""
89
22
def test_append_revisions(self):
90
"""Test appending more than one revision"""
91
br = self.get_branch()
23
from bzrlib.branch import Branch
24
br = Branch(".", init=True)
92
25
br.append_revision("rev1")
93
26
self.assertEquals(br.revision_history(), ["rev1",])
94
27
br.append_revision("rev2", "rev3")
95
28
self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
97
def test_fetch_revisions(self):
98
"""Test fetch-revision operation."""
99
get_transport(self.get_url()).mkdir('b1')
100
get_transport(self.get_url()).mkdir('b2')
101
wt = self.make_branch_and_tree('b1')
103
b2 = self.make_branch('b2')
104
file('b1/foo', 'w').write('hello')
105
wt.add(['foo'], ['foo-id'])
106
wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
108
mutter('start fetch')
109
self.assertEqual((1, []), b2.fetch(b1))
111
rev = b2.repository.get_revision('revision-1')
112
tree = b2.repository.revision_tree('revision-1')
113
self.assertEqual(tree.get_file_text('foo-id'), 'hello')
115
def get_unbalanced_tree_pair(self):
116
"""Return two branches, a and b, with one file in a."""
117
get_transport(self.get_url()).mkdir('a')
118
tree_a = self.make_branch_and_tree('a')
119
file('a/b', 'wb').write('b')
121
tree_a.commit("silly commit", rev_id='A')
123
get_transport(self.get_url()).mkdir('b')
124
tree_b = self.make_branch_and_tree('b')
125
return tree_a, tree_b
127
def get_balanced_branch_pair(self):
128
"""Returns br_a, br_b as with one commit in a, and b has a's stores."""
129
tree_a, tree_b = self.get_unbalanced_tree_pair()
130
tree_b.branch.repository.fetch(tree_a.branch.repository)
131
return tree_a, tree_b
133
def test_clone_branch(self):
134
"""Copy the stores from one branch to another"""
135
tree_a, tree_b = self.get_balanced_branch_pair()
136
tree_b.commit("silly commit")
138
# this fails to test that the history from a was not used.
139
dir_c = tree_a.bzrdir.clone('c', basis=tree_b.bzrdir)
140
self.assertEqual(tree_a.branch.revision_history(),
141
dir_c.open_branch().revision_history())
143
def test_clone_partial(self):
144
"""Copy only part of the history of a branch."""
145
# TODO: RBC 20060208 test with a revision not on revision-history.
146
# what should that behaviour be ? Emailed the list.
147
wt_a = self.make_branch_and_tree('a')
148
self.build_tree(['a/one'])
150
wt_a.commit('commit one', rev_id='1')
151
self.build_tree(['a/two'])
153
wt_a.commit('commit two', rev_id='2')
154
repo_b = self.make_repository('b')
155
wt_a.bzrdir.open_repository().copy_content_into(repo_b)
156
br_b = wt_a.bzrdir.open_branch().clone(repo_b.bzrdir, revision_id='1')
157
self.assertEqual(br_b.last_revision(), '1')
159
def test_sprout_partial(self):
160
# test sprouting with a prefix of the revision-history.
161
# also needs not-on-revision-history behaviour defined.
162
wt_a = self.make_branch_and_tree('a')
163
self.build_tree(['a/one'])
165
wt_a.commit('commit one', rev_id='1')
166
self.build_tree(['a/two'])
168
wt_a.commit('commit two', rev_id='2')
169
repo_b = self.make_repository('b')
170
wt_a.bzrdir.open_repository().copy_content_into(repo_b)
171
br_b = wt_a.bzrdir.open_branch().sprout(repo_b.bzrdir, revision_id='1')
172
self.assertEqual(br_b.last_revision(), '1')
174
def test_clone_branch_nickname(self):
175
# test the nick name is preserved always
176
raise TestSkipped('XXX branch cloning is not yet tested..')
178
def test_clone_branch_parent(self):
179
# test the parent is preserved always
180
raise TestSkipped('XXX branch cloning is not yet tested..')
182
def test_sprout_branch_nickname(self):
183
# test the nick name is reset always
184
raise TestSkipped('XXX branch sprouting is not yet tested..')
186
def test_sprout_branch_parent(self):
187
source = self.make_branch('source')
188
target = source.bzrdir.sprout(self.get_url('target')).open_branch()
189
self.assertEqual(source.bzrdir.root_transport.base, target.get_parent())
191
def test_record_initial_ghost_merge(self):
192
"""A pending merge with no revision present is still a merge."""
193
wt = self.make_branch_and_tree('.')
195
wt.add_pending_merge('non:existent@rev--ision--0--2')
196
wt.commit('pretend to merge nonexistent-revision', rev_id='first')
197
rev = branch.repository.get_revision(branch.last_revision())
198
self.assertEqual(len(rev.parent_ids), 1)
199
# parent_sha1s is not populated now, WTF. rbc 20051003
200
self.assertEqual(len(rev.parent_sha1s), 0)
201
self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
203
def test_bad_revision(self):
204
self.assertRaises(errors.InvalidRevisionId,
205
self.get_branch().repository.get_revision,
209
# compare the gpg-to-sign info for a commit with a ghost and
210
# an identical tree without a ghost
211
# fetch missing should rewrite the TOC of weaves to list newly available parents.
213
def test_pending_merges(self):
214
"""Tracking pending-merged revisions."""
215
wt = self.make_branch_and_tree('.')
217
self.assertEquals(wt.pending_merges(), [])
218
wt.add_pending_merge('foo@azkhazan-123123-abcabc')
219
self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
220
wt.add_pending_merge('foo@azkhazan-123123-abcabc')
221
self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
222
wt.add_pending_merge('wibble@fofof--20050401--1928390812')
223
self.assertEquals(wt.pending_merges(),
224
['foo@azkhazan-123123-abcabc',
225
'wibble@fofof--20050401--1928390812'])
226
wt.commit("commit from base with two merges")
227
rev = b.repository.get_revision(b.revision_history()[0])
228
self.assertEquals(len(rev.parent_ids), 2)
229
self.assertEquals(rev.parent_ids[0],
230
'foo@azkhazan-123123-abcabc')
231
self.assertEquals(rev.parent_ids[1],
232
'wibble@fofof--20050401--1928390812')
233
# list should be cleared when we do a commit
234
self.assertEquals(wt.pending_merges(), [])
236
def test_sign_existing_revision(self):
237
wt = self.make_branch_and_tree('.')
239
wt.commit("base", allow_pointless=True, rev_id='A')
240
from bzrlib.testament import Testament
241
strategy = bzrlib.gpg.LoopbackGPGStrategy(None)
242
branch.repository.sign_revision('A', strategy)
243
self.assertEqual(Testament.from_revision(branch.repository,
244
'A').as_short_text(),
245
branch.repository.get_signature_text('A'))
247
def test_store_signature(self):
248
wt = self.make_branch_and_tree('.')
250
branch.repository.store_revision_signature(
251
bzrlib.gpg.LoopbackGPGStrategy(None), 'FOO', 'A')
252
self.assertRaises(errors.NoSuchRevision,
253
branch.repository.has_signature_for_revision_id,
255
wt.commit("base", allow_pointless=True, rev_id='A')
256
self.assertEqual('FOO',
257
branch.repository.get_signature_text('A'))
259
def test_branch_keeps_signatures(self):
260
wt = self.make_branch_and_tree('source')
261
wt.commit('A', allow_pointless=True, rev_id='A')
262
wt.branch.repository.sign_revision('A',
263
bzrlib.gpg.LoopbackGPGStrategy(None))
264
#FIXME: clone should work to urls,
265
# wt.clone should work to disks.
266
self.build_tree(['target/'])
267
d2 = wt.bzrdir.clone('target')
268
self.assertEqual(wt.branch.repository.get_signature_text('A'),
269
d2.open_repository().get_signature_text('A'))
271
def test_nicks(self):
272
"""Branch nicknames"""
273
t = get_transport(self.get_url())
275
branch = self.make_branch('bzr.dev')
276
self.assertEqual(branch.nick, 'bzr.dev')
277
t.move('bzr.dev', 'bzr.ab')
278
branch = Branch.open(self.get_url('bzr.ab'))
279
self.assertEqual(branch.nick, 'bzr.ab')
280
branch.nick = "Aaron's branch"
281
branch.nick = "Aaron's branch"
285
branch.control_files.controlfilename("branch.conf")
289
self.assertEqual(branch.nick, "Aaron's branch")
290
t.move('bzr.ab', 'integration')
291
branch = Branch.open(self.get_url('integration'))
292
self.assertEqual(branch.nick, "Aaron's branch")
293
branch.nick = u"\u1234"
294
self.assertEqual(branch.nick, u"\u1234")
296
def test_commit_nicks(self):
297
"""Nicknames are committed to the revision"""
298
get_transport(self.get_url()).mkdir('bzr.dev')
299
wt = self.make_branch_and_tree('bzr.dev')
301
branch.nick = "My happy branch"
302
wt.commit('My commit respect da nick.')
303
committed = branch.repository.get_revision(branch.last_revision())
304
self.assertEqual(committed.properties["branch-nick"],
307
def test_create_open_branch_uses_repository(self):
309
repo = self.make_repository('.', shared=True)
310
except errors.IncompatibleFormat:
312
repo.bzrdir.root_transport.mkdir('child')
313
child_dir = self.bzrdir_format.initialize('child')
315
child_branch = self.branch_format.initialize(child_dir)
316
except errors.UninitializableFormat:
317
# branch references are not default init'able.
319
self.assertEqual(repo.bzrdir.root_transport.base,
320
child_branch.repository.bzrdir.root_transport.base)
321
child_branch = bzrlib.branch.Branch.open(self.get_url('child'))
322
self.assertEqual(repo.bzrdir.root_transport.base,
323
child_branch.repository.bzrdir.root_transport.base)
326
class ChrootedTests(TestCaseWithBranch):
327
"""A support class that provides readonly urls outside the local namespace.
329
This is done by checking if self.transport_server is a MemoryServer. if it
330
is then we are chrooted already, if it is not then an HttpServer is used
335
super(ChrootedTests, self).setUp()
336
if not self.transport_server == MemoryServer:
337
self.transport_readonly_server = HttpServer
339
def test_open_containing(self):
340
self.assertRaises(NotBranchError, Branch.open_containing,
341
self.get_readonly_url(''))
342
self.assertRaises(NotBranchError, Branch.open_containing,
343
self.get_readonly_url('g/p/q'))
344
branch = self.make_branch('.')
345
branch, relpath = Branch.open_containing(self.get_readonly_url(''))
346
self.assertEqual('', relpath)
347
branch, relpath = Branch.open_containing(self.get_readonly_url('g/p/q'))
348
self.assertEqual('g/p/q', relpath)
350
31
# TODO: rewrite this as a regular unittest, without relying on the displayed output
351
32
# >>> from bzrlib.commit import commit
352
33
# >>> bzrlib.trace.silent = True
353
34
# >>> br1 = ScratchBranch(files=['foo', 'bar'])
354
# >>> br1.working_tree().add('foo')
355
# >>> br1.working_tree().add('bar')
356
37
# >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
357
38
# >>> br2 = ScratchBranch()
358
39
# >>> br2.update_revisions(br1)
365
46
# Added 0 revisions.
366
47
# >>> br1.text_store.total_size() == br2.text_store.total_size()
369
class InstrumentedTransaction(object):
372
self.calls.append('finish')
378
class TestDecorator(object):
384
self._calls.append('lr')
386
def lock_write(self):
387
self._calls.append('lw')
390
self._calls.append('ul')
393
def do_with_read(self):
397
def except_with_read(self):
401
def do_with_write(self):
405
def except_with_write(self):
409
class TestDecorators(TestCase):
411
def test_needs_read_lock(self):
412
branch = TestDecorator()
413
self.assertEqual(1, branch.do_with_read())
414
self.assertEqual(['lr', 'ul'], branch._calls)
416
def test_excepts_in_read_lock(self):
417
branch = TestDecorator()
418
self.assertRaises(RuntimeError, branch.except_with_read)
419
self.assertEqual(['lr', 'ul'], branch._calls)
421
def test_needs_write_lock(self):
422
branch = TestDecorator()
423
self.assertEqual(2, branch.do_with_write())
424
self.assertEqual(['lw', 'ul'], branch._calls)
426
def test_excepts_in_write_lock(self):
427
branch = TestDecorator()
428
self.assertRaises(RuntimeError, branch.except_with_write)
429
self.assertEqual(['lw', 'ul'], branch._calls)
432
class TestBranchTransaction(TestCaseWithBranch):
435
super(TestBranchTransaction, self).setUp()
438
def test_default_get_transaction(self):
439
"""branch.get_transaction on a new branch should give a PassThrough."""
440
self.failUnless(isinstance(self.get_branch().get_transaction(),
441
transactions.PassThroughTransaction))
443
def test__set_new_transaction(self):
444
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
446
def test__set_over_existing_transaction_raises(self):
447
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
448
self.assertRaises(errors.LockError,
449
self.get_branch()._set_transaction,
450
transactions.ReadOnlyTransaction())
452
def test_finish_no_transaction_raises(self):
453
self.assertRaises(errors.LockError, self.get_branch()._finish_transaction)
455
def test_finish_readonly_transaction_works(self):
456
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
457
self.get_branch()._finish_transaction()
458
self.assertEqual(None, self.get_branch().control_files._transaction)
460
def test_unlock_calls_finish(self):
461
self.get_branch().lock_read()
462
transaction = InstrumentedTransaction()
463
self.get_branch().control_files._transaction = transaction
464
self.get_branch().unlock()
465
self.assertEqual(['finish'], transaction.calls)
467
def test_lock_read_acquires_ro_transaction(self):
468
self.get_branch().lock_read()
469
self.failUnless(isinstance(self.get_branch().get_transaction(),
470
transactions.ReadOnlyTransaction))
471
self.get_branch().unlock()
473
def test_lock_write_acquires_write_transaction(self):
474
self.get_branch().lock_write()
475
# cannot use get_transaction as its magic
476
self.failUnless(isinstance(self.get_branch().control_files._transaction,
477
transactions.WriteTransaction))
478
self.get_branch().unlock()
481
class TestBranchPushLocations(TestCaseWithBranch):
483
def test_get_push_location_unset(self):
484
self.assertEqual(None, self.get_branch().get_push_location())
486
def test_get_push_location_exact(self):
487
from bzrlib.config import (branches_config_filename,
488
ensure_config_dir_exists)
489
ensure_config_dir_exists()
490
fn = branches_config_filename()
491
print >> open(fn, 'wt'), ("[%s]\n"
492
"push_location=foo" %
493
self.get_branch().base[:-1])
494
self.assertEqual("foo", self.get_branch().get_push_location())
496
def test_set_push_location(self):
497
from bzrlib.config import (branches_config_filename,
498
ensure_config_dir_exists)
499
ensure_config_dir_exists()
500
fn = branches_config_filename()
501
self.get_branch().set_push_location('foo')
502
self.assertFileEqual("[%s]\n"
503
"push_location = foo" % self.get_branch().base[:-1],
506
# TODO RBC 20051029 test getting a push location from a branch in a
507
# recursive section - that is, it appends the branch name.
510
class TestFormat(TestCaseWithBranch):
511
"""Tests for the format itself."""
513
def test_format_initialize_find_open(self):
514
# loopback test to check the current format initializes to itself.
515
if not self.branch_format.is_supported():
516
# unsupported formats are not loopback testable
517
# because the default open will not open them and
518
# they may not be initializable.
520
# supported formats must be able to init and open
521
t = get_transport(self.get_url())
522
readonly_t = get_transport(self.get_readonly_url())
523
made_branch = self.make_branch('.')
524
self.failUnless(isinstance(made_branch, bzrlib.branch.Branch))
526
# find it via bzrdir opening:
527
opened_control = bzrdir.BzrDir.open(readonly_t.base)
528
direct_opened_branch = opened_control.open_branch()
529
self.assertEqual(direct_opened_branch.__class__, made_branch.__class__)
530
self.assertEqual(opened_control, direct_opened_branch.bzrdir)
531
self.failUnless(isinstance(direct_opened_branch._format,
532
self.branch_format.__class__))
534
# find it via Branch.open
535
opened_branch = bzrlib.branch.Branch.open(readonly_t.base)
536
self.failUnless(isinstance(opened_branch, made_branch.__class__))
537
self.assertEqual(made_branch._format.__class__,
538
opened_branch._format.__class__)
539
# if it has a unique id string, can we probe for it ?
541
self.branch_format.get_format_string()
542
except NotImplementedError:
544
self.assertEqual(self.branch_format,
545
bzrlib.branch.BranchFormat.find_format(opened_control))