~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_branch.py

  • Committer: Jelmer Vernooij
  • Date: 2011-12-05 14:12:23 UTC
  • mto: This revision was merged to the branch mainline in revision 6348.
  • Revision ID: jelmer@samba.org-20111205141223-8qxae4h37satlzgq
Move more functionality to vf_search.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2010 Canonical Ltd
 
1
# Copyright (C) 2006-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
"""Tests for the Branch facility that are not interface  tests.
18
18
 
19
 
For interface tests see tests/per_branch/*.py.
 
19
For interface tests see `tests/per_branch/*.py`.
20
20
 
21
21
For concrete class tests see this file, and for meta-branch tests
22
22
also see this file.
29
29
    bzrdir,
30
30
    config,
31
31
    errors,
 
32
    symbol_versioning,
32
33
    tests,
33
34
    trace,
34
 
    transport,
35
35
    urlutils,
36
36
    )
37
37
 
40
40
 
41
41
    def test_default_format(self):
42
42
        # update this if you change the default branch format
43
 
        self.assertIsInstance(_mod_branch.BranchFormat.get_default_format(),
 
43
        self.assertIsInstance(_mod_branch.format_registry.get_default(),
44
44
                _mod_branch.BzrBranchFormat7)
45
45
 
46
46
    def test_default_format_is_same_as_bzrdir_default(self):
48
48
        # set, but at the moment that's not true -- mbp 20070814 --
49
49
        # https://bugs.launchpad.net/bzr/+bug/132376
50
50
        self.assertEqual(
51
 
            _mod_branch.BranchFormat.get_default_format(),
 
51
            _mod_branch.format_registry.get_default(),
52
52
            bzrdir.BzrDirFormat.get_default_format().get_branch_format())
53
53
 
54
54
    def test_get_set_default_format(self):
55
55
        # set the format and then set it back again
56
 
        old_format = _mod_branch.BranchFormat.get_default_format()
57
 
        _mod_branch.BranchFormat.set_default_format(SampleBranchFormat())
 
56
        old_format = _mod_branch.format_registry.get_default()
 
57
        _mod_branch.format_registry.set_default(SampleBranchFormat())
58
58
        try:
59
59
            # the default branch format is used by the meta dir format
60
60
            # which is not the default bzrdir format at this point
62
62
            result = dir.create_branch()
63
63
            self.assertEqual(result, 'A branch')
64
64
        finally:
65
 
            _mod_branch.BranchFormat.set_default_format(old_format)
 
65
            _mod_branch.format_registry.set_default(old_format)
66
66
        self.assertEqual(old_format,
67
 
                         _mod_branch.BranchFormat.get_default_format())
 
67
                         _mod_branch.format_registry.get_default())
68
68
 
69
69
 
70
70
class TestBranchFormat5(tests.TestCaseWithTransport):
74
74
        url = self.get_url()
75
75
        bdir = bzrdir.BzrDirMetaFormat1().initialize(url)
76
76
        bdir.create_repository()
77
 
        branch = bdir.create_branch()
 
77
        branch = _mod_branch.BzrBranchFormat5().initialize(bdir)
78
78
        t = self.get_transport()
79
79
        self.log("branch instance is %r" % branch)
80
80
        self.assert_(isinstance(branch, _mod_branch.BzrBranch5))
86
86
        self.assertIsDirectory('.bzr/branch/lock/held', t)
87
87
 
88
88
    def test_set_push_location(self):
89
 
        from bzrlib.config import (locations_config_filename,
90
 
                                   ensure_config_dir_exists)
91
 
        ensure_config_dir_exists()
92
 
        fn = locations_config_filename()
93
 
        # write correct newlines to locations.conf
94
 
        # by default ConfigObj uses native line-endings for new files
95
 
        # but uses already existing line-endings if file is not empty
96
 
        f = open(fn, 'wb')
97
 
        try:
98
 
            f.write('# comment\n')
99
 
        finally:
100
 
            f.close()
 
89
        conf = config.LocationConfig.from_string('# comment\n', '.', save=True)
101
90
 
102
91
        branch = self.make_branch('.', format='knit')
103
92
        branch.set_push_location('foo')
106
95
                             "[%s]\n"
107
96
                             "push_location = foo\n"
108
97
                             "push_location:policy = norecurse\n" % local_path,
109
 
                             fn)
 
98
                             config.locations_config_filename())
110
99
 
111
100
    # TODO RBC 20051029 test getting a push location from a branch in a
112
101
    # recursive section - that is, it appends the branch name.
123
112
        """See BzrBranchFormat.get_format_string()."""
124
113
        return "Sample branch format."
125
114
 
126
 
    def initialize(self, a_bzrdir, name=None):
 
115
    def initialize(self, a_bzrdir, name=None, repository=None,
 
116
                   append_revisions_only=None):
127
117
        """Format 4 branches cannot be created."""
128
118
        t = a_bzrdir.get_branch_transport(self, name=name)
129
119
        t.put_bytes('format', self.get_format_string())
132
122
    def is_supported(self):
133
123
        return False
134
124
 
135
 
    def open(self, transport, name=None, _found=False, ignore_fallbacks=False):
 
125
    def open(self, transport, name=None, _found=False, ignore_fallbacks=False,
 
126
             possible_transports=None):
136
127
        return "opened branch."
137
128
 
138
129
 
 
130
# Demonstrating how lazy loading is often implemented:
 
131
# A constant string is created.
 
132
SampleSupportedBranchFormatString = "Sample supported branch format."
 
133
 
 
134
# And the format class can then reference the constant to avoid skew.
 
135
class SampleSupportedBranchFormat(_mod_branch.BranchFormat):
 
136
    """A sample supported format."""
 
137
 
 
138
    def get_format_string(self):
 
139
        """See BzrBranchFormat.get_format_string()."""
 
140
        return SampleSupportedBranchFormatString
 
141
 
 
142
    def initialize(self, a_bzrdir, name=None, append_revisions_only=None):
 
143
        t = a_bzrdir.get_branch_transport(self, name=name)
 
144
        t.put_bytes('format', self.get_format_string())
 
145
        return 'A branch'
 
146
 
 
147
    def open(self, transport, name=None, _found=False, ignore_fallbacks=False,
 
148
             possible_transports=None):
 
149
        return "opened supported branch."
 
150
 
 
151
 
 
152
class SampleExtraBranchFormat(_mod_branch.BranchFormat):
 
153
    """A sample format that is not usable in a metadir."""
 
154
 
 
155
    def get_format_string(self):
 
156
        # This format is not usable in a metadir.
 
157
        return None
 
158
 
 
159
    def network_name(self):
 
160
        # Network name always has to be provided.
 
161
        return "extra"
 
162
 
 
163
    def initialize(self, a_bzrdir, name=None):
 
164
        raise NotImplementedError(self.initialize)
 
165
 
 
166
    def open(self, transport, name=None, _found=False, ignore_fallbacks=False,
 
167
             possible_transports=None):
 
168
        raise NotImplementedError(self.open)
 
169
 
 
170
 
139
171
class TestBzrBranchFormat(tests.TestCaseWithTransport):
140
172
    """Tests for the BzrBranchFormat facility."""
141
173
 
149
181
            dir.create_repository()
150
182
            format.initialize(dir)
151
183
            found_format = _mod_branch.BranchFormat.find_format(dir)
152
 
            self.failUnless(isinstance(found_format, format.__class__))
 
184
            self.assertIsInstance(found_format, format.__class__)
153
185
        check_format(_mod_branch.BzrBranchFormat5(), "bar")
154
186
 
 
187
    def test_find_format_factory(self):
 
188
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
189
        SampleSupportedBranchFormat().initialize(dir)
 
190
        factory = _mod_branch.MetaDirBranchFormatFactory(
 
191
            SampleSupportedBranchFormatString,
 
192
            "bzrlib.tests.test_branch", "SampleSupportedBranchFormat")
 
193
        _mod_branch.format_registry.register(factory)
 
194
        self.addCleanup(_mod_branch.format_registry.remove, factory)
 
195
        b = _mod_branch.Branch.open(self.get_url())
 
196
        self.assertEqual(b, "opened supported branch.")
 
197
 
155
198
    def test_find_format_not_branch(self):
156
199
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
157
200
        self.assertRaises(errors.NotBranchError,
166
209
                          dir)
167
210
 
168
211
    def test_register_unregister_format(self):
 
212
        # Test the deprecated format registration functions
169
213
        format = SampleBranchFormat()
170
214
        # make a control dir
171
215
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
172
216
        # make a branch
173
217
        format.initialize(dir)
174
218
        # register a format for it.
175
 
        _mod_branch.BranchFormat.register_format(format)
 
219
        self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
 
220
            _mod_branch.BranchFormat.register_format, format)
176
221
        # which branch.Open will refuse (not supported)
177
222
        self.assertRaises(errors.UnsupportedFormatError,
178
223
                          _mod_branch.Branch.open, self.get_url())
182
227
            format.open(dir),
183
228
            bzrdir.BzrDir.open(self.get_url()).open_branch(unsupported=True))
184
229
        # unregister the format
185
 
        _mod_branch.BranchFormat.unregister_format(format)
 
230
        self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
 
231
            _mod_branch.BranchFormat.unregister_format, format)
186
232
        self.make_branch_and_tree('bar')
187
233
 
188
234
 
 
235
class TestBranchFormatRegistry(tests.TestCase):
 
236
 
 
237
    def setUp(self):
 
238
        super(TestBranchFormatRegistry, self).setUp()
 
239
        self.registry = _mod_branch.BranchFormatRegistry()
 
240
 
 
241
    def test_default(self):
 
242
        self.assertIs(None, self.registry.get_default())
 
243
        format = SampleBranchFormat()
 
244
        self.registry.set_default(format)
 
245
        self.assertEquals(format, self.registry.get_default())
 
246
 
 
247
    def test_register_unregister_format(self):
 
248
        format = SampleBranchFormat()
 
249
        self.registry.register(format)
 
250
        self.assertEquals(format,
 
251
            self.registry.get("Sample branch format."))
 
252
        self.registry.remove(format)
 
253
        self.assertRaises(KeyError, self.registry.get,
 
254
            "Sample branch format.")
 
255
 
 
256
    def test_get_all(self):
 
257
        format = SampleBranchFormat()
 
258
        self.assertEquals([], self.registry._get_all())
 
259
        self.registry.register(format)
 
260
        self.assertEquals([format], self.registry._get_all())
 
261
 
 
262
    def test_register_extra(self):
 
263
        format = SampleExtraBranchFormat()
 
264
        self.assertEquals([], self.registry._get_all())
 
265
        self.registry.register_extra(format)
 
266
        self.assertEquals([format], self.registry._get_all())
 
267
 
 
268
    def test_register_extra_lazy(self):
 
269
        self.assertEquals([], self.registry._get_all())
 
270
        self.registry.register_extra_lazy("bzrlib.tests.test_branch",
 
271
            "SampleExtraBranchFormat")
 
272
        formats = self.registry._get_all()
 
273
        self.assertEquals(1, len(formats))
 
274
        self.assertIsInstance(formats[0], SampleExtraBranchFormat)
 
275
 
 
276
 
 
277
#Used by TestMetaDirBranchFormatFactory 
 
278
FakeLazyFormat = None
 
279
 
 
280
 
 
281
class TestMetaDirBranchFormatFactory(tests.TestCase):
 
282
 
 
283
    def test_get_format_string_does_not_load(self):
 
284
        """Formats have a static format string."""
 
285
        factory = _mod_branch.MetaDirBranchFormatFactory("yo", None, None)
 
286
        self.assertEqual("yo", factory.get_format_string())
 
287
 
 
288
    def test_call_loads(self):
 
289
        # __call__ is used by the network_format_registry interface to get a
 
290
        # Format.
 
291
        global FakeLazyFormat
 
292
        del FakeLazyFormat
 
293
        factory = _mod_branch.MetaDirBranchFormatFactory(None,
 
294
            "bzrlib.tests.test_branch", "FakeLazyFormat")
 
295
        self.assertRaises(AttributeError, factory)
 
296
 
 
297
    def test_call_returns_call_of_referenced_object(self):
 
298
        global FakeLazyFormat
 
299
        FakeLazyFormat = lambda:'called'
 
300
        factory = _mod_branch.MetaDirBranchFormatFactory(None,
 
301
            "bzrlib.tests.test_branch", "FakeLazyFormat")
 
302
        self.assertEqual('called', factory())
 
303
 
 
304
 
189
305
class TestBranch67(object):
190
306
    """Common tests for both branch 6 and 7 which are mostly the same."""
191
307
 
210
326
 
211
327
    def test_layout(self):
212
328
        branch = self.make_branch('a', format=self.get_format_name())
213
 
        self.failUnlessExists('a/.bzr/branch/last-revision')
214
 
        self.failIfExists('a/.bzr/branch/revision-history')
215
 
        self.failIfExists('a/.bzr/branch/references')
 
329
        self.assertPathExists('a/.bzr/branch/last-revision')
 
330
        self.assertPathDoesNotExist('a/.bzr/branch/revision-history')
 
331
        self.assertPathDoesNotExist('a/.bzr/branch/references')
216
332
 
217
333
    def test_config(self):
218
334
        """Ensure that all configuration data is stored in the branch"""
219
335
        branch = self.make_branch('a', format=self.get_format_name())
220
 
        branch.set_parent('http://bazaar-vcs.org')
221
 
        self.failIfExists('a/.bzr/branch/parent')
222
 
        self.assertEqual('http://bazaar-vcs.org', branch.get_parent())
223
 
        branch.set_push_location('sftp://bazaar-vcs.org')
 
336
        branch.set_parent('http://example.com')
 
337
        self.assertPathDoesNotExist('a/.bzr/branch/parent')
 
338
        self.assertEqual('http://example.com', branch.get_parent())
 
339
        branch.set_push_location('sftp://example.com')
224
340
        config = branch.get_config()._get_branch_data_config()
225
 
        self.assertEqual('sftp://bazaar-vcs.org',
 
341
        self.assertEqual('sftp://example.com',
226
342
                         config.get_user_option('push_location'))
227
 
        branch.set_bound_location('ftp://bazaar-vcs.org')
228
 
        self.failIfExists('a/.bzr/branch/bound')
229
 
        self.assertEqual('ftp://bazaar-vcs.org', branch.get_bound_location())
 
343
        branch.set_bound_location('ftp://example.com')
 
344
        self.assertPathDoesNotExist('a/.bzr/branch/bound')
 
345
        self.assertEqual('ftp://example.com', branch.get_bound_location())
230
346
 
231
347
    def test_set_revision_history(self):
232
348
        builder = self.make_branch_builder('.', format=self.get_format_name())
237
353
        branch = builder.get_branch()
238
354
        branch.lock_write()
239
355
        self.addCleanup(branch.unlock)
240
 
        branch.set_revision_history(['foo', 'bar'])
241
 
        branch.set_revision_history(['foo'])
 
356
        self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
 
357
            branch.set_revision_history, ['foo', 'bar'])
 
358
        self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
 
359
                branch.set_revision_history, ['foo'])
242
360
        self.assertRaises(errors.NotLefthandHistory,
243
 
                          branch.set_revision_history, ['bar'])
 
361
            self.applyDeprecated, symbol_versioning.deprecated_in((2, 4, 0)),
 
362
            branch.set_revision_history, ['bar'])
244
363
 
245
364
    def do_checkout_test(self, lightweight=False):
246
365
        tree = self.make_branch_and_tree('source',
259
378
        subtree.commit('a subtree file')
260
379
        subsubtree.commit('a subsubtree file')
261
380
        tree.branch.create_checkout('target', lightweight=lightweight)
262
 
        self.failUnlessExists('target')
263
 
        self.failUnlessExists('target/subtree')
264
 
        self.failUnlessExists('target/subtree/file')
265
 
        self.failUnlessExists('target/subtree/subsubtree/file')
 
381
        self.assertPathExists('target')
 
382
        self.assertPathExists('target/subtree')
 
383
        self.assertPathExists('target/subtree/file')
 
384
        self.assertPathExists('target/subtree/subsubtree/file')
266
385
        subbranch = _mod_branch.Branch.open('target/subtree/subsubtree')
267
386
        if lightweight:
268
387
            self.assertEndsWith(subbranch.base, 'source/subtree/subsubtree/')
425
544
        self.assertEqual(('path3', 'location3'),
426
545
                         branch.get_reference_info('file-id'))
427
546
 
 
547
    def _recordParentMapCalls(self, repo):
 
548
        self._parent_map_calls = []
 
549
        orig_get_parent_map = repo.revisions.get_parent_map
 
550
        def get_parent_map(q):
 
551
            q = list(q)
 
552
            self._parent_map_calls.extend([e[0] for e in q])
 
553
            return orig_get_parent_map(q)
 
554
        repo.revisions.get_parent_map = get_parent_map
 
555
 
 
556
 
428
557
class TestBranchReference(tests.TestCaseWithTransport):
429
558
    """Tests for the branch reference facility."""
430
559
 
431
560
    def test_create_open_reference(self):
432
561
        bzrdirformat = bzrdir.BzrDirMetaFormat1()
433
 
        t = transport.get_transport(self.get_url('.'))
 
562
        t = self.get_transport()
434
563
        t.mkdir('repo')
435
564
        dir = bzrdirformat.initialize(self.get_url('repo'))
436
565
        dir.create_repository()
492
621
        self.assertTrue(hasattr(params, 'bzrdir'))
493
622
        self.assertTrue(hasattr(params, 'branch'))
494
623
 
 
624
    def test_post_branch_init_hook_repr(self):
 
625
        param_reprs = []
 
626
        _mod_branch.Branch.hooks.install_named_hook('post_branch_init',
 
627
            lambda params: param_reprs.append(repr(params)), None)
 
628
        branch = self.make_branch('a')
 
629
        self.assertLength(1, param_reprs)
 
630
        param_repr = param_reprs[0]
 
631
        self.assertStartsWith(param_repr, '<BranchInitHookParams of ')
 
632
 
495
633
    def test_post_switch_hook(self):
496
634
        from bzrlib import switch
497
635
        calls = []
528
666
        if value is not None:
529
667
            self.config.set_user_option('append_revisions_only', value)
530
668
        self.assertEqual(expected_value,
531
 
                         self.branch._get_append_revisions_only())
 
669
                         self.branch.get_append_revisions_only())
532
670
 
533
671
    def test_valid_append_revisions_only(self):
534
672
        self.assertEquals(None,
564
702
        # this usage of results is not recommended for new code (because it
565
703
        # doesn't describe very well what happened), but for api stability
566
704
        # it's still supported
567
 
        a = "%d revisions pulled" % r
568
 
        self.assertEqual(a, "10 revisions pulled")
 
705
        self.assertEqual(self.applyDeprecated(
 
706
            symbol_versioning.deprecated_in((2, 3, 0)),
 
707
            r.__int__),
 
708
            10)
569
709
 
570
710
    def test_report_changed(self):
571
711
        r = _mod_branch.PullResult()
583
723
        r.new_revid = "same-revid"
584
724
        f = StringIO()
585
725
        r.report(f)
586
 
        self.assertEqual("No revisions to pull.\n", f.getvalue())
587
 
 
588
 
 
589
 
class _StubLockable(object):
590
 
    """Helper for TestRunWithWriteLockedTarget."""
591
 
 
592
 
    def __init__(self, calls, unlock_exc=None):
593
 
        self.calls = calls
594
 
        self.unlock_exc = unlock_exc
595
 
 
596
 
    def lock_write(self):
597
 
        self.calls.append('lock_write')
598
 
 
599
 
    def unlock(self):
600
 
        self.calls.append('unlock')
601
 
        if self.unlock_exc is not None:
602
 
            raise self.unlock_exc
603
 
 
604
 
 
605
 
class _ErrorFromCallable(Exception):
606
 
    """Helper for TestRunWithWriteLockedTarget."""
607
 
 
608
 
 
609
 
class _ErrorFromUnlock(Exception):
610
 
    """Helper for TestRunWithWriteLockedTarget."""
611
 
 
612
 
 
613
 
class TestRunWithWriteLockedTarget(tests.TestCase):
614
 
    """Tests for _run_with_write_locked_target."""
615
 
 
616
 
    def setUp(self):
617
 
        tests.TestCase.setUp(self)
618
 
        self._calls = []
619
 
 
620
 
    def func_that_returns_ok(self):
621
 
        self._calls.append('func called')
622
 
        return 'ok'
623
 
 
624
 
    def func_that_raises(self):
625
 
        self._calls.append('func called')
626
 
        raise _ErrorFromCallable()
627
 
 
628
 
    def test_success_unlocks(self):
629
 
        lockable = _StubLockable(self._calls)
630
 
        result = _mod_branch._run_with_write_locked_target(
631
 
            lockable, self.func_that_returns_ok)
632
 
        self.assertEqual('ok', result)
633
 
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
634
 
 
635
 
    def test_exception_unlocks_and_propagates(self):
636
 
        lockable = _StubLockable(self._calls)
637
 
        self.assertRaises(_ErrorFromCallable,
638
 
                          _mod_branch._run_with_write_locked_target,
639
 
                          lockable, self.func_that_raises)
640
 
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
641
 
 
642
 
    def test_callable_succeeds_but_error_during_unlock(self):
643
 
        lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
644
 
        self.assertRaises(_ErrorFromUnlock,
645
 
                          _mod_branch._run_with_write_locked_target,
646
 
                          lockable, self.func_that_returns_ok)
647
 
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
648
 
 
649
 
    def test_error_during_unlock_does_not_mask_original_error(self):
650
 
        lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
651
 
        self.assertRaises(_ErrorFromCallable,
652
 
                          _mod_branch._run_with_write_locked_target,
653
 
                          lockable, self.func_that_raises)
654
 
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
655
 
 
 
726
        self.assertEqual("No revisions or tags to pull.\n", f.getvalue())
656
727