1
# Copyright (C) 2005, 2006 Canonical Ltd
1
# (C) 2005 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
11
# GNU General Public License for more details.
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for branch implementations - tests a branch format."""
31
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
32
from bzrlib.delta import TreeDelta
33
from bzrlib.errors import (FileExists,
36
UninitializableFormat,
39
from bzrlib.osutils import getcwd
40
import bzrlib.revision
41
from bzrlib.tests import TestCase, TestCaseWithTransport, TestSkipped
42
from bzrlib.tests.bzrdir_implementations.test_bzrdir import TestCaseWithBzrDir
43
from bzrlib.trace import mutter
44
from bzrlib.transport import get_transport
45
from bzrlib.transport.http import HttpServer
46
from bzrlib.transport.memory import MemoryServer
47
from bzrlib.upgrade import upgrade
48
from bzrlib.workingtree import WorkingTree
51
# TODO: Make a branch using basis branch, and check that it
52
# doesn't request any files that could have been avoided, by
53
# hooking into the Transport.
56
class TestCaseWithBranch(TestCaseWithBzrDir):
59
super(TestCaseWithBranch, self).setUp()
63
if self.branch is None:
64
self.branch = self.make_branch('')
67
def make_branch(self, relpath, format=None):
68
repo = self.make_repository(relpath, format=format)
69
# fixme RBC 20060210 this isnt necessarily a fixable thing,
70
# Skipped is the wrong exception to raise.
72
return self.branch_format.initialize(repo.bzrdir)
73
except errors.UninitializableFormat:
74
raise TestSkipped('Uninitializable branch format')
76
def make_repository(self, relpath, shared=False, format=None):
77
made_control = self.make_bzrdir(relpath, format=format)
78
return made_control.create_repository(shared=shared)
81
class TestBranch(TestCaseWithBranch):
17
from bzrlib.selftest import TestCaseInTempDir
20
class TestAppendRevisions(TestCaseInTempDir):
21
"""Test appending more than one revision"""
83
22
def test_append_revisions(self):
84
"""Test appending more than one revision"""
85
br = self.get_branch()
23
from bzrlib.branch import Branch
24
br = Branch(".", init=True)
86
25
br.append_revision("rev1")
87
26
self.assertEquals(br.revision_history(), ["rev1",])
88
27
br.append_revision("rev2", "rev3")
89
28
self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
91
def test_fetch_revisions(self):
92
"""Test fetch-revision operation."""
93
get_transport(self.get_url()).mkdir('b1')
94
get_transport(self.get_url()).mkdir('b2')
95
wt = self.make_branch_and_tree('b1')
97
b2 = self.make_branch('b2')
98
file('b1/foo', 'w').write('hello')
99
wt.add(['foo'], ['foo-id'])
100
wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
102
mutter('start fetch')
103
self.assertEqual((1, []), b2.fetch(b1))
105
rev = b2.repository.get_revision('revision-1')
106
tree = b2.repository.revision_tree('revision-1')
107
self.assertEqual(tree.get_file_text('foo-id'), 'hello')
109
def test_get_revision_delta(self):
110
tree_a = self.make_branch_and_tree('a')
111
self.build_tree(['a/foo'])
112
tree_a.add('foo', 'file1')
113
tree_a.commit('rev1', rev_id='rev1')
114
self.build_tree(['a/vla'])
115
tree_a.add('vla', 'file2')
116
tree_a.commit('rev2', rev_id='rev2')
118
delta = tree_a.branch.get_revision_delta(1)
119
self.assertIsInstance(delta, TreeDelta)
120
self.assertEqual([('foo', 'file1', 'file')], delta.added)
121
delta = tree_a.branch.get_revision_delta(2)
122
self.assertIsInstance(delta, TreeDelta)
123
self.assertEqual([('vla', 'file2', 'file')], delta.added)
125
def get_unbalanced_tree_pair(self):
126
"""Return two branches, a and b, with one file in a."""
127
get_transport(self.get_url()).mkdir('a')
128
tree_a = self.make_branch_and_tree('a')
129
file('a/b', 'wb').write('b')
131
tree_a.commit("silly commit", rev_id='A')
133
get_transport(self.get_url()).mkdir('b')
134
tree_b = self.make_branch_and_tree('b')
135
return tree_a, tree_b
137
def get_balanced_branch_pair(self):
138
"""Returns br_a, br_b as with one commit in a, and b has a's stores."""
139
tree_a, tree_b = self.get_unbalanced_tree_pair()
140
tree_b.branch.repository.fetch(tree_a.branch.repository)
141
return tree_a, tree_b
143
def test_clone_branch(self):
144
"""Copy the stores from one branch to another"""
145
tree_a, tree_b = self.get_balanced_branch_pair()
146
tree_b.commit("silly commit")
148
# this fails to test that the history from a was not used.
149
dir_c = tree_a.bzrdir.clone('c', basis=tree_b.bzrdir)
150
self.assertEqual(tree_a.branch.revision_history(),
151
dir_c.open_branch().revision_history())
153
def test_clone_partial(self):
154
"""Copy only part of the history of a branch."""
155
# TODO: RBC 20060208 test with a revision not on revision-history.
156
# what should that behaviour be ? Emailed the list.
157
wt_a = self.make_branch_and_tree('a')
158
self.build_tree(['a/one'])
160
wt_a.commit('commit one', rev_id='1')
161
self.build_tree(['a/two'])
163
wt_a.commit('commit two', rev_id='2')
164
repo_b = self.make_repository('b')
165
wt_a.bzrdir.open_repository().copy_content_into(repo_b)
166
br_b = wt_a.bzrdir.open_branch().clone(repo_b.bzrdir, revision_id='1')
167
self.assertEqual('1', br_b.last_revision())
169
def test_sprout_partial(self):
170
# test sprouting with a prefix of the revision-history.
171
# also needs not-on-revision-history behaviour defined.
172
wt_a = self.make_branch_and_tree('a')
173
self.build_tree(['a/one'])
175
wt_a.commit('commit one', rev_id='1')
176
self.build_tree(['a/two'])
178
wt_a.commit('commit two', rev_id='2')
179
repo_b = self.make_repository('b')
180
wt_a.bzrdir.open_repository().copy_content_into(repo_b)
181
br_b = wt_a.bzrdir.open_branch().sprout(repo_b.bzrdir, revision_id='1')
182
self.assertEqual('1', br_b.last_revision())
184
def get_parented_branch(self):
185
wt_a = self.make_branch_and_tree('a')
186
self.build_tree(['a/one'])
188
wt_a.commit('commit one', rev_id='1')
190
branch_b = wt_a.bzrdir.sprout('b', revision_id='1').open_branch()
191
self.assertEqual(wt_a.branch.base, branch_b.get_parent())
194
def test_clone_branch_nickname(self):
195
# test the nick name is preserved always
196
raise TestSkipped('XXX branch cloning is not yet tested..')
198
def test_clone_branch_parent(self):
199
# test the parent is preserved always
200
branch_b = self.get_parented_branch()
201
repo_c = self.make_repository('c')
202
branch_b.repository.copy_content_into(repo_c)
203
branch_c = branch_b.clone(repo_c.bzrdir)
204
self.assertNotEqual(None, branch_c.get_parent())
205
self.assertEqual(branch_b.get_parent(), branch_c.get_parent())
207
# We can also set a specific parent, and it should be honored
208
random_parent = 'http://bazaar-vcs.org/path/to/branch'
209
branch_b.set_parent(random_parent)
210
repo_d = self.make_repository('d')
211
branch_b.repository.copy_content_into(repo_d)
212
branch_d = branch_b.clone(repo_d.bzrdir)
213
self.assertEqual(random_parent, branch_d.get_parent())
215
def test_sprout_branch_nickname(self):
216
# test the nick name is reset always
217
raise TestSkipped('XXX branch sprouting is not yet tested..')
219
def test_sprout_branch_parent(self):
220
source = self.make_branch('source')
221
target = source.bzrdir.sprout(self.get_url('target')).open_branch()
222
self.assertEqual(source.bzrdir.root_transport.base, target.get_parent())
224
def test_submit_branch(self):
225
"""Submit location can be queried and set"""
226
branch = self.make_branch('branch')
227
self.assertEqual(branch.get_submit_branch(), None)
228
branch.set_submit_branch('sftp://example.com')
229
self.assertEqual(branch.get_submit_branch(), 'sftp://example.com')
230
branch.set_submit_branch('sftp://example.net')
231
self.assertEqual(branch.get_submit_branch(), 'sftp://example.net')
233
def test_record_initial_ghost(self):
234
"""Branches should support having ghosts."""
235
wt = self.make_branch_and_tree('.')
236
wt.set_parent_ids(['non:existent@rev--ision--0--2'],
237
allow_leftmost_as_ghost=True)
238
rev_id = wt.commit('commit against a ghost first parent.')
239
rev = wt.branch.repository.get_revision(rev_id)
240
self.assertEqual(rev.parent_ids, ['non:existent@rev--ision--0--2'])
241
# parent_sha1s is not populated now, WTF. rbc 20051003
242
self.assertEqual(len(rev.parent_sha1s), 0)
244
def test_record_two_ghosts(self):
245
"""Recording with all ghosts works."""
246
wt = self.make_branch_and_tree('.')
248
'foo@azkhazan-123123-abcabc',
249
'wibble@fofof--20050401--1928390812',
251
allow_leftmost_as_ghost=True)
252
rev_id = wt.commit("commit from ghost base with one merge")
253
# the revision should have been committed with two parents
254
rev = wt.branch.repository.get_revision(rev_id)
255
self.assertEqual(['foo@azkhazan-123123-abcabc',
256
'wibble@fofof--20050401--1928390812'],
259
def test_bad_revision(self):
260
self.assertRaises(errors.InvalidRevisionId,
261
self.get_branch().repository.get_revision,
265
# compare the gpg-to-sign info for a commit with a ghost and
266
# an identical tree without a ghost
267
# fetch missing should rewrite the TOC of weaves to list newly available parents.
269
def test_sign_existing_revision(self):
270
wt = self.make_branch_and_tree('.')
272
wt.commit("base", allow_pointless=True, rev_id='A')
273
from bzrlib.testament import Testament
274
strategy = gpg.LoopbackGPGStrategy(None)
275
branch.repository.sign_revision('A', strategy)
276
self.assertEqual(Testament.from_revision(branch.repository,
277
'A').as_short_text(),
278
branch.repository.get_signature_text('A'))
280
def test_store_signature(self):
281
wt = self.make_branch_and_tree('.')
283
branch.repository.store_revision_signature(
284
gpg.LoopbackGPGStrategy(None), 'FOO', 'A')
285
self.assertRaises(errors.NoSuchRevision,
286
branch.repository.has_signature_for_revision_id,
288
wt.commit("base", allow_pointless=True, rev_id='A')
289
self.assertEqual('FOO',
290
branch.repository.get_signature_text('A'))
292
def test_branch_keeps_signatures(self):
293
wt = self.make_branch_and_tree('source')
294
wt.commit('A', allow_pointless=True, rev_id='A')
295
wt.branch.repository.sign_revision('A',
296
gpg.LoopbackGPGStrategy(None))
297
#FIXME: clone should work to urls,
298
# wt.clone should work to disks.
299
self.build_tree(['target/'])
300
d2 = wt.bzrdir.clone('target')
301
self.assertEqual(wt.branch.repository.get_signature_text('A'),
302
d2.open_repository().get_signature_text('A'))
304
def test_nicks(self):
305
"""Branch nicknames"""
306
t = get_transport(self.get_url())
308
branch = self.make_branch('bzr.dev')
309
self.assertEqual(branch.nick, 'bzr.dev')
310
t.move('bzr.dev', 'bzr.ab')
311
branch = Branch.open(self.get_url('bzr.ab'))
312
self.assertEqual(branch.nick, 'bzr.ab')
313
branch.nick = "Aaron's branch"
314
branch.nick = "Aaron's branch"
318
branch.control_files.controlfilename("branch.conf")
322
self.assertEqual(branch.nick, "Aaron's branch")
323
t.move('bzr.ab', 'integration')
324
branch = Branch.open(self.get_url('integration'))
325
self.assertEqual(branch.nick, "Aaron's branch")
326
branch.nick = u"\u1234"
327
self.assertEqual(branch.nick, u"\u1234")
329
def test_commit_nicks(self):
330
"""Nicknames are committed to the revision"""
331
get_transport(self.get_url()).mkdir('bzr.dev')
332
wt = self.make_branch_and_tree('bzr.dev')
334
branch.nick = "My happy branch"
335
wt.commit('My commit respect da nick.')
336
committed = branch.repository.get_revision(branch.last_revision())
337
self.assertEqual(committed.properties["branch-nick"],
340
def test_create_open_branch_uses_repository(self):
342
repo = self.make_repository('.', shared=True)
343
except errors.IncompatibleFormat:
345
repo.bzrdir.root_transport.mkdir('child')
346
child_dir = self.bzrdir_format.initialize('child')
348
child_branch = self.branch_format.initialize(child_dir)
349
except errors.UninitializableFormat:
350
# branch references are not default init'able.
352
self.assertEqual(repo.bzrdir.root_transport.base,
353
child_branch.repository.bzrdir.root_transport.base)
354
child_branch = branch.Branch.open(self.get_url('child'))
355
self.assertEqual(repo.bzrdir.root_transport.base,
356
child_branch.repository.bzrdir.root_transport.base)
358
def test_format_description(self):
359
tree = self.make_branch_and_tree('tree')
360
text = tree.branch._format.get_format_description()
361
self.failUnless(len(text))
363
def test_check_branch_report_results(self):
364
"""Checking a branch produces results which can be printed"""
365
branch = self.make_branch('.')
366
result = branch.check()
367
# reports results through logging
368
result.report_results(verbose=True)
369
result.report_results(verbose=False)
371
def test_get_commit_builder(self):
372
self.assertIsInstance(self.make_branch(".").get_commit_builder([]),
373
repository.CommitBuilder)
375
def test_generate_revision_history(self):
376
"""Create a fake revision history easily."""
377
tree = self.make_branch_and_tree('.')
378
rev1 = tree.commit('foo')
379
orig_history = tree.branch.revision_history()
380
rev2 = tree.commit('bar', allow_pointless=True)
381
tree.branch.generate_revision_history(rev1)
382
self.assertEqual(orig_history, tree.branch.revision_history())
384
def test_generate_revision_history_NULL_REVISION(self):
385
tree = self.make_branch_and_tree('.')
386
rev1 = tree.commit('foo')
387
tree.branch.generate_revision_history(bzrlib.revision.NULL_REVISION)
388
self.assertEqual([], tree.branch.revision_history())
390
def test_create_checkout(self):
391
tree_a = self.make_branch_and_tree('a')
392
branch_a = tree_a.branch
393
checkout_b = branch_a.create_checkout('b')
394
self.assertEqual(None, checkout_b.last_revision())
395
checkout_b.commit('rev1', rev_id='rev1')
396
self.assertEqual('rev1', branch_a.last_revision())
397
self.assertNotEqual(checkout_b.branch.base, branch_a.base)
399
checkout_c = branch_a.create_checkout('c', lightweight=True)
400
self.assertEqual('rev1', checkout_c.last_revision())
401
checkout_c.commit('rev2', rev_id='rev2')
402
self.assertEqual('rev2', branch_a.last_revision())
403
self.assertEqual(checkout_c.branch.base, branch_a.base)
406
checkout_d = branch_a.create_checkout('d', lightweight=True)
407
self.assertEqual('rev2', checkout_d.last_revision())
409
checkout_e = branch_a.create_checkout('e')
410
self.assertEqual('rev2', checkout_e.last_revision())
412
def test_create_anonymous_lightweight_checkout(self):
413
"""A lightweight checkout from a readonly branch should succeed."""
414
tree_a = self.make_branch_and_tree('a')
415
rev_id = tree_a.commit('put some content in the branch')
416
source_branch = bzrlib.branch.Branch.open(
417
'readonly+' + tree_a.bzrdir.root_transport.base)
418
# sanity check that the test will be valid
419
self.assertRaises((errors.LockError, errors.TransportNotPossible),
420
source_branch.lock_write)
421
checkout = source_branch.create_checkout('c', lightweight=True)
422
self.assertEqual(rev_id, checkout.last_revision())
424
def test_create_anonymous_heavyweight_checkout(self):
425
"""A regular checkout from a readonly branch should succeed."""
426
tree_a = self.make_branch_and_tree('a')
427
rev_id = tree_a.commit('put some content in the branch')
428
source_branch = bzrlib.branch.Branch.open(
429
'readonly+' + tree_a.bzrdir.root_transport.base)
430
# sanity check that the test will be valid
431
self.assertRaises((errors.LockError, errors.TransportNotPossible),
432
source_branch.lock_write)
433
checkout = source_branch.create_checkout('c')
434
self.assertEqual(rev_id, checkout.last_revision())
437
class ChrootedTests(TestCaseWithBranch):
438
"""A support class that provides readonly urls outside the local namespace.
440
This is done by checking if self.transport_server is a MemoryServer. if it
441
is then we are chrooted already, if it is not then an HttpServer is used
446
super(ChrootedTests, self).setUp()
447
if not self.transport_server == MemoryServer:
448
self.transport_readonly_server = HttpServer
450
def test_open_containing(self):
451
self.assertRaises(NotBranchError, Branch.open_containing,
452
self.get_readonly_url(''))
453
self.assertRaises(NotBranchError, Branch.open_containing,
454
self.get_readonly_url('g/p/q'))
455
branch = self.make_branch('.')
456
branch, relpath = Branch.open_containing(self.get_readonly_url(''))
457
self.assertEqual('', relpath)
458
branch, relpath = Branch.open_containing(self.get_readonly_url('g/p/q'))
459
self.assertEqual('g/p/q', relpath)
462
class InstrumentedTransaction(object):
465
self.calls.append('finish')
471
class TestDecorator(object):
477
self._calls.append('lr')
479
def lock_write(self):
480
self._calls.append('lw')
483
self._calls.append('ul')
486
def do_with_read(self):
490
def except_with_read(self):
494
def do_with_write(self):
498
def except_with_write(self):
502
class TestDecorators(TestCase):
504
def test_needs_read_lock(self):
505
branch = TestDecorator()
506
self.assertEqual(1, branch.do_with_read())
507
self.assertEqual(['lr', 'ul'], branch._calls)
509
def test_excepts_in_read_lock(self):
510
branch = TestDecorator()
511
self.assertRaises(RuntimeError, branch.except_with_read)
512
self.assertEqual(['lr', 'ul'], branch._calls)
514
def test_needs_write_lock(self):
515
branch = TestDecorator()
516
self.assertEqual(2, branch.do_with_write())
517
self.assertEqual(['lw', 'ul'], branch._calls)
519
def test_excepts_in_write_lock(self):
520
branch = TestDecorator()
521
self.assertRaises(RuntimeError, branch.except_with_write)
522
self.assertEqual(['lw', 'ul'], branch._calls)
525
class TestBranchTransaction(TestCaseWithBranch):
528
super(TestBranchTransaction, self).setUp()
531
def test_default_get_transaction(self):
532
"""branch.get_transaction on a new branch should give a PassThrough."""
533
self.failUnless(isinstance(self.get_branch().get_transaction(),
534
transactions.PassThroughTransaction))
536
def test__set_new_transaction(self):
537
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
539
def test__set_over_existing_transaction_raises(self):
540
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
541
self.assertRaises(errors.LockError,
542
self.get_branch()._set_transaction,
543
transactions.ReadOnlyTransaction())
545
def test_finish_no_transaction_raises(self):
546
self.assertRaises(errors.LockError, self.get_branch()._finish_transaction)
548
def test_finish_readonly_transaction_works(self):
549
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
550
self.get_branch()._finish_transaction()
551
self.assertEqual(None, self.get_branch().control_files._transaction)
553
def test_unlock_calls_finish(self):
554
self.get_branch().lock_read()
555
transaction = InstrumentedTransaction()
556
self.get_branch().control_files._transaction = transaction
557
self.get_branch().unlock()
558
self.assertEqual(['finish'], transaction.calls)
560
def test_lock_read_acquires_ro_transaction(self):
561
self.get_branch().lock_read()
562
self.failUnless(isinstance(self.get_branch().get_transaction(),
563
transactions.ReadOnlyTransaction))
564
self.get_branch().unlock()
566
def test_lock_write_acquires_write_transaction(self):
567
self.get_branch().lock_write()
568
# cannot use get_transaction as its magic
569
self.failUnless(isinstance(self.get_branch().control_files._transaction,
570
transactions.WriteTransaction))
571
self.get_branch().unlock()
574
class TestBranchPushLocations(TestCaseWithBranch):
576
def test_get_push_location_unset(self):
577
self.assertEqual(None, self.get_branch().get_push_location())
579
def test_get_push_location_exact(self):
580
from bzrlib.config import (locations_config_filename,
581
ensure_config_dir_exists)
582
ensure_config_dir_exists()
583
fn = locations_config_filename()
584
print >> open(fn, 'wt'), ("[%s]\n"
585
"push_location=foo" %
586
self.get_branch().base[:-1])
587
self.assertEqual("foo", self.get_branch().get_push_location())
589
def test_set_push_location(self):
590
from bzrlib.config import (locations_config_filename,
591
ensure_config_dir_exists)
592
ensure_config_dir_exists()
593
fn = locations_config_filename()
594
branch = self.get_branch()
595
branch.set_push_location('foo')
596
local_path = urlutils.local_path_from_url(branch.base[:-1])
597
self.assertFileEqual("[%s]\n"
598
"push_location = foo" % local_path,
601
# TODO RBC 20051029 test getting a push location from a branch in a
602
# recursive section - that is, it appends the branch name.
605
class TestFormat(TestCaseWithBranch):
606
"""Tests for the format itself."""
608
def test_format_initialize_find_open(self):
609
# loopback test to check the current format initializes to itself.
610
if not self.branch_format.is_supported():
611
# unsupported formats are not loopback testable
612
# because the default open will not open them and
613
# they may not be initializable.
615
# supported formats must be able to init and open
616
t = get_transport(self.get_url())
617
readonly_t = get_transport(self.get_readonly_url())
618
made_branch = self.make_branch('.')
619
self.failUnless(isinstance(made_branch, branch.Branch))
621
# find it via bzrdir opening:
622
opened_control = bzrdir.BzrDir.open(readonly_t.base)
623
direct_opened_branch = opened_control.open_branch()
624
self.assertEqual(direct_opened_branch.__class__, made_branch.__class__)
625
self.assertEqual(opened_control, direct_opened_branch.bzrdir)
626
self.failUnless(isinstance(direct_opened_branch._format,
627
self.branch_format.__class__))
629
# find it via Branch.open
630
opened_branch = branch.Branch.open(readonly_t.base)
631
self.failUnless(isinstance(opened_branch, made_branch.__class__))
632
self.assertEqual(made_branch._format.__class__,
633
opened_branch._format.__class__)
634
# if it has a unique id string, can we probe for it ?
636
self.branch_format.get_format_string()
637
except NotImplementedError:
639
self.assertEqual(self.branch_format,
640
branch.BranchFormat.find_format(opened_control))
31
# TODO: rewrite this as a regular unittest, without relying on the displayed output
32
# >>> from bzrlib.commit import commit
33
# >>> bzrlib.trace.silent = True
34
# >>> br1 = ScratchBranch(files=['foo', 'bar'])
37
# >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
38
# >>> br2 = ScratchBranch()
39
# >>> br2.update_revisions(br1)
41
# Added 1 inventories.
43
# >>> br2.revision_history()
45
# >>> br2.update_revisions(br1)
47
# >>> br1.text_store.total_size() == br2.text_store.total_size()