~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_branch.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-08-27 02:27:19 UTC
  • mfrom: (4634.3.19 gc-batching)
  • Revision ID: pqm@pqm.ubuntu.com-20090827022719-bl2yoqhpj3fcfczu
(andrew) Fix #402657: 2a fetch over dumb transport reads one group at
        a time.

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Tests for the Branch facility that are not interface  tests.
18
18
 
19
 
For interface tests see tests/branch_implementations/*.py.
 
19
For interface tests see tests/per_branch/*.py.
20
20
 
21
21
For concrete class tests see this file, and for meta-branch tests
22
22
also see this file.
40
40
    BzrBranch5,
41
41
    BzrBranchFormat5,
42
42
    BzrBranchFormat6,
 
43
    BzrBranchFormat7,
43
44
    PullResult,
 
45
    _run_with_write_locked_target,
44
46
    )
45
 
from bzrlib.bzrdir import (BzrDirMetaFormat1, BzrDirMeta1, 
 
47
from bzrlib.bzrdir import (BzrDirMetaFormat1, BzrDirMeta1,
46
48
                           BzrDir, BzrDirFormat)
47
49
from bzrlib.errors import (NotBranchError,
48
50
                           UnknownFormatError,
59
61
    def test_default_format(self):
60
62
        # update this if you change the default branch format
61
63
        self.assertIsInstance(BranchFormat.get_default_format(),
62
 
                BzrBranchFormat6)
 
64
                BzrBranchFormat7)
63
65
 
64
66
    def test_default_format_is_same_as_bzrdir_default(self):
65
67
        # XXX: it might be nice if there was only one place the default was
66
 
        # set, but at the moment that's not true -- mbp 20070814 -- 
 
68
        # set, but at the moment that's not true -- mbp 20070814 --
67
69
        # https://bugs.launchpad.net/bzr/+bug/132376
68
70
        self.assertEqual(BranchFormat.get_default_format(),
69
71
                BzrDirFormat.get_default_format().get_branch_format())
133
135
class SampleBranchFormat(BranchFormat):
134
136
    """A sample format
135
137
 
136
 
    this format is initializable, unsupported to aid in testing the 
 
138
    this format is initializable, unsupported to aid in testing the
137
139
    open and open_downlevel routines.
138
140
    """
139
141
 
150
152
    def is_supported(self):
151
153
        return False
152
154
 
153
 
    def open(self, transport, _found=False):
 
155
    def open(self, transport, _found=False, ignore_fallbacks=False):
154
156
        return "opened branch."
155
157
 
156
158
 
160
162
    def test_find_format(self):
161
163
        # is the right format object found for a branch?
162
164
        # create a branch with a few known format objects.
163
 
        # this is not quite the same as 
 
165
        # this is not quite the same as
164
166
        self.build_tree(["foo/", "bar/"])
165
167
        def check_format(format, url):
166
168
            dir = format._matchingbzrdir.initialize(url)
169
171
            found_format = BranchFormat.find_format(dir)
170
172
            self.failUnless(isinstance(found_format, format.__class__))
171
173
        check_format(BzrBranchFormat5(), "bar")
172
 
        
 
174
 
173
175
    def test_find_format_not_branch(self):
174
176
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
175
177
        self.assertRaises(NotBranchError,
227
229
        branch = self.make_branch('a', format=self.get_format_name())
228
230
        self.failUnlessExists('a/.bzr/branch/last-revision')
229
231
        self.failIfExists('a/.bzr/branch/revision-history')
 
232
        self.failIfExists('a/.bzr/branch/references')
230
233
 
231
234
    def test_config(self):
232
235
        """Ensure that all configuration data is stored in the branch"""
333
336
        return _mod_branch.BzrBranch7
334
337
 
335
338
    def get_format_name(self):
336
 
        return "development"
 
339
        return "1.9"
337
340
 
338
341
    def get_format_name_subtree(self):
339
342
        return "development-subtree"
377
380
        self.assertTrue(branch.repository.has_revision(revid))
378
381
 
379
382
 
 
383
class BzrBranch8(TestCaseWithTransport):
 
384
 
 
385
    def make_branch(self, location, format=None):
 
386
        if format is None:
 
387
            format = bzrdir.format_registry.make_bzrdir('1.9')
 
388
            format.set_branch_format(_mod_branch.BzrBranchFormat8())
 
389
        return TestCaseWithTransport.make_branch(self, location, format=format)
 
390
 
 
391
    def create_branch_with_reference(self):
 
392
        branch = self.make_branch('branch')
 
393
        branch._set_all_reference_info({'file-id': ('path', 'location')})
 
394
        return branch
 
395
 
 
396
    @staticmethod
 
397
    def instrument_branch(branch, gets):
 
398
        old_get = branch._transport.get
 
399
        def get(*args, **kwargs):
 
400
            gets.append((args, kwargs))
 
401
            return old_get(*args, **kwargs)
 
402
        branch._transport.get = get
 
403
 
 
404
    def test_reference_info_caching_read_locked(self):
 
405
        gets = []
 
406
        branch = self.create_branch_with_reference()
 
407
        branch.lock_read()
 
408
        self.addCleanup(branch.unlock)
 
409
        self.instrument_branch(branch, gets)
 
410
        branch.get_reference_info('file-id')
 
411
        branch.get_reference_info('file-id')
 
412
        self.assertEqual(1, len(gets))
 
413
 
 
414
    def test_reference_info_caching_read_unlocked(self):
 
415
        gets = []
 
416
        branch = self.create_branch_with_reference()
 
417
        self.instrument_branch(branch, gets)
 
418
        branch.get_reference_info('file-id')
 
419
        branch.get_reference_info('file-id')
 
420
        self.assertEqual(2, len(gets))
 
421
 
 
422
    def test_reference_info_caching_write_locked(self):
 
423
        gets = []
 
424
        branch = self.make_branch('branch')
 
425
        branch.lock_write()
 
426
        self.instrument_branch(branch, gets)
 
427
        self.addCleanup(branch.unlock)
 
428
        branch._set_all_reference_info({'file-id': ('path2', 'location2')})
 
429
        path, location = branch.get_reference_info('file-id')
 
430
        self.assertEqual(0, len(gets))
 
431
        self.assertEqual('path2', path)
 
432
        self.assertEqual('location2', location)
 
433
 
 
434
    def test_reference_info_caches_cleared(self):
 
435
        branch = self.make_branch('branch')
 
436
        branch.lock_write()
 
437
        branch.set_reference_info('file-id', 'path2', 'location2')
 
438
        branch.unlock()
 
439
        doppelganger = Branch.open('branch')
 
440
        doppelganger.set_reference_info('file-id', 'path3', 'location3')
 
441
        self.assertEqual(('path3', 'location3'),
 
442
                         branch.get_reference_info('file-id'))
 
443
 
380
444
class TestBranchReference(TestCaseWithTransport):
381
445
    """Tests for the branch reference facility."""
382
446
 
423
487
    def test_installed_hooks_are_BranchHooks(self):
424
488
        """The installed hooks object should be a BranchHooks."""
425
489
        # the installed hooks are saved in self._preserved_hooks.
426
 
        self.assertIsInstance(self._preserved_hooks[_mod_branch.Branch], BranchHooks)
 
490
        self.assertIsInstance(self._preserved_hooks[_mod_branch.Branch][1],
 
491
            BranchHooks)
427
492
 
428
493
 
429
494
class TestPullResult(TestCase):
438
503
        # it's still supported
439
504
        a = "%d revisions pulled" % r
440
505
        self.assertEqual(a, "10 revisions pulled")
 
506
 
 
507
 
 
508
 
 
509
class _StubLockable(object):
 
510
    """Helper for TestRunWithWriteLockedTarget."""
 
511
 
 
512
    def __init__(self, calls, unlock_exc=None):
 
513
        self.calls = calls
 
514
        self.unlock_exc = unlock_exc
 
515
 
 
516
    def lock_write(self):
 
517
        self.calls.append('lock_write')
 
518
 
 
519
    def unlock(self):
 
520
        self.calls.append('unlock')
 
521
        if self.unlock_exc is not None:
 
522
            raise self.unlock_exc
 
523
 
 
524
 
 
525
class _ErrorFromCallable(Exception):
 
526
    """Helper for TestRunWithWriteLockedTarget."""
 
527
 
 
528
 
 
529
class _ErrorFromUnlock(Exception):
 
530
    """Helper for TestRunWithWriteLockedTarget."""
 
531
 
 
532
 
 
533
class TestRunWithWriteLockedTarget(TestCase):
 
534
    """Tests for _run_with_write_locked_target."""
 
535
 
 
536
    def setUp(self):
 
537
        TestCase.setUp(self)
 
538
        self._calls = []
 
539
 
 
540
    def func_that_returns_ok(self):
 
541
        self._calls.append('func called')
 
542
        return 'ok'
 
543
 
 
544
    def func_that_raises(self):
 
545
        self._calls.append('func called')
 
546
        raise _ErrorFromCallable()
 
547
 
 
548
    def test_success_unlocks(self):
 
549
        lockable = _StubLockable(self._calls)
 
550
        result = _run_with_write_locked_target(
 
551
            lockable, self.func_that_returns_ok)
 
552
        self.assertEqual('ok', result)
 
553
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
 
554
 
 
555
    def test_exception_unlocks_and_propagates(self):
 
556
        lockable = _StubLockable(self._calls)
 
557
        self.assertRaises(_ErrorFromCallable,
 
558
            _run_with_write_locked_target, lockable, self.func_that_raises)
 
559
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
 
560
 
 
561
    def test_callable_succeeds_but_error_during_unlock(self):
 
562
        lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
 
563
        self.assertRaises(_ErrorFromUnlock,
 
564
            _run_with_write_locked_target, lockable, self.func_that_returns_ok)
 
565
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
 
566
 
 
567
    def test_error_during_unlock_does_not_mask_original_error(self):
 
568
        lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
 
569
        self.assertRaises(_ErrorFromCallable,
 
570
            _run_with_write_locked_target, lockable, self.func_that_raises)
 
571
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
 
572
 
 
573