~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_branch.py

  • Committer: Patch Queue Manager
  • Date: 2011-09-22 14:12:18 UTC
  • mfrom: (6155.3.1 jam)
  • Revision ID: pqm@pqm.ubuntu.com-20110922141218-86s4uu6nqvourw4f
(jameinel) Cleanup comments bzrlib/smart/__init__.py (John A Meinel)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2010 Canonical Ltd
 
1
# Copyright (C) 2006-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
"""Tests for the Branch facility that are not interface  tests.
18
18
 
19
 
For interface tests see tests/per_branch/*.py.
 
19
For interface tests see `tests/per_branch/*.py`.
20
20
 
21
21
For concrete class tests see this file, and for meta-branch tests
22
22
also see this file.
32
32
    symbol_versioning,
33
33
    tests,
34
34
    trace,
35
 
    transport,
36
35
    urlutils,
37
36
    )
38
37
 
41
40
 
42
41
    def test_default_format(self):
43
42
        # update this if you change the default branch format
44
 
        self.assertIsInstance(_mod_branch.BranchFormat.get_default_format(),
 
43
        self.assertIsInstance(_mod_branch.format_registry.get_default(),
45
44
                _mod_branch.BzrBranchFormat7)
46
45
 
47
46
    def test_default_format_is_same_as_bzrdir_default(self):
49
48
        # set, but at the moment that's not true -- mbp 20070814 --
50
49
        # https://bugs.launchpad.net/bzr/+bug/132376
51
50
        self.assertEqual(
52
 
            _mod_branch.BranchFormat.get_default_format(),
 
51
            _mod_branch.format_registry.get_default(),
53
52
            bzrdir.BzrDirFormat.get_default_format().get_branch_format())
54
53
 
55
54
    def test_get_set_default_format(self):
56
55
        # set the format and then set it back again
57
 
        old_format = _mod_branch.BranchFormat.get_default_format()
58
 
        _mod_branch.BranchFormat.set_default_format(SampleBranchFormat())
 
56
        old_format = _mod_branch.format_registry.get_default()
 
57
        _mod_branch.format_registry.set_default(SampleBranchFormat())
59
58
        try:
60
59
            # the default branch format is used by the meta dir format
61
60
            # which is not the default bzrdir format at this point
63
62
            result = dir.create_branch()
64
63
            self.assertEqual(result, 'A branch')
65
64
        finally:
66
 
            _mod_branch.BranchFormat.set_default_format(old_format)
 
65
            _mod_branch.format_registry.set_default(old_format)
67
66
        self.assertEqual(old_format,
68
 
                         _mod_branch.BranchFormat.get_default_format())
 
67
                         _mod_branch.format_registry.get_default())
69
68
 
70
69
 
71
70
class TestBranchFormat5(tests.TestCaseWithTransport):
75
74
        url = self.get_url()
76
75
        bdir = bzrdir.BzrDirMetaFormat1().initialize(url)
77
76
        bdir.create_repository()
78
 
        branch = bdir.create_branch()
 
77
        branch = _mod_branch.BzrBranchFormat5().initialize(bdir)
79
78
        t = self.get_transport()
80
79
        self.log("branch instance is %r" % branch)
81
80
        self.assert_(isinstance(branch, _mod_branch.BzrBranch5))
113
112
        """See BzrBranchFormat.get_format_string()."""
114
113
        return "Sample branch format."
115
114
 
116
 
    def initialize(self, a_bzrdir, name=None):
 
115
    def initialize(self, a_bzrdir, name=None, repository=None,
 
116
                   append_revisions_only=None):
117
117
        """Format 4 branches cannot be created."""
118
118
        t = a_bzrdir.get_branch_transport(self, name=name)
119
119
        t.put_bytes('format', self.get_format_string())
138
138
        """See BzrBranchFormat.get_format_string()."""
139
139
        return SampleSupportedBranchFormatString
140
140
 
141
 
    def initialize(self, a_bzrdir, name=None):
 
141
    def initialize(self, a_bzrdir, name=None, append_revisions_only=None):
142
142
        t = a_bzrdir.get_branch_transport(self, name=name)
143
143
        t.put_bytes('format', self.get_format_string())
144
144
        return 'A branch'
147
147
        return "opened supported branch."
148
148
 
149
149
 
 
150
class SampleExtraBranchFormat(_mod_branch.BranchFormat):
 
151
    """A sample format that is not usable in a metadir."""
 
152
 
 
153
    def get_format_string(self):
 
154
        # This format is not usable in a metadir.
 
155
        return None
 
156
 
 
157
    def network_name(self):
 
158
        # Network name always has to be provided.
 
159
        return "extra"
 
160
 
 
161
    def initialize(self, a_bzrdir, name=None):
 
162
        raise NotImplementedError(self.initialize)
 
163
 
 
164
    def open(self, transport, name=None, _found=False, ignore_fallbacks=False):
 
165
        raise NotImplementedError(self.open)
 
166
 
 
167
 
150
168
class TestBzrBranchFormat(tests.TestCaseWithTransport):
151
169
    """Tests for the BzrBranchFormat facility."""
152
170
 
160
178
            dir.create_repository()
161
179
            format.initialize(dir)
162
180
            found_format = _mod_branch.BranchFormat.find_format(dir)
163
 
            self.failUnless(isinstance(found_format, format.__class__))
 
181
            self.assertIsInstance(found_format, format.__class__)
164
182
        check_format(_mod_branch.BzrBranchFormat5(), "bar")
165
183
 
166
184
    def test_find_format_factory(self):
169
187
        factory = _mod_branch.MetaDirBranchFormatFactory(
170
188
            SampleSupportedBranchFormatString,
171
189
            "bzrlib.tests.test_branch", "SampleSupportedBranchFormat")
172
 
        _mod_branch.BranchFormat.register_format(factory)
173
 
        self.addCleanup(_mod_branch.BranchFormat.unregister_format, factory)
 
190
        _mod_branch.format_registry.register(factory)
 
191
        self.addCleanup(_mod_branch.format_registry.remove, factory)
174
192
        b = _mod_branch.Branch.open(self.get_url())
175
193
        self.assertEqual(b, "opened supported branch.")
176
194
 
188
206
                          dir)
189
207
 
190
208
    def test_register_unregister_format(self):
 
209
        # Test the deprecated format registration functions
191
210
        format = SampleBranchFormat()
192
211
        # make a control dir
193
212
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
194
213
        # make a branch
195
214
        format.initialize(dir)
196
215
        # register a format for it.
197
 
        _mod_branch.BranchFormat.register_format(format)
 
216
        self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
 
217
            _mod_branch.BranchFormat.register_format, format)
198
218
        # which branch.Open will refuse (not supported)
199
219
        self.assertRaises(errors.UnsupportedFormatError,
200
220
                          _mod_branch.Branch.open, self.get_url())
204
224
            format.open(dir),
205
225
            bzrdir.BzrDir.open(self.get_url()).open_branch(unsupported=True))
206
226
        # unregister the format
207
 
        _mod_branch.BranchFormat.unregister_format(format)
 
227
        self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
 
228
            _mod_branch.BranchFormat.unregister_format, format)
208
229
        self.make_branch_and_tree('bar')
209
230
 
210
231
 
 
232
class TestBranchFormatRegistry(tests.TestCase):
 
233
 
 
234
    def setUp(self):
 
235
        super(TestBranchFormatRegistry, self).setUp()
 
236
        self.registry = _mod_branch.BranchFormatRegistry()
 
237
 
 
238
    def test_default(self):
 
239
        self.assertIs(None, self.registry.get_default())
 
240
        format = SampleBranchFormat()
 
241
        self.registry.set_default(format)
 
242
        self.assertEquals(format, self.registry.get_default())
 
243
 
 
244
    def test_register_unregister_format(self):
 
245
        format = SampleBranchFormat()
 
246
        self.registry.register(format)
 
247
        self.assertEquals(format,
 
248
            self.registry.get("Sample branch format."))
 
249
        self.registry.remove(format)
 
250
        self.assertRaises(KeyError, self.registry.get,
 
251
            "Sample branch format.")
 
252
 
 
253
    def test_get_all(self):
 
254
        format = SampleBranchFormat()
 
255
        self.assertEquals([], self.registry._get_all())
 
256
        self.registry.register(format)
 
257
        self.assertEquals([format], self.registry._get_all())
 
258
 
 
259
    def test_register_extra(self):
 
260
        format = SampleExtraBranchFormat()
 
261
        self.assertEquals([], self.registry._get_all())
 
262
        self.registry.register_extra(format)
 
263
        self.assertEquals([format], self.registry._get_all())
 
264
 
 
265
    def test_register_extra_lazy(self):
 
266
        self.assertEquals([], self.registry._get_all())
 
267
        self.registry.register_extra_lazy("bzrlib.tests.test_branch",
 
268
            "SampleExtraBranchFormat")
 
269
        formats = self.registry._get_all()
 
270
        self.assertEquals(1, len(formats))
 
271
        self.assertIsInstance(formats[0], SampleExtraBranchFormat)
 
272
 
 
273
 
211
274
#Used by TestMetaDirBranchFormatFactory 
212
275
FakeLazyFormat = None
213
276
 
260
323
 
261
324
    def test_layout(self):
262
325
        branch = self.make_branch('a', format=self.get_format_name())
263
 
        self.failUnlessExists('a/.bzr/branch/last-revision')
264
 
        self.failIfExists('a/.bzr/branch/revision-history')
265
 
        self.failIfExists('a/.bzr/branch/references')
 
326
        self.assertPathExists('a/.bzr/branch/last-revision')
 
327
        self.assertPathDoesNotExist('a/.bzr/branch/revision-history')
 
328
        self.assertPathDoesNotExist('a/.bzr/branch/references')
266
329
 
267
330
    def test_config(self):
268
331
        """Ensure that all configuration data is stored in the branch"""
269
332
        branch = self.make_branch('a', format=self.get_format_name())
270
 
        branch.set_parent('http://bazaar-vcs.org')
271
 
        self.failIfExists('a/.bzr/branch/parent')
272
 
        self.assertEqual('http://bazaar-vcs.org', branch.get_parent())
273
 
        branch.set_push_location('sftp://bazaar-vcs.org')
 
333
        branch.set_parent('http://example.com')
 
334
        self.assertPathDoesNotExist('a/.bzr/branch/parent')
 
335
        self.assertEqual('http://example.com', branch.get_parent())
 
336
        branch.set_push_location('sftp://example.com')
274
337
        config = branch.get_config()._get_branch_data_config()
275
 
        self.assertEqual('sftp://bazaar-vcs.org',
 
338
        self.assertEqual('sftp://example.com',
276
339
                         config.get_user_option('push_location'))
277
 
        branch.set_bound_location('ftp://bazaar-vcs.org')
278
 
        self.failIfExists('a/.bzr/branch/bound')
279
 
        self.assertEqual('ftp://bazaar-vcs.org', branch.get_bound_location())
 
340
        branch.set_bound_location('ftp://example.com')
 
341
        self.assertPathDoesNotExist('a/.bzr/branch/bound')
 
342
        self.assertEqual('ftp://example.com', branch.get_bound_location())
280
343
 
281
344
    def test_set_revision_history(self):
282
345
        builder = self.make_branch_builder('.', format=self.get_format_name())
287
350
        branch = builder.get_branch()
288
351
        branch.lock_write()
289
352
        self.addCleanup(branch.unlock)
290
 
        branch.set_revision_history(['foo', 'bar'])
291
 
        branch.set_revision_history(['foo'])
 
353
        self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
 
354
            branch.set_revision_history, ['foo', 'bar'])
 
355
        self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
 
356
                branch.set_revision_history, ['foo'])
292
357
        self.assertRaises(errors.NotLefthandHistory,
293
 
                          branch.set_revision_history, ['bar'])
 
358
            self.applyDeprecated, symbol_versioning.deprecated_in((2, 4, 0)),
 
359
            branch.set_revision_history, ['bar'])
294
360
 
295
361
    def do_checkout_test(self, lightweight=False):
296
362
        tree = self.make_branch_and_tree('source',
309
375
        subtree.commit('a subtree file')
310
376
        subsubtree.commit('a subsubtree file')
311
377
        tree.branch.create_checkout('target', lightweight=lightweight)
312
 
        self.failUnlessExists('target')
313
 
        self.failUnlessExists('target/subtree')
314
 
        self.failUnlessExists('target/subtree/file')
315
 
        self.failUnlessExists('target/subtree/subsubtree/file')
 
378
        self.assertPathExists('target')
 
379
        self.assertPathExists('target/subtree')
 
380
        self.assertPathExists('target/subtree/file')
 
381
        self.assertPathExists('target/subtree/subsubtree/file')
316
382
        subbranch = _mod_branch.Branch.open('target/subtree/subsubtree')
317
383
        if lightweight:
318
384
            self.assertEndsWith(subbranch.base, 'source/subtree/subsubtree/')
480
546
 
481
547
    def test_create_open_reference(self):
482
548
        bzrdirformat = bzrdir.BzrDirMetaFormat1()
483
 
        t = transport.get_transport(self.get_url('.'))
 
549
        t = self.get_transport()
484
550
        t.mkdir('repo')
485
551
        dir = bzrdirformat.initialize(self.get_url('repo'))
486
552
        dir.create_repository()
587
653
        if value is not None:
588
654
            self.config.set_user_option('append_revisions_only', value)
589
655
        self.assertEqual(expected_value,
590
 
                         self.branch._get_append_revisions_only())
 
656
                         self.branch.get_append_revisions_only())
591
657
 
592
658
    def test_valid_append_revisions_only(self):
593
659
        self.assertEquals(None,
644
710
        r.new_revid = "same-revid"
645
711
        f = StringIO()
646
712
        r.report(f)
647
 
        self.assertEqual("No revisions to pull.\n", f.getvalue())
648
 
 
649
 
 
650
 
class _StubLockable(object):
651
 
    """Helper for TestRunWithWriteLockedTarget."""
652
 
 
653
 
    def __init__(self, calls, unlock_exc=None):
654
 
        self.calls = calls
655
 
        self.unlock_exc = unlock_exc
656
 
 
657
 
    def lock_write(self):
658
 
        self.calls.append('lock_write')
659
 
 
660
 
    def unlock(self):
661
 
        self.calls.append('unlock')
662
 
        if self.unlock_exc is not None:
663
 
            raise self.unlock_exc
664
 
 
665
 
 
666
 
class _ErrorFromCallable(Exception):
667
 
    """Helper for TestRunWithWriteLockedTarget."""
668
 
 
669
 
 
670
 
class _ErrorFromUnlock(Exception):
671
 
    """Helper for TestRunWithWriteLockedTarget."""
672
 
 
673
 
 
674
 
class TestRunWithWriteLockedTarget(tests.TestCase):
675
 
    """Tests for _run_with_write_locked_target."""
676
 
 
677
 
    def setUp(self):
678
 
        tests.TestCase.setUp(self)
679
 
        self._calls = []
680
 
 
681
 
    def func_that_returns_ok(self):
682
 
        self._calls.append('func called')
683
 
        return 'ok'
684
 
 
685
 
    def func_that_raises(self):
686
 
        self._calls.append('func called')
687
 
        raise _ErrorFromCallable()
688
 
 
689
 
    def test_success_unlocks(self):
690
 
        lockable = _StubLockable(self._calls)
691
 
        result = _mod_branch._run_with_write_locked_target(
692
 
            lockable, self.func_that_returns_ok)
693
 
        self.assertEqual('ok', result)
694
 
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
695
 
 
696
 
    def test_exception_unlocks_and_propagates(self):
697
 
        lockable = _StubLockable(self._calls)
698
 
        self.assertRaises(_ErrorFromCallable,
699
 
                          _mod_branch._run_with_write_locked_target,
700
 
                          lockable, self.func_that_raises)
701
 
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
702
 
 
703
 
    def test_callable_succeeds_but_error_during_unlock(self):
704
 
        lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
705
 
        self.assertRaises(_ErrorFromUnlock,
706
 
                          _mod_branch._run_with_write_locked_target,
707
 
                          lockable, self.func_that_returns_ok)
708
 
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
709
 
 
710
 
    def test_error_during_unlock_does_not_mask_original_error(self):
711
 
        lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
712
 
        self.assertRaises(_ErrorFromCallable,
713
 
                          _mod_branch._run_with_write_locked_target,
714
 
                          lockable, self.func_that_raises)
715
 
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
716
 
 
 
713
        self.assertEqual("No revisions or tags to pull.\n", f.getvalue())
717
714