1
# Copyright (C) 2005, 2006 Canonical Ltd
1
# (C) 2005 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
11
# GNU General Public License for more details.
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for branch implementations - tests a branch format."""
32
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
33
from bzrlib.delta import TreeDelta
34
from bzrlib.errors import (FileExists,
37
UninitializableFormat,
40
from bzrlib.osutils import getcwd
41
import bzrlib.revision
42
from bzrlib.tests import TestCase, TestCaseWithTransport, TestSkipped
43
from bzrlib.tests.branch_implementations import TestCaseWithBranch
44
from bzrlib.tests.HttpServer import HttpServer
18
from bzrlib.branch import Branch
19
from bzrlib.clone import copy_branch
20
from bzrlib.commit import commit
21
import bzrlib.errors as errors
22
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
23
from bzrlib.selftest import TestCaseInTempDir
45
24
from bzrlib.trace import mutter
46
from bzrlib.transport import get_transport
47
from bzrlib.transport.memory import MemoryServer
48
from bzrlib.upgrade import upgrade
49
from bzrlib.workingtree import WorkingTree
52
class TestBranch(TestCaseWithBranch):
25
import bzrlib.transactions as transactions
26
from bzrlib.selftest.HTTPTestUtil import TestCaseWithWebserver
28
class TestBranch(TestCaseInTempDir):
54
30
def test_append_revisions(self):
55
31
"""Test appending more than one revision"""
56
wt = self.make_branch_and_tree('tree')
57
wt.commit('f', rev_id='rev1')
58
wt.commit('f', rev_id='rev2')
59
wt.commit('f', rev_id='rev3')
61
br = self.get_branch()
32
br = Branch.initialize(".")
63
33
br.append_revision("rev1")
64
34
self.assertEquals(br.revision_history(), ["rev1",])
65
35
br.append_revision("rev2", "rev3")
66
36
self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
67
self.assertRaises(errors.ReservedId, br.append_revision, 'current:')
69
def test_create_tree_with_merge(self):
70
tree = self.create_tree_with_merge()
71
ancestry_graph = tree.branch.repository.get_revision_graph('rev-3')
72
self.assertEqual({'rev-1':[],
74
'rev-1.1.1':['rev-1'],
75
'rev-3':['rev-2', 'rev-1.1.1'],
78
def test_revision_ids_are_utf8(self):
79
wt = self.make_branch_and_tree('tree')
80
wt.commit('f', rev_id='rev1')
81
wt.commit('f', rev_id='rev2')
82
wt.commit('f', rev_id='rev3')
84
br = self.get_branch()
86
br.set_revision_history(['rev1', 'rev2', 'rev3'])
87
rh = br.revision_history()
88
self.assertEqual(['rev1', 'rev2', 'rev3'], rh)
89
for revision_id in rh:
90
self.assertIsInstance(revision_id, str)
91
last = br.last_revision()
92
self.assertEqual('rev3', last)
93
self.assertIsInstance(last, str)
94
revno, last = br.last_revision_info()
95
self.assertEqual(3, revno)
96
self.assertEqual('rev3', last)
97
self.assertIsInstance(last, str)
99
38
def test_fetch_revisions(self):
100
39
"""Test fetch-revision operation."""
101
wt = self.make_branch_and_tree('b1')
103
self.build_tree_contents([('b1/foo', 'hello')])
104
wt.add(['foo'], ['foo-id'])
105
wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
107
b2 = self.make_branch('b2')
108
self.assertEqual((1, []), b2.fetch(b1))
110
rev = b2.repository.get_revision('revision-1')
111
tree = b2.repository.revision_tree('revision-1')
112
self.assertEqual(tree.get_file_text('foo-id'), 'hello')
114
def test_get_revision_delta(self):
115
tree_a = self.make_branch_and_tree('a')
116
self.build_tree(['a/foo'])
117
tree_a.add('foo', 'file1')
118
tree_a.commit('rev1', rev_id='rev1')
119
self.build_tree(['a/vla'])
120
tree_a.add('vla', 'file2')
121
tree_a.commit('rev2', rev_id='rev2')
123
delta = tree_a.branch.get_revision_delta(1)
124
self.assertIsInstance(delta, TreeDelta)
125
self.assertEqual([('foo', 'file1', 'file')], delta.added)
126
delta = tree_a.branch.get_revision_delta(2)
127
self.assertIsInstance(delta, TreeDelta)
128
self.assertEqual([('vla', 'file2', 'file')], delta.added)
130
def get_unbalanced_tree_pair(self):
131
"""Return two branches, a and b, with one file in a."""
132
tree_a = self.make_branch_and_tree('a')
133
self.build_tree_contents([('a/b', 'b')])
135
tree_a.commit("silly commit", rev_id='A')
137
tree_b = self.make_branch_and_tree('b')
138
return tree_a, tree_b
140
def get_balanced_branch_pair(self):
141
"""Returns br_a, br_b as with one commit in a, and b has a's stores."""
142
tree_a, tree_b = self.get_unbalanced_tree_pair()
143
tree_b.branch.repository.fetch(tree_a.branch.repository)
144
return tree_a, tree_b
146
def test_clone_partial(self):
40
from bzrlib.fetch import Fetcher
43
b1 = Branch.initialize('b1')
44
b2 = Branch.initialize('b2')
45
file(os.sep.join(['b1', 'foo']), 'w').write('hello')
46
b1.add(['foo'], ['foo-id'])
47
b1.commit('lala!', rev_id='revision-1', allow_pointless=False)
50
f = Fetcher(from_branch=b1, to_branch=b2)
51
eq = self.assertEquals
53
eq(f.last_revision, 'revision-1')
55
rev = b2.get_revision('revision-1')
56
tree = b2.revision_tree('revision-1')
57
eq(tree.get_file_text('foo-id'), 'hello')
59
def test_push_stores(self):
60
"""Copy the stores from one branch to another"""
62
br_a = Branch.initialize("a")
63
file('a/b', 'wb').write('b')
65
commit(br_a, "silly commit")
68
br_b = Branch.initialize("b")
69
self.assertRaises(NoSuchRevision, br_b.get_revision,
70
br_a.revision_history()[0])
71
br_a.push_stores(br_b)
72
rev = br_b.get_revision(br_a.revision_history()[0])
73
tree = br_b.revision_tree(br_a.revision_history()[0])
75
if tree.inventory[file_id].kind == "file":
76
tree.get_file(file_id).read()
79
def test_copy_branch(self):
80
"""Copy the stores from one branch to another"""
81
br_a, br_b = self.test_push_stores()
82
commit(br_b, "silly commit")
84
br_c = copy_branch(br_a, 'c', basis_branch=br_b)
85
self.assertEqual(br_a.revision_history(), br_c.revision_history())
86
## # basis branches currently disabled for weave format
87
## self.assertFalse(br_b.last_revision() in br_c.revision_history())
88
## br_c.get_revision(br_b.last_revision())
90
def test_copy_partial(self):
147
91
"""Copy only part of the history of a branch."""
148
# TODO: RBC 20060208 test with a revision not on revision-history.
149
# what should that behaviour be ? Emailed the list.
150
# First, make a branch with two commits.
151
wt_a = self.make_branch_and_tree('a')
152
self.build_tree(['a/one'])
154
wt_a.commit('commit one', rev_id='1')
92
self.build_tree(['a/', 'a/one'])
93
br_a = Branch.initialize('a')
95
br_a.commit('commit one', rev_id='u@d-1')
155
96
self.build_tree(['a/two'])
157
wt_a.commit('commit two', rev_id='2')
158
# Now make a copy of the repository.
159
repo_b = self.make_repository('b')
160
wt_a.branch.repository.copy_content_into(repo_b)
161
# wt_a might be a lightweight checkout, so get a hold of the actual
162
# branch (because you can't do a partial clone of a lightweight
164
branch = wt_a.branch.bzrdir.open_branch()
165
# Then make a branch where the new repository is, but specify a revision
166
# ID. The new branch's history will stop at the specified revision.
167
br_b = branch.clone(repo_b.bzrdir, revision_id='1')
168
self.assertEqual('1', br_b.last_revision())
170
def get_parented_branch(self):
171
wt_a = self.make_branch_and_tree('a')
172
self.build_tree(['a/one'])
174
wt_a.commit('commit one', rev_id='1')
176
branch_b = wt_a.bzrdir.sprout('b', revision_id='1').open_branch()
177
self.assertEqual(wt_a.branch.base, branch_b.get_parent())
180
def test_clone_branch_nickname(self):
181
# test the nick name is preserved always
182
raise TestSkipped('XXX branch cloning is not yet tested..')
184
def test_clone_branch_parent(self):
185
# test the parent is preserved always
186
branch_b = self.get_parented_branch()
187
repo_c = self.make_repository('c')
188
branch_b.repository.copy_content_into(repo_c)
189
branch_c = branch_b.clone(repo_c.bzrdir)
190
self.assertNotEqual(None, branch_c.get_parent())
191
self.assertEqual(branch_b.get_parent(), branch_c.get_parent())
193
# We can also set a specific parent, and it should be honored
194
random_parent = 'http://bazaar-vcs.org/path/to/branch'
195
branch_b.set_parent(random_parent)
196
repo_d = self.make_repository('d')
197
branch_b.repository.copy_content_into(repo_d)
198
branch_d = branch_b.clone(repo_d.bzrdir)
199
self.assertEqual(random_parent, branch_d.get_parent())
201
def test_submit_branch(self):
202
"""Submit location can be queried and set"""
203
branch = self.make_branch('branch')
204
self.assertEqual(branch.get_submit_branch(), None)
205
branch.set_submit_branch('sftp://example.com')
206
self.assertEqual(branch.get_submit_branch(), 'sftp://example.com')
207
branch.set_submit_branch('sftp://example.net')
208
self.assertEqual(branch.get_submit_branch(), 'sftp://example.net')
98
br_a.commit('commit two', rev_id='u@d-2')
99
br_b = copy_branch(br_a, 'b', revision='u@d-1')
100
self.assertEqual(br_b.last_revision(), 'u@d-1')
101
self.assertTrue(os.path.exists('b/one'))
102
self.assertFalse(os.path.exists('b/two'))
210
def test_public_branch(self):
211
"""public location can be queried and set"""
212
branch = self.make_branch('branch')
213
self.assertEqual(branch.get_public_branch(), None)
214
branch.set_public_branch('sftp://example.com')
215
self.assertEqual(branch.get_public_branch(), 'sftp://example.com')
216
branch.set_public_branch('sftp://example.net')
217
self.assertEqual(branch.get_public_branch(), 'sftp://example.net')
218
branch.set_public_branch(None)
219
self.assertEqual(branch.get_public_branch(), None)
221
def test_record_initial_ghost(self):
222
"""Branches should support having ghosts."""
223
wt = self.make_branch_and_tree('.')
224
wt.set_parent_ids(['non:existent@rev--ision--0--2'],
225
allow_leftmost_as_ghost=True)
226
rev_id = wt.commit('commit against a ghost first parent.')
227
rev = wt.branch.repository.get_revision(rev_id)
228
self.assertEqual(rev.parent_ids, ['non:existent@rev--ision--0--2'])
105
def test_record_initial_ghost_merge(self):
106
"""A pending merge with no revision present is still a merge."""
107
branch = Branch.initialize('.')
108
branch.add_pending_merge('non:existent@rev--ision--0--2')
109
branch.commit('pretend to merge nonexistent-revision', rev_id='first')
110
rev = branch.get_revision(branch.last_revision())
111
self.assertEqual(len(rev.parent_ids), 1)
229
112
# parent_sha1s is not populated now, WTF. rbc 20051003
230
113
self.assertEqual(len(rev.parent_sha1s), 0)
232
def test_record_two_ghosts(self):
233
"""Recording with all ghosts works."""
234
wt = self.make_branch_and_tree('.')
236
'foo@azkhazan-123123-abcabc',
237
'wibble@fofof--20050401--1928390812',
239
allow_leftmost_as_ghost=True)
240
rev_id = wt.commit("commit from ghost base with one merge")
241
# the revision should have been committed with two parents
242
rev = wt.branch.repository.get_revision(rev_id)
243
self.assertEqual(['foo@azkhazan-123123-abcabc',
244
'wibble@fofof--20050401--1928390812'],
247
def test_bad_revision(self):
248
self.assertRaises(errors.InvalidRevisionId,
249
self.get_branch().repository.get_revision,
114
self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
252
116
# TODO 20051003 RBC:
253
117
# compare the gpg-to-sign info for a commit with a ghost and
254
118
# an identical tree without a ghost
255
119
# fetch missing should rewrite the TOC of weaves to list newly available parents.
257
def test_sign_existing_revision(self):
258
wt = self.make_branch_and_tree('.')
260
wt.commit("base", allow_pointless=True, rev_id='A')
261
from bzrlib.testament import Testament
262
strategy = gpg.LoopbackGPGStrategy(None)
263
branch.repository.sign_revision('A', strategy)
264
self.assertEqual('-----BEGIN PSEUDO-SIGNED CONTENT-----\n' +
265
Testament.from_revision(branch.repository,
266
'A').as_short_text() +
267
'-----END PSEUDO-SIGNED CONTENT-----\n',
268
branch.repository.get_signature_text('A'))
270
def test_store_signature(self):
271
wt = self.make_branch_and_tree('.')
273
branch.repository.store_revision_signature(
274
gpg.LoopbackGPGStrategy(None), 'FOO', 'A')
275
self.assertRaises(errors.NoSuchRevision,
276
branch.repository.has_signature_for_revision_id,
278
wt.commit("base", allow_pointless=True, rev_id='A')
279
self.assertEqual('-----BEGIN PSEUDO-SIGNED CONTENT-----\n'
280
'FOO-----END PSEUDO-SIGNED CONTENT-----\n',
281
branch.repository.get_signature_text('A'))
283
def test_branch_keeps_signatures(self):
284
wt = self.make_branch_and_tree('source')
285
wt.commit('A', allow_pointless=True, rev_id='A')
286
repo = wt.branch.repository
287
repo.sign_revision('A', gpg.LoopbackGPGStrategy(None))
288
#FIXME: clone should work to urls,
289
# wt.clone should work to disks.
290
self.build_tree(['target/'])
291
d2 = repo.bzrdir.clone(urlutils.local_path_to_url('target'))
292
self.assertEqual(repo.get_signature_text('A'),
293
d2.open_repository().get_signature_text('A'))
295
def test_nicks(self):
296
"""Test explicit and implicit branch nicknames.
298
Nicknames are implicitly the name of the branch's directory, unless an
299
explicit nickname is set. That is, an explicit nickname always
300
overrides the implicit one.
302
t = get_transport(self.get_url())
303
branch = self.make_branch('bzr.dev')
304
# The nick will be 'bzr.dev', because there is no explicit nick set.
305
self.assertEqual(branch.nick, 'bzr.dev')
306
# Move the branch to a different directory, 'bzr.ab'. Now that branch
307
# will report its nick as 'bzr.ab'.
308
t.move('bzr.dev', 'bzr.ab')
309
branch = Branch.open(self.get_url('bzr.ab'))
310
self.assertEqual(branch.nick, 'bzr.ab')
311
# Set the branch nick explicitly. This will ensure there's a branch
312
# config file in the branch.
313
branch.nick = "Aaron's branch"
314
branch.nick = "Aaron's branch"
315
if not isinstance(branch, remote.RemoteBranch):
316
controlfilename = branch.control_files.controlfilename
317
self.failUnless(t.has(t.relpath(controlfilename("branch.conf"))))
318
# Because the nick has been set explicitly, the nick is now always
319
# "Aaron's branch", regardless of directory name.
320
self.assertEqual(branch.nick, "Aaron's branch")
321
t.move('bzr.ab', 'integration')
322
branch = Branch.open(self.get_url('integration'))
323
self.assertEqual(branch.nick, "Aaron's branch")
324
branch.nick = u"\u1234"
325
self.assertEqual(branch.nick, u"\u1234")
327
def test_commit_nicks(self):
328
"""Nicknames are committed to the revision"""
329
wt = self.make_branch_and_tree('bzr.dev')
331
branch.nick = "My happy branch"
332
wt.commit('My commit respect da nick.')
333
committed = branch.repository.get_revision(branch.last_revision())
334
self.assertEqual(committed.properties["branch-nick"],
337
def test_create_open_branch_uses_repository(self):
339
repo = self.make_repository('.', shared=True)
340
except errors.IncompatibleFormat:
342
child_transport = repo.bzrdir.root_transport.clone('child')
343
child_transport.mkdir('.')
344
child_dir = self.bzrdir_format.initialize_on_transport(child_transport)
346
child_branch = self.branch_format.initialize(child_dir)
347
except errors.UninitializableFormat:
348
# branch references are not default init'able.
350
self.assertEqual(repo.bzrdir.root_transport.base,
351
child_branch.repository.bzrdir.root_transport.base)
352
child_branch = branch.Branch.open(self.get_url('child'))
353
self.assertEqual(repo.bzrdir.root_transport.base,
354
child_branch.repository.bzrdir.root_transport.base)
356
def test_format_description(self):
357
tree = self.make_branch_and_tree('tree')
358
text = tree.branch._format.get_format_description()
359
self.failUnless(len(text))
361
def test_check_branch_report_results(self):
362
"""Checking a branch produces results which can be printed"""
363
branch = self.make_branch('.')
364
result = branch.check()
365
# reports results through logging
366
result.report_results(verbose=True)
367
result.report_results(verbose=False)
369
def test_get_commit_builder(self):
370
self.assertIsInstance(self.make_branch(".").get_commit_builder([]),
371
repository.CommitBuilder)
373
def test_generate_revision_history(self):
374
"""Create a fake revision history easily."""
375
tree = self.make_branch_and_tree('.')
376
rev1 = tree.commit('foo')
377
orig_history = tree.branch.revision_history()
378
rev2 = tree.commit('bar', allow_pointless=True)
379
tree.branch.generate_revision_history(rev1)
380
self.assertEqual(orig_history, tree.branch.revision_history())
382
def test_generate_revision_history_NULL_REVISION(self):
383
tree = self.make_branch_and_tree('.')
384
rev1 = tree.commit('foo')
385
tree.branch.generate_revision_history(bzrlib.revision.NULL_REVISION)
386
self.assertEqual([], tree.branch.revision_history())
388
def test_create_checkout(self):
389
tree_a = self.make_branch_and_tree('a')
390
branch_a = tree_a.branch
391
checkout_b = branch_a.create_checkout('b')
392
self.assertEqual(None, checkout_b.last_revision())
393
checkout_b.commit('rev1', rev_id='rev1')
394
self.assertEqual('rev1', branch_a.last_revision())
395
self.assertNotEqual(checkout_b.branch.base, branch_a.base)
397
checkout_c = branch_a.create_checkout('c', lightweight=True)
398
self.assertEqual('rev1', checkout_c.last_revision())
399
checkout_c.commit('rev2', rev_id='rev2')
400
self.assertEqual('rev2', branch_a.last_revision())
401
self.assertEqual(checkout_c.branch.base, branch_a.base)
404
checkout_d = branch_a.create_checkout('d', lightweight=True)
405
self.assertEqual('rev2', checkout_d.last_revision())
407
checkout_e = branch_a.create_checkout('e')
408
self.assertEqual('rev2', checkout_e.last_revision())
410
def test_create_anonymous_lightweight_checkout(self):
411
"""A lightweight checkout from a readonly branch should succeed."""
412
tree_a = self.make_branch_and_tree('a')
413
rev_id = tree_a.commit('put some content in the branch')
414
# open the branch via a readonly transport
415
source_branch = bzrlib.branch.Branch.open(self.get_readonly_url('a'))
416
# sanity check that the test will be valid
417
self.assertRaises((errors.LockError, errors.TransportNotPossible),
418
source_branch.lock_write)
419
checkout = source_branch.create_checkout('c', lightweight=True)
420
self.assertEqual(rev_id, checkout.last_revision())
422
def test_create_anonymous_heavyweight_checkout(self):
423
"""A regular checkout from a readonly branch should succeed."""
424
tree_a = self.make_branch_and_tree('a')
425
rev_id = tree_a.commit('put some content in the branch')
426
# open the branch via a readonly transport
427
source_branch = bzrlib.branch.Branch.open(self.get_readonly_url('a'))
428
# sanity check that the test will be valid
429
self.assertRaises((errors.LockError, errors.TransportNotPossible),
430
source_branch.lock_write)
431
checkout = source_branch.create_checkout('c')
432
self.assertEqual(rev_id, checkout.last_revision())
434
def test_set_revision_history(self):
435
tree = self.make_branch_and_tree('a')
436
tree.commit('a commit', rev_id='rev1')
438
br.set_revision_history(["rev1"])
439
self.assertEquals(br.revision_history(), ["rev1"])
440
br.set_revision_history([])
441
self.assertEquals(br.revision_history(), [])
444
class ChrootedTests(TestCaseWithBranch):
445
"""A support class that provides readonly urls outside the local namespace.
447
This is done by checking if self.transport_server is a MemoryServer. if it
448
is then we are chrooted already, if it is not then an HttpServer is used
453
super(ChrootedTests, self).setUp()
454
if not self.vfs_transport_factory == MemoryServer:
455
self.transport_readonly_server = HttpServer
121
def test_pending_merges(self):
122
"""Tracking pending-merged revisions."""
123
b = Branch.initialize('.')
125
self.assertEquals(b.pending_merges(), [])
126
b.add_pending_merge('foo@azkhazan-123123-abcabc')
127
self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
128
b.add_pending_merge('foo@azkhazan-123123-abcabc')
129
self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
130
b.add_pending_merge('wibble@fofof--20050401--1928390812')
131
self.assertEquals(b.pending_merges(),
132
['foo@azkhazan-123123-abcabc',
133
'wibble@fofof--20050401--1928390812'])
134
b.commit("commit from base with two merges")
135
rev = b.get_revision(b.revision_history()[0])
136
self.assertEquals(len(rev.parent_ids), 2)
137
self.assertEquals(rev.parent_ids[0],
138
'foo@azkhazan-123123-abcabc')
139
self.assertEquals(rev.parent_ids[1],
140
'wibble@fofof--20050401--1928390812')
141
# list should be cleared when we do a commit
142
self.assertEquals(b.pending_merges(), [])
145
class TestRemote(TestCaseWithWebserver):
457
147
def test_open_containing(self):
458
148
self.assertRaises(NotBranchError, Branch.open_containing,
459
self.get_readonly_url(''))
149
self.get_remote_url(''))
460
150
self.assertRaises(NotBranchError, Branch.open_containing,
461
self.get_readonly_url('g/p/q'))
462
branch = self.make_branch('.')
463
branch, relpath = Branch.open_containing(self.get_readonly_url(''))
464
self.assertEqual('', relpath)
465
branch, relpath = Branch.open_containing(self.get_readonly_url('g/p/q'))
466
self.assertEqual('g/p/q', relpath)
151
self.get_remote_url('g/p/q'))
152
b = Branch.initialize('.')
153
Branch.open_containing(self.get_remote_url(''))
154
Branch.open_containing(self.get_remote_url('g/p/q'))
156
# TODO: rewrite this as a regular unittest, without relying on the displayed output
157
# >>> from bzrlib.commit import commit
158
# >>> bzrlib.trace.silent = True
159
# >>> br1 = ScratchBranch(files=['foo', 'bar'])
162
# >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
163
# >>> br2 = ScratchBranch()
164
# >>> br2.update_revisions(br1)
166
# Added 1 inventories.
168
# >>> br2.revision_history()
170
# >>> br2.update_revisions(br1)
172
# >>> br1.text_store.total_size() == br2.text_store.total_size()
469
175
class InstrumentedTransaction(object):
478
class TestDecorator(object):
484
self._calls.append('lr')
486
def lock_write(self):
487
self._calls.append('lw')
490
self._calls.append('ul')
493
def do_with_read(self):
497
def except_with_read(self):
501
def do_with_write(self):
505
def except_with_write(self):
509
class TestDecorators(TestCase):
511
def test_needs_read_lock(self):
512
branch = TestDecorator()
513
self.assertEqual(1, branch.do_with_read())
514
self.assertEqual(['lr', 'ul'], branch._calls)
516
def test_excepts_in_read_lock(self):
517
branch = TestDecorator()
518
self.assertRaises(RuntimeError, branch.except_with_read)
519
self.assertEqual(['lr', 'ul'], branch._calls)
521
def test_needs_write_lock(self):
522
branch = TestDecorator()
523
self.assertEqual(2, branch.do_with_write())
524
self.assertEqual(['lw', 'ul'], branch._calls)
526
def test_excepts_in_write_lock(self):
527
branch = TestDecorator()
528
self.assertRaises(RuntimeError, branch.except_with_write)
529
self.assertEqual(['lw', 'ul'], branch._calls)
532
class TestBranchPushLocations(TestCaseWithBranch):
534
def test_get_push_location_unset(self):
535
self.assertEqual(None, self.get_branch().get_push_location())
537
def test_get_push_location_exact(self):
538
from bzrlib.config import (locations_config_filename,
539
ensure_config_dir_exists)
540
ensure_config_dir_exists()
541
fn = locations_config_filename()
542
print >> open(fn, 'wt'), ("[%s]\n"
543
"push_location=foo" %
544
self.get_branch().base[:-1])
545
self.assertEqual("foo", self.get_branch().get_push_location())
547
def test_set_push_location(self):
548
branch = self.get_branch()
549
branch.set_push_location('foo')
550
self.assertEqual('foo', branch.get_push_location())
553
class TestFormat(TestCaseWithBranch):
554
"""Tests for the format itself."""
556
def test_get_reference(self):
557
"""get_reference on all regular branches should return None."""
558
if not self.branch_format.is_supported():
559
# unsupported formats are not loopback testable
560
# because the default open will not open them and
561
# they may not be initializable.
563
made_branch = self.make_branch('.')
564
self.assertEqual(None,
565
made_branch._format.get_reference(made_branch.bzrdir))
567
def test_format_initialize_find_open(self):
568
# loopback test to check the current format initializes to itself.
569
if not self.branch_format.is_supported():
570
# unsupported formats are not loopback testable
571
# because the default open will not open them and
572
# they may not be initializable.
574
# supported formats must be able to init and open
575
t = get_transport(self.get_url())
576
readonly_t = get_transport(self.get_readonly_url())
577
made_branch = self.make_branch('.')
578
self.failUnless(isinstance(made_branch, branch.Branch))
580
# find it via bzrdir opening:
581
opened_control = bzrdir.BzrDir.open(readonly_t.base)
582
direct_opened_branch = opened_control.open_branch()
583
self.assertEqual(direct_opened_branch.__class__, made_branch.__class__)
584
self.assertEqual(opened_control, direct_opened_branch.bzrdir)
585
self.failUnless(isinstance(direct_opened_branch._format,
586
self.branch_format.__class__))
588
# find it via Branch.open
589
opened_branch = branch.Branch.open(readonly_t.base)
590
self.failUnless(isinstance(opened_branch, made_branch.__class__))
591
self.assertEqual(made_branch._format.__class__,
592
opened_branch._format.__class__)
593
# if it has a unique id string, can we probe for it ?
595
self.branch_format.get_format_string()
596
except NotImplementedError:
598
self.assertEqual(self.branch_format,
599
opened_control.find_branch_format())
602
class TestBound(TestCaseWithBranch):
604
def test_bind_unbind(self):
605
branch = self.make_branch('1')
606
branch2 = self.make_branch('2')
609
except errors.UpgradeRequired:
610
raise TestSkipped('Format does not support binding')
611
self.assertTrue(branch.unbind())
612
self.assertFalse(branch.unbind())
613
self.assertIs(None, branch.get_bound_location())
615
def test_old_bound_location(self):
616
branch = self.make_branch('branch1')
618
self.assertIs(None, branch.get_old_bound_location())
619
except errors.UpgradeRequired:
620
raise TestSkipped('Format does not store old bound locations')
621
branch2 = self.make_branch('branch2')
623
self.assertIs(None, branch.get_old_bound_location())
625
self.assertContainsRe(branch.get_old_bound_location(), '\/branch2\/$')
628
class TestStrict(TestCaseWithBranch):
630
def test_strict_history(self):
631
tree1 = self.make_branch_and_tree('tree1')
633
tree1.branch.set_append_revisions_only(True)
634
except errors.UpgradeRequired:
635
raise TestSkipped('Format does not support strict history')
636
tree1.commit('empty commit')
637
tree2 = tree1.bzrdir.sprout('tree2').open_workingtree()
638
tree2.commit('empty commit 2')
639
tree1.pull(tree2.branch)
640
tree1.commit('empty commit 3')
641
tree2.commit('empty commit 4')
642
self.assertRaises(errors.DivergedBranches, tree1.pull, tree2.branch)
643
tree2.merge_from_branch(tree1.branch)
644
tree2.commit('empty commit 5')
645
self.assertRaises(errors.AppendRevisionsOnlyViolation, tree1.pull,
647
tree3 = tree1.bzrdir.sprout('tree3').open_workingtree()
648
tree3.merge_from_branch(tree2.branch)
649
tree3.commit('empty commit 6')
650
tree2.pull(tree3.branch)
184
class TestBranchTransaction(TestCaseInTempDir):
187
super(TestBranchTransaction, self).setUp()
188
self.branch = Branch.initialize('.')
190
def test_default_get_transaction(self):
191
"""branch.get_transaction on a new branch should give a PassThrough."""
192
self.failUnless(isinstance(self.branch.get_transaction(),
193
transactions.PassThroughTransaction))
195
def test__set_new_transaction(self):
196
self.branch._set_transaction(transactions.ReadOnlyTransaction())
198
def test__set_over_existing_transaction_raises(self):
199
self.branch._set_transaction(transactions.ReadOnlyTransaction())
200
self.assertRaises(errors.LockError,
201
self.branch._set_transaction,
202
transactions.ReadOnlyTransaction())
204
def test_finish_no_transaction_raises(self):
205
self.assertRaises(errors.LockError, self.branch._finish_transaction)
207
def test_finish_readonly_transaction_works(self):
208
self.branch._set_transaction(transactions.ReadOnlyTransaction())
209
self.branch._finish_transaction()
210
self.assertEqual(None, self.branch._transaction)
212
def test_unlock_calls_finish(self):
213
self.branch.lock_read()
214
transaction = InstrumentedTransaction()
215
self.branch._transaction = transaction
217
self.assertEqual(['finish'], transaction.calls)
219
def test_lock_read_acquires_ro_transaction(self):
220
self.branch.lock_read()
221
self.failUnless(isinstance(self.branch.get_transaction(),
222
transactions.ReadOnlyTransaction))
225
def test_lock_write_acquires_passthrough_transaction(self):
226
self.branch.lock_write()
227
# cannot use get_transaction as its magic
228
self.failUnless(isinstance(self.branch._transaction,
229
transactions.PassThroughTransaction))