1
# (C) 2005 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
20
from bzrlib.clone import copy_branch
21
from bzrlib.commit import commit
22
import bzrlib.errors as errors
23
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
25
from bzrlib.selftest import TestCase, TestCaseInTempDir
26
from bzrlib.selftest.HTTPTestUtil import TestCaseWithWebserver
27
from bzrlib.trace import mutter
28
import bzrlib.transactions as transactions
30
# TODO: Make a branch using basis branch, and check that it
31
# doesn't request any files that could have been avoided, by
32
# hooking into the Transport.
34
class TestBranch(TestCaseInTempDir):
36
def test_append_revisions(self):
37
"""Test appending more than one revision"""
38
br = Branch.initialize(".")
39
br.append_revision("rev1")
40
self.assertEquals(br.revision_history(), ["rev1",])
41
br.append_revision("rev2", "rev3")
42
self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
44
def test_fetch_revisions(self):
45
"""Test fetch-revision operation."""
46
from bzrlib.fetch import Fetcher
49
b1 = Branch.initialize('b1')
50
b2 = Branch.initialize('b2')
51
file(os.sep.join(['b1', 'foo']), 'w').write('hello')
52
b1.add(['foo'], ['foo-id'])
53
b1.commit('lala!', rev_id='revision-1', allow_pointless=False)
56
f = Fetcher(from_branch=b1, to_branch=b2)
57
eq = self.assertEquals
59
eq(f.last_revision, 'revision-1')
61
rev = b2.get_revision('revision-1')
62
tree = b2.revision_tree('revision-1')
63
eq(tree.get_file_text('foo-id'), 'hello')
65
def get_unbalanced_branch_pair(self):
66
"""Return two branches, a and b, with one file in a."""
68
br_a = Branch.initialize("a")
69
file('a/b', 'wb').write('b')
71
commit(br_a, "silly commit", rev_id='A')
73
br_b = Branch.initialize("b")
76
def get_balanced_branch_pair(self):
77
"""Returns br_a, br_b as with one commit in a, and b has a's stores."""
78
br_a, br_b = self.get_unbalanced_branch_pair()
79
br_a.push_stores(br_b)
82
def test_push_stores(self):
83
"""Copy the stores from one branch to another"""
84
br_a, br_b = self.get_unbalanced_branch_pair()
85
# ensure the revision is missing.
86
self.assertRaises(NoSuchRevision, br_b.get_revision,
87
br_a.revision_history()[0])
88
br_a.push_stores(br_b)
89
# check that b now has all the data from a's first commit.
90
rev = br_b.get_revision(br_a.revision_history()[0])
91
tree = br_b.revision_tree(br_a.revision_history()[0])
93
if tree.inventory[file_id].kind == "file":
94
tree.get_file(file_id).read()
97
def test_copy_branch(self):
98
"""Copy the stores from one branch to another"""
99
br_a, br_b = self.get_balanced_branch_pair()
100
commit(br_b, "silly commit")
102
br_c = copy_branch(br_a, 'c', basis_branch=br_b)
103
self.assertEqual(br_a.revision_history(), br_c.revision_history())
105
def test_copy_partial(self):
106
"""Copy only part of the history of a branch."""
107
self.build_tree(['a/', 'a/one'])
108
br_a = Branch.initialize('a')
110
br_a.commit('commit one', rev_id='u@d-1')
111
self.build_tree(['a/two'])
113
br_a.commit('commit two', rev_id='u@d-2')
114
br_b = copy_branch(br_a, 'b', revision='u@d-1')
115
self.assertEqual(br_b.last_revision(), 'u@d-1')
116
self.assertTrue(os.path.exists('b/one'))
117
self.assertFalse(os.path.exists('b/two'))
119
def test_record_initial_ghost_merge(self):
120
"""A pending merge with no revision present is still a merge."""
121
branch = Branch.initialize('.')
122
branch.add_pending_merge('non:existent@rev--ision--0--2')
123
branch.commit('pretend to merge nonexistent-revision', rev_id='first')
124
rev = branch.get_revision(branch.last_revision())
125
self.assertEqual(len(rev.parent_ids), 1)
126
# parent_sha1s is not populated now, WTF. rbc 20051003
127
self.assertEqual(len(rev.parent_sha1s), 0)
128
self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
131
# compare the gpg-to-sign info for a commit with a ghost and
132
# an identical tree without a ghost
133
# fetch missing should rewrite the TOC of weaves to list newly available parents.
135
def test_pending_merges(self):
136
"""Tracking pending-merged revisions."""
137
b = Branch.initialize('.')
139
self.assertEquals(b.pending_merges(), [])
140
b.add_pending_merge('foo@azkhazan-123123-abcabc')
141
self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
142
b.add_pending_merge('foo@azkhazan-123123-abcabc')
143
self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
144
b.add_pending_merge('wibble@fofof--20050401--1928390812')
145
self.assertEquals(b.pending_merges(),
146
['foo@azkhazan-123123-abcabc',
147
'wibble@fofof--20050401--1928390812'])
148
b.commit("commit from base with two merges")
149
rev = b.get_revision(b.revision_history()[0])
150
self.assertEquals(len(rev.parent_ids), 2)
151
self.assertEquals(rev.parent_ids[0],
152
'foo@azkhazan-123123-abcabc')
153
self.assertEquals(rev.parent_ids[1],
154
'wibble@fofof--20050401--1928390812')
155
# list should be cleared when we do a commit
156
self.assertEquals(b.pending_merges(), [])
158
def test_sign_existing_revision(self):
159
branch = Branch.initialize('.')
160
branch.commit("base", allow_pointless=True, rev_id='A')
161
from bzrlib.testament import Testament
162
branch.sign_revision('A', bzrlib.gpg.LoopbackGPGStrategy(None))
163
self.assertEqual(Testament.from_revision(branch, 'A').as_short_text(),
164
branch.revision_store.get('A', 'sig').read())
166
def test_store_signature(self):
167
branch = Branch.initialize('.')
168
branch.store_revision_signature(bzrlib.gpg.LoopbackGPGStrategy(None),
170
self.assertEqual('FOO', branch.revision_store.get('A', 'sig').read())
172
def test__relcontrolfilename(self):
173
branch = Branch.initialize('.')
174
self.assertEqual('.bzr/%25', branch._rel_controlfilename('%'))
176
def test__relcontrolfilename_empty(self):
177
branch = Branch.initialize('.')
178
self.assertEqual('.bzr', branch._rel_controlfilename(''))
181
class TestRemote(TestCaseWithWebserver):
183
def test_open_containing(self):
184
self.assertRaises(NotBranchError, Branch.open_containing,
185
self.get_remote_url(''))
186
self.assertRaises(NotBranchError, Branch.open_containing,
187
self.get_remote_url('g/p/q'))
188
b = Branch.initialize('.')
189
branch, relpath = Branch.open_containing(self.get_remote_url(''))
190
self.assertEqual('', relpath)
191
branch, relpath = Branch.open_containing(self.get_remote_url('g/p/q'))
192
self.assertEqual('g/p/q', relpath)
194
# TODO: rewrite this as a regular unittest, without relying on the displayed output
195
# >>> from bzrlib.commit import commit
196
# >>> bzrlib.trace.silent = True
197
# >>> br1 = ScratchBranch(files=['foo', 'bar'])
200
# >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
201
# >>> br2 = ScratchBranch()
202
# >>> br2.update_revisions(br1)
204
# Added 1 inventories.
206
# >>> br2.revision_history()
208
# >>> br2.update_revisions(br1)
210
# >>> br1.text_store.total_size() == br2.text_store.total_size()
213
class InstrumentedTransaction(object):
216
self.calls.append('finish')
222
class TestDecorator(object):
228
self._calls.append('lr')
230
def lock_write(self):
231
self._calls.append('lw')
234
self._calls.append('ul')
237
def do_with_read(self):
241
def except_with_read(self):
245
def do_with_write(self):
249
def except_with_write(self):
253
class TestDecorators(TestCase):
255
def test_needs_read_lock(self):
256
branch = TestDecorator()
257
self.assertEqual(1, branch.do_with_read())
258
self.assertEqual(['lr', 'ul'], branch._calls)
260
def test_excepts_in_read_lock(self):
261
branch = TestDecorator()
262
self.assertRaises(RuntimeError, branch.except_with_read)
263
self.assertEqual(['lr', 'ul'], branch._calls)
265
def test_needs_write_lock(self):
266
branch = TestDecorator()
267
self.assertEqual(2, branch.do_with_write())
268
self.assertEqual(['lw', 'ul'], branch._calls)
270
def test_excepts_in_write_lock(self):
271
branch = TestDecorator()
272
self.assertRaises(RuntimeError, branch.except_with_write)
273
self.assertEqual(['lw', 'ul'], branch._calls)
276
class TestBranchTransaction(TestCaseInTempDir):
279
super(TestBranchTransaction, self).setUp()
280
self.branch = Branch.initialize('.')
282
def test_default_get_transaction(self):
283
"""branch.get_transaction on a new branch should give a PassThrough."""
284
self.failUnless(isinstance(self.branch.get_transaction(),
285
transactions.PassThroughTransaction))
287
def test__set_new_transaction(self):
288
self.branch._set_transaction(transactions.ReadOnlyTransaction())
290
def test__set_over_existing_transaction_raises(self):
291
self.branch._set_transaction(transactions.ReadOnlyTransaction())
292
self.assertRaises(errors.LockError,
293
self.branch._set_transaction,
294
transactions.ReadOnlyTransaction())
296
def test_finish_no_transaction_raises(self):
297
self.assertRaises(errors.LockError, self.branch._finish_transaction)
299
def test_finish_readonly_transaction_works(self):
300
self.branch._set_transaction(transactions.ReadOnlyTransaction())
301
self.branch._finish_transaction()
302
self.assertEqual(None, self.branch._transaction)
304
def test_unlock_calls_finish(self):
305
self.branch.lock_read()
306
transaction = InstrumentedTransaction()
307
self.branch._transaction = transaction
309
self.assertEqual(['finish'], transaction.calls)
311
def test_lock_read_acquires_ro_transaction(self):
312
self.branch.lock_read()
313
self.failUnless(isinstance(self.branch.get_transaction(),
314
transactions.ReadOnlyTransaction))
317
def test_lock_write_acquires_passthrough_transaction(self):
318
self.branch.lock_write()
319
# cannot use get_transaction as its magic
320
self.failUnless(isinstance(self.branch._transaction,
321
transactions.PassThroughTransaction))