~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_branch.py

  • Committer: Martin
  • Date: 2011-08-04 00:17:53 UTC
  • mto: This revision was merged to the branch mainline in revision 6055.
  • Revision ID: gzlist@googlemail.com-20110804001753-plgpwcpsxcum16yb
Make tests raising KnownFailure use the knownFailure method instead

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2010 Canonical Ltd
 
1
# Copyright (C) 2006-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
"""Tests for the Branch facility that are not interface  tests.
18
18
 
19
 
For interface tests see tests/per_branch/*.py.
 
19
For interface tests see `tests/per_branch/*.py`.
20
20
 
21
21
For concrete class tests see this file, and for meta-branch tests
22
22
also see this file.
29
29
    bzrdir,
30
30
    config,
31
31
    errors,
 
32
    symbol_versioning,
32
33
    tests,
33
34
    trace,
34
 
    transport,
35
35
    urlutils,
36
36
    )
37
37
 
40
40
 
41
41
    def test_default_format(self):
42
42
        # update this if you change the default branch format
43
 
        self.assertIsInstance(_mod_branch.BranchFormat.get_default_format(),
 
43
        self.assertIsInstance(_mod_branch.format_registry.get_default(),
44
44
                _mod_branch.BzrBranchFormat7)
45
45
 
46
46
    def test_default_format_is_same_as_bzrdir_default(self):
48
48
        # set, but at the moment that's not true -- mbp 20070814 --
49
49
        # https://bugs.launchpad.net/bzr/+bug/132376
50
50
        self.assertEqual(
51
 
            _mod_branch.BranchFormat.get_default_format(),
 
51
            _mod_branch.format_registry.get_default(),
52
52
            bzrdir.BzrDirFormat.get_default_format().get_branch_format())
53
53
 
54
54
    def test_get_set_default_format(self):
55
55
        # set the format and then set it back again
56
 
        old_format = _mod_branch.BranchFormat.get_default_format()
57
 
        _mod_branch.BranchFormat.set_default_format(SampleBranchFormat())
 
56
        old_format = _mod_branch.format_registry.get_default()
 
57
        _mod_branch.format_registry.set_default(SampleBranchFormat())
58
58
        try:
59
59
            # the default branch format is used by the meta dir format
60
60
            # which is not the default bzrdir format at this point
62
62
            result = dir.create_branch()
63
63
            self.assertEqual(result, 'A branch')
64
64
        finally:
65
 
            _mod_branch.BranchFormat.set_default_format(old_format)
 
65
            _mod_branch.format_registry.set_default(old_format)
66
66
        self.assertEqual(old_format,
67
 
                         _mod_branch.BranchFormat.get_default_format())
 
67
                         _mod_branch.format_registry.get_default())
68
68
 
69
69
 
70
70
class TestBranchFormat5(tests.TestCaseWithTransport):
74
74
        url = self.get_url()
75
75
        bdir = bzrdir.BzrDirMetaFormat1().initialize(url)
76
76
        bdir.create_repository()
77
 
        branch = bdir.create_branch()
 
77
        branch = _mod_branch.BzrBranchFormat5().initialize(bdir)
78
78
        t = self.get_transport()
79
79
        self.log("branch instance is %r" % branch)
80
80
        self.assert_(isinstance(branch, _mod_branch.BzrBranch5))
86
86
        self.assertIsDirectory('.bzr/branch/lock/held', t)
87
87
 
88
88
    def test_set_push_location(self):
89
 
        from bzrlib.config import (locations_config_filename,
90
 
                                   ensure_config_dir_exists)
91
 
        ensure_config_dir_exists()
92
 
        fn = locations_config_filename()
93
 
        # write correct newlines to locations.conf
94
 
        # by default ConfigObj uses native line-endings for new files
95
 
        # but uses already existing line-endings if file is not empty
96
 
        f = open(fn, 'wb')
97
 
        try:
98
 
            f.write('# comment\n')
99
 
        finally:
100
 
            f.close()
 
89
        conf = config.LocationConfig.from_string('# comment\n', '.', save=True)
101
90
 
102
91
        branch = self.make_branch('.', format='knit')
103
92
        branch.set_push_location('foo')
106
95
                             "[%s]\n"
107
96
                             "push_location = foo\n"
108
97
                             "push_location:policy = norecurse\n" % local_path,
109
 
                             fn)
 
98
                             config.locations_config_filename())
110
99
 
111
100
    # TODO RBC 20051029 test getting a push location from a branch in a
112
101
    # recursive section - that is, it appends the branch name.
123
112
        """See BzrBranchFormat.get_format_string()."""
124
113
        return "Sample branch format."
125
114
 
126
 
    def initialize(self, a_bzrdir, name=None):
 
115
    def initialize(self, a_bzrdir, name=None, repository=None):
127
116
        """Format 4 branches cannot be created."""
128
117
        t = a_bzrdir.get_branch_transport(self, name=name)
129
118
        t.put_bytes('format', self.get_format_string())
136
125
        return "opened branch."
137
126
 
138
127
 
 
128
# Demonstrating how lazy loading is often implemented:
 
129
# A constant string is created.
 
130
SampleSupportedBranchFormatString = "Sample supported branch format."
 
131
 
 
132
# And the format class can then reference the constant to avoid skew.
 
133
class SampleSupportedBranchFormat(_mod_branch.BranchFormat):
 
134
    """A sample supported format."""
 
135
 
 
136
    def get_format_string(self):
 
137
        """See BzrBranchFormat.get_format_string()."""
 
138
        return SampleSupportedBranchFormatString
 
139
 
 
140
    def initialize(self, a_bzrdir, name=None):
 
141
        t = a_bzrdir.get_branch_transport(self, name=name)
 
142
        t.put_bytes('format', self.get_format_string())
 
143
        return 'A branch'
 
144
 
 
145
    def open(self, transport, name=None, _found=False, ignore_fallbacks=False):
 
146
        return "opened supported branch."
 
147
 
 
148
 
 
149
class SampleExtraBranchFormat(_mod_branch.BranchFormat):
 
150
    """A sample format that is not usable in a metadir."""
 
151
 
 
152
    def get_format_string(self):
 
153
        # This format is not usable in a metadir.
 
154
        return None
 
155
 
 
156
    def network_name(self):
 
157
        # Network name always has to be provided.
 
158
        return "extra"
 
159
 
 
160
    def initialize(self, a_bzrdir, name=None):
 
161
        raise NotImplementedError(self.initialize)
 
162
 
 
163
    def open(self, transport, name=None, _found=False, ignore_fallbacks=False):
 
164
        raise NotImplementedError(self.open)
 
165
 
 
166
 
139
167
class TestBzrBranchFormat(tests.TestCaseWithTransport):
140
168
    """Tests for the BzrBranchFormat facility."""
141
169
 
149
177
            dir.create_repository()
150
178
            format.initialize(dir)
151
179
            found_format = _mod_branch.BranchFormat.find_format(dir)
152
 
            self.failUnless(isinstance(found_format, format.__class__))
 
180
            self.assertIsInstance(found_format, format.__class__)
153
181
        check_format(_mod_branch.BzrBranchFormat5(), "bar")
154
182
 
 
183
    def test_find_format_factory(self):
 
184
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
185
        SampleSupportedBranchFormat().initialize(dir)
 
186
        factory = _mod_branch.MetaDirBranchFormatFactory(
 
187
            SampleSupportedBranchFormatString,
 
188
            "bzrlib.tests.test_branch", "SampleSupportedBranchFormat")
 
189
        _mod_branch.format_registry.register(factory)
 
190
        self.addCleanup(_mod_branch.format_registry.remove, factory)
 
191
        b = _mod_branch.Branch.open(self.get_url())
 
192
        self.assertEqual(b, "opened supported branch.")
 
193
 
155
194
    def test_find_format_not_branch(self):
156
195
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
157
196
        self.assertRaises(errors.NotBranchError,
166
205
                          dir)
167
206
 
168
207
    def test_register_unregister_format(self):
 
208
        # Test the deprecated format registration functions
169
209
        format = SampleBranchFormat()
170
210
        # make a control dir
171
211
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
172
212
        # make a branch
173
213
        format.initialize(dir)
174
214
        # register a format for it.
175
 
        _mod_branch.BranchFormat.register_format(format)
 
215
        self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
 
216
            _mod_branch.BranchFormat.register_format, format)
176
217
        # which branch.Open will refuse (not supported)
177
218
        self.assertRaises(errors.UnsupportedFormatError,
178
219
                          _mod_branch.Branch.open, self.get_url())
182
223
            format.open(dir),
183
224
            bzrdir.BzrDir.open(self.get_url()).open_branch(unsupported=True))
184
225
        # unregister the format
185
 
        _mod_branch.BranchFormat.unregister_format(format)
 
226
        self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
 
227
            _mod_branch.BranchFormat.unregister_format, format)
186
228
        self.make_branch_and_tree('bar')
187
229
 
188
230
 
 
231
class TestBranchFormatRegistry(tests.TestCase):
 
232
 
 
233
    def setUp(self):
 
234
        super(TestBranchFormatRegistry, self).setUp()
 
235
        self.registry = _mod_branch.BranchFormatRegistry()
 
236
 
 
237
    def test_default(self):
 
238
        self.assertIs(None, self.registry.get_default())
 
239
        format = SampleBranchFormat()
 
240
        self.registry.set_default(format)
 
241
        self.assertEquals(format, self.registry.get_default())
 
242
 
 
243
    def test_register_unregister_format(self):
 
244
        format = SampleBranchFormat()
 
245
        self.registry.register(format)
 
246
        self.assertEquals(format,
 
247
            self.registry.get("Sample branch format."))
 
248
        self.registry.remove(format)
 
249
        self.assertRaises(KeyError, self.registry.get,
 
250
            "Sample branch format.")
 
251
 
 
252
    def test_get_all(self):
 
253
        format = SampleBranchFormat()
 
254
        self.assertEquals([], self.registry._get_all())
 
255
        self.registry.register(format)
 
256
        self.assertEquals([format], self.registry._get_all())
 
257
 
 
258
    def test_register_extra(self):
 
259
        format = SampleExtraBranchFormat()
 
260
        self.assertEquals([], self.registry._get_all())
 
261
        self.registry.register_extra(format)
 
262
        self.assertEquals([format], self.registry._get_all())
 
263
 
 
264
    def test_register_extra_lazy(self):
 
265
        self.assertEquals([], self.registry._get_all())
 
266
        self.registry.register_extra_lazy("bzrlib.tests.test_branch",
 
267
            "SampleExtraBranchFormat")
 
268
        formats = self.registry._get_all()
 
269
        self.assertEquals(1, len(formats))
 
270
        self.assertIsInstance(formats[0], SampleExtraBranchFormat)
 
271
 
 
272
 
 
273
#Used by TestMetaDirBranchFormatFactory 
 
274
FakeLazyFormat = None
 
275
 
 
276
 
 
277
class TestMetaDirBranchFormatFactory(tests.TestCase):
 
278
 
 
279
    def test_get_format_string_does_not_load(self):
 
280
        """Formats have a static format string."""
 
281
        factory = _mod_branch.MetaDirBranchFormatFactory("yo", None, None)
 
282
        self.assertEqual("yo", factory.get_format_string())
 
283
 
 
284
    def test_call_loads(self):
 
285
        # __call__ is used by the network_format_registry interface to get a
 
286
        # Format.
 
287
        global FakeLazyFormat
 
288
        del FakeLazyFormat
 
289
        factory = _mod_branch.MetaDirBranchFormatFactory(None,
 
290
            "bzrlib.tests.test_branch", "FakeLazyFormat")
 
291
        self.assertRaises(AttributeError, factory)
 
292
 
 
293
    def test_call_returns_call_of_referenced_object(self):
 
294
        global FakeLazyFormat
 
295
        FakeLazyFormat = lambda:'called'
 
296
        factory = _mod_branch.MetaDirBranchFormatFactory(None,
 
297
            "bzrlib.tests.test_branch", "FakeLazyFormat")
 
298
        self.assertEqual('called', factory())
 
299
 
 
300
 
189
301
class TestBranch67(object):
190
302
    """Common tests for both branch 6 and 7 which are mostly the same."""
191
303
 
210
322
 
211
323
    def test_layout(self):
212
324
        branch = self.make_branch('a', format=self.get_format_name())
213
 
        self.failUnlessExists('a/.bzr/branch/last-revision')
214
 
        self.failIfExists('a/.bzr/branch/revision-history')
215
 
        self.failIfExists('a/.bzr/branch/references')
 
325
        self.assertPathExists('a/.bzr/branch/last-revision')
 
326
        self.assertPathDoesNotExist('a/.bzr/branch/revision-history')
 
327
        self.assertPathDoesNotExist('a/.bzr/branch/references')
216
328
 
217
329
    def test_config(self):
218
330
        """Ensure that all configuration data is stored in the branch"""
219
331
        branch = self.make_branch('a', format=self.get_format_name())
220
 
        branch.set_parent('http://bazaar-vcs.org')
221
 
        self.failIfExists('a/.bzr/branch/parent')
222
 
        self.assertEqual('http://bazaar-vcs.org', branch.get_parent())
223
 
        branch.set_push_location('sftp://bazaar-vcs.org')
 
332
        branch.set_parent('http://example.com')
 
333
        self.assertPathDoesNotExist('a/.bzr/branch/parent')
 
334
        self.assertEqual('http://example.com', branch.get_parent())
 
335
        branch.set_push_location('sftp://example.com')
224
336
        config = branch.get_config()._get_branch_data_config()
225
 
        self.assertEqual('sftp://bazaar-vcs.org',
 
337
        self.assertEqual('sftp://example.com',
226
338
                         config.get_user_option('push_location'))
227
 
        branch.set_bound_location('ftp://bazaar-vcs.org')
228
 
        self.failIfExists('a/.bzr/branch/bound')
229
 
        self.assertEqual('ftp://bazaar-vcs.org', branch.get_bound_location())
 
339
        branch.set_bound_location('ftp://example.com')
 
340
        self.assertPathDoesNotExist('a/.bzr/branch/bound')
 
341
        self.assertEqual('ftp://example.com', branch.get_bound_location())
230
342
 
231
343
    def test_set_revision_history(self):
232
344
        builder = self.make_branch_builder('.', format=self.get_format_name())
237
349
        branch = builder.get_branch()
238
350
        branch.lock_write()
239
351
        self.addCleanup(branch.unlock)
240
 
        branch.set_revision_history(['foo', 'bar'])
241
 
        branch.set_revision_history(['foo'])
 
352
        self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
 
353
            branch.set_revision_history, ['foo', 'bar'])
 
354
        self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
 
355
                branch.set_revision_history, ['foo'])
242
356
        self.assertRaises(errors.NotLefthandHistory,
243
 
                          branch.set_revision_history, ['bar'])
 
357
            self.applyDeprecated, symbol_versioning.deprecated_in((2, 4, 0)),
 
358
            branch.set_revision_history, ['bar'])
244
359
 
245
360
    def do_checkout_test(self, lightweight=False):
246
361
        tree = self.make_branch_and_tree('source',
259
374
        subtree.commit('a subtree file')
260
375
        subsubtree.commit('a subsubtree file')
261
376
        tree.branch.create_checkout('target', lightweight=lightweight)
262
 
        self.failUnlessExists('target')
263
 
        self.failUnlessExists('target/subtree')
264
 
        self.failUnlessExists('target/subtree/file')
265
 
        self.failUnlessExists('target/subtree/subsubtree/file')
 
377
        self.assertPathExists('target')
 
378
        self.assertPathExists('target/subtree')
 
379
        self.assertPathExists('target/subtree/file')
 
380
        self.assertPathExists('target/subtree/subsubtree/file')
266
381
        subbranch = _mod_branch.Branch.open('target/subtree/subsubtree')
267
382
        if lightweight:
268
383
            self.assertEndsWith(subbranch.base, 'source/subtree/subsubtree/')
430
545
 
431
546
    def test_create_open_reference(self):
432
547
        bzrdirformat = bzrdir.BzrDirMetaFormat1()
433
 
        t = transport.get_transport(self.get_url('.'))
 
548
        t = self.get_transport()
434
549
        t.mkdir('repo')
435
550
        dir = bzrdirformat.initialize(self.get_url('repo'))
436
551
        dir.create_repository()
492
607
        self.assertTrue(hasattr(params, 'bzrdir'))
493
608
        self.assertTrue(hasattr(params, 'branch'))
494
609
 
 
610
    def test_post_branch_init_hook_repr(self):
 
611
        param_reprs = []
 
612
        _mod_branch.Branch.hooks.install_named_hook('post_branch_init',
 
613
            lambda params: param_reprs.append(repr(params)), None)
 
614
        branch = self.make_branch('a')
 
615
        self.assertLength(1, param_reprs)
 
616
        param_repr = param_reprs[0]
 
617
        self.assertStartsWith(param_repr, '<BranchInitHookParams of ')
 
618
 
495
619
    def test_post_switch_hook(self):
496
620
        from bzrlib import switch
497
621
        calls = []
564
688
        # this usage of results is not recommended for new code (because it
565
689
        # doesn't describe very well what happened), but for api stability
566
690
        # it's still supported
567
 
        a = "%d revisions pulled" % r
568
 
        self.assertEqual(a, "10 revisions pulled")
 
691
        self.assertEqual(self.applyDeprecated(
 
692
            symbol_versioning.deprecated_in((2, 3, 0)),
 
693
            r.__int__),
 
694
            10)
569
695
 
570
696
    def test_report_changed(self):
571
697
        r = _mod_branch.PullResult()
585
711
        r.report(f)
586
712
        self.assertEqual("No revisions to pull.\n", f.getvalue())
587
713
 
588
 
 
589
 
class _StubLockable(object):
590
 
    """Helper for TestRunWithWriteLockedTarget."""
591
 
 
592
 
    def __init__(self, calls, unlock_exc=None):
593
 
        self.calls = calls
594
 
        self.unlock_exc = unlock_exc
595
 
 
596
 
    def lock_write(self):
597
 
        self.calls.append('lock_write')
598
 
 
599
 
    def unlock(self):
600
 
        self.calls.append('unlock')
601
 
        if self.unlock_exc is not None:
602
 
            raise self.unlock_exc
603
 
 
604
 
 
605
 
class _ErrorFromCallable(Exception):
606
 
    """Helper for TestRunWithWriteLockedTarget."""
607
 
 
608
 
 
609
 
class _ErrorFromUnlock(Exception):
610
 
    """Helper for TestRunWithWriteLockedTarget."""
611
 
 
612
 
 
613
 
class TestRunWithWriteLockedTarget(tests.TestCase):
614
 
    """Tests for _run_with_write_locked_target."""
615
 
 
616
 
    def setUp(self):
617
 
        tests.TestCase.setUp(self)
618
 
        self._calls = []
619
 
 
620
 
    def func_that_returns_ok(self):
621
 
        self._calls.append('func called')
622
 
        return 'ok'
623
 
 
624
 
    def func_that_raises(self):
625
 
        self._calls.append('func called')
626
 
        raise _ErrorFromCallable()
627
 
 
628
 
    def test_success_unlocks(self):
629
 
        lockable = _StubLockable(self._calls)
630
 
        result = _mod_branch._run_with_write_locked_target(
631
 
            lockable, self.func_that_returns_ok)
632
 
        self.assertEqual('ok', result)
633
 
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
634
 
 
635
 
    def test_exception_unlocks_and_propagates(self):
636
 
        lockable = _StubLockable(self._calls)
637
 
        self.assertRaises(_ErrorFromCallable,
638
 
                          _mod_branch._run_with_write_locked_target,
639
 
                          lockable, self.func_that_raises)
640
 
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
641
 
 
642
 
    def test_callable_succeeds_but_error_during_unlock(self):
643
 
        lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
644
 
        self.assertRaises(_ErrorFromUnlock,
645
 
                          _mod_branch._run_with_write_locked_target,
646
 
                          lockable, self.func_that_returns_ok)
647
 
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
648
 
 
649
 
    def test_error_during_unlock_does_not_mask_original_error(self):
650
 
        lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
651
 
        self.assertRaises(_ErrorFromCallable,
652
 
                          _mod_branch._run_with_write_locked_target,
653
 
                          lockable, self.func_that_raises)
654
 
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
655
 
 
656