1
# Copyright (C) 2005, 2006 Canonical Ltd
1
# (C) 2005 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
11
# GNU General Public License for more details.
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for branch implementations - tests a branch format."""
31
20
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
32
from bzrlib.delta import TreeDelta
33
from bzrlib.errors import (FileExists,
36
UninitializableFormat,
21
from bzrlib.clone import copy_branch
22
from bzrlib.commit import commit
23
import bzrlib.errors as errors
24
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
39
26
from bzrlib.osutils import getcwd
40
import bzrlib.revision
41
from bzrlib.tests import TestCase, TestCaseWithTransport, TestSkipped
42
from bzrlib.tests.bzrdir_implementations.test_bzrdir import TestCaseWithBzrDir
27
from bzrlib.tests import TestCase, TestCaseInTempDir
28
from bzrlib.tests.HTTPTestUtil import TestCaseWithWebserver
43
29
from bzrlib.trace import mutter
44
from bzrlib.transport import get_transport
45
from bzrlib.transport.http import HttpServer
46
from bzrlib.transport.memory import MemoryServer
47
from bzrlib.upgrade import upgrade
48
from bzrlib.workingtree import WorkingTree
30
import bzrlib.transactions as transactions
31
from bzrlib.revision import NULL_REVISION
51
33
# TODO: Make a branch using basis branch, and check that it
52
34
# doesn't request any files that could have been avoided, by
53
35
# hooking into the Transport.
56
class TestCaseWithBranch(TestCaseWithBzrDir):
59
super(TestCaseWithBranch, self).setUp()
63
if self.branch is None:
64
self.branch = self.make_branch('')
67
def make_branch(self, relpath, format=None):
68
repo = self.make_repository(relpath, format=format)
69
# fixme RBC 20060210 this isnt necessarily a fixable thing,
70
# Skipped is the wrong exception to raise.
72
return self.branch_format.initialize(repo.bzrdir)
73
except errors.UninitializableFormat:
74
raise TestSkipped('Uninitializable branch format')
76
def make_repository(self, relpath, shared=False, format=None):
77
made_control = self.make_bzrdir(relpath, format=format)
78
return made_control.create_repository(shared=shared)
81
class TestBranch(TestCaseWithBranch):
37
class TestBranch(TestCaseInTempDir):
83
39
def test_append_revisions(self):
84
40
"""Test appending more than one revision"""
85
br = self.get_branch()
41
br = Branch.initialize(u".")
86
42
br.append_revision("rev1")
87
43
self.assertEquals(br.revision_history(), ["rev1",])
88
44
br.append_revision("rev2", "rev3")
91
47
def test_fetch_revisions(self):
92
48
"""Test fetch-revision operation."""
93
get_transport(self.get_url()).mkdir('b1')
94
get_transport(self.get_url()).mkdir('b2')
95
wt = self.make_branch_and_tree('b1')
97
b2 = self.make_branch('b2')
49
from bzrlib.fetch import Fetcher
52
b1 = Branch.initialize('b1')
53
b2 = Branch.initialize('b2')
98
54
file('b1/foo', 'w').write('hello')
99
wt.add(['foo'], ['foo-id'])
100
wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
55
b1.working_tree().add(['foo'], ['foo-id'])
56
b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=False)
102
58
mutter('start fetch')
103
self.assertEqual((1, []), b2.fetch(b1))
105
rev = b2.repository.get_revision('revision-1')
106
tree = b2.repository.revision_tree('revision-1')
107
self.assertEqual(tree.get_file_text('foo-id'), 'hello')
109
def test_get_revision_delta(self):
110
tree_a = self.make_branch_and_tree('a')
111
self.build_tree(['a/foo'])
112
tree_a.add('foo', 'file1')
113
tree_a.commit('rev1', rev_id='rev1')
114
self.build_tree(['a/vla'])
115
tree_a.add('vla', 'file2')
116
tree_a.commit('rev2', rev_id='rev2')
118
delta = tree_a.branch.get_revision_delta(1)
119
self.assertIsInstance(delta, TreeDelta)
120
self.assertEqual([('foo', 'file1', 'file')], delta.added)
121
delta = tree_a.branch.get_revision_delta(2)
122
self.assertIsInstance(delta, TreeDelta)
123
self.assertEqual([('vla', 'file2', 'file')], delta.added)
125
def get_unbalanced_tree_pair(self):
59
f = Fetcher(from_branch=b1, to_branch=b2)
60
eq = self.assertEquals
62
eq(f.last_revision, 'revision-1')
64
rev = b2.get_revision('revision-1')
65
tree = b2.revision_tree('revision-1')
66
eq(tree.get_file_text('foo-id'), 'hello')
68
def test_revision_tree(self):
69
b1 = Branch.initialize(u'.')
70
b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=True)
71
tree = b1.revision_tree('revision-1')
72
tree = b1.revision_tree(None)
73
self.assertEqual(len(tree.list_files()), 0)
74
tree = b1.revision_tree(NULL_REVISION)
75
self.assertEqual(len(tree.list_files()), 0)
77
def get_unbalanced_branch_pair(self):
126
78
"""Return two branches, a and b, with one file in a."""
127
get_transport(self.get_url()).mkdir('a')
128
tree_a = self.make_branch_and_tree('a')
80
br_a = Branch.initialize("a")
129
81
file('a/b', 'wb').write('b')
131
tree_a.commit("silly commit", rev_id='A')
133
get_transport(self.get_url()).mkdir('b')
134
tree_b = self.make_branch_and_tree('b')
135
return tree_a, tree_b
82
br_a.working_tree().add('b')
83
commit(br_a, "silly commit", rev_id='A')
85
br_b = Branch.initialize("b")
137
88
def get_balanced_branch_pair(self):
138
89
"""Returns br_a, br_b as with one commit in a, and b has a's stores."""
139
tree_a, tree_b = self.get_unbalanced_tree_pair()
140
tree_b.branch.repository.fetch(tree_a.branch.repository)
141
return tree_a, tree_b
143
def test_clone_branch(self):
144
"""Copy the stores from one branch to another"""
145
tree_a, tree_b = self.get_balanced_branch_pair()
146
tree_b.commit("silly commit")
90
br_a, br_b = self.get_unbalanced_branch_pair()
91
br_a.push_stores(br_b)
94
def test_push_stores(self):
95
"""Copy the stores from one branch to another"""
96
br_a, br_b = self.get_unbalanced_branch_pair()
97
# ensure the revision is missing.
98
self.assertRaises(NoSuchRevision, br_b.get_revision,
99
br_a.revision_history()[0])
100
br_a.push_stores(br_b)
101
# check that b now has all the data from a's first commit.
102
rev = br_b.get_revision(br_a.revision_history()[0])
103
tree = br_b.revision_tree(br_a.revision_history()[0])
105
if tree.inventory[file_id].kind == "file":
106
tree.get_file(file_id).read()
109
def test_copy_branch(self):
110
"""Copy the stores from one branch to another"""
111
br_a, br_b = self.get_balanced_branch_pair()
112
commit(br_b, "silly commit")
148
# this fails to test that the history from a was not used.
149
dir_c = tree_a.bzrdir.clone('c', basis=tree_b.bzrdir)
150
self.assertEqual(tree_a.branch.revision_history(),
151
dir_c.open_branch().revision_history())
114
br_c = copy_branch(br_a, 'c', basis_branch=br_b)
115
self.assertEqual(br_a.revision_history(), br_c.revision_history())
153
def test_clone_partial(self):
117
def test_copy_partial(self):
154
118
"""Copy only part of the history of a branch."""
155
# TODO: RBC 20060208 test with a revision not on revision-history.
156
# what should that behaviour be ? Emailed the list.
157
wt_a = self.make_branch_and_tree('a')
158
self.build_tree(['a/one'])
160
wt_a.commit('commit one', rev_id='1')
161
self.build_tree(['a/two'])
163
wt_a.commit('commit two', rev_id='2')
164
repo_b = self.make_repository('b')
165
wt_a.bzrdir.open_repository().copy_content_into(repo_b)
166
br_b = wt_a.bzrdir.open_branch().clone(repo_b.bzrdir, revision_id='1')
167
self.assertEqual(br_b.last_revision(), '1')
169
def test_sprout_partial(self):
170
# test sprouting with a prefix of the revision-history.
171
# also needs not-on-revision-history behaviour defined.
172
wt_a = self.make_branch_and_tree('a')
173
self.build_tree(['a/one'])
175
wt_a.commit('commit one', rev_id='1')
176
self.build_tree(['a/two'])
178
wt_a.commit('commit two', rev_id='2')
179
repo_b = self.make_repository('b')
180
wt_a.bzrdir.open_repository().copy_content_into(repo_b)
181
br_b = wt_a.bzrdir.open_branch().sprout(repo_b.bzrdir, revision_id='1')
182
self.assertEqual(br_b.last_revision(), '1')
184
def get_parented_branch(self):
185
wt_a = self.make_branch_and_tree('a')
186
self.build_tree(['a/one'])
188
wt_a.commit('commit one', rev_id='1')
190
branch_b = wt_a.bzrdir.sprout('b', revision_id='1').open_branch()
191
self.assertEqual(wt_a.branch.base, branch_b.get_parent())
194
def test_clone_branch_nickname(self):
195
# test the nick name is preserved always
196
raise TestSkipped('XXX branch cloning is not yet tested..')
198
def test_clone_branch_parent(self):
199
# test the parent is preserved always
200
branch_b = self.get_parented_branch()
201
repo_c = self.make_repository('c')
202
branch_b.repository.copy_content_into(repo_c)
203
branch_c = branch_b.clone(repo_c.bzrdir)
204
self.assertNotEqual(None, branch_c.get_parent())
205
self.assertEqual(branch_b.get_parent(), branch_c.get_parent())
207
# We can also set a specific parent, and it should be honored
208
random_parent = 'http://bazaar-vcs.org/path/to/branch'
209
branch_b.set_parent(random_parent)
210
repo_d = self.make_repository('d')
211
branch_b.repository.copy_content_into(repo_d)
212
branch_d = branch_b.clone(repo_d.bzrdir)
213
self.assertEqual(random_parent, branch_d.get_parent())
215
def test_sprout_branch_nickname(self):
216
# test the nick name is reset always
217
raise TestSkipped('XXX branch sprouting is not yet tested..')
219
def test_sprout_branch_parent(self):
220
source = self.make_branch('source')
221
target = source.bzrdir.sprout(self.get_url('target')).open_branch()
222
self.assertEqual(source.bzrdir.root_transport.base, target.get_parent())
224
def test_submit_branch(self):
225
"""Submit location can be queried and set"""
226
branch = self.make_branch('branch')
227
self.assertEqual(branch.get_submit_branch(), None)
228
branch.set_submit_branch('sftp://example.com')
229
self.assertEqual(branch.get_submit_branch(), 'sftp://example.com')
230
branch.set_submit_branch('sftp://example.net')
231
self.assertEqual(branch.get_submit_branch(), 'sftp://example.net')
119
self.build_tree(['a/', 'a/one'])
120
br_a = Branch.initialize('a')
121
br_a.working_tree().add(['one'])
122
br_a.working_tree().commit('commit one', rev_id='u@d-1')
123
self.build_tree(['a/two'])
124
br_a.working_tree().add(['two'])
125
br_a.working_tree().commit('commit two', rev_id='u@d-2')
126
br_b = copy_branch(br_a, 'b', revision='u@d-1')
127
self.assertEqual(br_b.last_revision(), 'u@d-1')
128
self.assertTrue(os.path.exists('b/one'))
129
self.assertFalse(os.path.exists('b/two'))
233
131
def test_record_initial_ghost_merge(self):
234
132
"""A pending merge with no revision present is still a merge."""
235
wt = self.make_branch_and_tree('.')
237
wt.add_pending_merge('non:existent@rev--ision--0--2')
238
wt.commit('pretend to merge nonexistent-revision', rev_id='first')
239
rev = branch.repository.get_revision(branch.last_revision())
133
branch = Branch.initialize(u'.')
134
branch.working_tree().add_pending_merge('non:existent@rev--ision--0--2')
135
branch.working_tree().commit('pretend to merge nonexistent-revision', rev_id='first')
136
rev = branch.get_revision(branch.last_revision())
240
137
self.assertEqual(len(rev.parent_ids), 1)
241
138
# parent_sha1s is not populated now, WTF. rbc 20051003
242
139
self.assertEqual(len(rev.parent_sha1s), 0)
243
140
self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
245
142
def test_bad_revision(self):
246
self.assertRaises(errors.InvalidRevisionId,
247
self.get_branch().repository.get_revision,
143
branch = Branch.initialize(u'.')
144
self.assertRaises(errors.InvalidRevisionId, branch.get_revision, None)
250
146
# TODO 20051003 RBC:
251
147
# compare the gpg-to-sign info for a commit with a ghost and
276
172
self.assertEquals(wt.pending_merges(), [])
278
174
def test_sign_existing_revision(self):
279
wt = self.make_branch_and_tree('.')
281
wt.commit("base", allow_pointless=True, rev_id='A')
175
branch = Branch.initialize(u'.')
176
branch.working_tree().commit("base", allow_pointless=True, rev_id='A')
282
177
from bzrlib.testament import Testament
283
strategy = gpg.LoopbackGPGStrategy(None)
284
branch.repository.sign_revision('A', strategy)
285
self.assertEqual(Testament.from_revision(branch.repository,
286
'A').as_short_text(),
287
branch.repository.get_signature_text('A'))
178
branch.sign_revision('A', bzrlib.gpg.LoopbackGPGStrategy(None))
179
self.assertEqual(Testament.from_revision(branch, 'A').as_short_text(),
180
branch.revision_store.get('A', 'sig').read())
289
182
def test_store_signature(self):
290
wt = self.make_branch_and_tree('.')
292
branch.repository.store_revision_signature(
293
gpg.LoopbackGPGStrategy(None), 'FOO', 'A')
294
self.assertRaises(errors.NoSuchRevision,
295
branch.repository.has_signature_for_revision_id,
297
wt.commit("base", allow_pointless=True, rev_id='A')
298
self.assertEqual('FOO',
299
branch.repository.get_signature_text('A'))
183
branch = Branch.initialize(u'.')
184
branch.store_revision_signature(bzrlib.gpg.LoopbackGPGStrategy(None),
186
self.assertEqual('FOO', branch.revision_store.get('A', 'sig').read())
301
def test_branch_keeps_signatures(self):
302
wt = self.make_branch_and_tree('source')
303
wt.commit('A', allow_pointless=True, rev_id='A')
304
wt.branch.repository.sign_revision('A',
305
gpg.LoopbackGPGStrategy(None))
306
#FIXME: clone should work to urls,
307
# wt.clone should work to disks.
308
self.build_tree(['target/'])
309
d2 = wt.bzrdir.clone('target')
310
self.assertEqual(wt.branch.repository.get_signature_text('A'),
311
d2.open_repository().get_signature_text('A'))
188
def test__relcontrolfilename(self):
189
branch = Branch.initialize(u'.')
190
self.assertEqual('.bzr/%25', branch._rel_controlfilename('%'))
192
def test__relcontrolfilename_empty(self):
193
branch = Branch.initialize(u'.')
194
self.assertEqual('.bzr', branch._rel_controlfilename(''))
313
196
def test_nicks(self):
314
197
"""Branch nicknames"""
315
t = get_transport(self.get_url())
317
branch = self.make_branch('bzr.dev')
199
branch = Branch.initialize('bzr.dev')
318
200
self.assertEqual(branch.nick, 'bzr.dev')
319
t.move('bzr.dev', 'bzr.ab')
320
branch = Branch.open(self.get_url('bzr.ab'))
201
os.rename('bzr.dev', 'bzr.ab')
202
branch = Branch.open('bzr.ab')
321
203
self.assertEqual(branch.nick, 'bzr.ab')
322
204
branch.nick = "Aaron's branch"
323
205
branch.nick = "Aaron's branch"
327
branch.control_files.controlfilename("branch.conf")
206
self.failUnless(os.path.exists(branch.controlfilename("branch.conf")))
331
207
self.assertEqual(branch.nick, "Aaron's branch")
332
t.move('bzr.ab', 'integration')
333
branch = Branch.open(self.get_url('integration'))
208
os.rename('bzr.ab', 'integration')
209
branch = Branch.open('integration')
334
210
self.assertEqual(branch.nick, "Aaron's branch")
335
211
branch.nick = u"\u1234"
336
212
self.assertEqual(branch.nick, u"\u1234")
338
214
def test_commit_nicks(self):
339
215
"""Nicknames are committed to the revision"""
340
get_transport(self.get_url()).mkdir('bzr.dev')
341
wt = self.make_branch_and_tree('bzr.dev')
217
branch = Branch.initialize('bzr.dev')
343
218
branch.nick = "My happy branch"
344
wt.commit('My commit respect da nick.')
345
committed = branch.repository.get_revision(branch.last_revision())
219
branch.working_tree().commit('My commit respect da nick.')
220
committed = branch.get_revision(branch.last_revision())
346
221
self.assertEqual(committed.properties["branch-nick"],
347
222
"My happy branch")
349
def test_create_open_branch_uses_repository(self):
351
repo = self.make_repository('.', shared=True)
352
except errors.IncompatibleFormat:
354
repo.bzrdir.root_transport.mkdir('child')
355
child_dir = self.bzrdir_format.initialize('child')
357
child_branch = self.branch_format.initialize(child_dir)
358
except errors.UninitializableFormat:
359
# branch references are not default init'able.
361
self.assertEqual(repo.bzrdir.root_transport.base,
362
child_branch.repository.bzrdir.root_transport.base)
363
child_branch = branch.Branch.open(self.get_url('child'))
364
self.assertEqual(repo.bzrdir.root_transport.base,
365
child_branch.repository.bzrdir.root_transport.base)
367
def test_format_description(self):
368
tree = self.make_branch_and_tree('tree')
369
text = tree.branch._format.get_format_description()
370
self.failUnless(len(text))
372
def test_check_branch_report_results(self):
373
"""Checking a branch produces results which can be printed"""
374
branch = self.make_branch('.')
375
result = branch.check()
376
# reports results through logging
377
result.report_results(verbose=True)
378
result.report_results(verbose=False)
380
def test_get_commit_builder(self):
381
self.assertIsInstance(self.make_branch(".").get_commit_builder([]),
382
repository.CommitBuilder)
384
def test_generate_revision_history(self):
385
"""Create a fake revision history easily."""
386
tree = self.make_branch_and_tree('.')
387
rev1 = tree.commit('foo')
388
orig_history = tree.branch.revision_history()
389
rev2 = tree.commit('bar', allow_pointless=True)
390
tree.branch.generate_revision_history(rev1)
391
self.assertEqual(orig_history, tree.branch.revision_history())
393
def test_generate_revision_history_NULL_REVISION(self):
394
tree = self.make_branch_and_tree('.')
395
rev1 = tree.commit('foo')
396
tree.branch.generate_revision_history(bzrlib.revision.NULL_REVISION)
397
self.assertEqual([], tree.branch.revision_history())
399
def test_create_checkout(self):
400
tree_a = self.make_branch_and_tree('a')
401
branch_a = tree_a.branch
402
checkout_b = branch_a.create_checkout('b')
403
checkout_b.commit('rev1', rev_id='rev1')
404
self.assertEqual('rev1', branch_a.last_revision())
405
self.assertNotEqual(checkout_b.branch.base, branch_a.base)
407
checkout_c = branch_a.create_checkout('c', lightweight=True)
408
checkout_c.commit('rev2', rev_id='rev2')
409
self.assertEqual('rev2', branch_a.last_revision())
410
self.assertEqual(checkout_c.branch.base, branch_a.base)
413
checkout_d = branch_a.create_checkout('d', lightweight=True)
415
checkout_e = branch_a.create_checkout('e')
418
class ChrootedTests(TestCaseWithBranch):
419
"""A support class that provides readonly urls outside the local namespace.
421
This is done by checking if self.transport_server is a MemoryServer. if it
422
is then we are chrooted already, if it is not then an HttpServer is used
427
super(ChrootedTests, self).setUp()
428
if not self.transport_server == MemoryServer:
429
self.transport_readonly_server = HttpServer
225
class TestRemote(TestCaseWithWebserver):
431
227
def test_open_containing(self):
432
228
self.assertRaises(NotBranchError, Branch.open_containing,
433
self.get_readonly_url(''))
229
self.get_remote_url(''))
434
230
self.assertRaises(NotBranchError, Branch.open_containing,
435
self.get_readonly_url('g/p/q'))
436
branch = self.make_branch('.')
437
branch, relpath = Branch.open_containing(self.get_readonly_url(''))
231
self.get_remote_url('g/p/q'))
232
b = Branch.initialize(u'.')
233
branch, relpath = Branch.open_containing(self.get_remote_url(''))
438
234
self.assertEqual('', relpath)
439
branch, relpath = Branch.open_containing(self.get_readonly_url('g/p/q'))
235
branch, relpath = Branch.open_containing(self.get_remote_url('g/p/q'))
440
236
self.assertEqual('g/p/q', relpath)
238
# TODO: rewrite this as a regular unittest, without relying on the displayed output
239
# >>> from bzrlib.commit import commit
240
# >>> bzrlib.trace.silent = True
241
# >>> br1 = ScratchBranch(files=['foo', 'bar'])
242
# >>> br1.working_tree().add('foo')
243
# >>> br1.working_tree().add('bar')
244
# >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
245
# >>> br2 = ScratchBranch()
246
# >>> br2.update_revisions(br1)
248
# Added 1 inventories.
250
# >>> br2.revision_history()
252
# >>> br2.update_revisions(br1)
254
# >>> br1.text_store.total_size() == br2.text_store.total_size()
443
257
class InstrumentedTransaction(object):
503
317
self.assertEqual(['lw', 'ul'], branch._calls)
506
class TestBranchTransaction(TestCaseWithBranch):
320
class TestBranchTransaction(TestCaseInTempDir):
509
323
super(TestBranchTransaction, self).setUp()
324
self.branch = Branch.initialize(u'.')
512
326
def test_default_get_transaction(self):
513
327
"""branch.get_transaction on a new branch should give a PassThrough."""
514
self.failUnless(isinstance(self.get_branch().get_transaction(),
328
self.failUnless(isinstance(self.branch.get_transaction(),
515
329
transactions.PassThroughTransaction))
517
331
def test__set_new_transaction(self):
518
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
332
self.branch._set_transaction(transactions.ReadOnlyTransaction())
520
334
def test__set_over_existing_transaction_raises(self):
521
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
335
self.branch._set_transaction(transactions.ReadOnlyTransaction())
522
336
self.assertRaises(errors.LockError,
523
self.get_branch()._set_transaction,
337
self.branch._set_transaction,
524
338
transactions.ReadOnlyTransaction())
526
340
def test_finish_no_transaction_raises(self):
527
self.assertRaises(errors.LockError, self.get_branch()._finish_transaction)
341
self.assertRaises(errors.LockError, self.branch._finish_transaction)
529
343
def test_finish_readonly_transaction_works(self):
530
self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
531
self.get_branch()._finish_transaction()
532
self.assertEqual(None, self.get_branch().control_files._transaction)
344
self.branch._set_transaction(transactions.ReadOnlyTransaction())
345
self.branch._finish_transaction()
346
self.assertEqual(None, self.branch._transaction)
534
348
def test_unlock_calls_finish(self):
535
self.get_branch().lock_read()
349
self.branch.lock_read()
536
350
transaction = InstrumentedTransaction()
537
self.get_branch().control_files._transaction = transaction
538
self.get_branch().unlock()
351
self.branch._transaction = transaction
539
353
self.assertEqual(['finish'], transaction.calls)
541
355
def test_lock_read_acquires_ro_transaction(self):
542
self.get_branch().lock_read()
543
self.failUnless(isinstance(self.get_branch().get_transaction(),
356
self.branch.lock_read()
357
self.failUnless(isinstance(self.branch.get_transaction(),
544
358
transactions.ReadOnlyTransaction))
545
self.get_branch().unlock()
547
def test_lock_write_acquires_write_transaction(self):
548
self.get_branch().lock_write()
361
def test_lock_write_acquires_passthrough_transaction(self):
362
self.branch.lock_write()
549
363
# cannot use get_transaction as its magic
550
self.failUnless(isinstance(self.get_branch().control_files._transaction,
551
transactions.WriteTransaction))
552
self.get_branch().unlock()
555
class TestBranchPushLocations(TestCaseWithBranch):
364
self.failUnless(isinstance(self.branch._transaction,
365
transactions.PassThroughTransaction))
369
class TestBranchPushLocations(TestCaseInTempDir):
372
super(TestBranchPushLocations, self).setUp()
373
self.branch = Branch.initialize(u'.')
557
375
def test_get_push_location_unset(self):
558
self.assertEqual(None, self.get_branch().get_push_location())
376
self.assertEqual(None, self.branch.get_push_location())
560
378
def test_get_push_location_exact(self):
561
from bzrlib.config import (locations_config_filename,
379
from bzrlib.config import (branches_config_filename,
562
380
ensure_config_dir_exists)
563
381
ensure_config_dir_exists()
564
fn = locations_config_filename()
382
fn = branches_config_filename()
565
383
print >> open(fn, 'wt'), ("[%s]\n"
566
384
"push_location=foo" %
567
self.get_branch().base[:-1])
568
self.assertEqual("foo", self.get_branch().get_push_location())
386
self.assertEqual("foo", self.branch.get_push_location())
570
388
def test_set_push_location(self):
571
from bzrlib.config import (locations_config_filename,
389
from bzrlib.config import (branches_config_filename,
572
390
ensure_config_dir_exists)
573
391
ensure_config_dir_exists()
574
fn = locations_config_filename()
575
branch = self.get_branch()
576
branch.set_push_location('foo')
577
local_path = urlutils.local_path_from_url(branch.base[:-1])
392
fn = branches_config_filename()
393
self.branch.set_push_location('foo')
578
394
self.assertFileEqual("[%s]\n"
579
"push_location = foo" % local_path,
395
"push_location = foo" % getcwd(),
582
398
# TODO RBC 20051029 test getting a push location from a branch in a
583
399
# recursive section - that is, it appends the branch name.
586
class TestFormat(TestCaseWithBranch):
587
"""Tests for the format itself."""
589
def test_format_initialize_find_open(self):
590
# loopback test to check the current format initializes to itself.
591
if not self.branch_format.is_supported():
592
# unsupported formats are not loopback testable
593
# because the default open will not open them and
594
# they may not be initializable.
596
# supported formats must be able to init and open
597
t = get_transport(self.get_url())
598
readonly_t = get_transport(self.get_readonly_url())
599
made_branch = self.make_branch('.')
600
self.failUnless(isinstance(made_branch, branch.Branch))
602
# find it via bzrdir opening:
603
opened_control = bzrdir.BzrDir.open(readonly_t.base)
604
direct_opened_branch = opened_control.open_branch()
605
self.assertEqual(direct_opened_branch.__class__, made_branch.__class__)
606
self.assertEqual(opened_control, direct_opened_branch.bzrdir)
607
self.failUnless(isinstance(direct_opened_branch._format,
608
self.branch_format.__class__))
610
# find it via Branch.open
611
opened_branch = branch.Branch.open(readonly_t.base)
612
self.failUnless(isinstance(opened_branch, made_branch.__class__))
613
self.assertEqual(made_branch._format.__class__,
614
opened_branch._format.__class__)
615
# if it has a unique id string, can we probe for it ?
617
self.branch_format.get_format_string()
618
except NotImplementedError:
620
self.assertEqual(self.branch_format,
621
branch.BranchFormat.find_format(opened_control))