1
# (C) 2005 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
20
from bzrlib.clone import copy_branch
21
from bzrlib.commit import commit
22
import bzrlib.errors as errors
23
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
25
from bzrlib.selftest import TestCase, TestCaseInTempDir
26
from bzrlib.selftest.HTTPTestUtil import TestCaseWithWebserver
27
from bzrlib.trace import mutter
28
import bzrlib.transactions as transactions
30
# TODO: Make a branch using basis branch, and check that it
31
# doesn't request any files that could have been avoided, by
32
# hooking into the Transport.
34
class TestBranch(TestCaseInTempDir):
36
def test_append_revisions(self):
37
"""Test appending more than one revision"""
38
br = Branch.initialize(".")
39
br.append_revision("rev1")
40
self.assertEquals(br.revision_history(), ["rev1",])
41
br.append_revision("rev2", "rev3")
42
self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
44
def test_fetch_revisions(self):
45
"""Test fetch-revision operation."""
46
from bzrlib.fetch import Fetcher
49
b1 = Branch.initialize('b1')
50
b2 = Branch.initialize('b2')
51
file(os.sep.join(['b1', 'foo']), 'w').write('hello')
52
b1.add(['foo'], ['foo-id'])
53
b1.commit('lala!', rev_id='revision-1', allow_pointless=False)
56
f = Fetcher(from_branch=b1, to_branch=b2)
57
eq = self.assertEquals
59
eq(f.last_revision, 'revision-1')
61
rev = b2.get_revision('revision-1')
62
tree = b2.revision_tree('revision-1')
63
eq(tree.get_file_text('foo-id'), 'hello')
65
def test_push_stores(self):
66
"""Copy the stores from one branch to another"""
68
br_a = Branch.initialize("a")
69
file('a/b', 'wb').write('b')
71
commit(br_a, "silly commit")
74
br_b = Branch.initialize("b")
75
self.assertRaises(NoSuchRevision, br_b.get_revision,
76
br_a.revision_history()[0])
77
br_a.push_stores(br_b)
78
rev = br_b.get_revision(br_a.revision_history()[0])
79
tree = br_b.revision_tree(br_a.revision_history()[0])
81
if tree.inventory[file_id].kind == "file":
82
tree.get_file(file_id).read()
85
def test_copy_branch(self):
86
"""Copy the stores from one branch to another"""
87
br_a, br_b = self.test_push_stores()
88
commit(br_b, "silly commit")
90
br_c = copy_branch(br_a, 'c', basis_branch=br_b)
91
self.assertEqual(br_a.revision_history(), br_c.revision_history())
93
def test_copy_partial(self):
94
"""Copy only part of the history of a branch."""
95
self.build_tree(['a/', 'a/one'])
96
br_a = Branch.initialize('a')
98
br_a.commit('commit one', rev_id='u@d-1')
99
self.build_tree(['a/two'])
101
br_a.commit('commit two', rev_id='u@d-2')
102
br_b = copy_branch(br_a, 'b', revision='u@d-1')
103
self.assertEqual(br_b.last_revision(), 'u@d-1')
104
self.assertTrue(os.path.exists('b/one'))
105
self.assertFalse(os.path.exists('b/two'))
108
def test_record_initial_ghost_merge(self):
109
"""A pending merge with no revision present is still a merge."""
110
branch = Branch.initialize('.')
111
branch.add_pending_merge('non:existent@rev--ision--0--2')
112
branch.commit('pretend to merge nonexistent-revision', rev_id='first')
113
rev = branch.get_revision(branch.last_revision())
114
self.assertEqual(len(rev.parent_ids), 1)
115
# parent_sha1s is not populated now, WTF. rbc 20051003
116
self.assertEqual(len(rev.parent_sha1s), 0)
117
self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
120
# compare the gpg-to-sign info for a commit with a ghost and
121
# an identical tree without a ghost
122
# fetch missing should rewrite the TOC of weaves to list newly available parents.
124
def test_pending_merges(self):
125
"""Tracking pending-merged revisions."""
126
b = Branch.initialize('.')
128
self.assertEquals(b.pending_merges(), [])
129
b.add_pending_merge('foo@azkhazan-123123-abcabc')
130
self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
131
b.add_pending_merge('foo@azkhazan-123123-abcabc')
132
self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
133
b.add_pending_merge('wibble@fofof--20050401--1928390812')
134
self.assertEquals(b.pending_merges(),
135
['foo@azkhazan-123123-abcabc',
136
'wibble@fofof--20050401--1928390812'])
137
b.commit("commit from base with two merges")
138
rev = b.get_revision(b.revision_history()[0])
139
self.assertEquals(len(rev.parent_ids), 2)
140
self.assertEquals(rev.parent_ids[0],
141
'foo@azkhazan-123123-abcabc')
142
self.assertEquals(rev.parent_ids[1],
143
'wibble@fofof--20050401--1928390812')
144
# list should be cleared when we do a commit
145
self.assertEquals(b.pending_merges(), [])
147
def test_sign_existing_revision(self):
148
branch = Branch.initialize('.')
149
branch.commit("base", allow_pointless=True, rev_id='A')
150
from bzrlib.testament import Testament
151
branch.sign_revision('A', bzrlib.gpg.LoopbackGPGStrategy(None))
152
self.assertEqual(Testament.from_revision(branch, 'A').as_short_text(),
153
branch.revision_store.get('A', 'sig').read())
155
def test_store_signature(self):
156
branch = Branch.initialize('.')
157
branch.store_revision_signature(bzrlib.gpg.LoopbackGPGStrategy(None),
159
self.assertEqual('FOO', branch.revision_store.get('A', 'sig').read())
162
class TestRemote(TestCaseWithWebserver):
164
def test_open_containing(self):
165
self.assertRaises(NotBranchError, Branch.open_containing,
166
self.get_remote_url(''))
167
self.assertRaises(NotBranchError, Branch.open_containing,
168
self.get_remote_url('g/p/q'))
169
b = Branch.initialize('.')
170
branch, relpath = Branch.open_containing(self.get_remote_url(''))
171
self.assertEqual('', relpath)
172
branch, relpath = Branch.open_containing(self.get_remote_url('g/p/q'))
173
self.assertEqual('g/p/q', relpath)
175
# TODO: rewrite this as a regular unittest, without relying on the displayed output
176
# >>> from bzrlib.commit import commit
177
# >>> bzrlib.trace.silent = True
178
# >>> br1 = ScratchBranch(files=['foo', 'bar'])
181
# >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
182
# >>> br2 = ScratchBranch()
183
# >>> br2.update_revisions(br1)
185
# Added 1 inventories.
187
# >>> br2.revision_history()
189
# >>> br2.update_revisions(br1)
191
# >>> br1.text_store.total_size() == br2.text_store.total_size()
194
class InstrumentedTransaction(object):
197
self.calls.append('finish')
203
class TestDecorator(object):
209
self._calls.append('lr')
211
def lock_write(self):
212
self._calls.append('lw')
215
self._calls.append('ul')
218
def do_with_read(self):
222
def except_with_read(self):
226
def do_with_write(self):
230
def except_with_write(self):
234
class TestDecorators(TestCase):
236
def test_needs_read_lock(self):
237
branch = TestDecorator()
238
self.assertEqual(1, branch.do_with_read())
239
self.assertEqual(['lr', 'ul'], branch._calls)
241
def test_excepts_in_read_lock(self):
242
branch = TestDecorator()
243
self.assertRaises(RuntimeError, branch.except_with_read)
244
self.assertEqual(['lr', 'ul'], branch._calls)
246
def test_needs_write_lock(self):
247
branch = TestDecorator()
248
self.assertEqual(2, branch.do_with_write())
249
self.assertEqual(['lw', 'ul'], branch._calls)
251
def test_excepts_in_write_lock(self):
252
branch = TestDecorator()
253
self.assertRaises(RuntimeError, branch.except_with_write)
254
self.assertEqual(['lw', 'ul'], branch._calls)
257
class TestBranchTransaction(TestCaseInTempDir):
260
super(TestBranchTransaction, self).setUp()
261
self.branch = Branch.initialize('.')
263
def test_default_get_transaction(self):
264
"""branch.get_transaction on a new branch should give a PassThrough."""
265
self.failUnless(isinstance(self.branch.get_transaction(),
266
transactions.PassThroughTransaction))
268
def test__set_new_transaction(self):
269
self.branch._set_transaction(transactions.ReadOnlyTransaction())
271
def test__set_over_existing_transaction_raises(self):
272
self.branch._set_transaction(transactions.ReadOnlyTransaction())
273
self.assertRaises(errors.LockError,
274
self.branch._set_transaction,
275
transactions.ReadOnlyTransaction())
277
def test_finish_no_transaction_raises(self):
278
self.assertRaises(errors.LockError, self.branch._finish_transaction)
280
def test_finish_readonly_transaction_works(self):
281
self.branch._set_transaction(transactions.ReadOnlyTransaction())
282
self.branch._finish_transaction()
283
self.assertEqual(None, self.branch._transaction)
285
def test_unlock_calls_finish(self):
286
self.branch.lock_read()
287
transaction = InstrumentedTransaction()
288
self.branch._transaction = transaction
290
self.assertEqual(['finish'], transaction.calls)
292
def test_lock_read_acquires_ro_transaction(self):
293
self.branch.lock_read()
294
self.failUnless(isinstance(self.branch.get_transaction(),
295
transactions.ReadOnlyTransaction))
298
def test_lock_write_acquires_passthrough_transaction(self):
299
self.branch.lock_write()
300
# cannot use get_transaction as its magic
301
self.failUnless(isinstance(self.branch._transaction,
302
transactions.PassThroughTransaction))