~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_branch.py

  • Committer: Vincent Ladeuil
  • Date: 2012-02-14 17:22:37 UTC
  • mfrom: (6466 +trunk)
  • mto: This revision was merged to the branch mainline in revision 6468.
  • Revision ID: v.ladeuil+lp@free.fr-20120214172237-7dv7er3n4uy8d5m4
Merge trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2011 Canonical Ltd
 
1
# Copyright (C) 2006-2012 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
"""Tests for the Branch facility that are not interface  tests.
18
18
 
19
 
For interface tests see tests/per_branch/*.py.
 
19
For interface tests see `tests/per_branch/*.py`.
20
20
 
21
21
For concrete class tests see this file, and for meta-branch tests
22
22
also see this file.
32
32
    symbol_versioning,
33
33
    tests,
34
34
    trace,
35
 
    transport,
36
35
    urlutils,
37
36
    )
38
37
 
41
40
 
42
41
    def test_default_format(self):
43
42
        # update this if you change the default branch format
44
 
        self.assertIsInstance(_mod_branch.BranchFormat.get_default_format(),
 
43
        self.assertIsInstance(_mod_branch.format_registry.get_default(),
45
44
                _mod_branch.BzrBranchFormat7)
46
45
 
47
46
    def test_default_format_is_same_as_bzrdir_default(self):
49
48
        # set, but at the moment that's not true -- mbp 20070814 --
50
49
        # https://bugs.launchpad.net/bzr/+bug/132376
51
50
        self.assertEqual(
52
 
            _mod_branch.BranchFormat.get_default_format(),
 
51
            _mod_branch.format_registry.get_default(),
53
52
            bzrdir.BzrDirFormat.get_default_format().get_branch_format())
54
53
 
55
54
    def test_get_set_default_format(self):
56
55
        # set the format and then set it back again
57
 
        old_format = _mod_branch.BranchFormat.get_default_format()
58
 
        _mod_branch.BranchFormat.set_default_format(SampleBranchFormat())
 
56
        old_format = _mod_branch.format_registry.get_default()
 
57
        _mod_branch.format_registry.set_default(SampleBranchFormat())
59
58
        try:
60
59
            # the default branch format is used by the meta dir format
61
60
            # which is not the default bzrdir format at this point
63
62
            result = dir.create_branch()
64
63
            self.assertEqual(result, 'A branch')
65
64
        finally:
66
 
            _mod_branch.BranchFormat.set_default_format(old_format)
 
65
            _mod_branch.format_registry.set_default(old_format)
67
66
        self.assertEqual(old_format,
68
 
                         _mod_branch.BranchFormat.get_default_format())
 
67
                         _mod_branch.format_registry.get_default())
69
68
 
70
69
 
71
70
class TestBranchFormat5(tests.TestCaseWithTransport):
75
74
        url = self.get_url()
76
75
        bdir = bzrdir.BzrDirMetaFormat1().initialize(url)
77
76
        bdir.create_repository()
78
 
        branch = bdir.create_branch()
 
77
        branch = _mod_branch.BzrBranchFormat5().initialize(bdir)
79
78
        t = self.get_transport()
80
79
        self.log("branch instance is %r" % branch)
81
80
        self.assert_(isinstance(branch, _mod_branch.BzrBranch5))
102
101
    # recursive section - that is, it appends the branch name.
103
102
 
104
103
 
105
 
class SampleBranchFormat(_mod_branch.BranchFormat):
 
104
class SampleBranchFormat(_mod_branch.BranchFormatMetadir):
106
105
    """A sample format
107
106
 
108
107
    this format is initializable, unsupported to aid in testing the
109
108
    open and open_downlevel routines.
110
109
    """
111
110
 
112
 
    def get_format_string(self):
 
111
    @classmethod
 
112
    def get_format_string(cls):
113
113
        """See BzrBranchFormat.get_format_string()."""
114
114
        return "Sample branch format."
115
115
 
116
 
    def initialize(self, a_bzrdir, name=None, repository=None):
 
116
    def initialize(self, a_bzrdir, name=None, repository=None,
 
117
                   append_revisions_only=None):
117
118
        """Format 4 branches cannot be created."""
118
119
        t = a_bzrdir.get_branch_transport(self, name=name)
119
120
        t.put_bytes('format', self.get_format_string())
122
123
    def is_supported(self):
123
124
        return False
124
125
 
125
 
    def open(self, transport, name=None, _found=False, ignore_fallbacks=False):
 
126
    def open(self, transport, name=None, _found=False, ignore_fallbacks=False,
 
127
             possible_transports=None):
126
128
        return "opened branch."
127
129
 
128
130
 
131
133
SampleSupportedBranchFormatString = "Sample supported branch format."
132
134
 
133
135
# And the format class can then reference the constant to avoid skew.
134
 
class SampleSupportedBranchFormat(_mod_branch.BranchFormat):
 
136
class SampleSupportedBranchFormat(_mod_branch.BranchFormatMetadir):
135
137
    """A sample supported format."""
136
138
 
137
 
    def get_format_string(self):
 
139
    @classmethod
 
140
    def get_format_string(cls):
138
141
        """See BzrBranchFormat.get_format_string()."""
139
142
        return SampleSupportedBranchFormatString
140
143
 
141
 
    def initialize(self, a_bzrdir, name=None):
 
144
    def initialize(self, a_bzrdir, name=None, append_revisions_only=None):
142
145
        t = a_bzrdir.get_branch_transport(self, name=name)
143
146
        t.put_bytes('format', self.get_format_string())
144
147
        return 'A branch'
145
148
 
146
 
    def open(self, transport, name=None, _found=False, ignore_fallbacks=False):
 
149
    def open(self, transport, name=None, _found=False, ignore_fallbacks=False,
 
150
             possible_transports=None):
147
151
        return "opened supported branch."
148
152
 
149
153
 
161
165
    def initialize(self, a_bzrdir, name=None):
162
166
        raise NotImplementedError(self.initialize)
163
167
 
164
 
    def open(self, transport, name=None, _found=False, ignore_fallbacks=False):
 
168
    def open(self, transport, name=None, _found=False, ignore_fallbacks=False,
 
169
             possible_transports=None):
165
170
        raise NotImplementedError(self.open)
166
171
 
167
172
 
177
182
            dir = format._matchingbzrdir.initialize(url)
178
183
            dir.create_repository()
179
184
            format.initialize(dir)
180
 
            found_format = _mod_branch.BranchFormat.find_format(dir)
181
 
            self.failUnless(isinstance(found_format, format.__class__))
 
185
            found_format = _mod_branch.BranchFormatMetadir.find_format(dir)
 
186
            self.assertIsInstance(found_format, format.__class__)
182
187
        check_format(_mod_branch.BzrBranchFormat5(), "bar")
183
188
 
184
 
    def test_extra_format(self):
185
 
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
186
 
        SampleSupportedBranchFormat().initialize(dir)
187
 
        format = SampleExtraBranchFormat()
188
 
        _mod_branch.BranchFormat.register_extra_format(format)
189
 
        self.addCleanup(_mod_branch.BranchFormat.unregister_extra_format,
190
 
            format)
191
 
        self.assertTrue(format in _mod_branch.BranchFormat.get_formats())
192
 
        self.assertEquals(format,
193
 
            _mod_branch.network_format_registry.get("extra"))
194
 
 
195
189
    def test_find_format_factory(self):
196
190
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
197
191
        SampleSupportedBranchFormat().initialize(dir)
198
192
        factory = _mod_branch.MetaDirBranchFormatFactory(
199
193
            SampleSupportedBranchFormatString,
200
194
            "bzrlib.tests.test_branch", "SampleSupportedBranchFormat")
201
 
        _mod_branch.BranchFormat.register_format(factory)
202
 
        self.addCleanup(_mod_branch.BranchFormat.unregister_format, factory)
 
195
        _mod_branch.format_registry.register(factory)
 
196
        self.addCleanup(_mod_branch.format_registry.remove, factory)
203
197
        b = _mod_branch.Branch.open(self.get_url())
204
198
        self.assertEqual(b, "opened supported branch.")
205
199
 
 
200
    def test_from_string(self):
 
201
        self.assertIsInstance(
 
202
            SampleBranchFormat.from_string("Sample branch format."),
 
203
            SampleBranchFormat)
 
204
        self.assertRaises(AssertionError,
 
205
            SampleBranchFormat.from_string, "Different branch format.")
 
206
 
206
207
    def test_find_format_not_branch(self):
207
208
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
208
209
        self.assertRaises(errors.NotBranchError,
209
 
                          _mod_branch.BranchFormat.find_format,
 
210
                          _mod_branch.BranchFormatMetadir.find_format,
210
211
                          dir)
211
212
 
212
213
    def test_find_format_unknown_format(self):
213
214
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
214
215
        SampleBranchFormat().initialize(dir)
215
216
        self.assertRaises(errors.UnknownFormatError,
216
 
                          _mod_branch.BranchFormat.find_format,
 
217
                          _mod_branch.BranchFormatMetadir.find_format,
217
218
                          dir)
218
219
 
 
220
    def test_find_format_with_features(self):
 
221
        tree = self.make_branch_and_tree('.', format='2a')
 
222
        tree.branch.update_feature_flags({"name": "optional"})
 
223
        found_format = _mod_branch.BranchFormatMetadir.find_format(tree.bzrdir)
 
224
        self.assertIsInstance(found_format, _mod_branch.BranchFormatMetadir)
 
225
        self.assertEquals(found_format.features.get("name"), "optional")
 
226
        tree.branch.update_feature_flags({"name": None})
 
227
        branch = _mod_branch.Branch.open('.')
 
228
        self.assertEquals(branch._format.features, {})
 
229
 
219
230
    def test_register_unregister_format(self):
 
231
        # Test the deprecated format registration functions
220
232
        format = SampleBranchFormat()
221
233
        # make a control dir
222
234
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
223
235
        # make a branch
224
236
        format.initialize(dir)
225
237
        # register a format for it.
226
 
        _mod_branch.BranchFormat.register_format(format)
 
238
        self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
 
239
            _mod_branch.BranchFormat.register_format, format)
227
240
        # which branch.Open will refuse (not supported)
228
241
        self.assertRaises(errors.UnsupportedFormatError,
229
242
                          _mod_branch.Branch.open, self.get_url())
233
246
            format.open(dir),
234
247
            bzrdir.BzrDir.open(self.get_url()).open_branch(unsupported=True))
235
248
        # unregister the format
236
 
        _mod_branch.BranchFormat.unregister_format(format)
 
249
        self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
 
250
            _mod_branch.BranchFormat.unregister_format, format)
237
251
        self.make_branch_and_tree('bar')
238
252
 
239
253
 
 
254
class TestBranchFormatRegistry(tests.TestCase):
 
255
 
 
256
    def setUp(self):
 
257
        super(TestBranchFormatRegistry, self).setUp()
 
258
        self.registry = _mod_branch.BranchFormatRegistry()
 
259
 
 
260
    def test_default(self):
 
261
        self.assertIs(None, self.registry.get_default())
 
262
        format = SampleBranchFormat()
 
263
        self.registry.set_default(format)
 
264
        self.assertEquals(format, self.registry.get_default())
 
265
 
 
266
    def test_register_unregister_format(self):
 
267
        format = SampleBranchFormat()
 
268
        self.registry.register(format)
 
269
        self.assertEquals(format,
 
270
            self.registry.get("Sample branch format."))
 
271
        self.registry.remove(format)
 
272
        self.assertRaises(KeyError, self.registry.get,
 
273
            "Sample branch format.")
 
274
 
 
275
    def test_get_all(self):
 
276
        format = SampleBranchFormat()
 
277
        self.assertEquals([], self.registry._get_all())
 
278
        self.registry.register(format)
 
279
        self.assertEquals([format], self.registry._get_all())
 
280
 
 
281
    def test_register_extra(self):
 
282
        format = SampleExtraBranchFormat()
 
283
        self.assertEquals([], self.registry._get_all())
 
284
        self.registry.register_extra(format)
 
285
        self.assertEquals([format], self.registry._get_all())
 
286
 
 
287
    def test_register_extra_lazy(self):
 
288
        self.assertEquals([], self.registry._get_all())
 
289
        self.registry.register_extra_lazy("bzrlib.tests.test_branch",
 
290
            "SampleExtraBranchFormat")
 
291
        formats = self.registry._get_all()
 
292
        self.assertEquals(1, len(formats))
 
293
        self.assertIsInstance(formats[0], SampleExtraBranchFormat)
 
294
 
 
295
 
240
296
#Used by TestMetaDirBranchFormatFactory 
241
297
FakeLazyFormat = None
242
298
 
289
345
 
290
346
    def test_layout(self):
291
347
        branch = self.make_branch('a', format=self.get_format_name())
292
 
        self.failUnlessExists('a/.bzr/branch/last-revision')
293
 
        self.failIfExists('a/.bzr/branch/revision-history')
294
 
        self.failIfExists('a/.bzr/branch/references')
 
348
        self.assertPathExists('a/.bzr/branch/last-revision')
 
349
        self.assertPathDoesNotExist('a/.bzr/branch/revision-history')
 
350
        self.assertPathDoesNotExist('a/.bzr/branch/references')
295
351
 
296
352
    def test_config(self):
297
353
        """Ensure that all configuration data is stored in the branch"""
298
354
        branch = self.make_branch('a', format=self.get_format_name())
299
355
        branch.set_parent('http://example.com')
300
 
        self.failIfExists('a/.bzr/branch/parent')
 
356
        self.assertPathDoesNotExist('a/.bzr/branch/parent')
301
357
        self.assertEqual('http://example.com', branch.get_parent())
302
358
        branch.set_push_location('sftp://example.com')
303
 
        config = branch.get_config()._get_branch_data_config()
304
 
        self.assertEqual('sftp://example.com',
305
 
                         config.get_user_option('push_location'))
 
359
        conf = branch.get_config_stack()
 
360
        self.assertEqual('sftp://example.com', conf.get('push_location'))
306
361
        branch.set_bound_location('ftp://example.com')
307
 
        self.failIfExists('a/.bzr/branch/bound')
 
362
        self.assertPathDoesNotExist('a/.bzr/branch/bound')
308
363
        self.assertEqual('ftp://example.com', branch.get_bound_location())
309
364
 
310
365
    def test_set_revision_history(self):
316
371
        branch = builder.get_branch()
317
372
        branch.lock_write()
318
373
        self.addCleanup(branch.unlock)
319
 
        branch.set_revision_history(['foo', 'bar'])
320
 
        branch.set_revision_history(['foo'])
 
374
        self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
 
375
            branch.set_revision_history, ['foo', 'bar'])
 
376
        self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
 
377
                branch.set_revision_history, ['foo'])
321
378
        self.assertRaises(errors.NotLefthandHistory,
322
 
                          branch.set_revision_history, ['bar'])
 
379
            self.applyDeprecated, symbol_versioning.deprecated_in((2, 4, 0)),
 
380
            branch.set_revision_history, ['bar'])
323
381
 
324
382
    def do_checkout_test(self, lightweight=False):
325
383
        tree = self.make_branch_and_tree('source',
338
396
        subtree.commit('a subtree file')
339
397
        subsubtree.commit('a subsubtree file')
340
398
        tree.branch.create_checkout('target', lightweight=lightweight)
341
 
        self.failUnlessExists('target')
342
 
        self.failUnlessExists('target/subtree')
343
 
        self.failUnlessExists('target/subtree/file')
344
 
        self.failUnlessExists('target/subtree/subsubtree/file')
 
399
        self.assertPathExists('target')
 
400
        self.assertPathExists('target/subtree')
 
401
        self.assertPathExists('target/subtree/file')
 
402
        self.assertPathExists('target/subtree/subsubtree/file')
345
403
        subbranch = _mod_branch.Branch.open('target/subtree/subsubtree')
346
404
        if lightweight:
347
405
            self.assertEndsWith(subbranch.base, 'source/subtree/subsubtree/')
354
412
    def test_light_checkout_with_references(self):
355
413
        self.do_checkout_test(lightweight=True)
356
414
 
357
 
    def test_set_push(self):
358
 
        branch = self.make_branch('source', format=self.get_format_name())
359
 
        branch.get_config().set_user_option('push_location', 'old',
360
 
            store=config.STORE_LOCATION)
361
 
        warnings = []
362
 
        def warning(*args):
363
 
            warnings.append(args[0] % args[1:])
364
 
        _warning = trace.warning
365
 
        trace.warning = warning
366
 
        try:
367
 
            branch.set_push_location('new')
368
 
        finally:
369
 
            trace.warning = _warning
370
 
        self.assertEqual(warnings[0], 'Value "new" is masked by "old" from '
371
 
                         'locations.conf')
372
 
 
373
415
 
374
416
class TestBranch6(TestBranch67, tests.TestCaseWithTransport):
375
417
 
504
546
        self.assertEqual(('path3', 'location3'),
505
547
                         branch.get_reference_info('file-id'))
506
548
 
 
549
    def _recordParentMapCalls(self, repo):
 
550
        self._parent_map_calls = []
 
551
        orig_get_parent_map = repo.revisions.get_parent_map
 
552
        def get_parent_map(q):
 
553
            q = list(q)
 
554
            self._parent_map_calls.extend([e[0] for e in q])
 
555
            return orig_get_parent_map(q)
 
556
        repo.revisions.get_parent_map = get_parent_map
 
557
 
 
558
 
507
559
class TestBranchReference(tests.TestCaseWithTransport):
508
560
    """Tests for the branch reference facility."""
509
561
 
523
575
        self.assertEqual(opened_branch.base, target_branch.base)
524
576
 
525
577
    def test_get_reference(self):
526
 
        """For a BranchReference, get_reference should reutrn the location."""
 
578
        """For a BranchReference, get_reference should return the location."""
527
579
        branch = self.make_branch('target')
528
580
        checkout = branch.create_checkout('checkout', lightweight=True)
529
581
        reference_url = branch.bzrdir.root_transport.abspath('') + '/'
609
661
    def setUp(self):
610
662
        super(TestBranchOptions, self).setUp()
611
663
        self.branch = self.make_branch('.')
612
 
        self.config = self.branch.get_config()
 
664
        self.config_stack = self.branch.get_config_stack()
613
665
 
614
666
    def check_append_revisions_only(self, expected_value, value=None):
615
667
        """Set append_revisions_only in config and check its interpretation."""
616
668
        if value is not None:
617
 
            self.config.set_user_option('append_revisions_only', value)
 
669
            self.config_stack.set('append_revisions_only', value)
618
670
        self.assertEqual(expected_value,
619
 
                         self.branch._get_append_revisions_only())
 
671
                         self.branch.get_append_revisions_only())
620
672
 
621
673
    def test_valid_append_revisions_only(self):
622
674
        self.assertEquals(None,
623
 
                          self.config.get_user_option('append_revisions_only'))
 
675
                          self.config_stack.get('append_revisions_only'))
624
676
        self.check_append_revisions_only(None)
625
677
        self.check_append_revisions_only(False, 'False')
626
678
        self.check_append_revisions_only(True, 'True')
638
690
        self.check_append_revisions_only(None, 'not-a-bool')
639
691
        self.assertLength(1, self.warnings)
640
692
        self.assertEqual(
641
 
            'Value "not-a-bool" is not a boolean for "append_revisions_only"',
 
693
            'Value "not-a-bool" is not valid for "append_revisions_only"',
642
694
            self.warnings[0])
643
695
 
644
696
 
666
718
        f = StringIO()
667
719
        r.report(f)
668
720
        self.assertEqual("Now on revision 20.\n", f.getvalue())
 
721
        self.assertEqual("Now on revision 20.\n", f.getvalue())
669
722
 
670
723
    def test_report_unchanged(self):
671
724
        r = _mod_branch.PullResult()
673
726
        r.new_revid = "same-revid"
674
727
        f = StringIO()
675
728
        r.report(f)
676
 
        self.assertEqual("No revisions to pull.\n", f.getvalue())
677
 
 
678
 
 
679
 
class _StubLockable(object):
680
 
    """Helper for TestRunWithWriteLockedTarget."""
681
 
 
682
 
    def __init__(self, calls, unlock_exc=None):
683
 
        self.calls = calls
684
 
        self.unlock_exc = unlock_exc
685
 
 
686
 
    def lock_write(self):
687
 
        self.calls.append('lock_write')
688
 
 
689
 
    def unlock(self):
690
 
        self.calls.append('unlock')
691
 
        if self.unlock_exc is not None:
692
 
            raise self.unlock_exc
693
 
 
694
 
 
695
 
class _ErrorFromCallable(Exception):
696
 
    """Helper for TestRunWithWriteLockedTarget."""
697
 
 
698
 
 
699
 
class _ErrorFromUnlock(Exception):
700
 
    """Helper for TestRunWithWriteLockedTarget."""
701
 
 
702
 
 
703
 
class TestRunWithWriteLockedTarget(tests.TestCase):
704
 
    """Tests for _run_with_write_locked_target."""
705
 
 
706
 
    def setUp(self):
707
 
        tests.TestCase.setUp(self)
708
 
        self._calls = []
709
 
 
710
 
    def func_that_returns_ok(self):
711
 
        self._calls.append('func called')
712
 
        return 'ok'
713
 
 
714
 
    def func_that_raises(self):
715
 
        self._calls.append('func called')
716
 
        raise _ErrorFromCallable()
717
 
 
718
 
    def test_success_unlocks(self):
719
 
        lockable = _StubLockable(self._calls)
720
 
        result = _mod_branch._run_with_write_locked_target(
721
 
            lockable, self.func_that_returns_ok)
722
 
        self.assertEqual('ok', result)
723
 
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
724
 
 
725
 
    def test_exception_unlocks_and_propagates(self):
726
 
        lockable = _StubLockable(self._calls)
727
 
        self.assertRaises(_ErrorFromCallable,
728
 
                          _mod_branch._run_with_write_locked_target,
729
 
                          lockable, self.func_that_raises)
730
 
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
731
 
 
732
 
    def test_callable_succeeds_but_error_during_unlock(self):
733
 
        lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
734
 
        self.assertRaises(_ErrorFromUnlock,
735
 
                          _mod_branch._run_with_write_locked_target,
736
 
                          lockable, self.func_that_returns_ok)
737
 
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
738
 
 
739
 
    def test_error_during_unlock_does_not_mask_original_error(self):
740
 
        lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
741
 
        self.assertRaises(_ErrorFromCallable,
742
 
                          _mod_branch._run_with_write_locked_target,
743
 
                          lockable, self.func_that_raises)
744
 
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
745
 
 
746
 
 
 
729
        self.assertEqual("No revisions or tags to pull.\n", f.getvalue())