1
# (C) 2005 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
20
from bzrlib.clone import copy_branch
21
from bzrlib.commit import commit
22
import bzrlib.errors as errors
23
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
25
from bzrlib.selftest import TestCase, TestCaseInTempDir
26
from bzrlib.selftest.HTTPTestUtil import TestCaseWithWebserver
27
from bzrlib.trace import mutter
28
import bzrlib.transactions as transactions
29
from bzrlib.revision import NULL_REVISION
31
# TODO: Make a branch using basis branch, and check that it
32
# doesn't request any files that could have been avoided, by
33
# hooking into the Transport.
35
class TestBranch(TestCaseInTempDir):
37
def test_append_revisions(self):
38
"""Test appending more than one revision"""
39
br = Branch.initialize(".")
40
br.append_revision("rev1")
41
self.assertEquals(br.revision_history(), ["rev1",])
42
br.append_revision("rev2", "rev3")
43
self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
45
def test_fetch_revisions(self):
46
"""Test fetch-revision operation."""
47
from bzrlib.fetch import Fetcher
50
b1 = Branch.initialize('b1')
51
b2 = Branch.initialize('b2')
52
file(os.sep.join(['b1', 'foo']), 'w').write('hello')
53
b1.add(['foo'], ['foo-id'])
54
b1.commit('lala!', rev_id='revision-1', allow_pointless=False)
57
f = Fetcher(from_branch=b1, to_branch=b2)
58
eq = self.assertEquals
60
eq(f.last_revision, 'revision-1')
62
rev = b2.get_revision('revision-1')
63
tree = b2.revision_tree('revision-1')
64
eq(tree.get_file_text('foo-id'), 'hello')
66
def test_revision_tree(self):
67
b1 = Branch.initialize('.')
68
b1.commit('lala!', rev_id='revision-1', allow_pointless=True)
69
tree = b1.revision_tree('revision-1')
70
tree = b1.revision_tree(None)
71
self.assertEqual(len(tree.list_files()), 0)
72
tree = b1.revision_tree(NULL_REVISION)
73
self.assertEqual(len(tree.list_files()), 0)
75
def get_unbalanced_branch_pair(self):
76
"""Return two branches, a and b, with one file in a."""
78
br_a = Branch.initialize("a")
79
file('a/b', 'wb').write('b')
81
commit(br_a, "silly commit", rev_id='A')
83
br_b = Branch.initialize("b")
86
def get_balanced_branch_pair(self):
87
"""Returns br_a, br_b as with one commit in a, and b has a's stores."""
88
br_a, br_b = self.get_unbalanced_branch_pair()
89
br_a.push_stores(br_b)
92
def test_push_stores(self):
93
"""Copy the stores from one branch to another"""
94
br_a, br_b = self.get_unbalanced_branch_pair()
95
# ensure the revision is missing.
96
self.assertRaises(NoSuchRevision, br_b.get_revision,
97
br_a.revision_history()[0])
98
br_a.push_stores(br_b)
99
# check that b now has all the data from a's first commit.
100
rev = br_b.get_revision(br_a.revision_history()[0])
101
tree = br_b.revision_tree(br_a.revision_history()[0])
103
if tree.inventory[file_id].kind == "file":
104
tree.get_file(file_id).read()
107
def test_copy_branch(self):
108
"""Copy the stores from one branch to another"""
109
br_a, br_b = self.get_balanced_branch_pair()
110
commit(br_b, "silly commit")
112
br_c = copy_branch(br_a, 'c', basis_branch=br_b)
113
self.assertEqual(br_a.revision_history(), br_c.revision_history())
115
def test_copy_partial(self):
116
"""Copy only part of the history of a branch."""
117
self.build_tree(['a/', 'a/one'])
118
br_a = Branch.initialize('a')
120
br_a.commit('commit one', rev_id='u@d-1')
121
self.build_tree(['a/two'])
123
br_a.commit('commit two', rev_id='u@d-2')
124
br_b = copy_branch(br_a, 'b', revision='u@d-1')
125
self.assertEqual(br_b.last_revision(), 'u@d-1')
126
self.assertTrue(os.path.exists('b/one'))
127
self.assertFalse(os.path.exists('b/two'))
129
def test_record_initial_ghost_merge(self):
130
"""A pending merge with no revision present is still a merge."""
131
branch = Branch.initialize('.')
132
branch.add_pending_merge('non:existent@rev--ision--0--2')
133
branch.commit('pretend to merge nonexistent-revision', rev_id='first')
134
rev = branch.get_revision(branch.last_revision())
135
self.assertEqual(len(rev.parent_ids), 1)
136
# parent_sha1s is not populated now, WTF. rbc 20051003
137
self.assertEqual(len(rev.parent_sha1s), 0)
138
self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
140
def test_bad_revision(self):
141
branch = Branch.initialize('.')
142
self.assertRaises(errors.InvalidRevisionId, branch.get_revision, None)
145
# compare the gpg-to-sign info for a commit with a ghost and
146
# an identical tree without a ghost
147
# fetch missing should rewrite the TOC of weaves to list newly available parents.
149
def test_pending_merges(self):
150
"""Tracking pending-merged revisions."""
151
b = Branch.initialize('.')
153
self.assertEquals(b.pending_merges(), [])
154
b.add_pending_merge('foo@azkhazan-123123-abcabc')
155
self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
156
b.add_pending_merge('foo@azkhazan-123123-abcabc')
157
self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
158
b.add_pending_merge('wibble@fofof--20050401--1928390812')
159
self.assertEquals(b.pending_merges(),
160
['foo@azkhazan-123123-abcabc',
161
'wibble@fofof--20050401--1928390812'])
162
b.commit("commit from base with two merges")
163
rev = b.get_revision(b.revision_history()[0])
164
self.assertEquals(len(rev.parent_ids), 2)
165
self.assertEquals(rev.parent_ids[0],
166
'foo@azkhazan-123123-abcabc')
167
self.assertEquals(rev.parent_ids[1],
168
'wibble@fofof--20050401--1928390812')
169
# list should be cleared when we do a commit
170
self.assertEquals(b.pending_merges(), [])
172
def test_sign_existing_revision(self):
173
branch = Branch.initialize('.')
174
branch.commit("base", allow_pointless=True, rev_id='A')
175
from bzrlib.testament import Testament
176
branch.sign_revision('A', bzrlib.gpg.LoopbackGPGStrategy(None))
177
self.assertEqual(Testament.from_revision(branch, 'A').as_short_text(),
178
branch.revision_store.get('A', 'sig').read())
180
def test_store_signature(self):
181
branch = Branch.initialize('.')
182
branch.store_revision_signature(bzrlib.gpg.LoopbackGPGStrategy(None),
184
self.assertEqual('FOO', branch.revision_store.get('A', 'sig').read())
186
def test__relcontrolfilename(self):
187
branch = Branch.initialize('.')
188
self.assertEqual('.bzr/%25', branch._rel_controlfilename('%'))
190
def test__relcontrolfilename_empty(self):
191
branch = Branch.initialize('.')
192
self.assertEqual('.bzr', branch._rel_controlfilename(''))
195
class TestRemote(TestCaseWithWebserver):
197
def test_open_containing(self):
198
self.assertRaises(NotBranchError, Branch.open_containing,
199
self.get_remote_url(''))
200
self.assertRaises(NotBranchError, Branch.open_containing,
201
self.get_remote_url('g/p/q'))
202
b = Branch.initialize('.')
203
branch, relpath = Branch.open_containing(self.get_remote_url(''))
204
self.assertEqual('', relpath)
205
branch, relpath = Branch.open_containing(self.get_remote_url('g/p/q'))
206
self.assertEqual('g/p/q', relpath)
208
# TODO: rewrite this as a regular unittest, without relying on the displayed output
209
# >>> from bzrlib.commit import commit
210
# >>> bzrlib.trace.silent = True
211
# >>> br1 = ScratchBranch(files=['foo', 'bar'])
214
# >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
215
# >>> br2 = ScratchBranch()
216
# >>> br2.update_revisions(br1)
218
# Added 1 inventories.
220
# >>> br2.revision_history()
222
# >>> br2.update_revisions(br1)
224
# >>> br1.text_store.total_size() == br2.text_store.total_size()
227
class InstrumentedTransaction(object):
230
self.calls.append('finish')
236
class TestDecorator(object):
242
self._calls.append('lr')
244
def lock_write(self):
245
self._calls.append('lw')
248
self._calls.append('ul')
251
def do_with_read(self):
255
def except_with_read(self):
259
def do_with_write(self):
263
def except_with_write(self):
267
class TestDecorators(TestCase):
269
def test_needs_read_lock(self):
270
branch = TestDecorator()
271
self.assertEqual(1, branch.do_with_read())
272
self.assertEqual(['lr', 'ul'], branch._calls)
274
def test_excepts_in_read_lock(self):
275
branch = TestDecorator()
276
self.assertRaises(RuntimeError, branch.except_with_read)
277
self.assertEqual(['lr', 'ul'], branch._calls)
279
def test_needs_write_lock(self):
280
branch = TestDecorator()
281
self.assertEqual(2, branch.do_with_write())
282
self.assertEqual(['lw', 'ul'], branch._calls)
284
def test_excepts_in_write_lock(self):
285
branch = TestDecorator()
286
self.assertRaises(RuntimeError, branch.except_with_write)
287
self.assertEqual(['lw', 'ul'], branch._calls)
290
class TestBranchTransaction(TestCaseInTempDir):
293
super(TestBranchTransaction, self).setUp()
294
self.branch = Branch.initialize('.')
296
def test_default_get_transaction(self):
297
"""branch.get_transaction on a new branch should give a PassThrough."""
298
self.failUnless(isinstance(self.branch.get_transaction(),
299
transactions.PassThroughTransaction))
301
def test__set_new_transaction(self):
302
self.branch._set_transaction(transactions.ReadOnlyTransaction())
304
def test__set_over_existing_transaction_raises(self):
305
self.branch._set_transaction(transactions.ReadOnlyTransaction())
306
self.assertRaises(errors.LockError,
307
self.branch._set_transaction,
308
transactions.ReadOnlyTransaction())
310
def test_finish_no_transaction_raises(self):
311
self.assertRaises(errors.LockError, self.branch._finish_transaction)
313
def test_finish_readonly_transaction_works(self):
314
self.branch._set_transaction(transactions.ReadOnlyTransaction())
315
self.branch._finish_transaction()
316
self.assertEqual(None, self.branch._transaction)
318
def test_unlock_calls_finish(self):
319
self.branch.lock_read()
320
transaction = InstrumentedTransaction()
321
self.branch._transaction = transaction
323
self.assertEqual(['finish'], transaction.calls)
325
def test_lock_read_acquires_ro_transaction(self):
326
self.branch.lock_read()
327
self.failUnless(isinstance(self.branch.get_transaction(),
328
transactions.ReadOnlyTransaction))
331
def test_lock_write_acquires_passthrough_transaction(self):
332
self.branch.lock_write()
333
# cannot use get_transaction as its magic
334
self.failUnless(isinstance(self.branch._transaction,
335
transactions.PassThroughTransaction))