14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for branch implementations - tests a branch format."""
23
import bzrlib.bzrdir as bzrdir
24
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
25
from bzrlib.commit import commit
26
import bzrlib.errors as errors
27
from bzrlib.errors import (FileExists,
30
UninitializableFormat,
34
from bzrlib.osutils import getcwd
35
from bzrlib.tests import TestCase, TestCaseWithTransport, TestSkipped
36
from bzrlib.trace import mutter
37
import bzrlib.transactions as transactions
38
from bzrlib.transport import get_transport
39
from bzrlib.transport.http import HttpServer
40
from bzrlib.transport.memory import MemoryServer
41
from bzrlib.upgrade import upgrade
42
from bzrlib.workingtree import WorkingTree
44
# TODO: Make a branch using basis branch, and check that it
45
# doesn't request any files that could have been avoided, by
46
# hooking into the Transport.
49
class TestCaseWithBranch(TestCaseWithTransport):
52
super(TestCaseWithBranch, self).setUp()
56
if self.branch is None:
57
self.branch = self.make_branch(None)
60
def make_branch(self, relpath):
61
repo = self.make_repository(relpath)
62
# fixme RBC 20060210 this isnt necessarily a fixable thing,
63
# Skipped is the wrong exception to raise.
65
return self.branch_format.initialize(repo.bzrdir)
66
except errors.UninitializableFormat:
67
raise TestSkipped('Uninitializable branch format')
69
def make_repository(self, relpath):
71
url = self.get_url(relpath)
72
segments = url.split('/')
73
if segments and segments[-1] not in ('', '.'):
74
parent = '/'.join(segments[:-1])
75
t = get_transport(parent)
80
made_control = self.bzrdir_format.initialize(url)
81
return made_control.create_repository()
82
except UninitializableFormat:
83
raise TestSkipped("Format %s is not initializable.")
86
class TestBranch(TestCaseWithBranch):
17
from bzrlib.selftest import InTempDir
20
class TestAppendRevisions(InTempDir):
21
"""Test appending more than one revision"""
88
22
def test_append_revisions(self):
89
"""Test appending more than one revision"""
90
br = self.get_branch()
23
from bzrlib.branch import Branch
24
br = Branch(".", init=True)
91
25
br.append_revision("rev1")
92
26
self.assertEquals(br.revision_history(), ["rev1",])
93
27
br.append_revision("rev2", "rev3")
94
28
self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
96
def test_fetch_revisions(self):
97
"""Test fetch-revision operation."""
98
from bzrlib.fetch import Fetcher
99
get_transport(self.get_url()).mkdir('b1')
100
get_transport(self.get_url()).mkdir('b2')
101
wt = self.make_branch_and_tree('b1')
103
b2 = self.make_branch('b2')
104
file('b1/foo', 'w').write('hello')
105
wt.add(['foo'], ['foo-id'])
106
wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
108
mutter('start fetch')
109
f = Fetcher(from_branch=b1, to_branch=b2)
110
eq = self.assertEquals
111
eq(f.count_copied, 1)
112
eq(f._last_revision, 'revision-1')
114
rev = b2.repository.get_revision('revision-1')
115
tree = b2.repository.revision_tree('revision-1')
116
eq(tree.get_file_text('foo-id'), 'hello')
118
def get_unbalanced_tree_pair(self):
119
"""Return two branches, a and b, with one file in a."""
120
get_transport(self.get_url()).mkdir('a')
121
tree_a = self.make_branch_and_tree('a')
122
file('a/b', 'wb').write('b')
124
tree_a.commit("silly commit", rev_id='A')
126
get_transport(self.get_url()).mkdir('b')
127
tree_b = self.make_branch_and_tree('b')
128
return tree_a, tree_b
130
def get_balanced_branch_pair(self):
131
"""Returns br_a, br_b as with one commit in a, and b has a's stores."""
132
tree_a, tree_b = self.get_unbalanced_tree_pair()
133
tree_b.branch.repository.fetch(tree_a.branch.repository)
134
return tree_a, tree_b
136
def test_clone_branch(self):
137
"""Copy the stores from one branch to another"""
138
tree_a, tree_b = self.get_balanced_branch_pair()
139
tree_b.commit("silly commit")
141
# this fails to test that the history from a was not used.
142
dir_c = tree_a.bzrdir.clone('c', basis=tree_b.bzrdir)
143
self.assertEqual(tree_a.branch.revision_history(),
144
dir_c.open_branch().revision_history())
146
def test_clone_partial(self):
147
"""Copy only part of the history of a branch."""
148
# TODO: RBC 20060208 test with a revision not on revision-history.
149
# what should that behaviour be ? Emailed the list.
150
wt_a = self.make_branch_and_tree('a')
151
self.build_tree(['a/one'])
153
wt_a.commit('commit one', rev_id='1')
154
self.build_tree(['a/two'])
156
wt_a.commit('commit two', rev_id='2')
157
repo_b = self.make_repository('b')
158
wt_a.bzrdir.open_repository().copy_content_into(repo_b)
159
br_b = wt_a.bzrdir.open_branch().clone(repo_b.bzrdir, revision_id='1')
160
self.assertEqual(br_b.last_revision(), '1')
162
def test_sprout_partial(self):
163
# test sprouting with a prefix of the revision-history.
164
# also needs not-on-revision-history behaviour defined.
165
wt_a = self.make_branch_and_tree('a')
166
self.build_tree(['a/one'])
168
wt_a.commit('commit one', rev_id='1')
169
self.build_tree(['a/two'])
171
wt_a.commit('commit two', rev_id='2')
172
repo_b = self.make_repository('b')
173
wt_a.bzrdir.open_repository().copy_content_into(repo_b)
174
br_b = wt_a.bzrdir.open_branch().sprout(repo_b.bzrdir, revision_id='1')
175
self.assertEqual(br_b.last_revision(), '1')
177
def test_clone_branch_nickname(self):
178
# test the nick name is preserved always
179
raise TestSkipped('XXX branch cloning is not yet tested..')
181
def test_clone_branch_parent(self):
182
# test the parent is preserved always
183
raise TestSkipped('XXX branch cloning is not yet tested..')
185
def test_sprout_branch_nickname(self):
186
# test the nick name is reset always
187
raise TestSkipped('XXX branch sprouting is not yet tested..')
189
def test_sprout_branch_parent(self):
190
source = self.make_branch('source')
191
target = source.bzrdir.sprout(self.get_url('target')).open_branch()
192
self.assertEqual(source.bzrdir.root_transport.base, target.get_parent())
194
def test_record_initial_ghost_merge(self):
195
"""A pending merge with no revision present is still a merge."""
196
wt = self.make_branch_and_tree('.')
198
wt.add_pending_merge('non:existent@rev--ision--0--2')
199
wt.commit('pretend to merge nonexistent-revision', rev_id='first')
200
rev = branch.repository.get_revision(branch.last_revision())
201
self.assertEqual(len(rev.parent_ids), 1)
202
# parent_sha1s is not populated now, WTF. rbc 20051003
203
self.assertEqual(len(rev.parent_sha1s), 0)
204
self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
206
def test_bad_revision(self):
207
self.assertRaises(errors.InvalidRevisionId,
208
self.get_branch().repository.get_revision,
212
# compare the gpg-to-sign info for a commit with a ghost and
213
# an identical tree without a ghost
214
# fetch missing should rewrite the TOC of weaves to list newly available parents.
216
def test_pending_merges(self):
217
"""Tracking pending-merged revisions."""
218
wt = self.make_branch_and_tree('.')
220
self.assertEquals(wt.pending_merges(), [])
221
wt.add_pending_merge('foo@azkhazan-123123-abcabc')
222
self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
223
wt.add_pending_merge('foo@azkhazan-123123-abcabc')
224
self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
225
wt.add_pending_merge('wibble@fofof--20050401--1928390812')
226
self.assertEquals(wt.pending_merges(),
227
['foo@azkhazan-123123-abcabc',
228
'wibble@fofof--20050401--1928390812'])
229
wt.commit("commit from base with two merges")
230
rev = b.repository.get_revision(b.revision_history()[0])
231
self.assertEquals(len(rev.parent_ids), 2)
232
self.assertEquals(rev.parent_ids[0],
233
'foo@azkhazan-123123-abcabc')
234
self.assertEquals(rev.parent_ids[1],
235
'wibble@fofof--20050401--1928390812')
236
# list should be cleared when we do a commit
237
self.assertEquals(wt.pending_merges(), [])
239
def test_sign_existing_revision(self):
240
wt = self.make_branch_and_tree('.')
242
wt.commit("base", allow_pointless=True, rev_id='A')
243
from bzrlib.testament import Testament
244
strategy = bzrlib.gpg.LoopbackGPGStrategy(None)
245
branch.repository.sign_revision('A', strategy)
246
self.assertEqual(Testament.from_revision(branch.repository,
247
'A').as_short_text(),
248
branch.repository.revision_store.get('A',
251
def test_store_signature(self):
252
branch = self.get_branch()
253
branch.repository.store_revision_signature(
254
bzrlib.gpg.LoopbackGPGStrategy(None), 'FOO', 'A')
255
self.assertEqual('FOO',
256
branch.repository.revision_store.get('A',
259
def test_branch_keeps_signatures(self):
260
wt = self.make_branch_and_tree('source')
261
wt.commit('A', allow_pointless=True, rev_id='A')
262
wt.branch.repository.sign_revision('A',
263
bzrlib.gpg.LoopbackGPGStrategy(None))
264
#FIXME: clone should work to urls,
265
# wt.clone should work to disks.
266
self.build_tree(['target/'])
267
d2 = wt.bzrdir.clone('target')
268
self.assertEqual(wt.branch.repository.revision_store.get('A',
270
d2.open_repository().revision_store.get('A',
273
def test_upgrade_preserves_signatures(self):
274
# this is in the current test format
275
wt = self.make_branch_and_tree('source')
276
wt.commit('A', allow_pointless=True, rev_id='A')
277
wt.branch.repository.sign_revision('A',
278
bzrlib.gpg.LoopbackGPGStrategy(None))
279
old_signature = wt.branch.repository.revision_store.get('A',
282
wt = WorkingTree.open(wt.basedir)
283
new_signature = wt.branch.repository.revision_store.get('A',
285
self.assertEqual(old_signature, new_signature)
287
def test_nicks(self):
288
"""Branch nicknames"""
289
t = get_transport(self.get_url())
291
branch = self.make_branch('bzr.dev')
292
self.assertEqual(branch.nick, 'bzr.dev')
293
t.move('bzr.dev', 'bzr.ab')
294
branch = Branch.open(self.get_url('bzr.ab'))
295
self.assertEqual(branch.nick, 'bzr.ab')
296
branch.nick = "Aaron's branch"
297
branch.nick = "Aaron's branch"
301
branch.control_files.controlfilename("branch.conf")
305
self.assertEqual(branch.nick, "Aaron's branch")
306
t.move('bzr.ab', 'integration')
307
branch = Branch.open(self.get_url('integration'))
308
self.assertEqual(branch.nick, "Aaron's branch")
309
branch.nick = u"\u1234"
310
self.assertEqual(branch.nick, u"\u1234")
312
def test_commit_nicks(self):
313
"""Nicknames are committed to the revision"""
314
get_transport(self.get_url()).mkdir('bzr.dev')
315
wt = self.make_branch_and_tree('bzr.dev')
317
branch.nick = "My happy branch"
318
wt.commit('My commit respect da nick.')
319
committed = branch.repository.get_revision(branch.last_revision())
320
self.assertEqual(committed.properties["branch-nick"],
324
class ChrootedTests(TestCaseWithBranch):
325
"""A support class that provides readonly urls outside the local namespace.
327
This is done by checking if self.transport_server is a MemoryServer. if it
328
is then we are chrooted already, if it is not then an HttpServer is used
333
super(ChrootedTests, self).setUp()
334
if not self.transport_server == MemoryServer:
335
self.transport_readonly_server = HttpServer
337
def test_open_containing(self):
338
self.assertRaises(NotBranchError, Branch.open_containing,
339
self.get_readonly_url(''))
340
self.assertRaises(NotBranchError, Branch.open_containing,
341
self.get_readonly_url('g/p/q'))
342
branch = self.make_branch('.')
343
branch, relpath = Branch.open_containing(self.get_readonly_url(''))
344
self.assertEqual('', relpath)
345
branch, relpath = Branch.open_containing(self.get_readonly_url('g/p/q'))
346
self.assertEqual('g/p/q', relpath)
348
# TODO: rewrite this as a regular unittest, without relying on the displayed output
349
# >>> from bzrlib.commit import commit
350
# >>> bzrlib.trace.silent = True
351
# >>> br1 = ScratchBranch(files=['foo', 'bar'])
352
# >>> br1.working_tree().add('foo')
353
# >>> br1.working_tree().add('bar')
354
# >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
355
# >>> br2 = ScratchBranch()
356
# >>> br2.update_revisions(br1)
358
# Added 1 inventories.
360
# >>> br2.revision_history()
362
# >>> br2.update_revisions(br1)
364
# >>> br1.text_store.total_size() == br2.text_store.total_size()
367
class InstrumentedTransaction(object):
370
self.calls.append('finish')
376
class TestDecorator(object):
382
self._calls.append('lr')
384
def lock_write(self):
385
self._calls.append('lw')
388
self._calls.append('ul')
391
def do_with_read(self):
395
def except_with_read(self):
399
def do_with_write(self):
403
def except_with_write(self):
407
class TestDecorators(TestCase):
409
def test_needs_read_lock(self):
410
branch = TestDecorator()
411
self.assertEqual(1, branch.do_with_read())
412
self.assertEqual(['lr', 'ul'], branch._calls)
414
def test_excepts_in_read_lock(self):
415
branch = TestDecorator()
416
self.assertRaises(RuntimeError, branch.except_with_read)
417
self.assertEqual(['lr', 'ul'], branch._calls)
419
def test_needs_write_lock(self):
420
branch = TestDecorator()
421
self.assertEqual(2, branch.do_with_write())
422
self.assertEqual(['lw', 'ul'], branch._calls)
424
def test_excepts_in_write_lock(self):
425
branch = TestDecorator()
426
self.assertRaises(RuntimeError, branch.except_with_write)
427
self.assertEqual(['lw', 'ul'], branch._calls)
430
class TestBranchTransaction(TestCaseWithBranch):
433
super(TestBranchTransaction, self).setUp()
436
def test_default_get_transaction(self):
437
"""branch.get_transaction on a new branch should give a PassThrough."""
438
self.failUnless(isinstance(self.get_branch().get_transaction(),
439
transactions.PassThroughTransaction))
441
def test__set_new_transaction(self):
442
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
444
def test__set_over_existing_transaction_raises(self):
445
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
446
self.assertRaises(errors.LockError,
447
self.get_branch()._set_transaction,
448
transactions.ReadOnlyTransaction())
450
def test_finish_no_transaction_raises(self):
451
self.assertRaises(errors.LockError, self.get_branch()._finish_transaction)
453
def test_finish_readonly_transaction_works(self):
454
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
455
self.get_branch()._finish_transaction()
456
self.assertEqual(None, self.get_branch().control_files._transaction)
458
def test_unlock_calls_finish(self):
459
self.get_branch().lock_read()
460
transaction = InstrumentedTransaction()
461
self.get_branch().control_files._transaction = transaction
462
self.get_branch().unlock()
463
self.assertEqual(['finish'], transaction.calls)
465
def test_lock_read_acquires_ro_transaction(self):
466
self.get_branch().lock_read()
467
self.failUnless(isinstance(self.get_branch().get_transaction(),
468
transactions.ReadOnlyTransaction))
469
self.get_branch().unlock()
471
def test_lock_write_acquires_passthrough_transaction(self):
472
self.get_branch().lock_write()
473
# cannot use get_transaction as its magic
474
self.failUnless(isinstance(self.get_branch().control_files._transaction,
475
transactions.PassThroughTransaction))
476
self.get_branch().unlock()
479
class TestBranchPushLocations(TestCaseWithBranch):
481
def test_get_push_location_unset(self):
482
self.assertEqual(None, self.get_branch().get_push_location())
484
def test_get_push_location_exact(self):
485
from bzrlib.config import (branches_config_filename,
486
ensure_config_dir_exists)
487
ensure_config_dir_exists()
488
fn = branches_config_filename()
489
print >> open(fn, 'wt'), ("[%s]\n"
490
"push_location=foo" %
491
self.get_branch().base[:-1])
492
self.assertEqual("foo", self.get_branch().get_push_location())
494
def test_set_push_location(self):
495
from bzrlib.config import (branches_config_filename,
496
ensure_config_dir_exists)
497
ensure_config_dir_exists()
498
fn = branches_config_filename()
499
self.get_branch().set_push_location('foo')
500
self.assertFileEqual("[%s]\n"
501
"push_location = foo" % self.get_branch().base[:-1],
504
# TODO RBC 20051029 test getting a push location from a branch in a
505
# recursive section - that is, it appends the branch name.
508
class TestFormat(TestCaseWithBranch):
509
"""Tests for the format itself."""
511
def test_format_initialize_find_open(self):
512
# loopback test to check the current format initializes to itself.
513
if not self.branch_format.is_supported():
514
# unsupported formats are not loopback testable
515
# because the default open will not open them and
516
# they may not be initializable.
518
# supported formats must be able to init and open
519
t = get_transport(self.get_url())
520
readonly_t = get_transport(self.get_readonly_url())
521
made_branch = self.make_branch('.')
522
self.failUnless(isinstance(made_branch, bzrlib.branch.Branch))
524
# find it via bzrdir opening:
525
opened_control = bzrdir.BzrDir.open(readonly_t.base)
526
direct_opened_branch = opened_control.open_branch()
527
self.assertEqual(direct_opened_branch.__class__, made_branch.__class__)
528
self.assertEqual(opened_control, direct_opened_branch.bzrdir)
529
self.failUnless(isinstance(direct_opened_branch._format,
530
self.branch_format.__class__))
532
# find it via Branch.open
533
opened_branch = bzrlib.branch.Branch.open(readonly_t.base)
534
self.failUnless(isinstance(opened_branch, made_branch.__class__))
535
self.assertEqual(made_branch._format.__class__,
536
opened_branch._format.__class__)
537
# if it has a unique id string, can we probe for it ?
539
self.branch_format.get_format_string()
540
except NotImplementedError:
542
self.assertEqual(self.branch_format,
543
bzrlib.branch.BranchFormat.find_format(opened_control))