~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_branch.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2010-05-11 11:47:36 UTC
  • mfrom: (5200.3.8 lock_return)
  • Revision ID: pqm@pqm.ubuntu.com-20100511114736-mc1sq9zyo3vufec7
(lifeless) Provide a consistent interface to Tree, Branch,
 Repository where lock methods return an object with an unlock method to
 unlock the lock. This breaks the API for Branch,
 Repository on their lock_write methods. (Robert Collins)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2011 Canonical Ltd
 
1
# Copyright (C) 2006-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
"""Tests for the Branch facility that are not interface  tests.
18
18
 
19
 
For interface tests see `tests/per_branch/*.py`.
 
19
For interface tests see tests/per_branch/*.py.
20
20
 
21
21
For concrete class tests see this file, and for meta-branch tests
22
22
also see this file.
29
29
    bzrdir,
30
30
    config,
31
31
    errors,
32
 
    symbol_versioning,
33
32
    tests,
34
33
    trace,
 
34
    transport,
35
35
    urlutils,
36
36
    )
37
37
 
40
40
 
41
41
    def test_default_format(self):
42
42
        # update this if you change the default branch format
43
 
        self.assertIsInstance(_mod_branch.format_registry.get_default(),
 
43
        self.assertIsInstance(_mod_branch.BranchFormat.get_default_format(),
44
44
                _mod_branch.BzrBranchFormat7)
45
45
 
46
46
    def test_default_format_is_same_as_bzrdir_default(self):
48
48
        # set, but at the moment that's not true -- mbp 20070814 --
49
49
        # https://bugs.launchpad.net/bzr/+bug/132376
50
50
        self.assertEqual(
51
 
            _mod_branch.format_registry.get_default(),
 
51
            _mod_branch.BranchFormat.get_default_format(),
52
52
            bzrdir.BzrDirFormat.get_default_format().get_branch_format())
53
53
 
54
54
    def test_get_set_default_format(self):
55
55
        # set the format and then set it back again
56
 
        old_format = _mod_branch.format_registry.get_default()
57
 
        _mod_branch.format_registry.set_default(SampleBranchFormat())
 
56
        old_format = _mod_branch.BranchFormat.get_default_format()
 
57
        _mod_branch.BranchFormat.set_default_format(SampleBranchFormat())
58
58
        try:
59
59
            # the default branch format is used by the meta dir format
60
60
            # which is not the default bzrdir format at this point
62
62
            result = dir.create_branch()
63
63
            self.assertEqual(result, 'A branch')
64
64
        finally:
65
 
            _mod_branch.format_registry.set_default(old_format)
 
65
            _mod_branch.BranchFormat.set_default_format(old_format)
66
66
        self.assertEqual(old_format,
67
 
                         _mod_branch.format_registry.get_default())
 
67
                         _mod_branch.BranchFormat.get_default_format())
68
68
 
69
69
 
70
70
class TestBranchFormat5(tests.TestCaseWithTransport):
74
74
        url = self.get_url()
75
75
        bdir = bzrdir.BzrDirMetaFormat1().initialize(url)
76
76
        bdir.create_repository()
77
 
        branch = _mod_branch.BzrBranchFormat5().initialize(bdir)
 
77
        branch = bdir.create_branch()
78
78
        t = self.get_transport()
79
79
        self.log("branch instance is %r" % branch)
80
80
        self.assert_(isinstance(branch, _mod_branch.BzrBranch5))
86
86
        self.assertIsDirectory('.bzr/branch/lock/held', t)
87
87
 
88
88
    def test_set_push_location(self):
89
 
        conf = config.LocationConfig.from_string('# comment\n', '.', save=True)
 
89
        from bzrlib.config import (locations_config_filename,
 
90
                                   ensure_config_dir_exists)
 
91
        ensure_config_dir_exists()
 
92
        fn = locations_config_filename()
 
93
        # write correct newlines to locations.conf
 
94
        # by default ConfigObj uses native line-endings for new files
 
95
        # but uses already existing line-endings if file is not empty
 
96
        f = open(fn, 'wb')
 
97
        try:
 
98
            f.write('# comment\n')
 
99
        finally:
 
100
            f.close()
90
101
 
91
102
        branch = self.make_branch('.', format='knit')
92
103
        branch.set_push_location('foo')
95
106
                             "[%s]\n"
96
107
                             "push_location = foo\n"
97
108
                             "push_location:policy = norecurse\n" % local_path,
98
 
                             config.locations_config_filename())
 
109
                             fn)
99
110
 
100
111
    # TODO RBC 20051029 test getting a push location from a branch in a
101
112
    # recursive section - that is, it appends the branch name.
112
123
        """See BzrBranchFormat.get_format_string()."""
113
124
        return "Sample branch format."
114
125
 
115
 
    def initialize(self, a_bzrdir, name=None, repository=None,
116
 
                   append_revisions_only=None):
 
126
    def initialize(self, a_bzrdir, name=None):
117
127
        """Format 4 branches cannot be created."""
118
128
        t = a_bzrdir.get_branch_transport(self, name=name)
119
129
        t.put_bytes('format', self.get_format_string())
126
136
        return "opened branch."
127
137
 
128
138
 
129
 
# Demonstrating how lazy loading is often implemented:
130
 
# A constant string is created.
131
 
SampleSupportedBranchFormatString = "Sample supported branch format."
132
 
 
133
 
# And the format class can then reference the constant to avoid skew.
134
 
class SampleSupportedBranchFormat(_mod_branch.BranchFormat):
135
 
    """A sample supported format."""
136
 
 
137
 
    def get_format_string(self):
138
 
        """See BzrBranchFormat.get_format_string()."""
139
 
        return SampleSupportedBranchFormatString
140
 
 
141
 
    def initialize(self, a_bzrdir, name=None, append_revisions_only=None):
142
 
        t = a_bzrdir.get_branch_transport(self, name=name)
143
 
        t.put_bytes('format', self.get_format_string())
144
 
        return 'A branch'
145
 
 
146
 
    def open(self, transport, name=None, _found=False, ignore_fallbacks=False):
147
 
        return "opened supported branch."
148
 
 
149
 
 
150
 
class SampleExtraBranchFormat(_mod_branch.BranchFormat):
151
 
    """A sample format that is not usable in a metadir."""
152
 
 
153
 
    def get_format_string(self):
154
 
        # This format is not usable in a metadir.
155
 
        return None
156
 
 
157
 
    def network_name(self):
158
 
        # Network name always has to be provided.
159
 
        return "extra"
160
 
 
161
 
    def initialize(self, a_bzrdir, name=None):
162
 
        raise NotImplementedError(self.initialize)
163
 
 
164
 
    def open(self, transport, name=None, _found=False, ignore_fallbacks=False):
165
 
        raise NotImplementedError(self.open)
166
 
 
167
 
 
168
139
class TestBzrBranchFormat(tests.TestCaseWithTransport):
169
140
    """Tests for the BzrBranchFormat facility."""
170
141
 
178
149
            dir.create_repository()
179
150
            format.initialize(dir)
180
151
            found_format = _mod_branch.BranchFormat.find_format(dir)
181
 
            self.assertIsInstance(found_format, format.__class__)
 
152
            self.failUnless(isinstance(found_format, format.__class__))
182
153
        check_format(_mod_branch.BzrBranchFormat5(), "bar")
183
154
 
184
 
    def test_find_format_factory(self):
185
 
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
186
 
        SampleSupportedBranchFormat().initialize(dir)
187
 
        factory = _mod_branch.MetaDirBranchFormatFactory(
188
 
            SampleSupportedBranchFormatString,
189
 
            "bzrlib.tests.test_branch", "SampleSupportedBranchFormat")
190
 
        _mod_branch.format_registry.register(factory)
191
 
        self.addCleanup(_mod_branch.format_registry.remove, factory)
192
 
        b = _mod_branch.Branch.open(self.get_url())
193
 
        self.assertEqual(b, "opened supported branch.")
194
 
 
195
155
    def test_find_format_not_branch(self):
196
156
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
197
157
        self.assertRaises(errors.NotBranchError,
206
166
                          dir)
207
167
 
208
168
    def test_register_unregister_format(self):
209
 
        # Test the deprecated format registration functions
210
169
        format = SampleBranchFormat()
211
170
        # make a control dir
212
171
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
213
172
        # make a branch
214
173
        format.initialize(dir)
215
174
        # register a format for it.
216
 
        self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
217
 
            _mod_branch.BranchFormat.register_format, format)
 
175
        _mod_branch.BranchFormat.register_format(format)
218
176
        # which branch.Open will refuse (not supported)
219
177
        self.assertRaises(errors.UnsupportedFormatError,
220
178
                          _mod_branch.Branch.open, self.get_url())
224
182
            format.open(dir),
225
183
            bzrdir.BzrDir.open(self.get_url()).open_branch(unsupported=True))
226
184
        # unregister the format
227
 
        self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
228
 
            _mod_branch.BranchFormat.unregister_format, format)
 
185
        _mod_branch.BranchFormat.unregister_format(format)
229
186
        self.make_branch_and_tree('bar')
230
187
 
231
188
 
232
 
class TestBranchFormatRegistry(tests.TestCase):
233
 
 
234
 
    def setUp(self):
235
 
        super(TestBranchFormatRegistry, self).setUp()
236
 
        self.registry = _mod_branch.BranchFormatRegistry()
237
 
 
238
 
    def test_default(self):
239
 
        self.assertIs(None, self.registry.get_default())
240
 
        format = SampleBranchFormat()
241
 
        self.registry.set_default(format)
242
 
        self.assertEquals(format, self.registry.get_default())
243
 
 
244
 
    def test_register_unregister_format(self):
245
 
        format = SampleBranchFormat()
246
 
        self.registry.register(format)
247
 
        self.assertEquals(format,
248
 
            self.registry.get("Sample branch format."))
249
 
        self.registry.remove(format)
250
 
        self.assertRaises(KeyError, self.registry.get,
251
 
            "Sample branch format.")
252
 
 
253
 
    def test_get_all(self):
254
 
        format = SampleBranchFormat()
255
 
        self.assertEquals([], self.registry._get_all())
256
 
        self.registry.register(format)
257
 
        self.assertEquals([format], self.registry._get_all())
258
 
 
259
 
    def test_register_extra(self):
260
 
        format = SampleExtraBranchFormat()
261
 
        self.assertEquals([], self.registry._get_all())
262
 
        self.registry.register_extra(format)
263
 
        self.assertEquals([format], self.registry._get_all())
264
 
 
265
 
    def test_register_extra_lazy(self):
266
 
        self.assertEquals([], self.registry._get_all())
267
 
        self.registry.register_extra_lazy("bzrlib.tests.test_branch",
268
 
            "SampleExtraBranchFormat")
269
 
        formats = self.registry._get_all()
270
 
        self.assertEquals(1, len(formats))
271
 
        self.assertIsInstance(formats[0], SampleExtraBranchFormat)
272
 
 
273
 
 
274
 
#Used by TestMetaDirBranchFormatFactory 
275
 
FakeLazyFormat = None
276
 
 
277
 
 
278
 
class TestMetaDirBranchFormatFactory(tests.TestCase):
279
 
 
280
 
    def test_get_format_string_does_not_load(self):
281
 
        """Formats have a static format string."""
282
 
        factory = _mod_branch.MetaDirBranchFormatFactory("yo", None, None)
283
 
        self.assertEqual("yo", factory.get_format_string())
284
 
 
285
 
    def test_call_loads(self):
286
 
        # __call__ is used by the network_format_registry interface to get a
287
 
        # Format.
288
 
        global FakeLazyFormat
289
 
        del FakeLazyFormat
290
 
        factory = _mod_branch.MetaDirBranchFormatFactory(None,
291
 
            "bzrlib.tests.test_branch", "FakeLazyFormat")
292
 
        self.assertRaises(AttributeError, factory)
293
 
 
294
 
    def test_call_returns_call_of_referenced_object(self):
295
 
        global FakeLazyFormat
296
 
        FakeLazyFormat = lambda:'called'
297
 
        factory = _mod_branch.MetaDirBranchFormatFactory(None,
298
 
            "bzrlib.tests.test_branch", "FakeLazyFormat")
299
 
        self.assertEqual('called', factory())
300
 
 
301
 
 
302
189
class TestBranch67(object):
303
190
    """Common tests for both branch 6 and 7 which are mostly the same."""
304
191
 
323
210
 
324
211
    def test_layout(self):
325
212
        branch = self.make_branch('a', format=self.get_format_name())
326
 
        self.assertPathExists('a/.bzr/branch/last-revision')
327
 
        self.assertPathDoesNotExist('a/.bzr/branch/revision-history')
328
 
        self.assertPathDoesNotExist('a/.bzr/branch/references')
 
213
        self.failUnlessExists('a/.bzr/branch/last-revision')
 
214
        self.failIfExists('a/.bzr/branch/revision-history')
 
215
        self.failIfExists('a/.bzr/branch/references')
329
216
 
330
217
    def test_config(self):
331
218
        """Ensure that all configuration data is stored in the branch"""
332
219
        branch = self.make_branch('a', format=self.get_format_name())
333
 
        branch.set_parent('http://example.com')
334
 
        self.assertPathDoesNotExist('a/.bzr/branch/parent')
335
 
        self.assertEqual('http://example.com', branch.get_parent())
336
 
        branch.set_push_location('sftp://example.com')
 
220
        branch.set_parent('http://bazaar-vcs.org')
 
221
        self.failIfExists('a/.bzr/branch/parent')
 
222
        self.assertEqual('http://bazaar-vcs.org', branch.get_parent())
 
223
        branch.set_push_location('sftp://bazaar-vcs.org')
337
224
        config = branch.get_config()._get_branch_data_config()
338
 
        self.assertEqual('sftp://example.com',
 
225
        self.assertEqual('sftp://bazaar-vcs.org',
339
226
                         config.get_user_option('push_location'))
340
 
        branch.set_bound_location('ftp://example.com')
341
 
        self.assertPathDoesNotExist('a/.bzr/branch/bound')
342
 
        self.assertEqual('ftp://example.com', branch.get_bound_location())
 
227
        branch.set_bound_location('ftp://bazaar-vcs.org')
 
228
        self.failIfExists('a/.bzr/branch/bound')
 
229
        self.assertEqual('ftp://bazaar-vcs.org', branch.get_bound_location())
343
230
 
344
231
    def test_set_revision_history(self):
345
232
        builder = self.make_branch_builder('.', format=self.get_format_name())
350
237
        branch = builder.get_branch()
351
238
        branch.lock_write()
352
239
        self.addCleanup(branch.unlock)
353
 
        self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
354
 
            branch.set_revision_history, ['foo', 'bar'])
355
 
        self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
356
 
                branch.set_revision_history, ['foo'])
 
240
        branch.set_revision_history(['foo', 'bar'])
 
241
        branch.set_revision_history(['foo'])
357
242
        self.assertRaises(errors.NotLefthandHistory,
358
 
            self.applyDeprecated, symbol_versioning.deprecated_in((2, 4, 0)),
359
 
            branch.set_revision_history, ['bar'])
 
243
                          branch.set_revision_history, ['bar'])
360
244
 
361
245
    def do_checkout_test(self, lightweight=False):
362
246
        tree = self.make_branch_and_tree('source',
375
259
        subtree.commit('a subtree file')
376
260
        subsubtree.commit('a subsubtree file')
377
261
        tree.branch.create_checkout('target', lightweight=lightweight)
378
 
        self.assertPathExists('target')
379
 
        self.assertPathExists('target/subtree')
380
 
        self.assertPathExists('target/subtree/file')
381
 
        self.assertPathExists('target/subtree/subsubtree/file')
 
262
        self.failUnlessExists('target')
 
263
        self.failUnlessExists('target/subtree')
 
264
        self.failUnlessExists('target/subtree/file')
 
265
        self.failUnlessExists('target/subtree/subsubtree/file')
382
266
        subbranch = _mod_branch.Branch.open('target/subtree/subsubtree')
383
267
        if lightweight:
384
268
            self.assertEndsWith(subbranch.base, 'source/subtree/subsubtree/')
546
430
 
547
431
    def test_create_open_reference(self):
548
432
        bzrdirformat = bzrdir.BzrDirMetaFormat1()
549
 
        t = self.get_transport()
 
433
        t = transport.get_transport(self.get_url('.'))
550
434
        t.mkdir('repo')
551
435
        dir = bzrdirformat.initialize(self.get_url('repo'))
552
436
        dir.create_repository()
608
492
        self.assertTrue(hasattr(params, 'bzrdir'))
609
493
        self.assertTrue(hasattr(params, 'branch'))
610
494
 
611
 
    def test_post_branch_init_hook_repr(self):
612
 
        param_reprs = []
613
 
        _mod_branch.Branch.hooks.install_named_hook('post_branch_init',
614
 
            lambda params: param_reprs.append(repr(params)), None)
615
 
        branch = self.make_branch('a')
616
 
        self.assertLength(1, param_reprs)
617
 
        param_repr = param_reprs[0]
618
 
        self.assertStartsWith(param_repr, '<BranchInitHookParams of ')
619
 
 
620
495
    def test_post_switch_hook(self):
621
496
        from bzrlib import switch
622
497
        calls = []
653
528
        if value is not None:
654
529
            self.config.set_user_option('append_revisions_only', value)
655
530
        self.assertEqual(expected_value,
656
 
                         self.branch.get_append_revisions_only())
 
531
                         self.branch._get_append_revisions_only())
657
532
 
658
533
    def test_valid_append_revisions_only(self):
659
534
        self.assertEquals(None,
689
564
        # this usage of results is not recommended for new code (because it
690
565
        # doesn't describe very well what happened), but for api stability
691
566
        # it's still supported
692
 
        self.assertEqual(self.applyDeprecated(
693
 
            symbol_versioning.deprecated_in((2, 3, 0)),
694
 
            r.__int__),
695
 
            10)
 
567
        a = "%d revisions pulled" % r
 
568
        self.assertEqual(a, "10 revisions pulled")
696
569
 
697
570
    def test_report_changed(self):
698
571
        r = _mod_branch.PullResult()
710
583
        r.new_revid = "same-revid"
711
584
        f = StringIO()
712
585
        r.report(f)
713
 
        self.assertEqual("No revisions or tags to pull.\n", f.getvalue())
 
586
        self.assertEqual("No revisions to pull.\n", f.getvalue())
 
587
 
 
588
 
 
589
class _StubLockable(object):
 
590
    """Helper for TestRunWithWriteLockedTarget."""
 
591
 
 
592
    def __init__(self, calls, unlock_exc=None):
 
593
        self.calls = calls
 
594
        self.unlock_exc = unlock_exc
 
595
 
 
596
    def lock_write(self):
 
597
        self.calls.append('lock_write')
 
598
 
 
599
    def unlock(self):
 
600
        self.calls.append('unlock')
 
601
        if self.unlock_exc is not None:
 
602
            raise self.unlock_exc
 
603
 
 
604
 
 
605
class _ErrorFromCallable(Exception):
 
606
    """Helper for TestRunWithWriteLockedTarget."""
 
607
 
 
608
 
 
609
class _ErrorFromUnlock(Exception):
 
610
    """Helper for TestRunWithWriteLockedTarget."""
 
611
 
 
612
 
 
613
class TestRunWithWriteLockedTarget(tests.TestCase):
 
614
    """Tests for _run_with_write_locked_target."""
 
615
 
 
616
    def setUp(self):
 
617
        tests.TestCase.setUp(self)
 
618
        self._calls = []
 
619
 
 
620
    def func_that_returns_ok(self):
 
621
        self._calls.append('func called')
 
622
        return 'ok'
 
623
 
 
624
    def func_that_raises(self):
 
625
        self._calls.append('func called')
 
626
        raise _ErrorFromCallable()
 
627
 
 
628
    def test_success_unlocks(self):
 
629
        lockable = _StubLockable(self._calls)
 
630
        result = _mod_branch._run_with_write_locked_target(
 
631
            lockable, self.func_that_returns_ok)
 
632
        self.assertEqual('ok', result)
 
633
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
 
634
 
 
635
    def test_exception_unlocks_and_propagates(self):
 
636
        lockable = _StubLockable(self._calls)
 
637
        self.assertRaises(_ErrorFromCallable,
 
638
                          _mod_branch._run_with_write_locked_target,
 
639
                          lockable, self.func_that_raises)
 
640
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
 
641
 
 
642
    def test_callable_succeeds_but_error_during_unlock(self):
 
643
        lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
 
644
        self.assertRaises(_ErrorFromUnlock,
 
645
                          _mod_branch._run_with_write_locked_target,
 
646
                          lockable, self.func_that_returns_ok)
 
647
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
 
648
 
 
649
    def test_error_during_unlock_does_not_mask_original_error(self):
 
650
        lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
 
651
        self.assertRaises(_ErrorFromCallable,
 
652
                          _mod_branch._run_with_write_locked_target,
 
653
                          lockable, self.func_that_raises)
 
654
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
 
655
 
714
656