~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/selftest/testbranch.py

- rules for using destructors

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# (C) 2005 Canonical Ltd
 
2
 
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
 
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
 
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
import os
 
18
 
 
19
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
 
20
from bzrlib.clone import copy_branch
 
21
from bzrlib.commit import commit
 
22
import bzrlib.errors as errors
 
23
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
 
24
import bzrlib.gpg
 
25
from bzrlib.selftest import TestCase, TestCaseInTempDir
 
26
from bzrlib.selftest.HTTPTestUtil import TestCaseWithWebserver
 
27
from bzrlib.trace import mutter
 
28
import bzrlib.transactions as transactions
 
29
 
 
30
# TODO: Make a branch using basis branch, and check that it 
 
31
# doesn't request any files that could have been avoided, by 
 
32
# hooking into the Transport.
 
33
 
 
34
class TestBranch(TestCaseInTempDir):
 
35
 
 
36
    def test_append_revisions(self):
 
37
        """Test appending more than one revision"""
 
38
        br = Branch.initialize(".")
 
39
        br.append_revision("rev1")
 
40
        self.assertEquals(br.revision_history(), ["rev1",])
 
41
        br.append_revision("rev2", "rev3")
 
42
        self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
 
43
 
 
44
    def test_fetch_revisions(self):
 
45
        """Test fetch-revision operation."""
 
46
        from bzrlib.fetch import Fetcher
 
47
        os.mkdir('b1')
 
48
        os.mkdir('b2')
 
49
        b1 = Branch.initialize('b1')
 
50
        b2 = Branch.initialize('b2')
 
51
        file(os.sep.join(['b1', 'foo']), 'w').write('hello')
 
52
        b1.add(['foo'], ['foo-id'])
 
53
        b1.commit('lala!', rev_id='revision-1', allow_pointless=False)
 
54
 
 
55
        mutter('start fetch')
 
56
        f = Fetcher(from_branch=b1, to_branch=b2)
 
57
        eq = self.assertEquals
 
58
        eq(f.count_copied, 1)
 
59
        eq(f.last_revision, 'revision-1')
 
60
 
 
61
        rev = b2.get_revision('revision-1')
 
62
        tree = b2.revision_tree('revision-1')
 
63
        eq(tree.get_file_text('foo-id'), 'hello')
 
64
 
 
65
    def get_unbalanced_branch_pair(self):
 
66
        """Return two branches, a and b, with one file in a."""
 
67
        os.mkdir('a')
 
68
        br_a = Branch.initialize("a")
 
69
        file('a/b', 'wb').write('b')
 
70
        br_a.add('b')
 
71
        commit(br_a, "silly commit", rev_id='A')
 
72
        os.mkdir('b')
 
73
        br_b = Branch.initialize("b")
 
74
        return br_a, br_b
 
75
 
 
76
    def get_balanced_branch_pair(self):
 
77
        """Returns br_a, br_b as with one commit in a, and b has a's stores."""
 
78
        br_a, br_b = self.get_unbalanced_branch_pair()
 
79
        br_a.push_stores(br_b)
 
80
        return br_a, br_b
 
81
 
 
82
    def test_push_stores(self):
 
83
        """Copy the stores from one branch to another"""
 
84
        br_a, br_b = self.get_unbalanced_branch_pair()
 
85
        # ensure the revision is missing.
 
86
        self.assertRaises(NoSuchRevision, br_b.get_revision, 
 
87
                          br_a.revision_history()[0])
 
88
        br_a.push_stores(br_b)
 
89
        # check that b now has all the data from a's first commit.
 
90
        rev = br_b.get_revision(br_a.revision_history()[0])
 
91
        tree = br_b.revision_tree(br_a.revision_history()[0])
 
92
        for file_id in tree:
 
93
            if tree.inventory[file_id].kind == "file":
 
94
                tree.get_file(file_id).read()
 
95
        return br_a, br_b
 
96
 
 
97
    def test_copy_branch(self):
 
98
        """Copy the stores from one branch to another"""
 
99
        br_a, br_b = self.get_balanced_branch_pair()
 
100
        commit(br_b, "silly commit")
 
101
        os.mkdir('c')
 
102
        br_c = copy_branch(br_a, 'c', basis_branch=br_b)
 
103
        self.assertEqual(br_a.revision_history(), br_c.revision_history())
 
104
 
 
105
    def test_copy_partial(self):
 
106
        """Copy only part of the history of a branch."""
 
107
        self.build_tree(['a/', 'a/one'])
 
108
        br_a = Branch.initialize('a')
 
109
        br_a.add(['one'])
 
110
        br_a.commit('commit one', rev_id='u@d-1')
 
111
        self.build_tree(['a/two'])
 
112
        br_a.add(['two'])
 
113
        br_a.commit('commit two', rev_id='u@d-2')
 
114
        br_b = copy_branch(br_a, 'b', revision='u@d-1')
 
115
        self.assertEqual(br_b.last_revision(), 'u@d-1')
 
116
        self.assertTrue(os.path.exists('b/one'))
 
117
        self.assertFalse(os.path.exists('b/two'))
 
118
        
 
119
    def test_record_initial_ghost_merge(self):
 
120
        """A pending merge with no revision present is still a merge."""
 
121
        branch = Branch.initialize('.')
 
122
        branch.add_pending_merge('non:existent@rev--ision--0--2')
 
123
        branch.commit('pretend to merge nonexistent-revision', rev_id='first')
 
124
        rev = branch.get_revision(branch.last_revision())
 
125
        self.assertEqual(len(rev.parent_ids), 1)
 
126
        # parent_sha1s is not populated now, WTF. rbc 20051003
 
127
        self.assertEqual(len(rev.parent_sha1s), 0)
 
128
        self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
 
129
 
 
130
# TODO 20051003 RBC:
 
131
# compare the gpg-to-sign info for a commit with a ghost and 
 
132
#     an identical tree without a ghost
 
133
# fetch missing should rewrite the TOC of weaves to list newly available parents.
 
134
        
 
135
    def test_pending_merges(self):
 
136
        """Tracking pending-merged revisions."""
 
137
        b = Branch.initialize('.')
 
138
 
 
139
        self.assertEquals(b.pending_merges(), [])
 
140
        b.add_pending_merge('foo@azkhazan-123123-abcabc')
 
141
        self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
 
142
        b.add_pending_merge('foo@azkhazan-123123-abcabc')
 
143
        self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
 
144
        b.add_pending_merge('wibble@fofof--20050401--1928390812')
 
145
        self.assertEquals(b.pending_merges(),
 
146
                          ['foo@azkhazan-123123-abcabc',
 
147
                           'wibble@fofof--20050401--1928390812'])
 
148
        b.commit("commit from base with two merges")
 
149
        rev = b.get_revision(b.revision_history()[0])
 
150
        self.assertEquals(len(rev.parent_ids), 2)
 
151
        self.assertEquals(rev.parent_ids[0],
 
152
                          'foo@azkhazan-123123-abcabc')
 
153
        self.assertEquals(rev.parent_ids[1],
 
154
                           'wibble@fofof--20050401--1928390812')
 
155
        # list should be cleared when we do a commit
 
156
        self.assertEquals(b.pending_merges(), [])
 
157
 
 
158
    def test_sign_existing_revision(self):
 
159
        branch = Branch.initialize('.')
 
160
        branch.commit("base", allow_pointless=True, rev_id='A')
 
161
        from bzrlib.testament import Testament
 
162
        branch.sign_revision('A', bzrlib.gpg.LoopbackGPGStrategy(None))
 
163
        self.assertEqual(Testament.from_revision(branch, 'A').as_short_text(),
 
164
                         branch.revision_store.get('A', 'sig').read())
 
165
 
 
166
    def test_store_signature(self):
 
167
        branch = Branch.initialize('.')
 
168
        branch.store_revision_signature(bzrlib.gpg.LoopbackGPGStrategy(None),
 
169
                                        'FOO', 'A')
 
170
        self.assertEqual('FOO', branch.revision_store.get('A', 'sig').read())
 
171
 
 
172
    def test__relcontrolfilename(self):
 
173
        branch = Branch.initialize('.')
 
174
        self.assertEqual('.bzr/%25', branch._rel_controlfilename('%'))
 
175
        
 
176
    def test__relcontrolfilename_empty(self):
 
177
        branch = Branch.initialize('.')
 
178
        self.assertEqual('.bzr', branch._rel_controlfilename(''))
 
179
 
 
180
 
 
181
class TestRemote(TestCaseWithWebserver):
 
182
 
 
183
    def test_open_containing(self):
 
184
        self.assertRaises(NotBranchError, Branch.open_containing,
 
185
                          self.get_remote_url(''))
 
186
        self.assertRaises(NotBranchError, Branch.open_containing,
 
187
                          self.get_remote_url('g/p/q'))
 
188
        b = Branch.initialize('.')
 
189
        branch, relpath = Branch.open_containing(self.get_remote_url(''))
 
190
        self.assertEqual('', relpath)
 
191
        branch, relpath = Branch.open_containing(self.get_remote_url('g/p/q'))
 
192
        self.assertEqual('g/p/q', relpath)
 
193
        
 
194
# TODO: rewrite this as a regular unittest, without relying on the displayed output        
 
195
#         >>> from bzrlib.commit import commit
 
196
#         >>> bzrlib.trace.silent = True
 
197
#         >>> br1 = ScratchBranch(files=['foo', 'bar'])
 
198
#         >>> br1.add('foo')
 
199
#         >>> br1.add('bar')
 
200
#         >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
 
201
#         >>> br2 = ScratchBranch()
 
202
#         >>> br2.update_revisions(br1)
 
203
#         Added 2 texts.
 
204
#         Added 1 inventories.
 
205
#         Added 1 revisions.
 
206
#         >>> br2.revision_history()
 
207
#         [u'REVISION-ID-1']
 
208
#         >>> br2.update_revisions(br1)
 
209
#         Added 0 revisions.
 
210
#         >>> br1.text_store.total_size() == br2.text_store.total_size()
 
211
#         True
 
212
 
 
213
class InstrumentedTransaction(object):
 
214
 
 
215
    def finish(self):
 
216
        self.calls.append('finish')
 
217
 
 
218
    def __init__(self):
 
219
        self.calls = []
 
220
 
 
221
 
 
222
class TestDecorator(object):
 
223
 
 
224
    def __init__(self):
 
225
        self._calls = []
 
226
 
 
227
    def lock_read(self):
 
228
        self._calls.append('lr')
 
229
 
 
230
    def lock_write(self):
 
231
        self._calls.append('lw')
 
232
 
 
233
    def unlock(self):
 
234
        self._calls.append('ul')
 
235
 
 
236
    @needs_read_lock
 
237
    def do_with_read(self):
 
238
        return 1
 
239
 
 
240
    @needs_read_lock
 
241
    def except_with_read(self):
 
242
        raise RuntimeError
 
243
 
 
244
    @needs_write_lock
 
245
    def do_with_write(self):
 
246
        return 2
 
247
 
 
248
    @needs_write_lock
 
249
    def except_with_write(self):
 
250
        raise RuntimeError
 
251
 
 
252
 
 
253
class TestDecorators(TestCase):
 
254
 
 
255
    def test_needs_read_lock(self):
 
256
        branch = TestDecorator()
 
257
        self.assertEqual(1, branch.do_with_read())
 
258
        self.assertEqual(['lr', 'ul'], branch._calls)
 
259
 
 
260
    def test_excepts_in_read_lock(self):
 
261
        branch = TestDecorator()
 
262
        self.assertRaises(RuntimeError, branch.except_with_read)
 
263
        self.assertEqual(['lr', 'ul'], branch._calls)
 
264
 
 
265
    def test_needs_write_lock(self):
 
266
        branch = TestDecorator()
 
267
        self.assertEqual(2, branch.do_with_write())
 
268
        self.assertEqual(['lw', 'ul'], branch._calls)
 
269
 
 
270
    def test_excepts_in_write_lock(self):
 
271
        branch = TestDecorator()
 
272
        self.assertRaises(RuntimeError, branch.except_with_write)
 
273
        self.assertEqual(['lw', 'ul'], branch._calls)
 
274
 
 
275
 
 
276
class TestBranchTransaction(TestCaseInTempDir):
 
277
 
 
278
    def setUp(self):
 
279
        super(TestBranchTransaction, self).setUp()
 
280
        self.branch = Branch.initialize('.')
 
281
        
 
282
    def test_default_get_transaction(self):
 
283
        """branch.get_transaction on a new branch should give a PassThrough."""
 
284
        self.failUnless(isinstance(self.branch.get_transaction(),
 
285
                                   transactions.PassThroughTransaction))
 
286
 
 
287
    def test__set_new_transaction(self):
 
288
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
289
 
 
290
    def test__set_over_existing_transaction_raises(self):
 
291
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
292
        self.assertRaises(errors.LockError,
 
293
                          self.branch._set_transaction,
 
294
                          transactions.ReadOnlyTransaction())
 
295
 
 
296
    def test_finish_no_transaction_raises(self):
 
297
        self.assertRaises(errors.LockError, self.branch._finish_transaction)
 
298
 
 
299
    def test_finish_readonly_transaction_works(self):
 
300
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
301
        self.branch._finish_transaction()
 
302
        self.assertEqual(None, self.branch._transaction)
 
303
 
 
304
    def test_unlock_calls_finish(self):
 
305
        self.branch.lock_read()
 
306
        transaction = InstrumentedTransaction()
 
307
        self.branch._transaction = transaction
 
308
        self.branch.unlock()
 
309
        self.assertEqual(['finish'], transaction.calls)
 
310
 
 
311
    def test_lock_read_acquires_ro_transaction(self):
 
312
        self.branch.lock_read()
 
313
        self.failUnless(isinstance(self.branch.get_transaction(),
 
314
                                   transactions.ReadOnlyTransaction))
 
315
        self.branch.unlock()
 
316
        
 
317
    def test_lock_write_acquires_passthrough_transaction(self):
 
318
        self.branch.lock_write()
 
319
        # cannot use get_transaction as its magic
 
320
        self.failUnless(isinstance(self.branch._transaction,
 
321
                                   transactions.PassThroughTransaction))
 
322
        self.branch.unlock()