~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_branch.py

(jameinel) Allow 'bzr serve' to interpret SIGHUP as a graceful shutdown.
 (bug #795025) (John A Meinel)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2010 Canonical Ltd
 
1
# Copyright (C) 2006-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
"""Tests for the Branch facility that are not interface  tests.
18
18
 
19
 
For interface tests see tests/per_branch/*.py.
 
19
For interface tests see `tests/per_branch/*.py`.
20
20
 
21
21
For concrete class tests see this file, and for meta-branch tests
22
22
also see this file.
29
29
    bzrdir,
30
30
    config,
31
31
    errors,
 
32
    symbol_versioning,
32
33
    tests,
33
34
    trace,
34
 
    transport,
35
35
    urlutils,
36
36
    )
37
37
 
40
40
 
41
41
    def test_default_format(self):
42
42
        # update this if you change the default branch format
43
 
        self.assertIsInstance(_mod_branch.BranchFormat.get_default_format(),
 
43
        self.assertIsInstance(_mod_branch.format_registry.get_default(),
44
44
                _mod_branch.BzrBranchFormat7)
45
45
 
46
46
    def test_default_format_is_same_as_bzrdir_default(self):
48
48
        # set, but at the moment that's not true -- mbp 20070814 --
49
49
        # https://bugs.launchpad.net/bzr/+bug/132376
50
50
        self.assertEqual(
51
 
            _mod_branch.BranchFormat.get_default_format(),
 
51
            _mod_branch.format_registry.get_default(),
52
52
            bzrdir.BzrDirFormat.get_default_format().get_branch_format())
53
53
 
54
54
    def test_get_set_default_format(self):
55
55
        # set the format and then set it back again
56
 
        old_format = _mod_branch.BranchFormat.get_default_format()
57
 
        _mod_branch.BranchFormat.set_default_format(SampleBranchFormat())
 
56
        old_format = _mod_branch.format_registry.get_default()
 
57
        _mod_branch.format_registry.set_default(SampleBranchFormat())
58
58
        try:
59
59
            # the default branch format is used by the meta dir format
60
60
            # which is not the default bzrdir format at this point
62
62
            result = dir.create_branch()
63
63
            self.assertEqual(result, 'A branch')
64
64
        finally:
65
 
            _mod_branch.BranchFormat.set_default_format(old_format)
 
65
            _mod_branch.format_registry.set_default(old_format)
66
66
        self.assertEqual(old_format,
67
 
                         _mod_branch.BranchFormat.get_default_format())
 
67
                         _mod_branch.format_registry.get_default())
68
68
 
69
69
 
70
70
class TestBranchFormat5(tests.TestCaseWithTransport):
74
74
        url = self.get_url()
75
75
        bdir = bzrdir.BzrDirMetaFormat1().initialize(url)
76
76
        bdir.create_repository()
77
 
        branch = bdir.create_branch()
 
77
        branch = _mod_branch.BzrBranchFormat5().initialize(bdir)
78
78
        t = self.get_transport()
79
79
        self.log("branch instance is %r" % branch)
80
80
        self.assert_(isinstance(branch, _mod_branch.BzrBranch5))
86
86
        self.assertIsDirectory('.bzr/branch/lock/held', t)
87
87
 
88
88
    def test_set_push_location(self):
89
 
        from bzrlib.config import (locations_config_filename,
90
 
                                   ensure_config_dir_exists)
91
 
        ensure_config_dir_exists()
92
 
        fn = locations_config_filename()
93
 
        # write correct newlines to locations.conf
94
 
        # by default ConfigObj uses native line-endings for new files
95
 
        # but uses already existing line-endings if file is not empty
96
 
        f = open(fn, 'wb')
97
 
        try:
98
 
            f.write('# comment\n')
99
 
        finally:
100
 
            f.close()
 
89
        conf = config.LocationConfig.from_string('# comment\n', '.', save=True)
101
90
 
102
91
        branch = self.make_branch('.', format='knit')
103
92
        branch.set_push_location('foo')
106
95
                             "[%s]\n"
107
96
                             "push_location = foo\n"
108
97
                             "push_location:policy = norecurse\n" % local_path,
109
 
                             fn)
 
98
                             config.locations_config_filename())
110
99
 
111
100
    # TODO RBC 20051029 test getting a push location from a branch in a
112
101
    # recursive section - that is, it appends the branch name.
123
112
        """See BzrBranchFormat.get_format_string()."""
124
113
        return "Sample branch format."
125
114
 
126
 
    def initialize(self, a_bzrdir, name=None):
 
115
    def initialize(self, a_bzrdir, name=None, repository=None,
 
116
                   append_revisions_only=None):
127
117
        """Format 4 branches cannot be created."""
128
118
        t = a_bzrdir.get_branch_transport(self, name=name)
129
119
        t.put_bytes('format', self.get_format_string())
136
126
        return "opened branch."
137
127
 
138
128
 
 
129
# Demonstrating how lazy loading is often implemented:
 
130
# A constant string is created.
 
131
SampleSupportedBranchFormatString = "Sample supported branch format."
 
132
 
 
133
# And the format class can then reference the constant to avoid skew.
 
134
class SampleSupportedBranchFormat(_mod_branch.BranchFormat):
 
135
    """A sample supported format."""
 
136
 
 
137
    def get_format_string(self):
 
138
        """See BzrBranchFormat.get_format_string()."""
 
139
        return SampleSupportedBranchFormatString
 
140
 
 
141
    def initialize(self, a_bzrdir, name=None, append_revisions_only=None):
 
142
        t = a_bzrdir.get_branch_transport(self, name=name)
 
143
        t.put_bytes('format', self.get_format_string())
 
144
        return 'A branch'
 
145
 
 
146
    def open(self, transport, name=None, _found=False, ignore_fallbacks=False):
 
147
        return "opened supported branch."
 
148
 
 
149
 
 
150
class SampleExtraBranchFormat(_mod_branch.BranchFormat):
 
151
    """A sample format that is not usable in a metadir."""
 
152
 
 
153
    def get_format_string(self):
 
154
        # This format is not usable in a metadir.
 
155
        return None
 
156
 
 
157
    def network_name(self):
 
158
        # Network name always has to be provided.
 
159
        return "extra"
 
160
 
 
161
    def initialize(self, a_bzrdir, name=None):
 
162
        raise NotImplementedError(self.initialize)
 
163
 
 
164
    def open(self, transport, name=None, _found=False, ignore_fallbacks=False):
 
165
        raise NotImplementedError(self.open)
 
166
 
 
167
 
139
168
class TestBzrBranchFormat(tests.TestCaseWithTransport):
140
169
    """Tests for the BzrBranchFormat facility."""
141
170
 
149
178
            dir.create_repository()
150
179
            format.initialize(dir)
151
180
            found_format = _mod_branch.BranchFormat.find_format(dir)
152
 
            self.failUnless(isinstance(found_format, format.__class__))
 
181
            self.assertIsInstance(found_format, format.__class__)
153
182
        check_format(_mod_branch.BzrBranchFormat5(), "bar")
154
183
 
 
184
    def test_find_format_factory(self):
 
185
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
186
        SampleSupportedBranchFormat().initialize(dir)
 
187
        factory = _mod_branch.MetaDirBranchFormatFactory(
 
188
            SampleSupportedBranchFormatString,
 
189
            "bzrlib.tests.test_branch", "SampleSupportedBranchFormat")
 
190
        _mod_branch.format_registry.register(factory)
 
191
        self.addCleanup(_mod_branch.format_registry.remove, factory)
 
192
        b = _mod_branch.Branch.open(self.get_url())
 
193
        self.assertEqual(b, "opened supported branch.")
 
194
 
155
195
    def test_find_format_not_branch(self):
156
196
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
157
197
        self.assertRaises(errors.NotBranchError,
166
206
                          dir)
167
207
 
168
208
    def test_register_unregister_format(self):
 
209
        # Test the deprecated format registration functions
169
210
        format = SampleBranchFormat()
170
211
        # make a control dir
171
212
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
172
213
        # make a branch
173
214
        format.initialize(dir)
174
215
        # register a format for it.
175
 
        _mod_branch.BranchFormat.register_format(format)
 
216
        self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
 
217
            _mod_branch.BranchFormat.register_format, format)
176
218
        # which branch.Open will refuse (not supported)
177
219
        self.assertRaises(errors.UnsupportedFormatError,
178
220
                          _mod_branch.Branch.open, self.get_url())
182
224
            format.open(dir),
183
225
            bzrdir.BzrDir.open(self.get_url()).open_branch(unsupported=True))
184
226
        # unregister the format
185
 
        _mod_branch.BranchFormat.unregister_format(format)
 
227
        self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
 
228
            _mod_branch.BranchFormat.unregister_format, format)
186
229
        self.make_branch_and_tree('bar')
187
230
 
188
231
 
 
232
class TestBranchFormatRegistry(tests.TestCase):
 
233
 
 
234
    def setUp(self):
 
235
        super(TestBranchFormatRegistry, self).setUp()
 
236
        self.registry = _mod_branch.BranchFormatRegistry()
 
237
 
 
238
    def test_default(self):
 
239
        self.assertIs(None, self.registry.get_default())
 
240
        format = SampleBranchFormat()
 
241
        self.registry.set_default(format)
 
242
        self.assertEquals(format, self.registry.get_default())
 
243
 
 
244
    def test_register_unregister_format(self):
 
245
        format = SampleBranchFormat()
 
246
        self.registry.register(format)
 
247
        self.assertEquals(format,
 
248
            self.registry.get("Sample branch format."))
 
249
        self.registry.remove(format)
 
250
        self.assertRaises(KeyError, self.registry.get,
 
251
            "Sample branch format.")
 
252
 
 
253
    def test_get_all(self):
 
254
        format = SampleBranchFormat()
 
255
        self.assertEquals([], self.registry._get_all())
 
256
        self.registry.register(format)
 
257
        self.assertEquals([format], self.registry._get_all())
 
258
 
 
259
    def test_register_extra(self):
 
260
        format = SampleExtraBranchFormat()
 
261
        self.assertEquals([], self.registry._get_all())
 
262
        self.registry.register_extra(format)
 
263
        self.assertEquals([format], self.registry._get_all())
 
264
 
 
265
    def test_register_extra_lazy(self):
 
266
        self.assertEquals([], self.registry._get_all())
 
267
        self.registry.register_extra_lazy("bzrlib.tests.test_branch",
 
268
            "SampleExtraBranchFormat")
 
269
        formats = self.registry._get_all()
 
270
        self.assertEquals(1, len(formats))
 
271
        self.assertIsInstance(formats[0], SampleExtraBranchFormat)
 
272
 
 
273
 
 
274
#Used by TestMetaDirBranchFormatFactory 
 
275
FakeLazyFormat = None
 
276
 
 
277
 
 
278
class TestMetaDirBranchFormatFactory(tests.TestCase):
 
279
 
 
280
    def test_get_format_string_does_not_load(self):
 
281
        """Formats have a static format string."""
 
282
        factory = _mod_branch.MetaDirBranchFormatFactory("yo", None, None)
 
283
        self.assertEqual("yo", factory.get_format_string())
 
284
 
 
285
    def test_call_loads(self):
 
286
        # __call__ is used by the network_format_registry interface to get a
 
287
        # Format.
 
288
        global FakeLazyFormat
 
289
        del FakeLazyFormat
 
290
        factory = _mod_branch.MetaDirBranchFormatFactory(None,
 
291
            "bzrlib.tests.test_branch", "FakeLazyFormat")
 
292
        self.assertRaises(AttributeError, factory)
 
293
 
 
294
    def test_call_returns_call_of_referenced_object(self):
 
295
        global FakeLazyFormat
 
296
        FakeLazyFormat = lambda:'called'
 
297
        factory = _mod_branch.MetaDirBranchFormatFactory(None,
 
298
            "bzrlib.tests.test_branch", "FakeLazyFormat")
 
299
        self.assertEqual('called', factory())
 
300
 
 
301
 
189
302
class TestBranch67(object):
190
303
    """Common tests for both branch 6 and 7 which are mostly the same."""
191
304
 
210
323
 
211
324
    def test_layout(self):
212
325
        branch = self.make_branch('a', format=self.get_format_name())
213
 
        self.failUnlessExists('a/.bzr/branch/last-revision')
214
 
        self.failIfExists('a/.bzr/branch/revision-history')
215
 
        self.failIfExists('a/.bzr/branch/references')
 
326
        self.assertPathExists('a/.bzr/branch/last-revision')
 
327
        self.assertPathDoesNotExist('a/.bzr/branch/revision-history')
 
328
        self.assertPathDoesNotExist('a/.bzr/branch/references')
216
329
 
217
330
    def test_config(self):
218
331
        """Ensure that all configuration data is stored in the branch"""
219
332
        branch = self.make_branch('a', format=self.get_format_name())
220
 
        branch.set_parent('http://bazaar-vcs.org')
221
 
        self.failIfExists('a/.bzr/branch/parent')
222
 
        self.assertEqual('http://bazaar-vcs.org', branch.get_parent())
223
 
        branch.set_push_location('sftp://bazaar-vcs.org')
 
333
        branch.set_parent('http://example.com')
 
334
        self.assertPathDoesNotExist('a/.bzr/branch/parent')
 
335
        self.assertEqual('http://example.com', branch.get_parent())
 
336
        branch.set_push_location('sftp://example.com')
224
337
        config = branch.get_config()._get_branch_data_config()
225
 
        self.assertEqual('sftp://bazaar-vcs.org',
 
338
        self.assertEqual('sftp://example.com',
226
339
                         config.get_user_option('push_location'))
227
 
        branch.set_bound_location('ftp://bazaar-vcs.org')
228
 
        self.failIfExists('a/.bzr/branch/bound')
229
 
        self.assertEqual('ftp://bazaar-vcs.org', branch.get_bound_location())
 
340
        branch.set_bound_location('ftp://example.com')
 
341
        self.assertPathDoesNotExist('a/.bzr/branch/bound')
 
342
        self.assertEqual('ftp://example.com', branch.get_bound_location())
230
343
 
231
344
    def test_set_revision_history(self):
232
345
        builder = self.make_branch_builder('.', format=self.get_format_name())
237
350
        branch = builder.get_branch()
238
351
        branch.lock_write()
239
352
        self.addCleanup(branch.unlock)
240
 
        branch.set_revision_history(['foo', 'bar'])
241
 
        branch.set_revision_history(['foo'])
 
353
        self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
 
354
            branch.set_revision_history, ['foo', 'bar'])
 
355
        self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
 
356
                branch.set_revision_history, ['foo'])
242
357
        self.assertRaises(errors.NotLefthandHistory,
243
 
                          branch.set_revision_history, ['bar'])
 
358
            self.applyDeprecated, symbol_versioning.deprecated_in((2, 4, 0)),
 
359
            branch.set_revision_history, ['bar'])
244
360
 
245
361
    def do_checkout_test(self, lightweight=False):
246
362
        tree = self.make_branch_and_tree('source',
259
375
        subtree.commit('a subtree file')
260
376
        subsubtree.commit('a subsubtree file')
261
377
        tree.branch.create_checkout('target', lightweight=lightweight)
262
 
        self.failUnlessExists('target')
263
 
        self.failUnlessExists('target/subtree')
264
 
        self.failUnlessExists('target/subtree/file')
265
 
        self.failUnlessExists('target/subtree/subsubtree/file')
 
378
        self.assertPathExists('target')
 
379
        self.assertPathExists('target/subtree')
 
380
        self.assertPathExists('target/subtree/file')
 
381
        self.assertPathExists('target/subtree/subsubtree/file')
266
382
        subbranch = _mod_branch.Branch.open('target/subtree/subsubtree')
267
383
        if lightweight:
268
384
            self.assertEndsWith(subbranch.base, 'source/subtree/subsubtree/')
430
546
 
431
547
    def test_create_open_reference(self):
432
548
        bzrdirformat = bzrdir.BzrDirMetaFormat1()
433
 
        t = transport.get_transport(self.get_url('.'))
 
549
        t = self.get_transport()
434
550
        t.mkdir('repo')
435
551
        dir = bzrdirformat.initialize(self.get_url('repo'))
436
552
        dir.create_repository()
492
608
        self.assertTrue(hasattr(params, 'bzrdir'))
493
609
        self.assertTrue(hasattr(params, 'branch'))
494
610
 
 
611
    def test_post_branch_init_hook_repr(self):
 
612
        param_reprs = []
 
613
        _mod_branch.Branch.hooks.install_named_hook('post_branch_init',
 
614
            lambda params: param_reprs.append(repr(params)), None)
 
615
        branch = self.make_branch('a')
 
616
        self.assertLength(1, param_reprs)
 
617
        param_repr = param_reprs[0]
 
618
        self.assertStartsWith(param_repr, '<BranchInitHookParams of ')
 
619
 
495
620
    def test_post_switch_hook(self):
496
621
        from bzrlib import switch
497
622
        calls = []
528
653
        if value is not None:
529
654
            self.config.set_user_option('append_revisions_only', value)
530
655
        self.assertEqual(expected_value,
531
 
                         self.branch._get_append_revisions_only())
 
656
                         self.branch.get_append_revisions_only())
532
657
 
533
658
    def test_valid_append_revisions_only(self):
534
659
        self.assertEquals(None,
564
689
        # this usage of results is not recommended for new code (because it
565
690
        # doesn't describe very well what happened), but for api stability
566
691
        # it's still supported
567
 
        a = "%d revisions pulled" % r
568
 
        self.assertEqual(a, "10 revisions pulled")
 
692
        self.assertEqual(self.applyDeprecated(
 
693
            symbol_versioning.deprecated_in((2, 3, 0)),
 
694
            r.__int__),
 
695
            10)
569
696
 
570
697
    def test_report_changed(self):
571
698
        r = _mod_branch.PullResult()
583
710
        r.new_revid = "same-revid"
584
711
        f = StringIO()
585
712
        r.report(f)
586
 
        self.assertEqual("No revisions to pull.\n", f.getvalue())
587
 
 
588
 
 
589
 
class _StubLockable(object):
590
 
    """Helper for TestRunWithWriteLockedTarget."""
591
 
 
592
 
    def __init__(self, calls, unlock_exc=None):
593
 
        self.calls = calls
594
 
        self.unlock_exc = unlock_exc
595
 
 
596
 
    def lock_write(self):
597
 
        self.calls.append('lock_write')
598
 
 
599
 
    def unlock(self):
600
 
        self.calls.append('unlock')
601
 
        if self.unlock_exc is not None:
602
 
            raise self.unlock_exc
603
 
 
604
 
 
605
 
class _ErrorFromCallable(Exception):
606
 
    """Helper for TestRunWithWriteLockedTarget."""
607
 
 
608
 
 
609
 
class _ErrorFromUnlock(Exception):
610
 
    """Helper for TestRunWithWriteLockedTarget."""
611
 
 
612
 
 
613
 
class TestRunWithWriteLockedTarget(tests.TestCase):
614
 
    """Tests for _run_with_write_locked_target."""
615
 
 
616
 
    def setUp(self):
617
 
        tests.TestCase.setUp(self)
618
 
        self._calls = []
619
 
 
620
 
    def func_that_returns_ok(self):
621
 
        self._calls.append('func called')
622
 
        return 'ok'
623
 
 
624
 
    def func_that_raises(self):
625
 
        self._calls.append('func called')
626
 
        raise _ErrorFromCallable()
627
 
 
628
 
    def test_success_unlocks(self):
629
 
        lockable = _StubLockable(self._calls)
630
 
        result = _mod_branch._run_with_write_locked_target(
631
 
            lockable, self.func_that_returns_ok)
632
 
        self.assertEqual('ok', result)
633
 
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
634
 
 
635
 
    def test_exception_unlocks_and_propagates(self):
636
 
        lockable = _StubLockable(self._calls)
637
 
        self.assertRaises(_ErrorFromCallable,
638
 
                          _mod_branch._run_with_write_locked_target,
639
 
                          lockable, self.func_that_raises)
640
 
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
641
 
 
642
 
    def test_callable_succeeds_but_error_during_unlock(self):
643
 
        lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
644
 
        self.assertRaises(_ErrorFromUnlock,
645
 
                          _mod_branch._run_with_write_locked_target,
646
 
                          lockable, self.func_that_returns_ok)
647
 
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
648
 
 
649
 
    def test_error_during_unlock_does_not_mask_original_error(self):
650
 
        lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
651
 
        self.assertRaises(_ErrorFromCallable,
652
 
                          _mod_branch._run_with_write_locked_target,
653
 
                          lockable, self.func_that_raises)
654
 
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
655
 
 
 
713
        self.assertEqual("No revisions or tags to pull.\n", f.getvalue())
656
714