~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_branch.py

NEWS section template into a separate file

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Tests for the Branch facility that are not interface  tests.
18
18
 
19
 
For interface tests see tests/branch_implementations/*.py.
 
19
For interface tests see tests/per_branch/*.py.
20
20
 
21
21
For concrete class tests see this file, and for meta-branch tests
22
22
also see this file.
40
40
    BzrBranch5,
41
41
    BzrBranchFormat5,
42
42
    BzrBranchFormat6,
 
43
    BzrBranchFormat7,
43
44
    PullResult,
 
45
    _run_with_write_locked_target,
44
46
    )
45
 
from bzrlib.bzrdir import (BzrDirMetaFormat1, BzrDirMeta1, 
 
47
from bzrlib.bzrdir import (BzrDirMetaFormat1, BzrDirMeta1,
46
48
                           BzrDir, BzrDirFormat)
47
49
from bzrlib.errors import (NotBranchError,
48
50
                           UnknownFormatError,
59
61
    def test_default_format(self):
60
62
        # update this if you change the default branch format
61
63
        self.assertIsInstance(BranchFormat.get_default_format(),
62
 
                BzrBranchFormat6)
 
64
                BzrBranchFormat7)
63
65
 
64
66
    def test_default_format_is_same_as_bzrdir_default(self):
65
67
        # XXX: it might be nice if there was only one place the default was
66
 
        # set, but at the moment that's not true -- mbp 20070814 -- 
 
68
        # set, but at the moment that's not true -- mbp 20070814 --
67
69
        # https://bugs.launchpad.net/bzr/+bug/132376
68
70
        self.assertEqual(BranchFormat.get_default_format(),
69
71
                BzrDirFormat.get_default_format().get_branch_format())
133
135
class SampleBranchFormat(BranchFormat):
134
136
    """A sample format
135
137
 
136
 
    this format is initializable, unsupported to aid in testing the 
 
138
    this format is initializable, unsupported to aid in testing the
137
139
    open and open_downlevel routines.
138
140
    """
139
141
 
150
152
    def is_supported(self):
151
153
        return False
152
154
 
153
 
    def open(self, transport, _found=False):
 
155
    def open(self, transport, _found=False, ignore_fallbacks=False):
154
156
        return "opened branch."
155
157
 
156
158
 
160
162
    def test_find_format(self):
161
163
        # is the right format object found for a branch?
162
164
        # create a branch with a few known format objects.
163
 
        # this is not quite the same as 
 
165
        # this is not quite the same as
164
166
        self.build_tree(["foo/", "bar/"])
165
167
        def check_format(format, url):
166
168
            dir = format._matchingbzrdir.initialize(url)
169
171
            found_format = BranchFormat.find_format(dir)
170
172
            self.failUnless(isinstance(found_format, format.__class__))
171
173
        check_format(BzrBranchFormat5(), "bar")
172
 
        
 
174
 
173
175
    def test_find_format_not_branch(self):
174
176
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
175
177
        self.assertRaises(NotBranchError,
227
229
        branch = self.make_branch('a', format=self.get_format_name())
228
230
        self.failUnlessExists('a/.bzr/branch/last-revision')
229
231
        self.failIfExists('a/.bzr/branch/revision-history')
 
232
        self.failIfExists('a/.bzr/branch/references')
230
233
 
231
234
    def test_config(self):
232
235
        """Ensure that all configuration data is stored in the branch"""
243
246
        self.assertEqual('ftp://bazaar-vcs.org', branch.get_bound_location())
244
247
 
245
248
    def test_set_revision_history(self):
246
 
        tree = self.make_branch_and_memory_tree('.',
247
 
            format=self.get_format_name())
248
 
        tree.lock_write()
249
 
        try:
250
 
            tree.add('.')
251
 
            tree.commit('foo', rev_id='foo')
252
 
            tree.commit('bar', rev_id='bar')
253
 
            tree.branch.set_revision_history(['foo', 'bar'])
254
 
            tree.branch.set_revision_history(['foo'])
255
 
            self.assertRaises(errors.NotLefthandHistory,
256
 
                              tree.branch.set_revision_history, ['bar'])
257
 
        finally:
258
 
            tree.unlock()
 
249
        builder = self.make_branch_builder('.', format=self.get_format_name())
 
250
        builder.build_snapshot('foo', None,
 
251
            [('add', ('', None, 'directory', None))],
 
252
            message='foo')
 
253
        builder.build_snapshot('bar', None, [], message='bar')
 
254
        branch = builder.get_branch()
 
255
        branch.lock_write()
 
256
        self.addCleanup(branch.unlock)
 
257
        branch.set_revision_history(['foo', 'bar'])
 
258
        branch.set_revision_history(['foo'])
 
259
        self.assertRaises(errors.NotLefthandHistory,
 
260
                          branch.set_revision_history, ['bar'])
259
261
 
260
262
    def do_checkout_test(self, lightweight=False):
261
263
        tree = self.make_branch_and_tree('source',
318
320
    def get_format_name_subtree(self):
319
321
        return "dirstate-with-subtree"
320
322
 
321
 
    def test_set_stacked_on_errors(self):
 
323
    def test_set_stacked_on_url_errors(self):
322
324
        branch = self.make_branch('a', format=self.get_format_name())
323
325
        self.assertRaises(errors.UnstackableBranchFormat,
324
 
            branch.set_stacked_on, None)
 
326
            branch.set_stacked_on_url, None)
325
327
 
326
328
    def test_default_stacked_location(self):
327
329
        branch = self.make_branch('a', format=self.get_format_name())
328
 
        self.assertRaises(errors.UnstackableBranchFormat, branch.get_stacked_on)
 
330
        self.assertRaises(errors.UnstackableBranchFormat, branch.get_stacked_on_url)
329
331
 
330
332
 
331
333
class TestBranch7(TestBranch67, TestCaseWithTransport):
334
336
        return _mod_branch.BzrBranch7
335
337
 
336
338
    def get_format_name(self):
337
 
        return "development"
 
339
        return "1.9"
338
340
 
339
341
    def get_format_name_subtree(self):
340
342
        return "development-subtree"
341
343
 
342
 
    def test_set_stacked_on_unstackable_repo(self):
 
344
    def test_set_stacked_on_url_unstackable_repo(self):
343
345
        repo = self.make_repository('a', format='dirstate-tags')
344
346
        control = repo.bzrdir
345
347
        branch = _mod_branch.BzrBranchFormat7().initialize(control)
346
348
        target = self.make_branch('b')
347
349
        self.assertRaises(errors.UnstackableRepositoryFormat,
348
 
            branch.set_stacked_on, target.base)
 
350
            branch.set_stacked_on_url, target.base)
 
351
 
 
352
    def test_clone_stacked_on_unstackable_repo(self):
 
353
        repo = self.make_repository('a', format='dirstate-tags')
 
354
        control = repo.bzrdir
 
355
        branch = _mod_branch.BzrBranchFormat7().initialize(control)
 
356
        # Calling clone should not raise UnstackableRepositoryFormat.
 
357
        cloned_bzrdir = control.clone('cloned')
349
358
 
350
359
    def _test_default_stacked_location(self):
351
360
        branch = self.make_branch('a', format=self.get_format_name())
352
 
        self.assertRaises(errors.NotStacked, branch.get_stacked_on)
 
361
        self.assertRaises(errors.NotStacked, branch.get_stacked_on_url)
353
362
 
354
363
    def test_stack_and_unstack(self):
355
364
        branch = self.make_branch('a', format=self.get_format_name())
356
365
        target = self.make_branch_and_tree('b', format=self.get_format_name())
357
 
        branch.set_stacked_on(target.branch.base)
358
 
        self.assertEqual(target.branch.base, branch.get_stacked_on())
 
366
        branch.set_stacked_on_url(target.branch.base)
 
367
        self.assertEqual(target.branch.base, branch.get_stacked_on_url())
359
368
        revid = target.commit('foo')
360
369
        self.assertTrue(branch.repository.has_revision(revid))
361
 
        branch.set_stacked_on(None)
362
 
        self.assertRaises(errors.NotStacked, branch.get_stacked_on)
 
370
        branch.set_stacked_on_url(None)
 
371
        self.assertRaises(errors.NotStacked, branch.get_stacked_on_url)
363
372
        self.assertFalse(branch.repository.has_revision(revid))
364
373
 
365
374
    def test_open_opens_stacked_reference(self):
366
375
        branch = self.make_branch('a', format=self.get_format_name())
367
376
        target = self.make_branch_and_tree('b', format=self.get_format_name())
368
 
        branch.set_stacked_on(target.branch.base)
 
377
        branch.set_stacked_on_url(target.branch.base)
369
378
        branch = branch.bzrdir.open_branch()
370
379
        revid = target.commit('foo')
371
380
        self.assertTrue(branch.repository.has_revision(revid))
372
381
 
373
382
 
 
383
class BzrBranch8(TestCaseWithTransport):
 
384
 
 
385
    def make_branch(self, location, format=None):
 
386
        if format is None:
 
387
            format = bzrdir.format_registry.make_bzrdir('1.9')
 
388
            format.set_branch_format(_mod_branch.BzrBranchFormat8())
 
389
        return TestCaseWithTransport.make_branch(self, location, format=format)
 
390
 
 
391
    def create_branch_with_reference(self):
 
392
        branch = self.make_branch('branch')
 
393
        branch._set_all_reference_info({'file-id': ('path', 'location')})
 
394
        return branch
 
395
 
 
396
    @staticmethod
 
397
    def instrument_branch(branch, gets):
 
398
        old_get = branch._transport.get
 
399
        def get(*args, **kwargs):
 
400
            gets.append((args, kwargs))
 
401
            return old_get(*args, **kwargs)
 
402
        branch._transport.get = get
 
403
 
 
404
    def test_reference_info_caching_read_locked(self):
 
405
        gets = []
 
406
        branch = self.create_branch_with_reference()
 
407
        branch.lock_read()
 
408
        self.addCleanup(branch.unlock)
 
409
        self.instrument_branch(branch, gets)
 
410
        branch.get_reference_info('file-id')
 
411
        branch.get_reference_info('file-id')
 
412
        self.assertEqual(1, len(gets))
 
413
 
 
414
    def test_reference_info_caching_read_unlocked(self):
 
415
        gets = []
 
416
        branch = self.create_branch_with_reference()
 
417
        self.instrument_branch(branch, gets)
 
418
        branch.get_reference_info('file-id')
 
419
        branch.get_reference_info('file-id')
 
420
        self.assertEqual(2, len(gets))
 
421
 
 
422
    def test_reference_info_caching_write_locked(self):
 
423
        gets = []
 
424
        branch = self.make_branch('branch')
 
425
        branch.lock_write()
 
426
        self.instrument_branch(branch, gets)
 
427
        self.addCleanup(branch.unlock)
 
428
        branch._set_all_reference_info({'file-id': ('path2', 'location2')})
 
429
        path, location = branch.get_reference_info('file-id')
 
430
        self.assertEqual(0, len(gets))
 
431
        self.assertEqual('path2', path)
 
432
        self.assertEqual('location2', location)
 
433
 
 
434
    def test_reference_info_caches_cleared(self):
 
435
        branch = self.make_branch('branch')
 
436
        branch.lock_write()
 
437
        branch.set_reference_info('file-id', 'path2', 'location2')
 
438
        branch.unlock()
 
439
        doppelganger = Branch.open('branch')
 
440
        doppelganger.set_reference_info('file-id', 'path3', 'location3')
 
441
        self.assertEqual(('path3', 'location3'),
 
442
                         branch.get_reference_info('file-id'))
 
443
 
374
444
class TestBranchReference(TestCaseWithTransport):
375
445
    """Tests for the branch reference facility."""
376
446
 
417
487
    def test_installed_hooks_are_BranchHooks(self):
418
488
        """The installed hooks object should be a BranchHooks."""
419
489
        # the installed hooks are saved in self._preserved_hooks.
420
 
        self.assertIsInstance(self._preserved_hooks[_mod_branch.Branch], BranchHooks)
 
490
        self.assertIsInstance(self._preserved_hooks[_mod_branch.Branch][1],
 
491
            BranchHooks)
421
492
 
422
493
 
423
494
class TestPullResult(TestCase):
432
503
        # it's still supported
433
504
        a = "%d revisions pulled" % r
434
505
        self.assertEqual(a, "10 revisions pulled")
 
506
 
 
507
 
 
508
 
 
509
class _StubLockable(object):
 
510
    """Helper for TestRunWithWriteLockedTarget."""
 
511
 
 
512
    def __init__(self, calls, unlock_exc=None):
 
513
        self.calls = calls
 
514
        self.unlock_exc = unlock_exc
 
515
 
 
516
    def lock_write(self):
 
517
        self.calls.append('lock_write')
 
518
 
 
519
    def unlock(self):
 
520
        self.calls.append('unlock')
 
521
        if self.unlock_exc is not None:
 
522
            raise self.unlock_exc
 
523
 
 
524
 
 
525
class _ErrorFromCallable(Exception):
 
526
    """Helper for TestRunWithWriteLockedTarget."""
 
527
 
 
528
 
 
529
class _ErrorFromUnlock(Exception):
 
530
    """Helper for TestRunWithWriteLockedTarget."""
 
531
 
 
532
 
 
533
class TestRunWithWriteLockedTarget(TestCase):
 
534
    """Tests for _run_with_write_locked_target."""
 
535
 
 
536
    def setUp(self):
 
537
        TestCase.setUp(self)
 
538
        self._calls = []
 
539
 
 
540
    def func_that_returns_ok(self):
 
541
        self._calls.append('func called')
 
542
        return 'ok'
 
543
 
 
544
    def func_that_raises(self):
 
545
        self._calls.append('func called')
 
546
        raise _ErrorFromCallable()
 
547
 
 
548
    def test_success_unlocks(self):
 
549
        lockable = _StubLockable(self._calls)
 
550
        result = _run_with_write_locked_target(
 
551
            lockable, self.func_that_returns_ok)
 
552
        self.assertEqual('ok', result)
 
553
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
 
554
 
 
555
    def test_exception_unlocks_and_propagates(self):
 
556
        lockable = _StubLockable(self._calls)
 
557
        self.assertRaises(_ErrorFromCallable,
 
558
            _run_with_write_locked_target, lockable, self.func_that_raises)
 
559
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
 
560
 
 
561
    def test_callable_succeeds_but_error_during_unlock(self):
 
562
        lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
 
563
        self.assertRaises(_ErrorFromUnlock,
 
564
            _run_with_write_locked_target, lockable, self.func_that_returns_ok)
 
565
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
 
566
 
 
567
    def test_error_during_unlock_does_not_mask_original_error(self):
 
568
        lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
 
569
        self.assertRaises(_ErrorFromCallable,
 
570
            _run_with_write_locked_target, lockable, self.func_that_raises)
 
571
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
 
572
 
 
573