~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/selftest/testbranch.py

  • Committer: Robert Collins
  • Date: 2005-10-19 10:11:57 UTC
  • mfrom: (1185.16.78)
  • mto: This revision was merged to the branch mainline in revision 1470.
  • Revision ID: robertc@robertcollins.net-20051019101157-17438d311e746b4f
mergeĀ fromĀ upstream

Show diffs side-by-side

added added

removed removed

Lines of Context:
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
import os
18
 
from bzrlib.branch import Branch
 
18
 
 
19
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
19
20
from bzrlib.clone import copy_branch
20
21
from bzrlib.commit import commit
21
 
from bzrlib.errors import NoSuchRevision, UnlistableBranch
22
 
from bzrlib.selftest import TestCaseInTempDir
 
22
import bzrlib.errors as errors
 
23
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
 
24
import bzrlib.gpg
 
25
from bzrlib.selftest import TestCase, TestCaseInTempDir
 
26
from bzrlib.selftest.HTTPTestUtil import TestCaseWithWebserver
23
27
from bzrlib.trace import mutter
 
28
import bzrlib.transactions as transactions
24
29
 
 
30
# TODO: Make a branch using basis branch, and check that it 
 
31
# doesn't request any files that could have been avoided, by 
 
32
# hooking into the Transport.
25
33
 
26
34
class TestBranch(TestCaseInTempDir):
27
35
 
54
62
        tree = b2.revision_tree('revision-1')
55
63
        eq(tree.get_file_text('foo-id'), 'hello')
56
64
 
57
 
    def test_push_stores(self):
58
 
        """Copy the stores from one branch to another"""
 
65
    def get_unbalanced_branch_pair(self):
 
66
        """Return two branches, a and b, with one file in a."""
59
67
        os.mkdir('a')
60
68
        br_a = Branch.initialize("a")
61
69
        file('a/b', 'wb').write('b')
62
70
        br_a.add('b')
63
 
        commit(br_a, "silly commit")
64
 
 
 
71
        commit(br_a, "silly commit", rev_id='A')
65
72
        os.mkdir('b')
66
73
        br_b = Branch.initialize("b")
 
74
        return br_a, br_b
 
75
 
 
76
    def get_balanced_branch_pair(self):
 
77
        """Returns br_a, br_b as with one commit in a, and b has a's stores."""
 
78
        br_a, br_b = self.get_unbalanced_branch_pair()
 
79
        br_a.push_stores(br_b)
 
80
        return br_a, br_b
 
81
 
 
82
    def test_push_stores(self):
 
83
        """Copy the stores from one branch to another"""
 
84
        br_a, br_b = self.get_unbalanced_branch_pair()
 
85
        # ensure the revision is missing.
67
86
        self.assertRaises(NoSuchRevision, br_b.get_revision, 
68
87
                          br_a.revision_history()[0])
69
88
        br_a.push_stores(br_b)
 
89
        # check that b now has all the data from a's first commit.
70
90
        rev = br_b.get_revision(br_a.revision_history()[0])
71
91
        tree = br_b.revision_tree(br_a.revision_history()[0])
72
92
        for file_id in tree:
76
96
 
77
97
    def test_copy_branch(self):
78
98
        """Copy the stores from one branch to another"""
79
 
        br_a, br_b = self.test_push_stores()
 
99
        br_a, br_b = self.get_balanced_branch_pair()
80
100
        commit(br_b, "silly commit")
81
101
        os.mkdir('c')
82
102
        br_c = copy_branch(br_a, 'c', basis_branch=br_b)
83
103
        self.assertEqual(br_a.revision_history(), br_c.revision_history())
84
 
        ## # basis branches currently disabled for weave format
85
 
        ## self.assertFalse(br_b.last_revision() in br_c.revision_history())
86
 
        ## br_c.get_revision(br_b.last_revision())
87
104
 
88
105
    def test_copy_partial(self):
89
106
        """Copy only part of the history of a branch."""
138
155
                           'wibble@fofof--20050401--1928390812')
139
156
        # list should be cleared when we do a commit
140
157
        self.assertEquals(b.pending_merges(), [])
141
 
 
142
 
 
 
158
 
 
159
    def test_sign_existing_revision(self):
 
160
        branch = Branch.initialize('.')
 
161
        branch.commit("base", allow_pointless=True, rev_id='A')
 
162
        from bzrlib.testament import Testament
 
163
        branch.sign_revision('A', bzrlib.gpg.LoopbackGPGStrategy(None))
 
164
        self.assertEqual(Testament.from_revision(branch, 'A').as_short_text(),
 
165
                         branch.revision_store.get('A', 'sig').read())
 
166
 
 
167
    def test_store_signature(self):
 
168
        branch = Branch.initialize('.')
 
169
        branch.store_revision_signature(bzrlib.gpg.LoopbackGPGStrategy(None),
 
170
                                        'FOO', 'A')
 
171
        self.assertEqual('FOO', branch.revision_store.get('A', 'sig').read())
 
172
 
 
173
 
 
174
class TestRemote(TestCaseWithWebserver):
 
175
 
 
176
    def test_open_containing(self):
 
177
        self.assertRaises(NotBranchError, Branch.open_containing,
 
178
                          self.get_remote_url(''))
 
179
        self.assertRaises(NotBranchError, Branch.open_containing,
 
180
                          self.get_remote_url('g/p/q'))
 
181
        b = Branch.initialize('.')
 
182
        branch, relpath = Branch.open_containing(self.get_remote_url(''))
 
183
        self.assertEqual('', relpath)
 
184
        branch, relpath = Branch.open_containing(self.get_remote_url('g/p/q'))
 
185
        self.assertEqual('g/p/q', relpath)
 
186
        
143
187
# TODO: rewrite this as a regular unittest, without relying on the displayed output        
144
188
#         >>> from bzrlib.commit import commit
145
189
#         >>> bzrlib.trace.silent = True
158
202
#         Added 0 revisions.
159
203
#         >>> br1.text_store.total_size() == br2.text_store.total_size()
160
204
#         True
 
205
 
 
206
class InstrumentedTransaction(object):
 
207
 
 
208
    def finish(self):
 
209
        self.calls.append('finish')
 
210
 
 
211
    def __init__(self):
 
212
        self.calls = []
 
213
 
 
214
 
 
215
class TestDecorator(object):
 
216
 
 
217
    def __init__(self):
 
218
        self._calls = []
 
219
 
 
220
    def lock_read(self):
 
221
        self._calls.append('lr')
 
222
 
 
223
    def lock_write(self):
 
224
        self._calls.append('lw')
 
225
 
 
226
    def unlock(self):
 
227
        self._calls.append('ul')
 
228
 
 
229
    @needs_read_lock
 
230
    def do_with_read(self):
 
231
        return 1
 
232
 
 
233
    @needs_read_lock
 
234
    def except_with_read(self):
 
235
        raise RuntimeError
 
236
 
 
237
    @needs_write_lock
 
238
    def do_with_write(self):
 
239
        return 2
 
240
 
 
241
    @needs_write_lock
 
242
    def except_with_write(self):
 
243
        raise RuntimeError
 
244
 
 
245
 
 
246
class TestDecorators(TestCase):
 
247
 
 
248
    def test_needs_read_lock(self):
 
249
        branch = TestDecorator()
 
250
        self.assertEqual(1, branch.do_with_read())
 
251
        self.assertEqual(['lr', 'ul'], branch._calls)
 
252
 
 
253
    def test_excepts_in_read_lock(self):
 
254
        branch = TestDecorator()
 
255
        self.assertRaises(RuntimeError, branch.except_with_read)
 
256
        self.assertEqual(['lr', 'ul'], branch._calls)
 
257
 
 
258
    def test_needs_write_lock(self):
 
259
        branch = TestDecorator()
 
260
        self.assertEqual(2, branch.do_with_write())
 
261
        self.assertEqual(['lw', 'ul'], branch._calls)
 
262
 
 
263
    def test_excepts_in_write_lock(self):
 
264
        branch = TestDecorator()
 
265
        self.assertRaises(RuntimeError, branch.except_with_write)
 
266
        self.assertEqual(['lw', 'ul'], branch._calls)
 
267
 
 
268
 
 
269
class TestBranchTransaction(TestCaseInTempDir):
 
270
 
 
271
    def setUp(self):
 
272
        super(TestBranchTransaction, self).setUp()
 
273
        self.branch = Branch.initialize('.')
 
274
        
 
275
    def test_default_get_transaction(self):
 
276
        """branch.get_transaction on a new branch should give a PassThrough."""
 
277
        self.failUnless(isinstance(self.branch.get_transaction(),
 
278
                                   transactions.PassThroughTransaction))
 
279
 
 
280
    def test__set_new_transaction(self):
 
281
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
282
 
 
283
    def test__set_over_existing_transaction_raises(self):
 
284
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
285
        self.assertRaises(errors.LockError,
 
286
                          self.branch._set_transaction,
 
287
                          transactions.ReadOnlyTransaction())
 
288
 
 
289
    def test_finish_no_transaction_raises(self):
 
290
        self.assertRaises(errors.LockError, self.branch._finish_transaction)
 
291
 
 
292
    def test_finish_readonly_transaction_works(self):
 
293
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
294
        self.branch._finish_transaction()
 
295
        self.assertEqual(None, self.branch._transaction)
 
296
 
 
297
    def test_unlock_calls_finish(self):
 
298
        self.branch.lock_read()
 
299
        transaction = InstrumentedTransaction()
 
300
        self.branch._transaction = transaction
 
301
        self.branch.unlock()
 
302
        self.assertEqual(['finish'], transaction.calls)
 
303
 
 
304
    def test_lock_read_acquires_ro_transaction(self):
 
305
        self.branch.lock_read()
 
306
        self.failUnless(isinstance(self.branch.get_transaction(),
 
307
                                   transactions.ReadOnlyTransaction))
 
308
        self.branch.unlock()
 
309
        
 
310
    def test_lock_write_acquires_passthrough_transaction(self):
 
311
        self.branch.lock_write()
 
312
        # cannot use get_transaction as its magic
 
313
        self.failUnless(isinstance(self.branch._transaction,
 
314
                                   transactions.PassThroughTransaction))
 
315
        self.branch.unlock()