~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_branch.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2011-05-18 13:02:52 UTC
  • mfrom: (5830.3.6 i18n-msgfmt)
  • Revision ID: pqm@pqm.ubuntu.com-20110518130252-ky96qcvzt6o0zg3f
(mbp) add build_mo command to setup.py (INADA Naoki)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2012, 2016 Canonical Ltd
 
1
# Copyright (C) 2006-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
"""Tests for the Branch facility that are not interface  tests.
18
18
 
19
 
For interface tests see `tests/per_branch/*.py`.
 
19
For interface tests see tests/per_branch/*.py.
20
20
 
21
21
For concrete class tests see this file, and for meta-branch tests
22
22
also see this file.
28
28
    branch as _mod_branch,
29
29
    bzrdir,
30
30
    config,
31
 
    controldir,
32
31
    errors,
 
32
    symbol_versioning,
33
33
    tests,
34
34
    trace,
35
35
    urlutils,
36
36
    )
37
 
from bzrlib.branchfmt.fullhistory import (
38
 
    BzrBranch5,
39
 
    BzrBranchFormat5,
40
 
    )
41
37
 
42
38
 
43
39
class TestDefaultFormat(tests.TestCase):
78
74
        url = self.get_url()
79
75
        bdir = bzrdir.BzrDirMetaFormat1().initialize(url)
80
76
        bdir.create_repository()
81
 
        branch = BzrBranchFormat5().initialize(bdir)
 
77
        branch = _mod_branch.BzrBranchFormat5().initialize(bdir)
82
78
        t = self.get_transport()
83
79
        self.log("branch instance is %r" % branch)
84
 
        self.assertTrue(isinstance(branch, BzrBranch5))
 
80
        self.assert_(isinstance(branch, _mod_branch.BzrBranch5))
85
81
        self.assertIsDirectory('.', t)
86
82
        self.assertIsDirectory('.bzr/branch', t)
87
83
        self.assertIsDirectory('.bzr/branch/lock', t)
105
101
    # recursive section - that is, it appends the branch name.
106
102
 
107
103
 
108
 
class SampleBranchFormat(_mod_branch.BranchFormatMetadir):
 
104
class SampleBranchFormat(_mod_branch.BranchFormat):
109
105
    """A sample format
110
106
 
111
107
    this format is initializable, unsupported to aid in testing the
112
108
    open and open_downlevel routines.
113
109
    """
114
110
 
115
 
    @classmethod
116
 
    def get_format_string(cls):
 
111
    def get_format_string(self):
117
112
        """See BzrBranchFormat.get_format_string()."""
118
113
        return "Sample branch format."
119
114
 
120
 
    def initialize(self, a_bzrdir, name=None, repository=None,
121
 
                   append_revisions_only=None):
 
115
    def initialize(self, a_bzrdir, name=None, repository=None):
122
116
        """Format 4 branches cannot be created."""
123
117
        t = a_bzrdir.get_branch_transport(self, name=name)
124
118
        t.put_bytes('format', self.get_format_string())
127
121
    def is_supported(self):
128
122
        return False
129
123
 
130
 
    def open(self, transport, name=None, _found=False, ignore_fallbacks=False,
131
 
             possible_transports=None):
 
124
    def open(self, transport, name=None, _found=False, ignore_fallbacks=False):
132
125
        return "opened branch."
133
126
 
134
127
 
137
130
SampleSupportedBranchFormatString = "Sample supported branch format."
138
131
 
139
132
# And the format class can then reference the constant to avoid skew.
140
 
class SampleSupportedBranchFormat(_mod_branch.BranchFormatMetadir):
 
133
class SampleSupportedBranchFormat(_mod_branch.BranchFormat):
141
134
    """A sample supported format."""
142
135
 
143
 
    @classmethod
144
 
    def get_format_string(cls):
 
136
    def get_format_string(self):
145
137
        """See BzrBranchFormat.get_format_string()."""
146
138
        return SampleSupportedBranchFormatString
147
139
 
148
 
    def initialize(self, a_bzrdir, name=None, append_revisions_only=None):
 
140
    def initialize(self, a_bzrdir, name=None):
149
141
        t = a_bzrdir.get_branch_transport(self, name=name)
150
142
        t.put_bytes('format', self.get_format_string())
151
143
        return 'A branch'
152
144
 
153
 
    def open(self, transport, name=None, _found=False, ignore_fallbacks=False,
154
 
             possible_transports=None):
 
145
    def open(self, transport, name=None, _found=False, ignore_fallbacks=False):
155
146
        return "opened supported branch."
156
147
 
157
148
 
169
160
    def initialize(self, a_bzrdir, name=None):
170
161
        raise NotImplementedError(self.initialize)
171
162
 
172
 
    def open(self, transport, name=None, _found=False, ignore_fallbacks=False,
173
 
             possible_transports=None):
 
163
    def open(self, transport, name=None, _found=False, ignore_fallbacks=False):
174
164
        raise NotImplementedError(self.open)
175
165
 
176
166
 
186
176
            dir = format._matchingbzrdir.initialize(url)
187
177
            dir.create_repository()
188
178
            format.initialize(dir)
189
 
            found_format = _mod_branch.BranchFormatMetadir.find_format(dir)
 
179
            found_format = _mod_branch.BranchFormat.find_format(dir)
190
180
            self.assertIsInstance(found_format, format.__class__)
191
 
        check_format(BzrBranchFormat5(), "bar")
 
181
        check_format(_mod_branch.BzrBranchFormat5(), "bar")
192
182
 
193
183
    def test_find_format_factory(self):
194
184
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
201
191
        b = _mod_branch.Branch.open(self.get_url())
202
192
        self.assertEqual(b, "opened supported branch.")
203
193
 
204
 
    def test_from_string(self):
205
 
        self.assertIsInstance(
206
 
            SampleBranchFormat.from_string("Sample branch format."),
207
 
            SampleBranchFormat)
208
 
        self.assertRaises(AssertionError,
209
 
            SampleBranchFormat.from_string, "Different branch format.")
210
 
 
211
194
    def test_find_format_not_branch(self):
212
195
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
213
196
        self.assertRaises(errors.NotBranchError,
214
 
                          _mod_branch.BranchFormatMetadir.find_format,
 
197
                          _mod_branch.BranchFormat.find_format,
215
198
                          dir)
216
199
 
217
200
    def test_find_format_unknown_format(self):
218
201
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
219
202
        SampleBranchFormat().initialize(dir)
220
203
        self.assertRaises(errors.UnknownFormatError,
221
 
                          _mod_branch.BranchFormatMetadir.find_format,
 
204
                          _mod_branch.BranchFormat.find_format,
222
205
                          dir)
223
206
 
224
 
    def test_find_format_with_features(self):
225
 
        tree = self.make_branch_and_tree('.', format='2a')
226
 
        tree.branch.update_feature_flags({"name": "optional"})
227
 
        found_format = _mod_branch.BranchFormatMetadir.find_format(tree.bzrdir)
228
 
        self.assertIsInstance(found_format, _mod_branch.BranchFormatMetadir)
229
 
        self.assertEqual(found_format.features.get("name"), "optional")
230
 
        tree.branch.update_feature_flags({"name": None})
231
 
        branch = _mod_branch.Branch.open('.')
232
 
        self.assertEqual(branch._format.features, {})
 
207
    def test_register_unregister_format(self):
 
208
        # Test the deprecated format registration functions
 
209
        format = SampleBranchFormat()
 
210
        # make a control dir
 
211
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
212
        # make a branch
 
213
        format.initialize(dir)
 
214
        # register a format for it.
 
215
        self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
 
216
            _mod_branch.BranchFormat.register_format, format)
 
217
        # which branch.Open will refuse (not supported)
 
218
        self.assertRaises(errors.UnsupportedFormatError,
 
219
                          _mod_branch.Branch.open, self.get_url())
 
220
        self.make_branch_and_tree('foo')
 
221
        # but open_downlevel will work
 
222
        self.assertEqual(
 
223
            format.open(dir),
 
224
            bzrdir.BzrDir.open(self.get_url()).open_branch(unsupported=True))
 
225
        # unregister the format
 
226
        self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
 
227
            _mod_branch.BranchFormat.unregister_format, format)
 
228
        self.make_branch_and_tree('bar')
233
229
 
234
230
 
235
231
class TestBranchFormatRegistry(tests.TestCase):
242
238
        self.assertIs(None, self.registry.get_default())
243
239
        format = SampleBranchFormat()
244
240
        self.registry.set_default(format)
245
 
        self.assertEqual(format, self.registry.get_default())
 
241
        self.assertEquals(format, self.registry.get_default())
246
242
 
247
243
    def test_register_unregister_format(self):
248
244
        format = SampleBranchFormat()
249
245
        self.registry.register(format)
250
 
        self.assertEqual(format,
 
246
        self.assertEquals(format,
251
247
            self.registry.get("Sample branch format."))
252
248
        self.registry.remove(format)
253
249
        self.assertRaises(KeyError, self.registry.get,
255
251
 
256
252
    def test_get_all(self):
257
253
        format = SampleBranchFormat()
258
 
        self.assertEqual([], self.registry._get_all())
 
254
        self.assertEquals([], self.registry._get_all())
259
255
        self.registry.register(format)
260
 
        self.assertEqual([format], self.registry._get_all())
 
256
        self.assertEquals([format], self.registry._get_all())
261
257
 
262
258
    def test_register_extra(self):
263
259
        format = SampleExtraBranchFormat()
264
 
        self.assertEqual([], self.registry._get_all())
 
260
        self.assertEquals([], self.registry._get_all())
265
261
        self.registry.register_extra(format)
266
 
        self.assertEqual([format], self.registry._get_all())
 
262
        self.assertEquals([format], self.registry._get_all())
267
263
 
268
264
    def test_register_extra_lazy(self):
269
 
        self.assertEqual([], self.registry._get_all())
 
265
        self.assertEquals([], self.registry._get_all())
270
266
        self.registry.register_extra_lazy("bzrlib.tests.test_branch",
271
267
            "SampleExtraBranchFormat")
272
268
        formats = self.registry._get_all()
273
 
        self.assertEqual(1, len(formats))
 
269
        self.assertEquals(1, len(formats))
274
270
        self.assertIsInstance(formats[0], SampleExtraBranchFormat)
275
271
 
276
272
 
337
333
        self.assertPathDoesNotExist('a/.bzr/branch/parent')
338
334
        self.assertEqual('http://example.com', branch.get_parent())
339
335
        branch.set_push_location('sftp://example.com')
340
 
        conf = branch.get_config_stack()
341
 
        self.assertEqual('sftp://example.com', conf.get('push_location'))
 
336
        config = branch.get_config()._get_branch_data_config()
 
337
        self.assertEqual('sftp://example.com',
 
338
                         config.get_user_option('push_location'))
342
339
        branch.set_bound_location('ftp://example.com')
343
340
        self.assertPathDoesNotExist('a/.bzr/branch/bound')
344
341
        self.assertEqual('ftp://example.com', branch.get_bound_location())
345
342
 
 
343
    def test_set_revision_history(self):
 
344
        builder = self.make_branch_builder('.', format=self.get_format_name())
 
345
        builder.build_snapshot('foo', None,
 
346
            [('add', ('', None, 'directory', None))],
 
347
            message='foo')
 
348
        builder.build_snapshot('bar', None, [], message='bar')
 
349
        branch = builder.get_branch()
 
350
        branch.lock_write()
 
351
        self.addCleanup(branch.unlock)
 
352
        self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
 
353
            branch.set_revision_history, ['foo', 'bar'])
 
354
        self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
 
355
                branch.set_revision_history, ['foo'])
 
356
        self.assertRaises(errors.NotLefthandHistory,
 
357
            self.applyDeprecated, symbol_versioning.deprecated_in((2, 4, 0)),
 
358
            branch.set_revision_history, ['bar'])
 
359
 
346
360
    def do_checkout_test(self, lightweight=False):
347
361
        tree = self.make_branch_and_tree('source',
348
362
            format=self.get_format_name_subtree())
376
390
    def test_light_checkout_with_references(self):
377
391
        self.do_checkout_test(lightweight=True)
378
392
 
 
393
    def test_set_push(self):
 
394
        branch = self.make_branch('source', format=self.get_format_name())
 
395
        branch.get_config().set_user_option('push_location', 'old',
 
396
            store=config.STORE_LOCATION)
 
397
        warnings = []
 
398
        def warning(*args):
 
399
            warnings.append(args[0] % args[1:])
 
400
        _warning = trace.warning
 
401
        trace.warning = warning
 
402
        try:
 
403
            branch.set_push_location('new')
 
404
        finally:
 
405
            trace.warning = _warning
 
406
        self.assertEqual(warnings[0], 'Value "new" is masked by "old" from '
 
407
                         'locations.conf')
 
408
 
379
409
 
380
410
class TestBranch6(TestBranch67, tests.TestCaseWithTransport):
381
411
 
452
482
 
453
483
    def make_branch(self, location, format=None):
454
484
        if format is None:
455
 
            format = controldir.format_registry.make_bzrdir('1.9')
 
485
            format = bzrdir.format_registry.make_bzrdir('1.9')
456
486
            format.set_branch_format(_mod_branch.BzrBranchFormat8())
457
487
        return tests.TestCaseWithTransport.make_branch(
458
488
            self, location, format=format)
510
540
        self.assertEqual(('path3', 'location3'),
511
541
                         branch.get_reference_info('file-id'))
512
542
 
513
 
    def _recordParentMapCalls(self, repo):
514
 
        self._parent_map_calls = []
515
 
        orig_get_parent_map = repo.revisions.get_parent_map
516
 
        def get_parent_map(q):
517
 
            q = list(q)
518
 
            self._parent_map_calls.extend([e[0] for e in q])
519
 
            return orig_get_parent_map(q)
520
 
        repo.revisions.get_parent_map = get_parent_map
521
 
 
522
 
 
523
543
class TestBranchReference(tests.TestCaseWithTransport):
524
544
    """Tests for the branch reference facility."""
525
545
 
539
559
        self.assertEqual(opened_branch.base, target_branch.base)
540
560
 
541
561
    def test_get_reference(self):
542
 
        """For a BranchReference, get_reference should return the location."""
 
562
        """For a BranchReference, get_reference should reutrn the location."""
543
563
        branch = self.make_branch('target')
544
564
        checkout = branch.create_checkout('checkout', lightweight=True)
545
565
        reference_url = branch.bzrdir.root_transport.abspath('') + '/'
555
575
    def test_constructor(self):
556
576
        """Check that creating a BranchHooks instance has the right defaults."""
557
577
        hooks = _mod_branch.BranchHooks()
 
578
        self.assertTrue("set_rh" in hooks, "set_rh not in %s" % hooks)
558
579
        self.assertTrue("post_push" in hooks, "post_push not in %s" % hooks)
559
580
        self.assertTrue("post_commit" in hooks, "post_commit not in %s" % hooks)
560
581
        self.assertTrue("pre_commit" in hooks, "pre_commit not in %s" % hooks)
624
645
    def setUp(self):
625
646
        super(TestBranchOptions, self).setUp()
626
647
        self.branch = self.make_branch('.')
627
 
        self.config_stack = self.branch.get_config_stack()
 
648
        self.config = self.branch.get_config()
628
649
 
629
650
    def check_append_revisions_only(self, expected_value, value=None):
630
651
        """Set append_revisions_only in config and check its interpretation."""
631
652
        if value is not None:
632
 
            self.config_stack.set('append_revisions_only', value)
 
653
            self.config.set_user_option('append_revisions_only', value)
633
654
        self.assertEqual(expected_value,
634
 
                         self.branch.get_append_revisions_only())
 
655
                         self.branch._get_append_revisions_only())
635
656
 
636
657
    def test_valid_append_revisions_only(self):
637
 
        self.assertEqual(None,
638
 
                          self.config_stack.get('append_revisions_only'))
 
658
        self.assertEquals(None,
 
659
                          self.config.get_user_option('append_revisions_only'))
639
660
        self.check_append_revisions_only(None)
640
661
        self.check_append_revisions_only(False, 'False')
641
662
        self.check_append_revisions_only(True, 'True')
653
674
        self.check_append_revisions_only(None, 'not-a-bool')
654
675
        self.assertLength(1, self.warnings)
655
676
        self.assertEqual(
656
 
            'Value "not-a-bool" is not valid for "append_revisions_only"',
 
677
            'Value "not-a-bool" is not a boolean for "append_revisions_only"',
657
678
            self.warnings[0])
658
679
 
659
 
    def test_use_fresh_values(self):
660
 
        copy = _mod_branch.Branch.open(self.branch.base)
661
 
        copy.lock_write()
662
 
        try:
663
 
            copy.get_config_stack().set('foo', 'bar')
664
 
        finally:
665
 
            copy.unlock()
666
 
        self.assertFalse(self.branch.is_locked())
667
 
        # Since the branch is locked, the option value won't be saved on disk
668
 
        # so trying to access the config of locked branch via another older
669
 
        # non-locked branch object pointing to the same branch is not supported
670
 
        self.assertEqual(None, self.branch.get_config_stack().get('foo'))
671
 
        # Using a newly created branch object works as expected
672
 
        fresh = _mod_branch.Branch.open(self.branch.base)
673
 
        self.assertEqual('bar', fresh.get_config_stack().get('foo'))
674
 
 
675
 
    def test_set_from_config_get_from_config_stack(self):
676
 
        self.branch.lock_write()
677
 
        self.addCleanup(self.branch.unlock)
678
 
        self.branch.get_config().set_user_option('foo', 'bar')
679
 
        result = self.branch.get_config_stack().get('foo')
680
 
        # https://bugs.launchpad.net/bzr/+bug/948344
681
 
        self.expectFailure('BranchStack uses cache after set_user_option',
682
 
                           self.assertEqual, 'bar', result)
683
 
 
684
 
    def test_set_from_config_stack_get_from_config(self):
685
 
        self.branch.lock_write()
686
 
        self.addCleanup(self.branch.unlock)
687
 
        self.branch.get_config_stack().set('foo', 'bar')
688
 
        # Since the branch is locked, the option value won't be saved on disk
689
 
        # so mixing get() and get_user_option() is broken by design.
690
 
        self.assertEqual(None,
691
 
                         self.branch.get_config().get_user_option('foo'))
692
 
 
693
 
    def test_set_delays_write_when_branch_is_locked(self):
694
 
        self.branch.lock_write()
695
 
        self.addCleanup(self.branch.unlock)
696
 
        self.branch.get_config_stack().set('foo', 'bar')
697
 
        copy = _mod_branch.Branch.open(self.branch.base)
698
 
        result = copy.get_config_stack().get('foo')
699
 
        # Accessing from a different branch object is like accessing from a
700
 
        # different process: the option has not been saved yet and the new
701
 
        # value cannot be seen.
702
 
        self.assertIs(None, result)
703
 
 
704
680
 
705
681
class TestPullResult(tests.TestCase):
706
682
 
 
683
    def test_pull_result_to_int(self):
 
684
        # to support old code, the pull result can be used as an int
 
685
        r = _mod_branch.PullResult()
 
686
        r.old_revno = 10
 
687
        r.new_revno = 20
 
688
        # this usage of results is not recommended for new code (because it
 
689
        # doesn't describe very well what happened), but for api stability
 
690
        # it's still supported
 
691
        self.assertEqual(self.applyDeprecated(
 
692
            symbol_versioning.deprecated_in((2, 3, 0)),
 
693
            r.__int__),
 
694
            10)
 
695
 
707
696
    def test_report_changed(self):
708
697
        r = _mod_branch.PullResult()
709
698
        r.old_revid = "old-revid"
713
702
        f = StringIO()
714
703
        r.report(f)
715
704
        self.assertEqual("Now on revision 20.\n", f.getvalue())
716
 
        self.assertEqual("Now on revision 20.\n", f.getvalue())
717
705
 
718
706
    def test_report_unchanged(self):
719
707
        r = _mod_branch.PullResult()
721
709
        r.new_revid = "same-revid"
722
710
        f = StringIO()
723
711
        r.report(f)
724
 
        self.assertEqual("No revisions or tags to pull.\n", f.getvalue())
 
712
        self.assertEqual("No revisions to pull.\n", f.getvalue())
 
713
 
 
714
 
 
715
class _StubLockable(object):
 
716
    """Helper for TestRunWithWriteLockedTarget."""
 
717
 
 
718
    def __init__(self, calls, unlock_exc=None):
 
719
        self.calls = calls
 
720
        self.unlock_exc = unlock_exc
 
721
 
 
722
    def lock_write(self):
 
723
        self.calls.append('lock_write')
 
724
 
 
725
    def unlock(self):
 
726
        self.calls.append('unlock')
 
727
        if self.unlock_exc is not None:
 
728
            raise self.unlock_exc
 
729
 
 
730
 
 
731
class _ErrorFromCallable(Exception):
 
732
    """Helper for TestRunWithWriteLockedTarget."""
 
733
 
 
734
 
 
735
class _ErrorFromUnlock(Exception):
 
736
    """Helper for TestRunWithWriteLockedTarget."""
 
737
 
 
738
 
 
739
class TestRunWithWriteLockedTarget(tests.TestCase):
 
740
    """Tests for _run_with_write_locked_target."""
 
741
 
 
742
    def setUp(self):
 
743
        tests.TestCase.setUp(self)
 
744
        self._calls = []
 
745
 
 
746
    def func_that_returns_ok(self):
 
747
        self._calls.append('func called')
 
748
        return 'ok'
 
749
 
 
750
    def func_that_raises(self):
 
751
        self._calls.append('func called')
 
752
        raise _ErrorFromCallable()
 
753
 
 
754
    def test_success_unlocks(self):
 
755
        lockable = _StubLockable(self._calls)
 
756
        result = _mod_branch._run_with_write_locked_target(
 
757
            lockable, self.func_that_returns_ok)
 
758
        self.assertEqual('ok', result)
 
759
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
 
760
 
 
761
    def test_exception_unlocks_and_propagates(self):
 
762
        lockable = _StubLockable(self._calls)
 
763
        self.assertRaises(_ErrorFromCallable,
 
764
                          _mod_branch._run_with_write_locked_target,
 
765
                          lockable, self.func_that_raises)
 
766
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
 
767
 
 
768
    def test_callable_succeeds_but_error_during_unlock(self):
 
769
        lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
 
770
        self.assertRaises(_ErrorFromUnlock,
 
771
                          _mod_branch._run_with_write_locked_target,
 
772
                          lockable, self.func_that_returns_ok)
 
773
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
 
774
 
 
775
    def test_error_during_unlock_does_not_mask_original_error(self):
 
776
        lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
 
777
        self.assertRaises(_ErrorFromCallable,
 
778
                          _mod_branch._run_with_write_locked_target,
 
779
                          lockable, self.func_that_raises)
 
780
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)