~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_branch.py

  • Committer: Robert Collins
  • Date: 2010-06-28 02:41:22 UTC
  • mto: This revision was merged to the branch mainline in revision 5324.
  • Revision ID: robertc@robertcollins.net-20100628024122-g951fzp74f3u6wst
Sanity check that new_trace_file in pop_log_file is valid, and also fix a test that monkey patched get_terminal_encoding.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2011 Canonical Ltd
 
1
# Copyright (C) 2006-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
"""Tests for the Branch facility that are not interface  tests.
18
18
 
19
 
For interface tests see `tests/per_branch/*.py`.
 
19
For interface tests see tests/per_branch/*.py.
20
20
 
21
21
For concrete class tests see this file, and for meta-branch tests
22
22
also see this file.
29
29
    bzrdir,
30
30
    config,
31
31
    errors,
32
 
    symbol_versioning,
33
32
    tests,
34
33
    trace,
 
34
    transport,
35
35
    urlutils,
36
36
    )
37
37
 
40
40
 
41
41
    def test_default_format(self):
42
42
        # update this if you change the default branch format
43
 
        self.assertIsInstance(_mod_branch.format_registry.get_default(),
 
43
        self.assertIsInstance(_mod_branch.BranchFormat.get_default_format(),
44
44
                _mod_branch.BzrBranchFormat7)
45
45
 
46
46
    def test_default_format_is_same_as_bzrdir_default(self):
48
48
        # set, but at the moment that's not true -- mbp 20070814 --
49
49
        # https://bugs.launchpad.net/bzr/+bug/132376
50
50
        self.assertEqual(
51
 
            _mod_branch.format_registry.get_default(),
 
51
            _mod_branch.BranchFormat.get_default_format(),
52
52
            bzrdir.BzrDirFormat.get_default_format().get_branch_format())
53
53
 
54
54
    def test_get_set_default_format(self):
55
55
        # set the format and then set it back again
56
 
        old_format = _mod_branch.format_registry.get_default()
57
 
        _mod_branch.format_registry.set_default(SampleBranchFormat())
 
56
        old_format = _mod_branch.BranchFormat.get_default_format()
 
57
        _mod_branch.BranchFormat.set_default_format(SampleBranchFormat())
58
58
        try:
59
59
            # the default branch format is used by the meta dir format
60
60
            # which is not the default bzrdir format at this point
62
62
            result = dir.create_branch()
63
63
            self.assertEqual(result, 'A branch')
64
64
        finally:
65
 
            _mod_branch.format_registry.set_default(old_format)
 
65
            _mod_branch.BranchFormat.set_default_format(old_format)
66
66
        self.assertEqual(old_format,
67
 
                         _mod_branch.format_registry.get_default())
 
67
                         _mod_branch.BranchFormat.get_default_format())
68
68
 
69
69
 
70
70
class TestBranchFormat5(tests.TestCaseWithTransport):
74
74
        url = self.get_url()
75
75
        bdir = bzrdir.BzrDirMetaFormat1().initialize(url)
76
76
        bdir.create_repository()
77
 
        branch = _mod_branch.BzrBranchFormat5().initialize(bdir)
 
77
        branch = bdir.create_branch()
78
78
        t = self.get_transport()
79
79
        self.log("branch instance is %r" % branch)
80
80
        self.assert_(isinstance(branch, _mod_branch.BzrBranch5))
86
86
        self.assertIsDirectory('.bzr/branch/lock/held', t)
87
87
 
88
88
    def test_set_push_location(self):
89
 
        conf = config.LocationConfig.from_string('# comment\n', '.', save=True)
 
89
        from bzrlib.config import (locations_config_filename,
 
90
                                   ensure_config_dir_exists)
 
91
        ensure_config_dir_exists()
 
92
        fn = locations_config_filename()
 
93
        # write correct newlines to locations.conf
 
94
        # by default ConfigObj uses native line-endings for new files
 
95
        # but uses already existing line-endings if file is not empty
 
96
        f = open(fn, 'wb')
 
97
        try:
 
98
            f.write('# comment\n')
 
99
        finally:
 
100
            f.close()
90
101
 
91
102
        branch = self.make_branch('.', format='knit')
92
103
        branch.set_push_location('foo')
95
106
                             "[%s]\n"
96
107
                             "push_location = foo\n"
97
108
                             "push_location:policy = norecurse\n" % local_path,
98
 
                             config.locations_config_filename())
 
109
                             fn)
99
110
 
100
111
    # TODO RBC 20051029 test getting a push location from a branch in a
101
112
    # recursive section - that is, it appends the branch name.
112
123
        """See BzrBranchFormat.get_format_string()."""
113
124
        return "Sample branch format."
114
125
 
115
 
    def initialize(self, a_bzrdir, name=None, repository=None):
 
126
    def initialize(self, a_bzrdir, name=None):
116
127
        """Format 4 branches cannot be created."""
117
128
        t = a_bzrdir.get_branch_transport(self, name=name)
118
129
        t.put_bytes('format', self.get_format_string())
146
157
        return "opened supported branch."
147
158
 
148
159
 
149
 
class SampleExtraBranchFormat(_mod_branch.BranchFormat):
150
 
    """A sample format that is not usable in a metadir."""
151
 
 
152
 
    def get_format_string(self):
153
 
        # This format is not usable in a metadir.
154
 
        return None
155
 
 
156
 
    def network_name(self):
157
 
        # Network name always has to be provided.
158
 
        return "extra"
159
 
 
160
 
    def initialize(self, a_bzrdir, name=None):
161
 
        raise NotImplementedError(self.initialize)
162
 
 
163
 
    def open(self, transport, name=None, _found=False, ignore_fallbacks=False):
164
 
        raise NotImplementedError(self.open)
165
 
 
166
 
 
167
160
class TestBzrBranchFormat(tests.TestCaseWithTransport):
168
161
    """Tests for the BzrBranchFormat facility."""
169
162
 
177
170
            dir.create_repository()
178
171
            format.initialize(dir)
179
172
            found_format = _mod_branch.BranchFormat.find_format(dir)
180
 
            self.assertIsInstance(found_format, format.__class__)
 
173
            self.failUnless(isinstance(found_format, format.__class__))
181
174
        check_format(_mod_branch.BzrBranchFormat5(), "bar")
182
175
 
183
176
    def test_find_format_factory(self):
186
179
        factory = _mod_branch.MetaDirBranchFormatFactory(
187
180
            SampleSupportedBranchFormatString,
188
181
            "bzrlib.tests.test_branch", "SampleSupportedBranchFormat")
189
 
        _mod_branch.format_registry.register(factory)
190
 
        self.addCleanup(_mod_branch.format_registry.remove, factory)
 
182
        _mod_branch.BranchFormat.register_format(factory)
 
183
        self.addCleanup(_mod_branch.BranchFormat.unregister_format, factory)
191
184
        b = _mod_branch.Branch.open(self.get_url())
192
185
        self.assertEqual(b, "opened supported branch.")
193
186
 
205
198
                          dir)
206
199
 
207
200
    def test_register_unregister_format(self):
208
 
        # Test the deprecated format registration functions
209
201
        format = SampleBranchFormat()
210
202
        # make a control dir
211
203
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
212
204
        # make a branch
213
205
        format.initialize(dir)
214
206
        # register a format for it.
215
 
        self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
216
 
            _mod_branch.BranchFormat.register_format, format)
 
207
        _mod_branch.BranchFormat.register_format(format)
217
208
        # which branch.Open will refuse (not supported)
218
209
        self.assertRaises(errors.UnsupportedFormatError,
219
210
                          _mod_branch.Branch.open, self.get_url())
223
214
            format.open(dir),
224
215
            bzrdir.BzrDir.open(self.get_url()).open_branch(unsupported=True))
225
216
        # unregister the format
226
 
        self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
227
 
            _mod_branch.BranchFormat.unregister_format, format)
 
217
        _mod_branch.BranchFormat.unregister_format(format)
228
218
        self.make_branch_and_tree('bar')
229
219
 
230
220
 
231
 
class TestBranchFormatRegistry(tests.TestCase):
232
 
 
233
 
    def setUp(self):
234
 
        super(TestBranchFormatRegistry, self).setUp()
235
 
        self.registry = _mod_branch.BranchFormatRegistry()
236
 
 
237
 
    def test_default(self):
238
 
        self.assertIs(None, self.registry.get_default())
239
 
        format = SampleBranchFormat()
240
 
        self.registry.set_default(format)
241
 
        self.assertEquals(format, self.registry.get_default())
242
 
 
243
 
    def test_register_unregister_format(self):
244
 
        format = SampleBranchFormat()
245
 
        self.registry.register(format)
246
 
        self.assertEquals(format,
247
 
            self.registry.get("Sample branch format."))
248
 
        self.registry.remove(format)
249
 
        self.assertRaises(KeyError, self.registry.get,
250
 
            "Sample branch format.")
251
 
 
252
 
    def test_get_all(self):
253
 
        format = SampleBranchFormat()
254
 
        self.assertEquals([], self.registry._get_all())
255
 
        self.registry.register(format)
256
 
        self.assertEquals([format], self.registry._get_all())
257
 
 
258
 
    def test_register_extra(self):
259
 
        format = SampleExtraBranchFormat()
260
 
        self.assertEquals([], self.registry._get_all())
261
 
        self.registry.register_extra(format)
262
 
        self.assertEquals([format], self.registry._get_all())
263
 
 
264
 
    def test_register_extra_lazy(self):
265
 
        self.assertEquals([], self.registry._get_all())
266
 
        self.registry.register_extra_lazy("bzrlib.tests.test_branch",
267
 
            "SampleExtraBranchFormat")
268
 
        formats = self.registry._get_all()
269
 
        self.assertEquals(1, len(formats))
270
 
        self.assertIsInstance(formats[0], SampleExtraBranchFormat)
271
 
 
272
 
 
273
221
#Used by TestMetaDirBranchFormatFactory 
274
222
FakeLazyFormat = None
275
223
 
322
270
 
323
271
    def test_layout(self):
324
272
        branch = self.make_branch('a', format=self.get_format_name())
325
 
        self.assertPathExists('a/.bzr/branch/last-revision')
326
 
        self.assertPathDoesNotExist('a/.bzr/branch/revision-history')
327
 
        self.assertPathDoesNotExist('a/.bzr/branch/references')
 
273
        self.failUnlessExists('a/.bzr/branch/last-revision')
 
274
        self.failIfExists('a/.bzr/branch/revision-history')
 
275
        self.failIfExists('a/.bzr/branch/references')
328
276
 
329
277
    def test_config(self):
330
278
        """Ensure that all configuration data is stored in the branch"""
331
279
        branch = self.make_branch('a', format=self.get_format_name())
332
 
        branch.set_parent('http://example.com')
333
 
        self.assertPathDoesNotExist('a/.bzr/branch/parent')
334
 
        self.assertEqual('http://example.com', branch.get_parent())
335
 
        branch.set_push_location('sftp://example.com')
 
280
        branch.set_parent('http://bazaar-vcs.org')
 
281
        self.failIfExists('a/.bzr/branch/parent')
 
282
        self.assertEqual('http://bazaar-vcs.org', branch.get_parent())
 
283
        branch.set_push_location('sftp://bazaar-vcs.org')
336
284
        config = branch.get_config()._get_branch_data_config()
337
 
        self.assertEqual('sftp://example.com',
 
285
        self.assertEqual('sftp://bazaar-vcs.org',
338
286
                         config.get_user_option('push_location'))
339
 
        branch.set_bound_location('ftp://example.com')
340
 
        self.assertPathDoesNotExist('a/.bzr/branch/bound')
341
 
        self.assertEqual('ftp://example.com', branch.get_bound_location())
 
287
        branch.set_bound_location('ftp://bazaar-vcs.org')
 
288
        self.failIfExists('a/.bzr/branch/bound')
 
289
        self.assertEqual('ftp://bazaar-vcs.org', branch.get_bound_location())
342
290
 
343
291
    def test_set_revision_history(self):
344
292
        builder = self.make_branch_builder('.', format=self.get_format_name())
349
297
        branch = builder.get_branch()
350
298
        branch.lock_write()
351
299
        self.addCleanup(branch.unlock)
352
 
        self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
353
 
            branch.set_revision_history, ['foo', 'bar'])
354
 
        self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
355
 
                branch.set_revision_history, ['foo'])
 
300
        branch.set_revision_history(['foo', 'bar'])
 
301
        branch.set_revision_history(['foo'])
356
302
        self.assertRaises(errors.NotLefthandHistory,
357
 
            self.applyDeprecated, symbol_versioning.deprecated_in((2, 4, 0)),
358
 
            branch.set_revision_history, ['bar'])
 
303
                          branch.set_revision_history, ['bar'])
359
304
 
360
305
    def do_checkout_test(self, lightweight=False):
361
306
        tree = self.make_branch_and_tree('source',
374
319
        subtree.commit('a subtree file')
375
320
        subsubtree.commit('a subsubtree file')
376
321
        tree.branch.create_checkout('target', lightweight=lightweight)
377
 
        self.assertPathExists('target')
378
 
        self.assertPathExists('target/subtree')
379
 
        self.assertPathExists('target/subtree/file')
380
 
        self.assertPathExists('target/subtree/subsubtree/file')
 
322
        self.failUnlessExists('target')
 
323
        self.failUnlessExists('target/subtree')
 
324
        self.failUnlessExists('target/subtree/file')
 
325
        self.failUnlessExists('target/subtree/subsubtree/file')
381
326
        subbranch = _mod_branch.Branch.open('target/subtree/subsubtree')
382
327
        if lightweight:
383
328
            self.assertEndsWith(subbranch.base, 'source/subtree/subsubtree/')
545
490
 
546
491
    def test_create_open_reference(self):
547
492
        bzrdirformat = bzrdir.BzrDirMetaFormat1()
548
 
        t = self.get_transport()
 
493
        t = transport.get_transport(self.get_url('.'))
549
494
        t.mkdir('repo')
550
495
        dir = bzrdirformat.initialize(self.get_url('repo'))
551
496
        dir.create_repository()
607
552
        self.assertTrue(hasattr(params, 'bzrdir'))
608
553
        self.assertTrue(hasattr(params, 'branch'))
609
554
 
610
 
    def test_post_branch_init_hook_repr(self):
611
 
        param_reprs = []
612
 
        _mod_branch.Branch.hooks.install_named_hook('post_branch_init',
613
 
            lambda params: param_reprs.append(repr(params)), None)
614
 
        branch = self.make_branch('a')
615
 
        self.assertLength(1, param_reprs)
616
 
        param_repr = param_reprs[0]
617
 
        self.assertStartsWith(param_repr, '<BranchInitHookParams of ')
618
 
 
619
555
    def test_post_switch_hook(self):
620
556
        from bzrlib import switch
621
557
        calls = []
688
624
        # this usage of results is not recommended for new code (because it
689
625
        # doesn't describe very well what happened), but for api stability
690
626
        # it's still supported
691
 
        self.assertEqual(self.applyDeprecated(
692
 
            symbol_versioning.deprecated_in((2, 3, 0)),
693
 
            r.__int__),
694
 
            10)
 
627
        a = "%d revisions pulled" % r
 
628
        self.assertEqual(a, "10 revisions pulled")
695
629
 
696
630
    def test_report_changed(self):
697
631
        r = _mod_branch.PullResult()
711
645
        r.report(f)
712
646
        self.assertEqual("No revisions to pull.\n", f.getvalue())
713
647
 
 
648
 
 
649
class _StubLockable(object):
 
650
    """Helper for TestRunWithWriteLockedTarget."""
 
651
 
 
652
    def __init__(self, calls, unlock_exc=None):
 
653
        self.calls = calls
 
654
        self.unlock_exc = unlock_exc
 
655
 
 
656
    def lock_write(self):
 
657
        self.calls.append('lock_write')
 
658
 
 
659
    def unlock(self):
 
660
        self.calls.append('unlock')
 
661
        if self.unlock_exc is not None:
 
662
            raise self.unlock_exc
 
663
 
 
664
 
 
665
class _ErrorFromCallable(Exception):
 
666
    """Helper for TestRunWithWriteLockedTarget."""
 
667
 
 
668
 
 
669
class _ErrorFromUnlock(Exception):
 
670
    """Helper for TestRunWithWriteLockedTarget."""
 
671
 
 
672
 
 
673
class TestRunWithWriteLockedTarget(tests.TestCase):
 
674
    """Tests for _run_with_write_locked_target."""
 
675
 
 
676
    def setUp(self):
 
677
        tests.TestCase.setUp(self)
 
678
        self._calls = []
 
679
 
 
680
    def func_that_returns_ok(self):
 
681
        self._calls.append('func called')
 
682
        return 'ok'
 
683
 
 
684
    def func_that_raises(self):
 
685
        self._calls.append('func called')
 
686
        raise _ErrorFromCallable()
 
687
 
 
688
    def test_success_unlocks(self):
 
689
        lockable = _StubLockable(self._calls)
 
690
        result = _mod_branch._run_with_write_locked_target(
 
691
            lockable, self.func_that_returns_ok)
 
692
        self.assertEqual('ok', result)
 
693
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
 
694
 
 
695
    def test_exception_unlocks_and_propagates(self):
 
696
        lockable = _StubLockable(self._calls)
 
697
        self.assertRaises(_ErrorFromCallable,
 
698
                          _mod_branch._run_with_write_locked_target,
 
699
                          lockable, self.func_that_raises)
 
700
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
 
701
 
 
702
    def test_callable_succeeds_but_error_during_unlock(self):
 
703
        lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
 
704
        self.assertRaises(_ErrorFromUnlock,
 
705
                          _mod_branch._run_with_write_locked_target,
 
706
                          lockable, self.func_that_returns_ok)
 
707
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
 
708
 
 
709
    def test_error_during_unlock_does_not_mask_original_error(self):
 
710
        lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
 
711
        self.assertRaises(_ErrorFromCallable,
 
712
                          _mod_branch._run_with_write_locked_target,
 
713
                          lockable, self.func_that_raises)
 
714
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
 
715
 
 
716