14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for branch implementations - tests a branch format."""
22
import bzrlib.branch as branch
23
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
24
from bzrlib.clone import copy_branch
25
from bzrlib.commit import commit
26
import bzrlib.errors as errors
27
from bzrlib.errors import (NoSuchRevision,
29
UninitializableFormat,
33
from bzrlib.osutils import getcwd
34
from bzrlib.tests import TestCase, TestCaseWithTransport, TestSkipped
35
from bzrlib.trace import mutter
36
import bzrlib.transactions as transactions
37
from bzrlib.transport import get_transport
38
from bzrlib.transport.http import HttpServer
39
from bzrlib.transport.memory import MemoryServer
40
from bzrlib.revision import NULL_REVISION
42
# TODO: Make a branch using basis branch, and check that it
43
# doesn't request any files that could have been avoided, by
44
# hooking into the Transport.
47
class TestCaseWithBranch(TestCaseWithTransport):
50
super(TestCaseWithBranch, self).setUp()
54
if self.branch is None:
55
self.branch = self.make_branch(self.get_url())
58
def make_branch(self, relpath):
60
return self.branch_format.initialize(relpath)
61
except UninitializableFormat:
62
raise TestSkipped("Format %s is not initializable.")
65
class TestBranch(TestCaseWithBranch):
17
from bzrlib.selftest import TestCaseInTempDir
20
class TestAppendRevisions(TestCaseInTempDir):
21
"""Test appending more than one revision"""
67
22
def test_append_revisions(self):
68
"""Test appending more than one revision"""
69
br = self.get_branch()
23
from bzrlib.branch import Branch
24
br = Branch.initialize(".")
70
25
br.append_revision("rev1")
71
26
self.assertEquals(br.revision_history(), ["rev1",])
72
27
br.append_revision("rev2", "rev3")
73
28
self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
75
def test_fetch_revisions(self):
76
"""Test fetch-revision operation."""
77
from bzrlib.fetch import Fetcher
80
b1 = self.make_branch('b1')
81
b2 = self.make_branch('b2')
82
file('b1/foo', 'w').write('hello')
83
b1.working_tree().add(['foo'], ['foo-id'])
84
b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=False)
87
f = Fetcher(from_branch=b1, to_branch=b2)
88
eq = self.assertEquals
90
eq(f.last_revision, 'revision-1')
92
rev = b2.get_revision('revision-1')
93
tree = b2.revision_tree('revision-1')
94
eq(tree.get_file_text('foo-id'), 'hello')
96
def test_revision_tree(self):
97
b1 = self.get_branch()
98
b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=True)
99
tree = b1.revision_tree('revision-1')
100
tree = b1.revision_tree(None)
101
self.assertEqual(len(tree.list_files()), 0)
102
tree = b1.revision_tree(NULL_REVISION)
103
self.assertEqual(len(tree.list_files()), 0)
105
def get_unbalanced_branch_pair(self):
106
"""Return two branches, a and b, with one file in a."""
108
br_a = self.make_branch('a')
109
file('a/b', 'wb').write('b')
110
br_a.working_tree().add('b')
111
commit(br_a, "silly commit", rev_id='A')
113
br_b = self.make_branch('b')
116
def get_balanced_branch_pair(self):
117
"""Returns br_a, br_b as with one commit in a, and b has a's stores."""
118
br_a, br_b = self.get_unbalanced_branch_pair()
119
br_a.push_stores(br_b)
122
def test_push_stores(self):
123
"""Copy the stores from one branch to another"""
124
br_a, br_b = self.get_unbalanced_branch_pair()
125
# ensure the revision is missing.
126
self.assertRaises(NoSuchRevision, br_b.get_revision,
127
br_a.revision_history()[0])
128
br_a.push_stores(br_b)
129
# check that b now has all the data from a's first commit.
130
rev = br_b.get_revision(br_a.revision_history()[0])
131
tree = br_b.revision_tree(br_a.revision_history()[0])
133
if tree.inventory[file_id].kind == "file":
134
tree.get_file(file_id).read()
137
def test_copy_branch(self):
138
"""Copy the stores from one branch to another"""
139
br_a, br_b = self.get_balanced_branch_pair()
140
commit(br_b, "silly commit")
142
br_c = copy_branch(br_a, 'c', basis_branch=br_b)
143
self.assertEqual(br_a.revision_history(), br_c.revision_history())
145
def test_copy_partial(self):
146
"""Copy only part of the history of a branch."""
147
self.build_tree(['a/', 'a/one'])
148
br_a = self.make_branch('a')
149
br_a.working_tree().add(['one'])
150
br_a.working_tree().commit('commit one', rev_id='u@d-1')
151
self.build_tree(['a/two'])
152
br_a.working_tree().add(['two'])
153
br_a.working_tree().commit('commit two', rev_id='u@d-2')
154
br_b = copy_branch(br_a, 'b', revision='u@d-1')
155
self.assertEqual(br_b.last_revision(), 'u@d-1')
156
self.assertTrue(os.path.exists('b/one'))
157
self.assertFalse(os.path.exists('b/two'))
159
def test_record_initial_ghost_merge(self):
160
"""A pending merge with no revision present is still a merge."""
161
branch = self.get_branch()
162
branch.working_tree().add_pending_merge('non:existent@rev--ision--0--2')
163
branch.working_tree().commit('pretend to merge nonexistent-revision', rev_id='first')
164
rev = branch.get_revision(branch.last_revision())
165
self.assertEqual(len(rev.parent_ids), 1)
166
# parent_sha1s is not populated now, WTF. rbc 20051003
167
self.assertEqual(len(rev.parent_sha1s), 0)
168
self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
170
def test_bad_revision(self):
171
self.assertRaises(errors.InvalidRevisionId, self.get_branch().get_revision, None)
174
# compare the gpg-to-sign info for a commit with a ghost and
175
# an identical tree without a ghost
176
# fetch missing should rewrite the TOC of weaves to list newly available parents.
178
def test_pending_merges(self):
179
"""Tracking pending-merged revisions."""
180
b = self.get_branch()
181
wt = b.working_tree()
182
self.assertEquals(wt.pending_merges(), [])
183
wt.add_pending_merge('foo@azkhazan-123123-abcabc')
184
self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
185
wt.add_pending_merge('foo@azkhazan-123123-abcabc')
186
self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
187
wt.add_pending_merge('wibble@fofof--20050401--1928390812')
188
self.assertEquals(wt.pending_merges(),
189
['foo@azkhazan-123123-abcabc',
190
'wibble@fofof--20050401--1928390812'])
191
b.working_tree().commit("commit from base with two merges")
192
rev = b.get_revision(b.revision_history()[0])
193
self.assertEquals(len(rev.parent_ids), 2)
194
self.assertEquals(rev.parent_ids[0],
195
'foo@azkhazan-123123-abcabc')
196
self.assertEquals(rev.parent_ids[1],
197
'wibble@fofof--20050401--1928390812')
198
# list should be cleared when we do a commit
199
self.assertEquals(wt.pending_merges(), [])
201
def test_sign_existing_revision(self):
202
branch = self.get_branch()
203
branch.working_tree().commit("base", allow_pointless=True, rev_id='A')
204
from bzrlib.testament import Testament
205
branch.sign_revision('A', bzrlib.gpg.LoopbackGPGStrategy(None))
206
self.assertEqual(Testament.from_revision(branch, 'A').as_short_text(),
207
branch.revision_store.get('A', 'sig').read())
209
def test_store_signature(self):
210
branch = self.get_branch()
211
branch.store_revision_signature(bzrlib.gpg.LoopbackGPGStrategy(None),
213
self.assertEqual('FOO', branch.revision_store.get('A', 'sig').read())
215
def test__relcontrolfilename(self):
216
self.assertEqual('.bzr/%25', self.get_branch()._rel_controlfilename('%'))
218
def test__relcontrolfilename_empty(self):
219
self.assertEqual('.bzr', self.get_branch()._rel_controlfilename(''))
221
def test_nicks(self):
222
"""Branch nicknames"""
224
branch = self.make_branch('bzr.dev')
225
self.assertEqual(branch.nick, 'bzr.dev')
226
os.rename('bzr.dev', 'bzr.ab')
227
branch = Branch.open('bzr.ab')
228
self.assertEqual(branch.nick, 'bzr.ab')
229
branch.nick = "Aaron's branch"
230
branch.nick = "Aaron's branch"
231
self.failUnless(os.path.exists(branch.controlfilename("branch.conf")))
232
self.assertEqual(branch.nick, "Aaron's branch")
233
os.rename('bzr.ab', 'integration')
234
branch = Branch.open('integration')
235
self.assertEqual(branch.nick, "Aaron's branch")
236
branch.nick = u"\u1234"
237
self.assertEqual(branch.nick, u"\u1234")
239
def test_commit_nicks(self):
240
"""Nicknames are committed to the revision"""
242
branch = self.get_branch()
243
branch.nick = "My happy branch"
244
branch.working_tree().commit('My commit respect da nick.')
245
committed = branch.get_revision(branch.last_revision())
246
self.assertEqual(committed.properties["branch-nick"],
250
class ChrootedTests(TestCaseWithBranch):
251
"""A support class that provides readonly urls outside the local namespace.
253
This is done by checking if self.transport_server is a MemoryServer. if it
254
is then we are chrooted already, if it is not then an HttpServer is used
259
super(ChrootedTests, self).setUp()
260
if not isinstance(self.transport_server, MemoryServer):
261
self.transport_readonly_server = HttpServer
263
def test_open_containing(self):
264
self.assertRaises(NotBranchError, Branch.open_containing,
265
self.get_readonly_url(''))
266
self.assertRaises(NotBranchError, Branch.open_containing,
267
self.get_readonly_url('g/p/q'))
269
branch = self.branch_format.initialize(self.get_url())
270
except UninitializableFormat:
271
raise TestSkipped("Format %s is not initializable.")
272
branch, relpath = Branch.open_containing(self.get_readonly_url(''))
273
self.assertEqual('', relpath)
274
branch, relpath = Branch.open_containing(self.get_readonly_url('g/p/q'))
275
self.assertEqual('g/p/q', relpath)
277
31
# TODO: rewrite this as a regular unittest, without relying on the displayed output
278
32
# >>> from bzrlib.commit import commit
279
33
# >>> bzrlib.trace.silent = True
280
34
# >>> br1 = ScratchBranch(files=['foo', 'bar'])
281
# >>> br1.working_tree().add('foo')
282
# >>> br1.working_tree().add('bar')
283
37
# >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
284
38
# >>> br2 = ScratchBranch()
285
39
# >>> br2.update_revisions(br1)
292
46
# Added 0 revisions.
293
47
# >>> br1.text_store.total_size() == br2.text_store.total_size()
296
class InstrumentedTransaction(object):
299
self.calls.append('finish')
305
class TestDecorator(object):
311
self._calls.append('lr')
313
def lock_write(self):
314
self._calls.append('lw')
317
self._calls.append('ul')
320
def do_with_read(self):
324
def except_with_read(self):
328
def do_with_write(self):
332
def except_with_write(self):
336
class TestDecorators(TestCase):
338
def test_needs_read_lock(self):
339
branch = TestDecorator()
340
self.assertEqual(1, branch.do_with_read())
341
self.assertEqual(['lr', 'ul'], branch._calls)
343
def test_excepts_in_read_lock(self):
344
branch = TestDecorator()
345
self.assertRaises(RuntimeError, branch.except_with_read)
346
self.assertEqual(['lr', 'ul'], branch._calls)
348
def test_needs_write_lock(self):
349
branch = TestDecorator()
350
self.assertEqual(2, branch.do_with_write())
351
self.assertEqual(['lw', 'ul'], branch._calls)
353
def test_excepts_in_write_lock(self):
354
branch = TestDecorator()
355
self.assertRaises(RuntimeError, branch.except_with_write)
356
self.assertEqual(['lw', 'ul'], branch._calls)
359
class TestBranchTransaction(TestCaseWithBranch):
362
super(TestBranchTransaction, self).setUp()
365
def test_default_get_transaction(self):
366
"""branch.get_transaction on a new branch should give a PassThrough."""
367
self.failUnless(isinstance(self.get_branch().get_transaction(),
368
transactions.PassThroughTransaction))
370
def test__set_new_transaction(self):
371
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
373
def test__set_over_existing_transaction_raises(self):
374
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
375
self.assertRaises(errors.LockError,
376
self.get_branch()._set_transaction,
377
transactions.ReadOnlyTransaction())
379
def test_finish_no_transaction_raises(self):
380
self.assertRaises(errors.LockError, self.get_branch()._finish_transaction)
382
def test_finish_readonly_transaction_works(self):
383
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
384
self.get_branch()._finish_transaction()
385
self.assertEqual(None, self.get_branch()._transaction)
387
def test_unlock_calls_finish(self):
388
self.get_branch().lock_read()
389
transaction = InstrumentedTransaction()
390
self.get_branch()._transaction = transaction
391
self.get_branch().unlock()
392
self.assertEqual(['finish'], transaction.calls)
394
def test_lock_read_acquires_ro_transaction(self):
395
self.get_branch().lock_read()
396
self.failUnless(isinstance(self.get_branch().get_transaction(),
397
transactions.ReadOnlyTransaction))
398
self.get_branch().unlock()
400
def test_lock_write_acquires_passthrough_transaction(self):
401
self.get_branch().lock_write()
402
# cannot use get_transaction as its magic
403
self.failUnless(isinstance(self.get_branch()._transaction,
404
transactions.PassThroughTransaction))
405
self.get_branch().unlock()
408
class TestBranchPushLocations(TestCaseWithBranch):
410
def test_get_push_location_unset(self):
411
self.assertEqual(None, self.get_branch().get_push_location())
413
def test_get_push_location_exact(self):
414
from bzrlib.config import (branches_config_filename,
415
ensure_config_dir_exists)
416
ensure_config_dir_exists()
417
fn = branches_config_filename()
418
print >> open(fn, 'wt'), ("[%s]\n"
419
"push_location=foo" %
421
self.assertEqual("foo", self.get_branch().get_push_location())
423
def test_set_push_location(self):
424
from bzrlib.config import (branches_config_filename,
425
ensure_config_dir_exists)
426
ensure_config_dir_exists()
427
fn = branches_config_filename()
428
self.get_branch().set_push_location('foo')
429
self.assertFileEqual("[%s]\n"
430
"push_location = foo" % getcwd(),
433
# TODO RBC 20051029 test getting a push location from a branch in a
434
# recursive section - that is, it appends the branch name.
437
class TestFormat(TestCaseWithBranch):
438
"""Tests for the format itself."""
440
def test_format_initialize_find_open(self):
441
# loopback test to check the current format initializes to itself.
442
if not self.branch_format.is_supported():
443
# unsupported formats are not loopback testable
444
# because the default open will not open them and
445
# they may not be initializable.
447
# supported formats must be able to init and open
448
t = get_transport(self.get_url())
449
readonly_t = get_transport(self.get_readonly_url())
450
made_branch = self.branch_format.initialize(t.base)
451
self.failUnless(isinstance(made_branch, branch.Branch))
452
self.assertEqual(self.branch_format,
453
branch.BzrBranchFormat.find_format(readonly_t))
454
direct_opened_branch = self.branch_format.open(readonly_t)
455
opened_branch = branch.Branch.open(t.base)
456
self.assertEqual(made_branch._branch_format,
457
opened_branch._branch_format)
458
self.assertEqual(direct_opened_branch._branch_format,
459
opened_branch._branch_format)
460
self.failUnless(isinstance(opened_branch, branch.Branch))
462
def test_open_not_branch(self):
463
self.assertRaises(NoSuchFile,
464
self.branch_format.open,
465
get_transport(self.get_readonly_url()))