1
# Copyright (C) 2005, 2006 Canonical Ltd
1
# (C) 2005 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
11
# GNU General Public License for more details.
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for branch implementations - tests a branch format."""
31
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
32
from bzrlib.delta import TreeDelta
33
from bzrlib.errors import (FileExists,
36
UninitializableFormat,
39
from bzrlib.osutils import getcwd
40
import bzrlib.revision
41
from bzrlib.tests import TestCase, TestCaseWithTransport, TestSkipped
42
from bzrlib.tests.bzrdir_implementations.test_bzrdir import TestCaseWithBzrDir
43
from bzrlib.trace import mutter
44
from bzrlib.transport import get_transport
45
from bzrlib.transport.http import HttpServer
46
from bzrlib.transport.memory import MemoryServer
47
from bzrlib.upgrade import upgrade
48
from bzrlib.workingtree import WorkingTree
51
# TODO: Make a branch using basis branch, and check that it
52
# doesn't request any files that could have been avoided, by
53
# hooking into the Transport.
56
class TestCaseWithBranch(TestCaseWithBzrDir):
59
super(TestCaseWithBranch, self).setUp()
63
if self.branch is None:
64
self.branch = self.make_branch('')
67
def make_branch(self, relpath, format=None):
68
repo = self.make_repository(relpath, format=format)
69
# fixme RBC 20060210 this isnt necessarily a fixable thing,
70
# Skipped is the wrong exception to raise.
72
return self.branch_format.initialize(repo.bzrdir)
73
except errors.UninitializableFormat:
74
raise TestSkipped('Uninitializable branch format')
76
def make_repository(self, relpath, shared=False, format=None):
77
made_control = self.make_bzrdir(relpath, format=format)
78
return made_control.create_repository(shared=shared)
81
class TestBranch(TestCaseWithBranch):
83
def test_append_revisions(self):
84
"""Test appending more than one revision"""
85
br = self.get_branch()
17
from bzrlib.selftest import InTempDir
21
class TestAppendRevisions(InTempDir):
22
"""Test appending more than one revision"""
24
from bzrlib.branch import Branch
25
br = Branch(".", init=True)
86
26
br.append_revision("rev1")
87
27
self.assertEquals(br.revision_history(), ["rev1",])
88
28
br.append_revision("rev2", "rev3")
89
29
self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
91
def test_fetch_revisions(self):
92
"""Test fetch-revision operation."""
93
get_transport(self.get_url()).mkdir('b1')
94
get_transport(self.get_url()).mkdir('b2')
95
wt = self.make_branch_and_tree('b1')
97
b2 = self.make_branch('b2')
98
file('b1/foo', 'w').write('hello')
99
wt.add(['foo'], ['foo-id'])
100
wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
102
mutter('start fetch')
103
self.assertEqual((1, []), b2.fetch(b1))
105
rev = b2.repository.get_revision('revision-1')
106
tree = b2.repository.revision_tree('revision-1')
107
self.assertEqual(tree.get_file_text('foo-id'), 'hello')
109
def test_get_revision_delta(self):
110
tree_a = self.make_branch_and_tree('a')
111
self.build_tree(['a/foo'])
112
tree_a.add('foo', 'file1')
113
tree_a.commit('rev1', rev_id='rev1')
114
self.build_tree(['a/vla'])
115
tree_a.add('vla', 'file2')
116
tree_a.commit('rev2', rev_id='rev2')
118
delta = tree_a.branch.get_revision_delta(1)
119
self.assertIsInstance(delta, TreeDelta)
120
self.assertEqual([('foo', 'file1', 'file')], delta.added)
121
delta = tree_a.branch.get_revision_delta(2)
122
self.assertIsInstance(delta, TreeDelta)
123
self.assertEqual([('vla', 'file2', 'file')], delta.added)
125
def get_unbalanced_tree_pair(self):
126
"""Return two branches, a and b, with one file in a."""
127
get_transport(self.get_url()).mkdir('a')
128
tree_a = self.make_branch_and_tree('a')
129
file('a/b', 'wb').write('b')
131
tree_a.commit("silly commit", rev_id='A')
133
get_transport(self.get_url()).mkdir('b')
134
tree_b = self.make_branch_and_tree('b')
135
return tree_a, tree_b
137
def get_balanced_branch_pair(self):
138
"""Returns br_a, br_b as with one commit in a, and b has a's stores."""
139
tree_a, tree_b = self.get_unbalanced_tree_pair()
140
tree_b.branch.repository.fetch(tree_a.branch.repository)
141
return tree_a, tree_b
143
def test_clone_branch(self):
144
"""Copy the stores from one branch to another"""
145
tree_a, tree_b = self.get_balanced_branch_pair()
146
tree_b.commit("silly commit")
148
# this fails to test that the history from a was not used.
149
dir_c = tree_a.bzrdir.clone('c', basis=tree_b.bzrdir)
150
self.assertEqual(tree_a.branch.revision_history(),
151
dir_c.open_branch().revision_history())
153
def test_clone_partial(self):
154
"""Copy only part of the history of a branch."""
155
# TODO: RBC 20060208 test with a revision not on revision-history.
156
# what should that behaviour be ? Emailed the list.
157
wt_a = self.make_branch_and_tree('a')
158
self.build_tree(['a/one'])
160
wt_a.commit('commit one', rev_id='1')
161
self.build_tree(['a/two'])
163
wt_a.commit('commit two', rev_id='2')
164
repo_b = self.make_repository('b')
165
wt_a.bzrdir.open_repository().copy_content_into(repo_b)
166
br_b = wt_a.bzrdir.open_branch().clone(repo_b.bzrdir, revision_id='1')
167
self.assertEqual(br_b.last_revision(), '1')
169
def test_sprout_partial(self):
170
# test sprouting with a prefix of the revision-history.
171
# also needs not-on-revision-history behaviour defined.
172
wt_a = self.make_branch_and_tree('a')
173
self.build_tree(['a/one'])
175
wt_a.commit('commit one', rev_id='1')
176
self.build_tree(['a/two'])
178
wt_a.commit('commit two', rev_id='2')
179
repo_b = self.make_repository('b')
180
wt_a.bzrdir.open_repository().copy_content_into(repo_b)
181
br_b = wt_a.bzrdir.open_branch().sprout(repo_b.bzrdir, revision_id='1')
182
self.assertEqual(br_b.last_revision(), '1')
184
def test_clone_branch_nickname(self):
185
# test the nick name is preserved always
186
raise TestSkipped('XXX branch cloning is not yet tested..')
188
def test_clone_branch_parent(self):
189
# test the parent is preserved always
190
raise TestSkipped('XXX branch cloning is not yet tested..')
192
def test_sprout_branch_nickname(self):
193
# test the nick name is reset always
194
raise TestSkipped('XXX branch sprouting is not yet tested..')
196
def test_sprout_branch_parent(self):
197
source = self.make_branch('source')
198
target = source.bzrdir.sprout(self.get_url('target')).open_branch()
199
self.assertEqual(source.bzrdir.root_transport.base, target.get_parent())
201
def test_submit_branch(self):
202
"""Submit location can be queried and set"""
203
branch = self.make_branch('branch')
204
self.assertEqual(branch.get_submit_branch(), None)
205
branch.set_submit_branch('sftp://example.com')
206
self.assertEqual(branch.get_submit_branch(), 'sftp://example.com')
207
branch.set_submit_branch('sftp://example.net')
208
self.assertEqual(branch.get_submit_branch(), 'sftp://example.net')
210
def test_record_initial_ghost_merge(self):
211
"""A pending merge with no revision present is still a merge."""
212
wt = self.make_branch_and_tree('.')
214
wt.add_pending_merge('non:existent@rev--ision--0--2')
215
wt.commit('pretend to merge nonexistent-revision', rev_id='first')
216
rev = branch.repository.get_revision(branch.last_revision())
217
self.assertEqual(len(rev.parent_ids), 1)
218
# parent_sha1s is not populated now, WTF. rbc 20051003
219
self.assertEqual(len(rev.parent_sha1s), 0)
220
self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
222
def test_bad_revision(self):
223
self.assertRaises(errors.InvalidRevisionId,
224
self.get_branch().repository.get_revision,
228
# compare the gpg-to-sign info for a commit with a ghost and
229
# an identical tree without a ghost
230
# fetch missing should rewrite the TOC of weaves to list newly available parents.
232
def test_pending_merges(self):
233
"""Tracking pending-merged revisions."""
234
wt = self.make_branch_and_tree('.')
236
self.assertEquals(wt.pending_merges(), [])
237
wt.add_pending_merge('foo@azkhazan-123123-abcabc')
238
self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
239
wt.add_pending_merge('foo@azkhazan-123123-abcabc')
240
self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
241
wt.add_pending_merge('wibble@fofof--20050401--1928390812')
242
self.assertEquals(wt.pending_merges(),
243
['foo@azkhazan-123123-abcabc',
244
'wibble@fofof--20050401--1928390812'])
245
wt.commit("commit from base with two merges")
246
rev = b.repository.get_revision(b.revision_history()[0])
247
self.assertEquals(len(rev.parent_ids), 2)
248
self.assertEquals(rev.parent_ids[0],
249
'foo@azkhazan-123123-abcabc')
250
self.assertEquals(rev.parent_ids[1],
251
'wibble@fofof--20050401--1928390812')
252
# list should be cleared when we do a commit
253
self.assertEquals(wt.pending_merges(), [])
255
def test_sign_existing_revision(self):
256
wt = self.make_branch_and_tree('.')
258
wt.commit("base", allow_pointless=True, rev_id='A')
259
from bzrlib.testament import Testament
260
strategy = gpg.LoopbackGPGStrategy(None)
261
branch.repository.sign_revision('A', strategy)
262
self.assertEqual(Testament.from_revision(branch.repository,
263
'A').as_short_text(),
264
branch.repository.get_signature_text('A'))
266
def test_store_signature(self):
267
wt = self.make_branch_and_tree('.')
269
branch.repository.store_revision_signature(
270
gpg.LoopbackGPGStrategy(None), 'FOO', 'A')
271
self.assertRaises(errors.NoSuchRevision,
272
branch.repository.has_signature_for_revision_id,
274
wt.commit("base", allow_pointless=True, rev_id='A')
275
self.assertEqual('FOO',
276
branch.repository.get_signature_text('A'))
278
def test_branch_keeps_signatures(self):
279
wt = self.make_branch_and_tree('source')
280
wt.commit('A', allow_pointless=True, rev_id='A')
281
wt.branch.repository.sign_revision('A',
282
gpg.LoopbackGPGStrategy(None))
283
#FIXME: clone should work to urls,
284
# wt.clone should work to disks.
285
self.build_tree(['target/'])
286
d2 = wt.bzrdir.clone('target')
287
self.assertEqual(wt.branch.repository.get_signature_text('A'),
288
d2.open_repository().get_signature_text('A'))
290
def test_nicks(self):
291
"""Branch nicknames"""
292
t = get_transport(self.get_url())
294
branch = self.make_branch('bzr.dev')
295
self.assertEqual(branch.nick, 'bzr.dev')
296
t.move('bzr.dev', 'bzr.ab')
297
branch = Branch.open(self.get_url('bzr.ab'))
298
self.assertEqual(branch.nick, 'bzr.ab')
299
branch.nick = "Aaron's branch"
300
branch.nick = "Aaron's branch"
304
branch.control_files.controlfilename("branch.conf")
308
self.assertEqual(branch.nick, "Aaron's branch")
309
t.move('bzr.ab', 'integration')
310
branch = Branch.open(self.get_url('integration'))
311
self.assertEqual(branch.nick, "Aaron's branch")
312
branch.nick = u"\u1234"
313
self.assertEqual(branch.nick, u"\u1234")
315
def test_commit_nicks(self):
316
"""Nicknames are committed to the revision"""
317
get_transport(self.get_url()).mkdir('bzr.dev')
318
wt = self.make_branch_and_tree('bzr.dev')
320
branch.nick = "My happy branch"
321
wt.commit('My commit respect da nick.')
322
committed = branch.repository.get_revision(branch.last_revision())
323
self.assertEqual(committed.properties["branch-nick"],
326
def test_create_open_branch_uses_repository(self):
328
repo = self.make_repository('.', shared=True)
329
except errors.IncompatibleFormat:
331
repo.bzrdir.root_transport.mkdir('child')
332
child_dir = self.bzrdir_format.initialize('child')
334
child_branch = self.branch_format.initialize(child_dir)
335
except errors.UninitializableFormat:
336
# branch references are not default init'able.
338
self.assertEqual(repo.bzrdir.root_transport.base,
339
child_branch.repository.bzrdir.root_transport.base)
340
child_branch = branch.Branch.open(self.get_url('child'))
341
self.assertEqual(repo.bzrdir.root_transport.base,
342
child_branch.repository.bzrdir.root_transport.base)
344
def test_format_description(self):
345
tree = self.make_branch_and_tree('tree')
346
text = tree.branch._format.get_format_description()
347
self.failUnless(len(text))
349
def test_check_branch_report_results(self):
350
"""Checking a branch produces results which can be printed"""
351
branch = self.make_branch('.')
352
result = branch.check()
353
# reports results through logging
354
result.report_results(verbose=True)
355
result.report_results(verbose=False)
357
def test_get_commit_builder(self):
358
self.assertIsInstance(self.make_branch(".").get_commit_builder([]),
359
repository.CommitBuilder)
361
def test_generate_revision_history(self):
362
"""Create a fake revision history easily."""
363
tree = self.make_branch_and_tree('.')
364
rev1 = tree.commit('foo')
365
orig_history = tree.branch.revision_history()
366
rev2 = tree.commit('bar', allow_pointless=True)
367
tree.branch.generate_revision_history(rev1)
368
self.assertEqual(orig_history, tree.branch.revision_history())
370
def test_generate_revision_history_NULL_REVISION(self):
371
tree = self.make_branch_and_tree('.')
372
rev1 = tree.commit('foo')
373
tree.branch.generate_revision_history(bzrlib.revision.NULL_REVISION)
374
self.assertEqual([], tree.branch.revision_history())
377
class ChrootedTests(TestCaseWithBranch):
378
"""A support class that provides readonly urls outside the local namespace.
380
This is done by checking if self.transport_server is a MemoryServer. if it
381
is then we are chrooted already, if it is not then an HttpServer is used
386
super(ChrootedTests, self).setUp()
387
if not self.transport_server == MemoryServer:
388
self.transport_readonly_server = HttpServer
390
def test_open_containing(self):
391
self.assertRaises(NotBranchError, Branch.open_containing,
392
self.get_readonly_url(''))
393
self.assertRaises(NotBranchError, Branch.open_containing,
394
self.get_readonly_url('g/p/q'))
395
branch = self.make_branch('.')
396
branch, relpath = Branch.open_containing(self.get_readonly_url(''))
397
self.assertEqual('', relpath)
398
branch, relpath = Branch.open_containing(self.get_readonly_url('g/p/q'))
399
self.assertEqual('g/p/q', relpath)
402
class InstrumentedTransaction(object):
405
self.calls.append('finish')
411
class TestDecorator(object):
417
self._calls.append('lr')
419
def lock_write(self):
420
self._calls.append('lw')
423
self._calls.append('ul')
426
def do_with_read(self):
430
def except_with_read(self):
434
def do_with_write(self):
438
def except_with_write(self):
442
class TestDecorators(TestCase):
444
def test_needs_read_lock(self):
445
branch = TestDecorator()
446
self.assertEqual(1, branch.do_with_read())
447
self.assertEqual(['lr', 'ul'], branch._calls)
449
def test_excepts_in_read_lock(self):
450
branch = TestDecorator()
451
self.assertRaises(RuntimeError, branch.except_with_read)
452
self.assertEqual(['lr', 'ul'], branch._calls)
454
def test_needs_write_lock(self):
455
branch = TestDecorator()
456
self.assertEqual(2, branch.do_with_write())
457
self.assertEqual(['lw', 'ul'], branch._calls)
459
def test_excepts_in_write_lock(self):
460
branch = TestDecorator()
461
self.assertRaises(RuntimeError, branch.except_with_write)
462
self.assertEqual(['lw', 'ul'], branch._calls)
465
class TestBranchTransaction(TestCaseWithBranch):
468
super(TestBranchTransaction, self).setUp()
471
def test_default_get_transaction(self):
472
"""branch.get_transaction on a new branch should give a PassThrough."""
473
self.failUnless(isinstance(self.get_branch().get_transaction(),
474
transactions.PassThroughTransaction))
476
def test__set_new_transaction(self):
477
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
479
def test__set_over_existing_transaction_raises(self):
480
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
481
self.assertRaises(errors.LockError,
482
self.get_branch()._set_transaction,
483
transactions.ReadOnlyTransaction())
485
def test_finish_no_transaction_raises(self):
486
self.assertRaises(errors.LockError, self.get_branch()._finish_transaction)
488
def test_finish_readonly_transaction_works(self):
489
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
490
self.get_branch()._finish_transaction()
491
self.assertEqual(None, self.get_branch().control_files._transaction)
493
def test_unlock_calls_finish(self):
494
self.get_branch().lock_read()
495
transaction = InstrumentedTransaction()
496
self.get_branch().control_files._transaction = transaction
497
self.get_branch().unlock()
498
self.assertEqual(['finish'], transaction.calls)
500
def test_lock_read_acquires_ro_transaction(self):
501
self.get_branch().lock_read()
502
self.failUnless(isinstance(self.get_branch().get_transaction(),
503
transactions.ReadOnlyTransaction))
504
self.get_branch().unlock()
506
def test_lock_write_acquires_write_transaction(self):
507
self.get_branch().lock_write()
508
# cannot use get_transaction as its magic
509
self.failUnless(isinstance(self.get_branch().control_files._transaction,
510
transactions.WriteTransaction))
511
self.get_branch().unlock()
514
class TestBranchPushLocations(TestCaseWithBranch):
516
def test_get_push_location_unset(self):
517
self.assertEqual(None, self.get_branch().get_push_location())
519
def test_get_push_location_exact(self):
520
from bzrlib.config import (locations_config_filename,
521
ensure_config_dir_exists)
522
ensure_config_dir_exists()
523
fn = locations_config_filename()
524
print >> open(fn, 'wt'), ("[%s]\n"
525
"push_location=foo" %
526
self.get_branch().base[:-1])
527
self.assertEqual("foo", self.get_branch().get_push_location())
529
def test_set_push_location(self):
530
from bzrlib.config import (locations_config_filename,
531
ensure_config_dir_exists)
532
ensure_config_dir_exists()
533
fn = locations_config_filename()
534
branch = self.get_branch()
535
branch.set_push_location('foo')
536
local_path = urlutils.local_path_from_url(branch.base[:-1])
537
self.assertFileEqual("[%s]\n"
538
"push_location = foo" % local_path,
541
# TODO RBC 20051029 test getting a push location from a branch in a
542
# recursive section - that is, it appends the branch name.
545
class TestFormat(TestCaseWithBranch):
546
"""Tests for the format itself."""
548
def test_format_initialize_find_open(self):
549
# loopback test to check the current format initializes to itself.
550
if not self.branch_format.is_supported():
551
# unsupported formats are not loopback testable
552
# because the default open will not open them and
553
# they may not be initializable.
555
# supported formats must be able to init and open
556
t = get_transport(self.get_url())
557
readonly_t = get_transport(self.get_readonly_url())
558
made_branch = self.make_branch('.')
559
self.failUnless(isinstance(made_branch, branch.Branch))
561
# find it via bzrdir opening:
562
opened_control = bzrdir.BzrDir.open(readonly_t.base)
563
direct_opened_branch = opened_control.open_branch()
564
self.assertEqual(direct_opened_branch.__class__, made_branch.__class__)
565
self.assertEqual(opened_control, direct_opened_branch.bzrdir)
566
self.failUnless(isinstance(direct_opened_branch._format,
567
self.branch_format.__class__))
569
# find it via Branch.open
570
opened_branch = branch.Branch.open(readonly_t.base)
571
self.failUnless(isinstance(opened_branch, made_branch.__class__))
572
self.assertEqual(made_branch._format.__class__,
573
opened_branch._format.__class__)
574
# if it has a unique id string, can we probe for it ?
576
self.branch_format.get_format_string()
577
except NotImplementedError:
579
self.assertEqual(self.branch_format,
580
branch.BranchFormat.find_format(opened_control))