~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_branch_implementations.py

Implement BranchTestProviderAdapter, so tests now run across all branch formats.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# (C) 2005 Canonical Ltd
 
1
# (C) 2005, 2006 Canonical Ltd
2
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
 
17
"""Tests for branch implementations - tests a branch format."""
 
18
 
17
19
import os
18
20
import sys
19
21
 
22
24
from bzrlib.clone import copy_branch
23
25
from bzrlib.commit import commit
24
26
import bzrlib.errors as errors
25
 
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
 
27
from bzrlib.errors import (NoSuchRevision,
 
28
                           UninitializableFormat,
 
29
                           UnlistableBranch,
 
30
                           NotBranchError,
 
31
                           )
26
32
import bzrlib.gpg
27
33
from bzrlib.osutils import getcwd
28
 
from bzrlib.tests import TestCase, TestCaseInTempDir
 
34
from bzrlib.tests import TestCase, TestCaseInTempDir, TestSkipped
29
35
from bzrlib.tests.HTTPTestUtil import TestCaseWithWebserver
30
36
from bzrlib.trace import mutter
31
37
import bzrlib.transactions as transactions
35
41
# doesn't request any files that could have been avoided, by 
36
42
# hooking into the Transport.
37
43
 
38
 
class TestBranch(TestCaseInTempDir):
 
44
 
 
45
class TestCaseWithBranch(TestCaseInTempDir):
 
46
 
 
47
    def setUp(self):
 
48
        super(TestCaseWithBranch, self).setUp()
 
49
        self.branch = None
 
50
 
 
51
    def get_branch(self):
 
52
        if self.branch is None:
 
53
            self.branch = self.make_branch('.')
 
54
        return self.branch
 
55
 
 
56
    def make_branch(self, relpath):
 
57
        try:
 
58
            return self.branch_format.initialize(relpath)
 
59
        except UninitializableFormat:
 
60
            raise TestSkipped("Format %s is not initializable.")
 
61
 
 
62
 
 
63
class TestBranch(TestCaseWithBranch):
39
64
 
40
65
    def test_append_revisions(self):
41
66
        """Test appending more than one revision"""
42
 
        br = Branch.initialize(u".")
 
67
        br = self.get_branch()
43
68
        br.append_revision("rev1")
44
69
        self.assertEquals(br.revision_history(), ["rev1",])
45
70
        br.append_revision("rev2", "rev3")
50
75
        from bzrlib.fetch import Fetcher
51
76
        os.mkdir('b1')
52
77
        os.mkdir('b2')
53
 
        b1 = Branch.initialize('b1')
54
 
        b2 = Branch.initialize('b2')
 
78
        b1 = self.make_branch('b1')
 
79
        b2 = self.make_branch('b2')
55
80
        file('b1/foo', 'w').write('hello')
56
81
        b1.working_tree().add(['foo'], ['foo-id'])
57
82
        b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=False)
67
92
        eq(tree.get_file_text('foo-id'), 'hello')
68
93
 
69
94
    def test_revision_tree(self):
70
 
        b1 = Branch.initialize(u'.')
 
95
        b1 = self.get_branch()
71
96
        b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=True)
72
97
        tree = b1.revision_tree('revision-1')
73
98
        tree = b1.revision_tree(None)
78
103
    def get_unbalanced_branch_pair(self):
79
104
        """Return two branches, a and b, with one file in a."""
80
105
        os.mkdir('a')
81
 
        br_a = Branch.initialize("a")
 
106
        br_a = self.make_branch('a')
82
107
        file('a/b', 'wb').write('b')
83
108
        br_a.working_tree().add('b')
84
109
        commit(br_a, "silly commit", rev_id='A')
85
110
        os.mkdir('b')
86
 
        br_b = Branch.initialize("b")
 
111
        br_b = self.make_branch('b')
87
112
        return br_a, br_b
88
113
 
89
114
    def get_balanced_branch_pair(self):
118
143
    def test_copy_partial(self):
119
144
        """Copy only part of the history of a branch."""
120
145
        self.build_tree(['a/', 'a/one'])
121
 
        br_a = Branch.initialize('a')
 
146
        br_a = self.make_branch('a')
122
147
        br_a.working_tree().add(['one'])
123
148
        br_a.working_tree().commit('commit one', rev_id='u@d-1')
124
149
        self.build_tree(['a/two'])
131
156
        
132
157
    def test_record_initial_ghost_merge(self):
133
158
        """A pending merge with no revision present is still a merge."""
134
 
        branch = Branch.initialize(u'.')
 
159
        branch = self.get_branch()
135
160
        branch.working_tree().add_pending_merge('non:existent@rev--ision--0--2')
136
161
        branch.working_tree().commit('pretend to merge nonexistent-revision', rev_id='first')
137
162
        rev = branch.get_revision(branch.last_revision())
141
166
        self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
142
167
 
143
168
    def test_bad_revision(self):
144
 
        branch = Branch.initialize(u'.')
145
 
        self.assertRaises(errors.InvalidRevisionId, branch.get_revision, None)
 
169
        self.assertRaises(errors.InvalidRevisionId, self.get_branch().get_revision, None)
146
170
 
147
171
# TODO 20051003 RBC:
148
172
# compare the gpg-to-sign info for a commit with a ghost and 
151
175
        
152
176
    def test_pending_merges(self):
153
177
        """Tracking pending-merged revisions."""
154
 
        b = Branch.initialize(u'.')
 
178
        b = self.get_branch()
155
179
        wt = b.working_tree()
156
180
        self.assertEquals(wt.pending_merges(), [])
157
181
        wt.add_pending_merge('foo@azkhazan-123123-abcabc')
173
197
        self.assertEquals(wt.pending_merges(), [])
174
198
 
175
199
    def test_sign_existing_revision(self):
176
 
        branch = Branch.initialize(u'.')
 
200
        branch = self.get_branch()
177
201
        branch.working_tree().commit("base", allow_pointless=True, rev_id='A')
178
202
        from bzrlib.testament import Testament
179
203
        branch.sign_revision('A', bzrlib.gpg.LoopbackGPGStrategy(None))
181
205
                         branch.revision_store.get('A', 'sig').read())
182
206
 
183
207
    def test_store_signature(self):
184
 
        branch = Branch.initialize(u'.')
 
208
        branch = self.get_branch()
185
209
        branch.store_revision_signature(bzrlib.gpg.LoopbackGPGStrategy(None),
186
210
                                        'FOO', 'A')
187
211
        self.assertEqual('FOO', branch.revision_store.get('A', 'sig').read())
188
212
 
189
213
    def test__relcontrolfilename(self):
190
 
        branch = Branch.initialize(u'.')
191
 
        self.assertEqual('.bzr/%25', branch._rel_controlfilename('%'))
 
214
        self.assertEqual('.bzr/%25', self.get_branch()._rel_controlfilename('%'))
192
215
        
193
216
    def test__relcontrolfilename_empty(self):
194
 
        branch = Branch.initialize(u'.')
195
 
        self.assertEqual('.bzr', branch._rel_controlfilename(''))
 
217
        self.assertEqual('.bzr', self.get_branch()._rel_controlfilename(''))
196
218
 
197
219
    def test_nicks(self):
198
220
        """Branch nicknames"""
199
221
        os.mkdir('bzr.dev')
200
 
        branch = Branch.initialize('bzr.dev')
 
222
        branch = self.make_branch('bzr.dev')
201
223
        self.assertEqual(branch.nick, 'bzr.dev')
202
224
        os.rename('bzr.dev', 'bzr.ab')
203
225
        branch = Branch.open('bzr.ab')
215
237
    def test_commit_nicks(self):
216
238
        """Nicknames are committed to the revision"""
217
239
        os.mkdir('bzr.dev')
218
 
        branch = Branch.initialize('bzr.dev')
 
240
        branch = self.get_branch()
219
241
        branch.nick = "My happy branch"
220
242
        branch.working_tree().commit('My commit respect da nick.')
221
243
        committed = branch.get_revision(branch.last_revision())
230
252
                          self.get_remote_url(''))
231
253
        self.assertRaises(NotBranchError, Branch.open_containing,
232
254
                          self.get_remote_url('g/p/q'))
233
 
        b = Branch.initialize(u'.')
 
255
        try:
 
256
            branch = self.branch_format.initialize('.')
 
257
        except UninitializableFormat:
 
258
            raise TestSkipped("Format %s is not initializable.")
234
259
        branch, relpath = Branch.open_containing(self.get_remote_url(''))
235
260
        self.assertEqual('', relpath)
236
261
        branch, relpath = Branch.open_containing(self.get_remote_url('g/p/q'))
318
343
        self.assertEqual(['lw', 'ul'], branch._calls)
319
344
 
320
345
 
321
 
class TestBranchTransaction(TestCaseInTempDir):
 
346
class TestBranchTransaction(TestCaseWithBranch):
322
347
 
323
348
    def setUp(self):
324
349
        super(TestBranchTransaction, self).setUp()
325
 
        self.branch = Branch.initialize(u'.')
326
 
        
 
350
        self.branch = None
 
351
 
327
352
    def test_default_get_transaction(self):
328
353
        """branch.get_transaction on a new branch should give a PassThrough."""
329
 
        self.failUnless(isinstance(self.branch.get_transaction(),
 
354
        self.failUnless(isinstance(self.get_branch().get_transaction(),
330
355
                                   transactions.PassThroughTransaction))
331
356
 
332
357
    def test__set_new_transaction(self):
333
 
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
358
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
334
359
 
335
360
    def test__set_over_existing_transaction_raises(self):
336
 
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
361
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
337
362
        self.assertRaises(errors.LockError,
338
 
                          self.branch._set_transaction,
 
363
                          self.get_branch()._set_transaction,
339
364
                          transactions.ReadOnlyTransaction())
340
365
 
341
366
    def test_finish_no_transaction_raises(self):
342
 
        self.assertRaises(errors.LockError, self.branch._finish_transaction)
 
367
        self.assertRaises(errors.LockError, self.get_branch()._finish_transaction)
343
368
 
344
369
    def test_finish_readonly_transaction_works(self):
345
 
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
346
 
        self.branch._finish_transaction()
347
 
        self.assertEqual(None, self.branch._transaction)
 
370
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
 
371
        self.get_branch()._finish_transaction()
 
372
        self.assertEqual(None, self.get_branch()._transaction)
348
373
 
349
374
    def test_unlock_calls_finish(self):
350
 
        self.branch.lock_read()
 
375
        self.get_branch().lock_read()
351
376
        transaction = InstrumentedTransaction()
352
 
        self.branch._transaction = transaction
353
 
        self.branch.unlock()
 
377
        self.get_branch()._transaction = transaction
 
378
        self.get_branch().unlock()
354
379
        self.assertEqual(['finish'], transaction.calls)
355
380
 
356
381
    def test_lock_read_acquires_ro_transaction(self):
357
 
        self.branch.lock_read()
358
 
        self.failUnless(isinstance(self.branch.get_transaction(),
 
382
        self.get_branch().lock_read()
 
383
        self.failUnless(isinstance(self.get_branch().get_transaction(),
359
384
                                   transactions.ReadOnlyTransaction))
360
 
        self.branch.unlock()
 
385
        self.get_branch().unlock()
361
386
        
362
387
    def test_lock_write_acquires_passthrough_transaction(self):
363
 
        self.branch.lock_write()
 
388
        self.get_branch().lock_write()
364
389
        # cannot use get_transaction as its magic
365
 
        self.failUnless(isinstance(self.branch._transaction,
 
390
        self.failUnless(isinstance(self.get_branch()._transaction,
366
391
                                   transactions.PassThroughTransaction))
367
 
        self.branch.unlock()
368
 
 
369
 
 
370
 
class TestBranchPushLocations(TestCaseInTempDir):
371
 
 
372
 
    def setUp(self):
373
 
        super(TestBranchPushLocations, self).setUp()
374
 
        self.branch = Branch.initialize(u'.')
375
 
        
 
392
        self.get_branch().unlock()
 
393
 
 
394
 
 
395
class TestBranchPushLocations(TestCaseWithBranch):
 
396
 
376
397
    def test_get_push_location_unset(self):
377
 
        self.assertEqual(None, self.branch.get_push_location())
 
398
        self.assertEqual(None, self.get_branch().get_push_location())
378
399
 
379
400
    def test_get_push_location_exact(self):
380
401
        from bzrlib.config import (branches_config_filename,
384
405
        print >> open(fn, 'wt'), ("[%s]\n"
385
406
                                  "push_location=foo" %
386
407
                                  getcwd())
387
 
        self.assertEqual("foo", self.branch.get_push_location())
 
408
        self.assertEqual("foo", self.get_branch().get_push_location())
388
409
 
389
410
    def test_set_push_location(self):
390
411
        from bzrlib.config import (branches_config_filename,
391
412
                                   ensure_config_dir_exists)
392
413
        ensure_config_dir_exists()
393
414
        fn = branches_config_filename()
394
 
        self.branch.set_push_location('foo')
 
415
        self.get_branch().set_push_location('foo')
395
416
        self.assertFileEqual("[%s]\n"
396
417
                             "push_location = foo" % getcwd(),
397
418
                             fn)
400
421
    # recursive section - that is, it appends the branch name.
401
422
 
402
423
 
403
 
class TestDefaultFormat(TestCase):
404
 
 
405
 
    def test_get_set_default_initializer(self):
406
 
        old_initializer = Branch.get_default_initializer()
407
 
        # default is BzrBranch._initialize
408
 
        self.assertEqual(branch.BzrBranch._initialize, old_initializer)
409
 
        def recorder(url):
410
 
            return "a branch %s" % url
411
 
        Branch.set_default_initializer(recorder)
412
 
        try:
413
 
            b = Branch.initialize("memory:/")
414
 
            self.assertEqual("a branch memory:/", b)
415
 
        finally:
416
 
            Branch.set_default_initializer(old_initializer)
417
 
        self.assertEqual(old_initializer, Branch.get_default_initializer())
418
 
 
419
 
 
420
 
class TestBzrBranchFormat(TestCaseInTempDir):
421
 
    """Tests for the BzrBranchFormat facility."""
422
 
 
423
 
    def test_find_format(self):
424
 
        # is the right format object found for a branch?
425
 
        # create a branch with a known format object
426
 
        self.build_tree(["foo/", "bar/"])
427
 
        def check_format(format, url):
428
 
            format.initialize(url)
429
 
            found_format = branch.BzrBranchFormat.find_format(url)
430
 
            self.failUnless(isinstance(found_format, format.__class__))
431
 
        check_format(branch.BzrBranchFormat5(), "foo")
432
 
        check_format(branch.BzrBranchFormat6(), "bar")
433
 
 
434
 
    def test_initialize_returns_branch(self):
435
 
        self.failUnless(isinstance(branch.BzrBranchFormat5().initialize("."), branch.Branch))
 
424
class TestFormat(TestCaseWithBranch):
 
425
    """Tests for the format itself."""
 
426
 
 
427
    def test_format_initialize_find_open(self):
 
428
        # loopback test to check the current format initializes to itself.
 
429
        made_branch = self.make_branch('.')
 
430
        self.assertEqual(self.branch_format,
 
431
                         branch.BzrBranchFormat.find_format('.'))
 
432
        opened_branch = branch.Branch.open('.')
 
433
        self.assertEqual(made_branch._branch_format, opened_branch._branch_format)