~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_branch.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-04-09 20:23:07 UTC
  • mfrom: (4265.1.4 bbc-merge)
  • Revision ID: pqm@pqm.ubuntu.com-20090409202307-n0depb16qepoe21o
(jam) Change _fetch_uses_deltas = False for CHK repos until we can
        write a better fix.

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Tests for the Branch facility that are not interface  tests.
18
18
 
27
27
from bzrlib import (
28
28
    branch as _mod_branch,
29
29
    bzrdir,
 
30
    config,
30
31
    errors,
 
32
    trace,
31
33
    urlutils,
32
34
    )
33
35
from bzrlib.branch import (
37
39
    BranchReferenceFormat,
38
40
    BzrBranch5,
39
41
    BzrBranchFormat5,
 
42
    BzrBranchFormat6,
40
43
    PullResult,
 
44
    _run_with_write_locked_target,
41
45
    )
42
 
from bzrlib.bzrdir import (BzrDirMetaFormat1, BzrDirMeta1, 
 
46
from bzrlib.bzrdir import (BzrDirMetaFormat1, BzrDirMeta1,
43
47
                           BzrDir, BzrDirFormat)
44
48
from bzrlib.errors import (NotBranchError,
45
49
                           UnknownFormatError,
50
54
from bzrlib.tests import TestCase, TestCaseWithTransport
51
55
from bzrlib.transport import get_transport
52
56
 
 
57
 
53
58
class TestDefaultFormat(TestCase):
54
59
 
 
60
    def test_default_format(self):
 
61
        # update this if you change the default branch format
 
62
        self.assertIsInstance(BranchFormat.get_default_format(),
 
63
                BzrBranchFormat6)
 
64
 
 
65
    def test_default_format_is_same_as_bzrdir_default(self):
 
66
        # XXX: it might be nice if there was only one place the default was
 
67
        # set, but at the moment that's not true -- mbp 20070814 --
 
68
        # https://bugs.launchpad.net/bzr/+bug/132376
 
69
        self.assertEqual(BranchFormat.get_default_format(),
 
70
                BzrDirFormat.get_default_format().get_branch_format())
 
71
 
55
72
    def test_get_set_default_format(self):
 
73
        # set the format and then set it back again
56
74
        old_format = BranchFormat.get_default_format()
57
 
        # default is 5
58
 
        self.assertTrue(isinstance(old_format, BzrBranchFormat5))
59
75
        BranchFormat.set_default_format(SampleBranchFormat())
60
76
        try:
61
77
            # the default branch format is used by the meta dir format
93
109
                                   ensure_config_dir_exists)
94
110
        ensure_config_dir_exists()
95
111
        fn = locations_config_filename()
 
112
        # write correct newlines to locations.conf
 
113
        # by default ConfigObj uses native line-endings for new files
 
114
        # but uses already existing line-endings if file is not empty
 
115
        f = open(fn, 'wb')
 
116
        try:
 
117
            f.write('# comment\n')
 
118
        finally:
 
119
            f.close()
 
120
 
96
121
        branch = self.make_branch('.', format='knit')
97
122
        branch.set_push_location('foo')
98
123
        local_path = urlutils.local_path_from_url(branch.base[:-1])
99
 
        self.assertFileEqual("[%s]\n"
 
124
        self.assertFileEqual("# comment\n"
 
125
                             "[%s]\n"
100
126
                             "push_location = foo\n"
101
 
                             "push_location:policy = norecurse" % local_path,
 
127
                             "push_location:policy = norecurse\n" % local_path,
102
128
                             fn)
103
129
 
104
130
    # TODO RBC 20051029 test getting a push location from a branch in a
108
134
class SampleBranchFormat(BranchFormat):
109
135
    """A sample format
110
136
 
111
 
    this format is initializable, unsupported to aid in testing the 
 
137
    this format is initializable, unsupported to aid in testing the
112
138
    open and open_downlevel routines.
113
139
    """
114
140
 
125
151
    def is_supported(self):
126
152
        return False
127
153
 
128
 
    def open(self, transport, _found=False):
 
154
    def open(self, transport, _found=False, ignore_fallbacks=False):
129
155
        return "opened branch."
130
156
 
131
157
 
135
161
    def test_find_format(self):
136
162
        # is the right format object found for a branch?
137
163
        # create a branch with a few known format objects.
138
 
        # this is not quite the same as 
 
164
        # this is not quite the same as
139
165
        self.build_tree(["foo/", "bar/"])
140
166
        def check_format(format, url):
141
167
            dir = format._matchingbzrdir.initialize(url)
144
170
            found_format = BranchFormat.find_format(dir)
145
171
            self.failUnless(isinstance(found_format, format.__class__))
146
172
        check_format(BzrBranchFormat5(), "bar")
147
 
        
 
173
 
148
174
    def test_find_format_not_branch(self):
149
175
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
150
176
        self.assertRaises(NotBranchError,
176
202
        self.make_branch_and_tree('bar')
177
203
 
178
204
 
179
 
class TestBranch6(TestCaseWithTransport):
 
205
class TestBranch67(object):
 
206
    """Common tests for both branch 6 and 7 which are mostly the same."""
 
207
 
 
208
    def get_format_name(self):
 
209
        raise NotImplementedError(self.get_format_name)
 
210
 
 
211
    def get_format_name_subtree(self):
 
212
        raise NotImplementedError(self.get_format_name)
 
213
 
 
214
    def get_class(self):
 
215
        raise NotImplementedError(self.get_class)
180
216
 
181
217
    def test_creation(self):
182
218
        format = BzrDirMetaFormat1()
183
219
        format.set_branch_format(_mod_branch.BzrBranchFormat6())
184
220
        branch = self.make_branch('a', format=format)
185
 
        self.assertIsInstance(branch, _mod_branch.BzrBranch6)
186
 
        branch = self.make_branch('b', format='dirstate-tags')
187
 
        self.assertIsInstance(branch, _mod_branch.BzrBranch6)
 
221
        self.assertIsInstance(branch, self.get_class())
 
222
        branch = self.make_branch('b', format=self.get_format_name())
 
223
        self.assertIsInstance(branch, self.get_class())
188
224
        branch = _mod_branch.Branch.open('a')
189
 
        self.assertIsInstance(branch, _mod_branch.BzrBranch6)
 
225
        self.assertIsInstance(branch, self.get_class())
190
226
 
191
227
    def test_layout(self):
192
 
        branch = self.make_branch('a', format='dirstate-tags')
 
228
        branch = self.make_branch('a', format=self.get_format_name())
193
229
        self.failUnlessExists('a/.bzr/branch/last-revision')
194
230
        self.failIfExists('a/.bzr/branch/revision-history')
195
231
 
196
232
    def test_config(self):
197
233
        """Ensure that all configuration data is stored in the branch"""
198
 
        branch = self.make_branch('a', format='dirstate-tags')
 
234
        branch = self.make_branch('a', format=self.get_format_name())
199
235
        branch.set_parent('http://bazaar-vcs.org')
200
236
        self.failIfExists('a/.bzr/branch/parent')
201
237
        self.assertEqual('http://bazaar-vcs.org', branch.get_parent())
208
244
        self.assertEqual('ftp://bazaar-vcs.org', branch.get_bound_location())
209
245
 
210
246
    def test_set_revision_history(self):
211
 
        tree = self.make_branch_and_memory_tree('.',
212
 
            format='dirstate-tags')
213
 
        tree.lock_write()
214
 
        try:
215
 
            tree.add('.')
216
 
            tree.commit('foo', rev_id='foo')
217
 
            tree.commit('bar', rev_id='bar')
218
 
            tree.branch.set_revision_history(['foo', 'bar'])
219
 
            tree.branch.set_revision_history(['foo'])
220
 
            self.assertRaises(errors.NotLefthandHistory,
221
 
                              tree.branch.set_revision_history, ['bar'])
222
 
        finally:
223
 
            tree.unlock()
224
 
 
225
 
    def test_append_revision(self):
226
 
        tree = self.make_branch_and_tree('branch1',
227
 
            format='dirstate-tags')
228
 
        tree.lock_write()
229
 
        try:
230
 
            tree.commit('foo', rev_id='foo')
231
 
            tree.commit('bar', rev_id='bar')
232
 
            tree.commit('baz', rev_id='baz')
233
 
            tree.set_last_revision('bar')
234
 
            tree.branch.set_last_revision_info(2, 'bar')
235
 
            tree.commit('qux', rev_id='qux')
236
 
            tree.add_parent_tree_id('baz')
237
 
            tree.commit('qux', rev_id='quxx')
238
 
            tree.branch.set_last_revision_info(0, 'null:')
239
 
            self.assertRaises(errors.NotLeftParentDescendant,
240
 
                              tree.branch.append_revision, 'bar')
241
 
            tree.branch.append_revision('foo')
242
 
            self.assertRaises(errors.NotLeftParentDescendant,
243
 
                              tree.branch.append_revision, 'baz')
244
 
            tree.branch.append_revision('bar')
245
 
            tree.branch.append_revision('baz')
246
 
            self.assertRaises(errors.NotLeftParentDescendant,
247
 
                              tree.branch.append_revision, 'quxx')
248
 
        finally:
249
 
            tree.unlock()
 
247
        builder = self.make_branch_builder('.', format=self.get_format_name())
 
248
        builder.build_snapshot('foo', None,
 
249
            [('add', ('', None, 'directory', None))],
 
250
            message='foo')
 
251
        builder.build_snapshot('bar', None, [], message='bar')
 
252
        branch = builder.get_branch()
 
253
        branch.lock_write()
 
254
        self.addCleanup(branch.unlock)
 
255
        branch.set_revision_history(['foo', 'bar'])
 
256
        branch.set_revision_history(['foo'])
 
257
        self.assertRaises(errors.NotLefthandHistory,
 
258
                          branch.set_revision_history, ['bar'])
250
259
 
251
260
    def do_checkout_test(self, lightweight=False):
252
 
        tree = self.make_branch_and_tree('source', format='dirstate-with-subtree')
 
261
        tree = self.make_branch_and_tree('source',
 
262
            format=self.get_format_name_subtree())
253
263
        subtree = self.make_branch_and_tree('source/subtree',
254
 
            format='dirstate-with-subtree')
 
264
            format=self.get_format_name_subtree())
255
265
        subsubtree = self.make_branch_and_tree('source/subtree/subsubtree',
256
 
            format='dirstate-with-subtree')
 
266
            format=self.get_format_name_subtree())
257
267
        self.build_tree(['source/subtree/file',
258
268
                         'source/subtree/subsubtree/file'])
259
269
        subsubtree.add('file')
274
284
        else:
275
285
            self.assertEndsWith(subbranch.base, 'target/subtree/subsubtree/')
276
286
 
277
 
 
278
287
    def test_checkout_with_references(self):
279
288
        self.do_checkout_test()
280
289
 
281
290
    def test_light_checkout_with_references(self):
282
291
        self.do_checkout_test(lightweight=True)
283
292
 
 
293
    def test_set_push(self):
 
294
        branch = self.make_branch('source', format=self.get_format_name())
 
295
        branch.get_config().set_user_option('push_location', 'old',
 
296
            store=config.STORE_LOCATION)
 
297
        warnings = []
 
298
        def warning(*args):
 
299
            warnings.append(args[0] % args[1:])
 
300
        _warning = trace.warning
 
301
        trace.warning = warning
 
302
        try:
 
303
            branch.set_push_location('new')
 
304
        finally:
 
305
            trace.warning = _warning
 
306
        self.assertEqual(warnings[0], 'Value "new" is masked by "old" from '
 
307
                         'locations.conf')
 
308
 
 
309
 
 
310
class TestBranch6(TestBranch67, TestCaseWithTransport):
 
311
 
 
312
    def get_class(self):
 
313
        return _mod_branch.BzrBranch6
 
314
 
 
315
    def get_format_name(self):
 
316
        return "dirstate-tags"
 
317
 
 
318
    def get_format_name_subtree(self):
 
319
        return "dirstate-with-subtree"
 
320
 
 
321
    def test_set_stacked_on_url_errors(self):
 
322
        branch = self.make_branch('a', format=self.get_format_name())
 
323
        self.assertRaises(errors.UnstackableBranchFormat,
 
324
            branch.set_stacked_on_url, None)
 
325
 
 
326
    def test_default_stacked_location(self):
 
327
        branch = self.make_branch('a', format=self.get_format_name())
 
328
        self.assertRaises(errors.UnstackableBranchFormat, branch.get_stacked_on_url)
 
329
 
 
330
 
 
331
class TestBranch7(TestBranch67, TestCaseWithTransport):
 
332
 
 
333
    def get_class(self):
 
334
        return _mod_branch.BzrBranch7
 
335
 
 
336
    def get_format_name(self):
 
337
        return "1.9"
 
338
 
 
339
    def get_format_name_subtree(self):
 
340
        return "development-subtree"
 
341
 
 
342
    def test_set_stacked_on_url_unstackable_repo(self):
 
343
        repo = self.make_repository('a', format='dirstate-tags')
 
344
        control = repo.bzrdir
 
345
        branch = _mod_branch.BzrBranchFormat7().initialize(control)
 
346
        target = self.make_branch('b')
 
347
        self.assertRaises(errors.UnstackableRepositoryFormat,
 
348
            branch.set_stacked_on_url, target.base)
 
349
 
 
350
    def test_clone_stacked_on_unstackable_repo(self):
 
351
        repo = self.make_repository('a', format='dirstate-tags')
 
352
        control = repo.bzrdir
 
353
        branch = _mod_branch.BzrBranchFormat7().initialize(control)
 
354
        # Calling clone should not raise UnstackableRepositoryFormat.
 
355
        cloned_bzrdir = control.clone('cloned')
 
356
 
 
357
    def _test_default_stacked_location(self):
 
358
        branch = self.make_branch('a', format=self.get_format_name())
 
359
        self.assertRaises(errors.NotStacked, branch.get_stacked_on_url)
 
360
 
 
361
    def test_stack_and_unstack(self):
 
362
        branch = self.make_branch('a', format=self.get_format_name())
 
363
        target = self.make_branch_and_tree('b', format=self.get_format_name())
 
364
        branch.set_stacked_on_url(target.branch.base)
 
365
        self.assertEqual(target.branch.base, branch.get_stacked_on_url())
 
366
        revid = target.commit('foo')
 
367
        self.assertTrue(branch.repository.has_revision(revid))
 
368
        branch.set_stacked_on_url(None)
 
369
        self.assertRaises(errors.NotStacked, branch.get_stacked_on_url)
 
370
        self.assertFalse(branch.repository.has_revision(revid))
 
371
 
 
372
    def test_open_opens_stacked_reference(self):
 
373
        branch = self.make_branch('a', format=self.get_format_name())
 
374
        target = self.make_branch_and_tree('b', format=self.get_format_name())
 
375
        branch.set_stacked_on_url(target.branch.base)
 
376
        branch = branch.bzrdir.open_branch()
 
377
        revid = target.commit('foo')
 
378
        self.assertTrue(branch.repository.has_revision(revid))
 
379
 
 
380
 
284
381
class TestBranchReference(TestCaseWithTransport):
285
382
    """Tests for the branch reference facility."""
286
383
 
298
395
        opened_branch = branch_dir.open_branch()
299
396
        self.assertEqual(opened_branch.base, target_branch.base)
300
397
 
 
398
    def test_get_reference(self):
 
399
        """For a BranchReference, get_reference should reutrn the location."""
 
400
        branch = self.make_branch('target')
 
401
        checkout = branch.create_checkout('checkout', lightweight=True)
 
402
        reference_url = branch.bzrdir.root_transport.abspath('') + '/'
 
403
        # if the api for create_checkout changes to return different checkout types
 
404
        # then this file read will fail.
 
405
        self.assertFileEqual(reference_url, 'checkout/.bzr/branch/location')
 
406
        self.assertEqual(reference_url,
 
407
            _mod_branch.BranchReferenceFormat().get_reference(checkout.bzrdir))
 
408
 
301
409
 
302
410
class TestHooks(TestCase):
303
411
 
307
415
        self.assertTrue("set_rh" in hooks, "set_rh not in %s" % hooks)
308
416
        self.assertTrue("post_push" in hooks, "post_push not in %s" % hooks)
309
417
        self.assertTrue("post_commit" in hooks, "post_commit not in %s" % hooks)
 
418
        self.assertTrue("pre_commit" in hooks, "pre_commit not in %s" % hooks)
310
419
        self.assertTrue("post_pull" in hooks, "post_pull not in %s" % hooks)
311
420
        self.assertTrue("post_uncommit" in hooks, "post_uncommit not in %s" % hooks)
 
421
        self.assertTrue("post_change_branch_tip" in hooks,
 
422
                        "post_change_branch_tip not in %s" % hooks)
312
423
 
313
424
    def test_installed_hooks_are_BranchHooks(self):
314
425
        """The installed hooks object should be a BranchHooks."""
315
426
        # the installed hooks are saved in self._preserved_hooks.
316
 
        self.assertIsInstance(self._preserved_hooks[_mod_branch.Branch], BranchHooks)
317
 
 
318
 
    def test_install_hook_raises_unknown_hook(self):
319
 
        """install_hook should raise UnknownHook if a hook is unknown."""
320
 
        hooks = BranchHooks()
321
 
        self.assertRaises(UnknownHook, hooks.install_hook, 'silly', None)
322
 
 
323
 
    def test_install_hook_appends_known_hook(self):
324
 
        """install_hook should append the callable for known hooks."""
325
 
        hooks = BranchHooks()
326
 
        hooks.install_hook('set_rh', None)
327
 
        self.assertEqual(hooks['set_rh'], [None])
 
427
        self.assertIsInstance(self._preserved_hooks[_mod_branch.Branch][1],
 
428
            BranchHooks)
328
429
 
329
430
 
330
431
class TestPullResult(TestCase):
339
440
        # it's still supported
340
441
        a = "%d revisions pulled" % r
341
442
        self.assertEqual(a, "10 revisions pulled")
 
443
 
 
444
 
 
445
 
 
446
class _StubLockable(object):
 
447
    """Helper for TestRunWithWriteLockedTarget."""
 
448
 
 
449
    def __init__(self, calls, unlock_exc=None):
 
450
        self.calls = calls
 
451
        self.unlock_exc = unlock_exc
 
452
 
 
453
    def lock_write(self):
 
454
        self.calls.append('lock_write')
 
455
 
 
456
    def unlock(self):
 
457
        self.calls.append('unlock')
 
458
        if self.unlock_exc is not None:
 
459
            raise self.unlock_exc
 
460
 
 
461
 
 
462
class _ErrorFromCallable(Exception):
 
463
    """Helper for TestRunWithWriteLockedTarget."""
 
464
 
 
465
 
 
466
class _ErrorFromUnlock(Exception):
 
467
    """Helper for TestRunWithWriteLockedTarget."""
 
468
 
 
469
 
 
470
class TestRunWithWriteLockedTarget(TestCase):
 
471
    """Tests for _run_with_write_locked_target."""
 
472
 
 
473
    def setUp(self):
 
474
        TestCase.setUp(self)
 
475
        self._calls = []
 
476
 
 
477
    def func_that_returns_ok(self):
 
478
        self._calls.append('func called')
 
479
        return 'ok'
 
480
 
 
481
    def func_that_raises(self):
 
482
        self._calls.append('func called')
 
483
        raise _ErrorFromCallable()
 
484
 
 
485
    def test_success_unlocks(self):
 
486
        lockable = _StubLockable(self._calls)
 
487
        result = _run_with_write_locked_target(
 
488
            lockable, self.func_that_returns_ok)
 
489
        self.assertEqual('ok', result)
 
490
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
 
491
 
 
492
    def test_exception_unlocks_and_propagates(self):
 
493
        lockable = _StubLockable(self._calls)
 
494
        self.assertRaises(_ErrorFromCallable,
 
495
            _run_with_write_locked_target, lockable, self.func_that_raises)
 
496
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
 
497
 
 
498
    def test_callable_succeeds_but_error_during_unlock(self):
 
499
        lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
 
500
        self.assertRaises(_ErrorFromUnlock,
 
501
            _run_with_write_locked_target, lockable, self.func_that_returns_ok)
 
502
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
 
503
 
 
504
    def test_error_during_unlock_does_not_mask_original_error(self):
 
505
        lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
 
506
        self.assertRaises(_ErrorFromCallable,
 
507
            _run_with_write_locked_target, lockable, self.func_that_raises)
 
508
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
 
509
 
 
510