1
# Copyright (C) 2005, 2006 Canonical Ltd
1
# (C) 2005 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
11
# GNU General Public License for more details.
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for branch implementations - tests a branch format."""
31
19
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
32
from bzrlib.delta import TreeDelta
33
from bzrlib.errors import (FileExists,
36
UninitializableFormat,
39
from bzrlib.osutils import getcwd
40
import bzrlib.revision
41
from bzrlib.tests import TestCase, TestCaseWithTransport, TestSkipped
42
from bzrlib.tests.bzrdir_implementations.test_bzrdir import TestCaseWithBzrDir
43
from bzrlib.tests.HttpServer import HttpServer
20
from bzrlib.clone import copy_branch
21
from bzrlib.commit import commit
22
import bzrlib.errors as errors
23
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
25
from bzrlib.selftest import TestCase, TestCaseInTempDir
26
from bzrlib.selftest.HTTPTestUtil import TestCaseWithWebserver
44
27
from bzrlib.trace import mutter
45
from bzrlib.transport import get_transport
46
from bzrlib.transport.memory import MemoryServer
47
from bzrlib.upgrade import upgrade
48
from bzrlib.workingtree import WorkingTree
28
import bzrlib.transactions as transactions
51
30
# TODO: Make a branch using basis branch, and check that it
52
31
# doesn't request any files that could have been avoided, by
53
32
# hooking into the Transport.
56
class TestCaseWithBranch(TestCaseWithBzrDir):
59
super(TestCaseWithBranch, self).setUp()
63
if self.branch is None:
64
self.branch = self.make_branch('')
67
def make_branch(self, relpath, format=None):
68
repo = self.make_repository(relpath, format=format)
69
# fixme RBC 20060210 this isnt necessarily a fixable thing,
70
# Skipped is the wrong exception to raise.
72
return self.branch_format.initialize(repo.bzrdir)
73
except errors.UninitializableFormat:
74
raise TestSkipped('Uninitializable branch format')
76
def make_repository(self, relpath, shared=False, format=None):
77
made_control = self.make_bzrdir(relpath, format=format)
78
return made_control.create_repository(shared=shared)
81
class TestBranch(TestCaseWithBranch):
34
class TestBranch(TestCaseInTempDir):
83
36
def test_append_revisions(self):
84
37
"""Test appending more than one revision"""
85
br = self.get_branch()
38
br = Branch.initialize(".")
86
39
br.append_revision("rev1")
87
40
self.assertEquals(br.revision_history(), ["rev1",])
88
41
br.append_revision("rev2", "rev3")
89
42
self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
90
self.assertRaises(errors.ReservedId, br.append_revision, 'current:')
92
def test_revision_ids_are_utf8(self):
93
br = self.get_branch()
94
br.set_revision_history(['rev1', 'rev2', 'rev3'])
95
rh = br.revision_history()
96
self.assertEqual(['rev1', 'rev2', 'rev3'], rh)
97
for revision_id in rh:
98
self.assertIsInstance(revision_id, str)
99
last = br.last_revision()
100
self.assertEqual('rev3', last)
101
self.assertIsInstance(last, str)
103
44
def test_fetch_revisions(self):
104
45
"""Test fetch-revision operation."""
105
get_transport(self.get_url()).mkdir('b1')
106
get_transport(self.get_url()).mkdir('b2')
107
wt = self.make_branch_and_tree('b1')
109
b2 = self.make_branch('b2')
110
file('b1/foo', 'w').write('hello')
111
wt.add(['foo'], ['foo-id'])
112
wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
46
from bzrlib.fetch import Fetcher
49
b1 = Branch.initialize('b1')
50
b2 = Branch.initialize('b2')
51
file(os.sep.join(['b1', 'foo']), 'w').write('hello')
52
b1.add(['foo'], ['foo-id'])
53
b1.commit('lala!', rev_id='revision-1', allow_pointless=False)
114
55
mutter('start fetch')
115
self.assertEqual((1, []), b2.fetch(b1))
117
rev = b2.repository.get_revision('revision-1')
118
tree = b2.repository.revision_tree('revision-1')
119
self.assertEqual(tree.get_file_text('foo-id'), 'hello')
121
def test_get_revision_delta(self):
122
tree_a = self.make_branch_and_tree('a')
123
self.build_tree(['a/foo'])
124
tree_a.add('foo', 'file1')
125
tree_a.commit('rev1', rev_id='rev1')
126
self.build_tree(['a/vla'])
127
tree_a.add('vla', 'file2')
128
tree_a.commit('rev2', rev_id='rev2')
130
delta = tree_a.branch.get_revision_delta(1)
131
self.assertIsInstance(delta, TreeDelta)
132
self.assertEqual([('foo', 'file1', 'file')], delta.added)
133
delta = tree_a.branch.get_revision_delta(2)
134
self.assertIsInstance(delta, TreeDelta)
135
self.assertEqual([('vla', 'file2', 'file')], delta.added)
137
def get_unbalanced_tree_pair(self):
56
f = Fetcher(from_branch=b1, to_branch=b2)
57
eq = self.assertEquals
59
eq(f.last_revision, 'revision-1')
61
rev = b2.get_revision('revision-1')
62
tree = b2.revision_tree('revision-1')
63
eq(tree.get_file_text('foo-id'), 'hello')
65
def get_unbalanced_branch_pair(self):
138
66
"""Return two branches, a and b, with one file in a."""
139
get_transport(self.get_url()).mkdir('a')
140
tree_a = self.make_branch_and_tree('a')
68
br_a = Branch.initialize("a")
141
69
file('a/b', 'wb').write('b')
143
tree_a.commit("silly commit", rev_id='A')
145
get_transport(self.get_url()).mkdir('b')
146
tree_b = self.make_branch_and_tree('b')
147
return tree_a, tree_b
71
commit(br_a, "silly commit", rev_id='A')
73
br_b = Branch.initialize("b")
149
76
def get_balanced_branch_pair(self):
150
77
"""Returns br_a, br_b as with one commit in a, and b has a's stores."""
151
tree_a, tree_b = self.get_unbalanced_tree_pair()
152
tree_b.branch.repository.fetch(tree_a.branch.repository)
153
return tree_a, tree_b
155
def test_clone_branch(self):
156
"""Copy the stores from one branch to another"""
157
tree_a, tree_b = self.get_balanced_branch_pair()
158
tree_b.commit("silly commit")
78
br_a, br_b = self.get_unbalanced_branch_pair()
79
br_a.push_stores(br_b)
82
def test_push_stores(self):
83
"""Copy the stores from one branch to another"""
84
br_a, br_b = self.get_unbalanced_branch_pair()
85
# ensure the revision is missing.
86
self.assertRaises(NoSuchRevision, br_b.get_revision,
87
br_a.revision_history()[0])
88
br_a.push_stores(br_b)
89
# check that b now has all the data from a's first commit.
90
rev = br_b.get_revision(br_a.revision_history()[0])
91
tree = br_b.revision_tree(br_a.revision_history()[0])
93
if tree.inventory[file_id].kind == "file":
94
tree.get_file(file_id).read()
97
def test_copy_branch(self):
98
"""Copy the stores from one branch to another"""
99
br_a, br_b = self.get_balanced_branch_pair()
100
commit(br_b, "silly commit")
160
# this fails to test that the history from a was not used.
161
dir_c = tree_a.bzrdir.clone('c', basis=tree_b.bzrdir)
162
self.assertEqual(tree_a.branch.revision_history(),
163
dir_c.open_branch().revision_history())
102
br_c = copy_branch(br_a, 'c', basis_branch=br_b)
103
self.assertEqual(br_a.revision_history(), br_c.revision_history())
165
def test_clone_partial(self):
105
def test_copy_partial(self):
166
106
"""Copy only part of the history of a branch."""
167
# TODO: RBC 20060208 test with a revision not on revision-history.
168
# what should that behaviour be ? Emailed the list.
169
wt_a = self.make_branch_and_tree('a')
170
self.build_tree(['a/one'])
172
wt_a.commit('commit one', rev_id='1')
173
self.build_tree(['a/two'])
175
wt_a.commit('commit two', rev_id='2')
176
repo_b = self.make_repository('b')
177
wt_a.bzrdir.open_repository().copy_content_into(repo_b)
178
br_b = wt_a.bzrdir.open_branch().clone(repo_b.bzrdir, revision_id='1')
179
self.assertEqual('1', br_b.last_revision())
181
def test_sprout_partial(self):
182
# test sprouting with a prefix of the revision-history.
183
# also needs not-on-revision-history behaviour defined.
184
wt_a = self.make_branch_and_tree('a')
185
self.build_tree(['a/one'])
187
wt_a.commit('commit one', rev_id='1')
188
self.build_tree(['a/two'])
190
wt_a.commit('commit two', rev_id='2')
191
repo_b = self.make_repository('b')
192
wt_a.bzrdir.open_repository().copy_content_into(repo_b)
193
br_b = wt_a.bzrdir.open_branch().sprout(repo_b.bzrdir, revision_id='1')
194
self.assertEqual('1', br_b.last_revision())
196
def get_parented_branch(self):
197
wt_a = self.make_branch_and_tree('a')
198
self.build_tree(['a/one'])
200
wt_a.commit('commit one', rev_id='1')
202
branch_b = wt_a.bzrdir.sprout('b', revision_id='1').open_branch()
203
self.assertEqual(wt_a.branch.base, branch_b.get_parent())
206
def test_clone_branch_nickname(self):
207
# test the nick name is preserved always
208
raise TestSkipped('XXX branch cloning is not yet tested..')
210
def test_clone_branch_parent(self):
211
# test the parent is preserved always
212
branch_b = self.get_parented_branch()
213
repo_c = self.make_repository('c')
214
branch_b.repository.copy_content_into(repo_c)
215
branch_c = branch_b.clone(repo_c.bzrdir)
216
self.assertNotEqual(None, branch_c.get_parent())
217
self.assertEqual(branch_b.get_parent(), branch_c.get_parent())
219
# We can also set a specific parent, and it should be honored
220
random_parent = 'http://bazaar-vcs.org/path/to/branch'
221
branch_b.set_parent(random_parent)
222
repo_d = self.make_repository('d')
223
branch_b.repository.copy_content_into(repo_d)
224
branch_d = branch_b.clone(repo_d.bzrdir)
225
self.assertEqual(random_parent, branch_d.get_parent())
227
def test_sprout_branch_nickname(self):
228
# test the nick name is reset always
229
raise TestSkipped('XXX branch sprouting is not yet tested..')
231
def test_sprout_branch_parent(self):
232
source = self.make_branch('source')
233
target = source.bzrdir.sprout(self.get_url('target')).open_branch()
234
self.assertEqual(source.bzrdir.root_transport.base, target.get_parent())
236
def test_submit_branch(self):
237
"""Submit location can be queried and set"""
238
branch = self.make_branch('branch')
239
self.assertEqual(branch.get_submit_branch(), None)
240
branch.set_submit_branch('sftp://example.com')
241
self.assertEqual(branch.get_submit_branch(), 'sftp://example.com')
242
branch.set_submit_branch('sftp://example.net')
243
self.assertEqual(branch.get_submit_branch(), 'sftp://example.net')
107
self.build_tree(['a/', 'a/one'])
108
br_a = Branch.initialize('a')
110
br_a.commit('commit one', rev_id='u@d-1')
111
self.build_tree(['a/two'])
113
br_a.commit('commit two', rev_id='u@d-2')
114
br_b = copy_branch(br_a, 'b', revision='u@d-1')
115
self.assertEqual(br_b.last_revision(), 'u@d-1')
116
self.assertTrue(os.path.exists('b/one'))
117
self.assertFalse(os.path.exists('b/two'))
245
def test_record_initial_ghost(self):
246
"""Branches should support having ghosts."""
247
wt = self.make_branch_and_tree('.')
248
wt.set_parent_ids(['non:existent@rev--ision--0--2'],
249
allow_leftmost_as_ghost=True)
250
rev_id = wt.commit('commit against a ghost first parent.')
251
rev = wt.branch.repository.get_revision(rev_id)
252
self.assertEqual(rev.parent_ids, ['non:existent@rev--ision--0--2'])
120
def test_record_initial_ghost_merge(self):
121
"""A pending merge with no revision present is still a merge."""
122
branch = Branch.initialize('.')
123
branch.add_pending_merge('non:existent@rev--ision--0--2')
124
branch.commit('pretend to merge nonexistent-revision', rev_id='first')
125
rev = branch.get_revision(branch.last_revision())
126
self.assertEqual(len(rev.parent_ids), 1)
253
127
# parent_sha1s is not populated now, WTF. rbc 20051003
254
128
self.assertEqual(len(rev.parent_sha1s), 0)
256
def test_record_two_ghosts(self):
257
"""Recording with all ghosts works."""
258
wt = self.make_branch_and_tree('.')
260
'foo@azkhazan-123123-abcabc',
261
'wibble@fofof--20050401--1928390812',
263
allow_leftmost_as_ghost=True)
264
rev_id = wt.commit("commit from ghost base with one merge")
265
# the revision should have been committed with two parents
266
rev = wt.branch.repository.get_revision(rev_id)
267
self.assertEqual(['foo@azkhazan-123123-abcabc',
268
'wibble@fofof--20050401--1928390812'],
271
def test_bad_revision(self):
272
self.assertRaises(errors.InvalidRevisionId,
273
self.get_branch().repository.get_revision,
129
self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
276
131
# TODO 20051003 RBC:
277
132
# compare the gpg-to-sign info for a commit with a ghost and
278
133
# an identical tree without a ghost
279
134
# fetch missing should rewrite the TOC of weaves to list newly available parents.
136
def test_pending_merges(self):
137
"""Tracking pending-merged revisions."""
138
b = Branch.initialize('.')
140
self.assertEquals(b.pending_merges(), [])
141
b.add_pending_merge('foo@azkhazan-123123-abcabc')
142
self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
143
b.add_pending_merge('foo@azkhazan-123123-abcabc')
144
self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
145
b.add_pending_merge('wibble@fofof--20050401--1928390812')
146
self.assertEquals(b.pending_merges(),
147
['foo@azkhazan-123123-abcabc',
148
'wibble@fofof--20050401--1928390812'])
149
b.commit("commit from base with two merges")
150
rev = b.get_revision(b.revision_history()[0])
151
self.assertEquals(len(rev.parent_ids), 2)
152
self.assertEquals(rev.parent_ids[0],
153
'foo@azkhazan-123123-abcabc')
154
self.assertEquals(rev.parent_ids[1],
155
'wibble@fofof--20050401--1928390812')
156
# list should be cleared when we do a commit
157
self.assertEquals(b.pending_merges(), [])
281
159
def test_sign_existing_revision(self):
282
wt = self.make_branch_and_tree('.')
284
wt.commit("base", allow_pointless=True, rev_id='A')
160
branch = Branch.initialize('.')
161
branch.commit("base", allow_pointless=True, rev_id='A')
285
162
from bzrlib.testament import Testament
286
strategy = gpg.LoopbackGPGStrategy(None)
287
branch.repository.sign_revision('A', strategy)
288
self.assertEqual(Testament.from_revision(branch.repository,
289
'A').as_short_text(),
290
branch.repository.get_signature_text('A'))
163
branch.sign_revision('A', bzrlib.gpg.LoopbackGPGStrategy(None))
164
self.assertEqual(Testament.from_revision(branch, 'A').as_short_text(),
165
branch.revision_store.get('A', 'sig').read())
292
167
def test_store_signature(self):
293
wt = self.make_branch_and_tree('.')
295
branch.repository.store_revision_signature(
296
gpg.LoopbackGPGStrategy(None), 'FOO', 'A')
297
self.assertRaises(errors.NoSuchRevision,
298
branch.repository.has_signature_for_revision_id,
300
wt.commit("base", allow_pointless=True, rev_id='A')
301
self.assertEqual('FOO',
302
branch.repository.get_signature_text('A'))
304
def test_branch_keeps_signatures(self):
305
wt = self.make_branch_and_tree('source')
306
wt.commit('A', allow_pointless=True, rev_id='A')
307
wt.branch.repository.sign_revision('A',
308
gpg.LoopbackGPGStrategy(None))
309
#FIXME: clone should work to urls,
310
# wt.clone should work to disks.
311
self.build_tree(['target/'])
312
d2 = wt.bzrdir.clone('target')
313
self.assertEqual(wt.branch.repository.get_signature_text('A'),
314
d2.open_repository().get_signature_text('A'))
316
def test_nicks(self):
317
"""Branch nicknames"""
318
t = get_transport(self.get_url())
320
branch = self.make_branch('bzr.dev')
321
self.assertEqual(branch.nick, 'bzr.dev')
322
t.move('bzr.dev', 'bzr.ab')
323
branch = Branch.open(self.get_url('bzr.ab'))
324
self.assertEqual(branch.nick, 'bzr.ab')
325
branch.nick = "Aaron's branch"
326
branch.nick = "Aaron's branch"
330
branch.control_files.controlfilename("branch.conf")
334
self.assertEqual(branch.nick, "Aaron's branch")
335
t.move('bzr.ab', 'integration')
336
branch = Branch.open(self.get_url('integration'))
337
self.assertEqual(branch.nick, "Aaron's branch")
338
branch.nick = u"\u1234"
339
self.assertEqual(branch.nick, u"\u1234")
341
def test_commit_nicks(self):
342
"""Nicknames are committed to the revision"""
343
get_transport(self.get_url()).mkdir('bzr.dev')
344
wt = self.make_branch_and_tree('bzr.dev')
346
branch.nick = "My happy branch"
347
wt.commit('My commit respect da nick.')
348
committed = branch.repository.get_revision(branch.last_revision())
349
self.assertEqual(committed.properties["branch-nick"],
352
def test_create_open_branch_uses_repository(self):
354
repo = self.make_repository('.', shared=True)
355
except errors.IncompatibleFormat:
357
repo.bzrdir.root_transport.mkdir('child')
358
child_dir = self.bzrdir_format.initialize('child')
360
child_branch = self.branch_format.initialize(child_dir)
361
except errors.UninitializableFormat:
362
# branch references are not default init'able.
364
self.assertEqual(repo.bzrdir.root_transport.base,
365
child_branch.repository.bzrdir.root_transport.base)
366
child_branch = branch.Branch.open(self.get_url('child'))
367
self.assertEqual(repo.bzrdir.root_transport.base,
368
child_branch.repository.bzrdir.root_transport.base)
370
def test_format_description(self):
371
tree = self.make_branch_and_tree('tree')
372
text = tree.branch._format.get_format_description()
373
self.failUnless(len(text))
375
def test_check_branch_report_results(self):
376
"""Checking a branch produces results which can be printed"""
377
branch = self.make_branch('.')
378
result = branch.check()
379
# reports results through logging
380
result.report_results(verbose=True)
381
result.report_results(verbose=False)
383
def test_get_commit_builder(self):
384
self.assertIsInstance(self.make_branch(".").get_commit_builder([]),
385
repository.CommitBuilder)
387
def test_generate_revision_history(self):
388
"""Create a fake revision history easily."""
389
tree = self.make_branch_and_tree('.')
390
rev1 = tree.commit('foo')
391
orig_history = tree.branch.revision_history()
392
rev2 = tree.commit('bar', allow_pointless=True)
393
tree.branch.generate_revision_history(rev1)
394
self.assertEqual(orig_history, tree.branch.revision_history())
396
def test_generate_revision_history_NULL_REVISION(self):
397
tree = self.make_branch_and_tree('.')
398
rev1 = tree.commit('foo')
399
tree.branch.generate_revision_history(bzrlib.revision.NULL_REVISION)
400
self.assertEqual([], tree.branch.revision_history())
402
def test_create_checkout(self):
403
tree_a = self.make_branch_and_tree('a')
404
branch_a = tree_a.branch
405
checkout_b = branch_a.create_checkout('b')
406
self.assertEqual(None, checkout_b.last_revision())
407
checkout_b.commit('rev1', rev_id='rev1')
408
self.assertEqual('rev1', branch_a.last_revision())
409
self.assertNotEqual(checkout_b.branch.base, branch_a.base)
411
checkout_c = branch_a.create_checkout('c', lightweight=True)
412
self.assertEqual('rev1', checkout_c.last_revision())
413
checkout_c.commit('rev2', rev_id='rev2')
414
self.assertEqual('rev2', branch_a.last_revision())
415
self.assertEqual(checkout_c.branch.base, branch_a.base)
418
checkout_d = branch_a.create_checkout('d', lightweight=True)
419
self.assertEqual('rev2', checkout_d.last_revision())
421
checkout_e = branch_a.create_checkout('e')
422
self.assertEqual('rev2', checkout_e.last_revision())
424
def test_create_anonymous_lightweight_checkout(self):
425
"""A lightweight checkout from a readonly branch should succeed."""
426
tree_a = self.make_branch_and_tree('a')
427
rev_id = tree_a.commit('put some content in the branch')
428
source_branch = bzrlib.branch.Branch.open(
429
'readonly+' + tree_a.bzrdir.root_transport.base)
430
# sanity check that the test will be valid
431
self.assertRaises((errors.LockError, errors.TransportNotPossible),
432
source_branch.lock_write)
433
checkout = source_branch.create_checkout('c', lightweight=True)
434
self.assertEqual(rev_id, checkout.last_revision())
436
def test_create_anonymous_heavyweight_checkout(self):
437
"""A regular checkout from a readonly branch should succeed."""
438
tree_a = self.make_branch_and_tree('a')
439
rev_id = tree_a.commit('put some content in the branch')
440
source_branch = bzrlib.branch.Branch.open(
441
'readonly+' + tree_a.bzrdir.root_transport.base)
442
# sanity check that the test will be valid
443
self.assertRaises((errors.LockError, errors.TransportNotPossible),
444
source_branch.lock_write)
445
checkout = source_branch.create_checkout('c')
446
self.assertEqual(rev_id, checkout.last_revision())
449
class ChrootedTests(TestCaseWithBranch):
450
"""A support class that provides readonly urls outside the local namespace.
452
This is done by checking if self.transport_server is a MemoryServer. if it
453
is then we are chrooted already, if it is not then an HttpServer is used
458
super(ChrootedTests, self).setUp()
459
if not self.transport_server == MemoryServer:
460
self.transport_readonly_server = HttpServer
168
branch = Branch.initialize('.')
169
branch.store_revision_signature(bzrlib.gpg.LoopbackGPGStrategy(None),
171
self.assertEqual('FOO', branch.revision_store.get('A', 'sig').read())
174
class TestRemote(TestCaseWithWebserver):
462
176
def test_open_containing(self):
463
177
self.assertRaises(NotBranchError, Branch.open_containing,
464
self.get_readonly_url(''))
178
self.get_remote_url(''))
465
179
self.assertRaises(NotBranchError, Branch.open_containing,
466
self.get_readonly_url('g/p/q'))
467
branch = self.make_branch('.')
468
branch, relpath = Branch.open_containing(self.get_readonly_url(''))
180
self.get_remote_url('g/p/q'))
181
b = Branch.initialize('.')
182
branch, relpath = Branch.open_containing(self.get_remote_url(''))
469
183
self.assertEqual('', relpath)
470
branch, relpath = Branch.open_containing(self.get_readonly_url('g/p/q'))
184
branch, relpath = Branch.open_containing(self.get_remote_url('g/p/q'))
471
185
self.assertEqual('g/p/q', relpath)
187
# TODO: rewrite this as a regular unittest, without relying on the displayed output
188
# >>> from bzrlib.commit import commit
189
# >>> bzrlib.trace.silent = True
190
# >>> br1 = ScratchBranch(files=['foo', 'bar'])
193
# >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
194
# >>> br2 = ScratchBranch()
195
# >>> br2.update_revisions(br1)
197
# Added 1 inventories.
199
# >>> br2.revision_history()
201
# >>> br2.update_revisions(br1)
203
# >>> br1.text_store.total_size() == br2.text_store.total_size()
474
206
class InstrumentedTransaction(object):
534
266
self.assertEqual(['lw', 'ul'], branch._calls)
537
class TestBranchTransaction(TestCaseWithBranch):
269
class TestBranchTransaction(TestCaseInTempDir):
540
272
super(TestBranchTransaction, self).setUp()
273
self.branch = Branch.initialize('.')
543
275
def test_default_get_transaction(self):
544
276
"""branch.get_transaction on a new branch should give a PassThrough."""
545
self.failUnless(isinstance(self.get_branch().get_transaction(),
277
self.failUnless(isinstance(self.branch.get_transaction(),
546
278
transactions.PassThroughTransaction))
548
280
def test__set_new_transaction(self):
549
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
281
self.branch._set_transaction(transactions.ReadOnlyTransaction())
551
283
def test__set_over_existing_transaction_raises(self):
552
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
284
self.branch._set_transaction(transactions.ReadOnlyTransaction())
553
285
self.assertRaises(errors.LockError,
554
self.get_branch()._set_transaction,
286
self.branch._set_transaction,
555
287
transactions.ReadOnlyTransaction())
557
289
def test_finish_no_transaction_raises(self):
558
self.assertRaises(errors.LockError, self.get_branch()._finish_transaction)
290
self.assertRaises(errors.LockError, self.branch._finish_transaction)
560
292
def test_finish_readonly_transaction_works(self):
561
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
562
self.get_branch()._finish_transaction()
563
self.assertEqual(None, self.get_branch().control_files._transaction)
293
self.branch._set_transaction(transactions.ReadOnlyTransaction())
294
self.branch._finish_transaction()
295
self.assertEqual(None, self.branch._transaction)
565
297
def test_unlock_calls_finish(self):
566
self.get_branch().lock_read()
298
self.branch.lock_read()
567
299
transaction = InstrumentedTransaction()
568
self.get_branch().control_files._transaction = transaction
569
self.get_branch().unlock()
300
self.branch._transaction = transaction
570
302
self.assertEqual(['finish'], transaction.calls)
572
304
def test_lock_read_acquires_ro_transaction(self):
573
self.get_branch().lock_read()
574
self.failUnless(isinstance(self.get_branch().get_transaction(),
305
self.branch.lock_read()
306
self.failUnless(isinstance(self.branch.get_transaction(),
575
307
transactions.ReadOnlyTransaction))
576
self.get_branch().unlock()
578
def test_lock_write_acquires_write_transaction(self):
579
self.get_branch().lock_write()
310
def test_lock_write_acquires_passthrough_transaction(self):
311
self.branch.lock_write()
580
312
# cannot use get_transaction as its magic
581
self.failUnless(isinstance(self.get_branch().control_files._transaction,
582
transactions.WriteTransaction))
583
self.get_branch().unlock()
586
class TestBranchPushLocations(TestCaseWithBranch):
588
def test_get_push_location_unset(self):
589
self.assertEqual(None, self.get_branch().get_push_location())
591
def test_get_push_location_exact(self):
592
from bzrlib.config import (locations_config_filename,
593
ensure_config_dir_exists)
594
ensure_config_dir_exists()
595
fn = locations_config_filename()
596
print >> open(fn, 'wt'), ("[%s]\n"
597
"push_location=foo" %
598
self.get_branch().base[:-1])
599
self.assertEqual("foo", self.get_branch().get_push_location())
601
def test_set_push_location(self):
602
from bzrlib.config import (locations_config_filename,
603
ensure_config_dir_exists)
604
ensure_config_dir_exists()
605
fn = locations_config_filename()
606
branch = self.get_branch()
607
branch.set_push_location('foo')
608
local_path = urlutils.local_path_from_url(branch.base[:-1])
609
self.assertFileEqual("[%s]\n"
610
"push_location = foo\n"
611
"push_location:policy = norecurse" % local_path,
614
# TODO RBC 20051029 test getting a push location from a branch in a
615
# recursive section - that is, it appends the branch name.
618
class TestFormat(TestCaseWithBranch):
619
"""Tests for the format itself."""
621
def test_format_initialize_find_open(self):
622
# loopback test to check the current format initializes to itself.
623
if not self.branch_format.is_supported():
624
# unsupported formats are not loopback testable
625
# because the default open will not open them and
626
# they may not be initializable.
628
# supported formats must be able to init and open
629
t = get_transport(self.get_url())
630
readonly_t = get_transport(self.get_readonly_url())
631
made_branch = self.make_branch('.')
632
self.failUnless(isinstance(made_branch, branch.Branch))
634
# find it via bzrdir opening:
635
opened_control = bzrdir.BzrDir.open(readonly_t.base)
636
direct_opened_branch = opened_control.open_branch()
637
self.assertEqual(direct_opened_branch.__class__, made_branch.__class__)
638
self.assertEqual(opened_control, direct_opened_branch.bzrdir)
639
self.failUnless(isinstance(direct_opened_branch._format,
640
self.branch_format.__class__))
642
# find it via Branch.open
643
opened_branch = branch.Branch.open(readonly_t.base)
644
self.failUnless(isinstance(opened_branch, made_branch.__class__))
645
self.assertEqual(made_branch._format.__class__,
646
opened_branch._format.__class__)
647
# if it has a unique id string, can we probe for it ?
649
self.branch_format.get_format_string()
650
except NotImplementedError:
652
self.assertEqual(self.branch_format,
653
branch.BranchFormat.find_format(opened_control))
313
self.failUnless(isinstance(self.branch._transaction,
314
transactions.PassThroughTransaction))