~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_branch.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2011-02-25 02:01:51 UTC
  • mfrom: (5676.1.10 per_interrepo-extra)
  • Revision ID: pqm@pqm.ubuntu.com-20110225020151-tlqdjbxfv5byh7l7
(jelmer) Allow repositories to provide extra combinations to run
 bzrlib.tests.per_interrepo with. (Jelmer Vernooij)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006 Canonical Ltd
 
1
# Copyright (C) 2006-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Tests for the Branch facility that are not interface  tests.
18
18
 
19
 
For interface tests see tests/branch_implementations/*.py.
 
19
For interface tests see tests/per_branch/*.py.
20
20
 
21
21
For concrete class tests see this file, and for meta-branch tests
22
22
also see this file.
23
23
"""
24
24
 
25
 
from StringIO import StringIO
26
 
 
27
 
import bzrlib.branch
28
 
from bzrlib.branch import (BzrBranch5, 
29
 
                           BzrBranchFormat5)
30
 
import bzrlib.bzrdir as bzrdir
31
 
from bzrlib.bzrdir import (BzrDirMetaFormat1, BzrDirMeta1, 
32
 
                           BzrDir, BzrDirFormat)
33
 
from bzrlib.errors import (NotBranchError,
34
 
                           UnknownFormatError,
35
 
                           UnsupportedFormatError,
36
 
                           )
37
 
 
38
 
from bzrlib.tests import TestCase, TestCaseWithTransport
39
 
from bzrlib.transport import get_transport
40
 
 
41
 
class TestDefaultFormat(TestCase):
 
25
from cStringIO import StringIO
 
26
 
 
27
from bzrlib import (
 
28
    branch as _mod_branch,
 
29
    bzrdir,
 
30
    config,
 
31
    errors,
 
32
    symbol_versioning,
 
33
    tests,
 
34
    trace,
 
35
    urlutils,
 
36
    )
 
37
 
 
38
 
 
39
class TestDefaultFormat(tests.TestCase):
 
40
 
 
41
    def test_default_format(self):
 
42
        # update this if you change the default branch format
 
43
        self.assertIsInstance(_mod_branch.format_registry.get_default(),
 
44
                _mod_branch.BzrBranchFormat7)
 
45
 
 
46
    def test_default_format_is_same_as_bzrdir_default(self):
 
47
        # XXX: it might be nice if there was only one place the default was
 
48
        # set, but at the moment that's not true -- mbp 20070814 --
 
49
        # https://bugs.launchpad.net/bzr/+bug/132376
 
50
        self.assertEqual(
 
51
            _mod_branch.format_registry.get_default(),
 
52
            bzrdir.BzrDirFormat.get_default_format().get_branch_format())
42
53
 
43
54
    def test_get_set_default_format(self):
44
 
        old_format = bzrlib.branch.BranchFormat.get_default_format()
45
 
        # default is 5
46
 
        self.assertTrue(isinstance(old_format, bzrlib.branch.BzrBranchFormat5))
47
 
        bzrlib.branch.BranchFormat.set_default_format(SampleBranchFormat())
 
55
        # set the format and then set it back again
 
56
        old_format = _mod_branch.format_registry.get_default()
 
57
        _mod_branch.format_registry.set_default(SampleBranchFormat())
48
58
        try:
49
59
            # the default branch format is used by the meta dir format
50
60
            # which is not the default bzrdir format at this point
51
 
            dir = BzrDirMetaFormat1().initialize('memory:///')
 
61
            dir = bzrdir.BzrDirMetaFormat1().initialize('memory:///')
52
62
            result = dir.create_branch()
53
63
            self.assertEqual(result, 'A branch')
54
64
        finally:
55
 
            bzrlib.branch.BranchFormat.set_default_format(old_format)
56
 
        self.assertEqual(old_format, bzrlib.branch.BranchFormat.get_default_format())
57
 
 
58
 
 
59
 
class TestBranchFormat5(TestCaseWithTransport):
 
65
            _mod_branch.format_registry.set_default(old_format)
 
66
        self.assertEqual(old_format,
 
67
                         _mod_branch.format_registry.get_default())
 
68
 
 
69
 
 
70
class TestBranchFormat5(tests.TestCaseWithTransport):
60
71
    """Tests specific to branch format 5"""
61
72
 
62
73
    def test_branch_format_5_uses_lockdir(self):
63
74
        url = self.get_url()
64
 
        bzrdir = BzrDirMetaFormat1().initialize(url)
65
 
        bzrdir.create_repository()
66
 
        branch = bzrdir.create_branch()
 
75
        bdir = bzrdir.BzrDirMetaFormat1().initialize(url)
 
76
        bdir.create_repository()
 
77
        branch = bdir.create_branch()
67
78
        t = self.get_transport()
68
79
        self.log("branch instance is %r" % branch)
69
 
        self.assert_(isinstance(branch, BzrBranch5))
 
80
        self.assert_(isinstance(branch, _mod_branch.BzrBranch5))
70
81
        self.assertIsDirectory('.', t)
71
82
        self.assertIsDirectory('.bzr/branch', t)
72
83
        self.assertIsDirectory('.bzr/branch/lock', t)
73
84
        branch.lock_write()
74
 
        try:
75
 
            self.assertIsDirectory('.bzr/branch/lock/held', t)
76
 
        finally:
77
 
            branch.unlock()
78
 
 
79
 
 
80
 
class TestBranchEscaping(TestCaseWithTransport):
81
 
    """Test a branch can be correctly stored and used on a vfat-like transport
82
 
    
83
 
    Makes sure we have proper escaping of invalid characters, etc.
84
 
 
85
 
    It'd be better to test all operations on the FakeVFATTransportDecorator,
86
 
    but working trees go straight to the os not through the Transport layer.
87
 
    Therefore we build some history first in the regular way and then 
88
 
    check it's safe to access for vfat.
89
 
    """
90
 
 
91
 
    FOO_ID = 'foo<:>ID'
92
 
    REV_ID = 'revid-1'
93
 
 
94
 
    def setUp(self):
95
 
        super(TestBranchEscaping, self).setUp()
96
 
        from bzrlib.repository import RepositoryFormatKnit1
97
 
        bzrdir = BzrDirMetaFormat1().initialize(self.get_url())
98
 
        repo = RepositoryFormatKnit1().initialize(bzrdir)
99
 
        branch = bzrdir.create_branch()
100
 
        wt = bzrdir.create_workingtree()
101
 
        self.build_tree_contents([("foo", "contents of foo")])
102
 
        # add file with id containing wierd characters
103
 
        wt.add(['foo'], [self.FOO_ID])
104
 
        wt.commit('this is my new commit', rev_id=self.REV_ID)
105
 
 
106
 
    def test_branch_on_vfat(self):
107
 
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
108
 
        # now access over vfat; should be safe
109
 
        transport = FakeVFATTransportDecorator('vfat+' + self.get_url())
110
 
        bzrdir, junk = BzrDir.open_containing_from_transport(transport)
111
 
        branch = bzrdir.open_branch()
112
 
        revtree = branch.repository.revision_tree(self.REV_ID)
113
 
        contents = revtree.get_file_text(self.FOO_ID)
114
 
        self.assertEqual(contents, 'contents of foo')
115
 
 
116
 
 
117
 
class SampleBranchFormat(bzrlib.branch.BranchFormat):
 
85
        self.addCleanup(branch.unlock)
 
86
        self.assertIsDirectory('.bzr/branch/lock/held', t)
 
87
 
 
88
    def test_set_push_location(self):
 
89
        conf = config.LocationConfig.from_string('# comment\n', '.', save=True)
 
90
 
 
91
        branch = self.make_branch('.', format='knit')
 
92
        branch.set_push_location('foo')
 
93
        local_path = urlutils.local_path_from_url(branch.base[:-1])
 
94
        self.assertFileEqual("# comment\n"
 
95
                             "[%s]\n"
 
96
                             "push_location = foo\n"
 
97
                             "push_location:policy = norecurse\n" % local_path,
 
98
                             config.locations_config_filename())
 
99
 
 
100
    # TODO RBC 20051029 test getting a push location from a branch in a
 
101
    # recursive section - that is, it appends the branch name.
 
102
 
 
103
 
 
104
class SampleBranchFormat(_mod_branch.BranchFormat):
118
105
    """A sample format
119
106
 
120
 
    this format is initializable, unsupported to aid in testing the 
 
107
    this format is initializable, unsupported to aid in testing the
121
108
    open and open_downlevel routines.
122
109
    """
123
110
 
125
112
        """See BzrBranchFormat.get_format_string()."""
126
113
        return "Sample branch format."
127
114
 
128
 
    def initialize(self, a_bzrdir):
 
115
    def initialize(self, a_bzrdir, name=None, repository=None):
129
116
        """Format 4 branches cannot be created."""
130
 
        t = a_bzrdir.get_branch_transport(self)
131
 
        t.put('format', StringIO(self.get_format_string()))
 
117
        t = a_bzrdir.get_branch_transport(self, name=name)
 
118
        t.put_bytes('format', self.get_format_string())
132
119
        return 'A branch'
133
120
 
134
121
    def is_supported(self):
135
122
        return False
136
123
 
137
 
    def open(self, transport, _found=False):
 
124
    def open(self, transport, name=None, _found=False, ignore_fallbacks=False):
138
125
        return "opened branch."
139
126
 
140
127
 
141
 
class TestBzrBranchFormat(TestCaseWithTransport):
 
128
# Demonstrating how lazy loading is often implemented:
 
129
# A constant string is created.
 
130
SampleSupportedBranchFormatString = "Sample supported branch format."
 
131
 
 
132
# And the format class can then reference the constant to avoid skew.
 
133
class SampleSupportedBranchFormat(_mod_branch.BranchFormat):
 
134
    """A sample supported format."""
 
135
 
 
136
    def get_format_string(self):
 
137
        """See BzrBranchFormat.get_format_string()."""
 
138
        return SampleSupportedBranchFormatString
 
139
 
 
140
    def initialize(self, a_bzrdir, name=None):
 
141
        t = a_bzrdir.get_branch_transport(self, name=name)
 
142
        t.put_bytes('format', self.get_format_string())
 
143
        return 'A branch'
 
144
 
 
145
    def open(self, transport, name=None, _found=False, ignore_fallbacks=False):
 
146
        return "opened supported branch."
 
147
 
 
148
 
 
149
class SampleExtraBranchFormat(_mod_branch.BranchFormat):
 
150
    """A sample format that is not usable in a metadir."""
 
151
 
 
152
    def get_format_string(self):
 
153
        # This format is not usable in a metadir.
 
154
        return None
 
155
 
 
156
    def network_name(self):
 
157
        # Network name always has to be provided.
 
158
        return "extra"
 
159
 
 
160
    def initialize(self, a_bzrdir, name=None):
 
161
        raise NotImplementedError(self.initialize)
 
162
 
 
163
    def open(self, transport, name=None, _found=False, ignore_fallbacks=False):
 
164
        raise NotImplementedError(self.open)
 
165
 
 
166
 
 
167
class TestBzrBranchFormat(tests.TestCaseWithTransport):
142
168
    """Tests for the BzrBranchFormat facility."""
143
169
 
144
170
    def test_find_format(self):
145
171
        # is the right format object found for a branch?
146
172
        # create a branch with a few known format objects.
147
 
        # this is not quite the same as 
 
173
        # this is not quite the same as
148
174
        self.build_tree(["foo/", "bar/"])
149
175
        def check_format(format, url):
150
176
            dir = format._matchingbzrdir.initialize(url)
151
177
            dir.create_repository()
152
178
            format.initialize(dir)
153
 
            found_format = bzrlib.branch.BranchFormat.find_format(dir)
 
179
            found_format = _mod_branch.BranchFormat.find_format(dir)
154
180
            self.failUnless(isinstance(found_format, format.__class__))
155
 
        check_format(bzrlib.branch.BzrBranchFormat5(), "bar")
156
 
        
 
181
        check_format(_mod_branch.BzrBranchFormat5(), "bar")
 
182
 
 
183
    def test_find_format_factory(self):
 
184
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
185
        SampleSupportedBranchFormat().initialize(dir)
 
186
        factory = _mod_branch.MetaDirBranchFormatFactory(
 
187
            SampleSupportedBranchFormatString,
 
188
            "bzrlib.tests.test_branch", "SampleSupportedBranchFormat")
 
189
        _mod_branch.format_registry.register(factory)
 
190
        self.addCleanup(_mod_branch.format_registry.remove, factory)
 
191
        b = _mod_branch.Branch.open(self.get_url())
 
192
        self.assertEqual(b, "opened supported branch.")
 
193
 
157
194
    def test_find_format_not_branch(self):
158
195
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
159
 
        self.assertRaises(NotBranchError,
160
 
                          bzrlib.branch.BranchFormat.find_format,
 
196
        self.assertRaises(errors.NotBranchError,
 
197
                          _mod_branch.BranchFormat.find_format,
161
198
                          dir)
162
199
 
163
200
    def test_find_format_unknown_format(self):
164
201
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
165
202
        SampleBranchFormat().initialize(dir)
166
 
        self.assertRaises(UnknownFormatError,
167
 
                          bzrlib.branch.BranchFormat.find_format,
 
203
        self.assertRaises(errors.UnknownFormatError,
 
204
                          _mod_branch.BranchFormat.find_format,
168
205
                          dir)
169
206
 
170
207
    def test_register_unregister_format(self):
 
208
        # Test the deprecated format registration functions
171
209
        format = SampleBranchFormat()
172
210
        # make a control dir
173
211
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
174
212
        # make a branch
175
213
        format.initialize(dir)
176
214
        # register a format for it.
177
 
        bzrlib.branch.BranchFormat.register_format(format)
 
215
        self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
 
216
            _mod_branch.BranchFormat.register_format, format)
178
217
        # which branch.Open will refuse (not supported)
179
 
        self.assertRaises(UnsupportedFormatError, bzrlib.branch.Branch.open, self.get_url())
 
218
        self.assertRaises(errors.UnsupportedFormatError,
 
219
                          _mod_branch.Branch.open, self.get_url())
 
220
        self.make_branch_and_tree('foo')
180
221
        # but open_downlevel will work
181
 
        self.assertEqual(format.open(dir), bzrdir.BzrDir.open(self.get_url()).open_branch(unsupported=True))
 
222
        self.assertEqual(
 
223
            format.open(dir),
 
224
            bzrdir.BzrDir.open(self.get_url()).open_branch(unsupported=True))
182
225
        # unregister the format
183
 
        bzrlib.branch.BranchFormat.unregister_format(format)
184
 
 
185
 
 
186
 
class TestBranchReference(TestCaseWithTransport):
 
226
        self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
 
227
            _mod_branch.BranchFormat.unregister_format, format)
 
228
        self.make_branch_and_tree('bar')
 
229
 
 
230
 
 
231
class TestBranchFormatRegistry(tests.TestCase):
 
232
 
 
233
    def setUp(self):
 
234
        super(TestBranchFormatRegistry, self).setUp()
 
235
        self.registry = _mod_branch.BranchFormatRegistry()
 
236
 
 
237
    def test_default(self):
 
238
        self.assertIs(None, self.registry.get_default())
 
239
        format = SampleBranchFormat()
 
240
        self.registry.set_default(format)
 
241
        self.assertEquals(format, self.registry.get_default())
 
242
 
 
243
    def test_register_unregister_format(self):
 
244
        format = SampleBranchFormat()
 
245
        self.registry.register(format)
 
246
        self.assertEquals(format,
 
247
            self.registry.get("Sample branch format."))
 
248
        self.registry.remove(format)
 
249
        self.assertRaises(KeyError, self.registry.get,
 
250
            "Sample branch format.")
 
251
 
 
252
    def test_get_all(self):
 
253
        format = SampleBranchFormat()
 
254
        self.assertEquals([], self.registry._get_all())
 
255
        self.registry.register(format)
 
256
        self.assertEquals([format], self.registry._get_all())
 
257
 
 
258
    def test_register_extra(self):
 
259
        format = SampleExtraBranchFormat()
 
260
        self.assertEquals([], self.registry._get_all())
 
261
        self.registry.register_extra(format)
 
262
        self.assertEquals([format], self.registry._get_all())
 
263
 
 
264
    def test_register_extra_lazy(self):
 
265
        self.assertEquals([], self.registry._get_all())
 
266
        self.registry.register_extra_lazy("bzrlib.tests.test_branch",
 
267
            "SampleExtraBranchFormat")
 
268
        formats = self.registry._get_all()
 
269
        self.assertEquals(1, len(formats))
 
270
        self.assertIsInstance(formats[0], SampleExtraBranchFormat)
 
271
 
 
272
 
 
273
#Used by TestMetaDirBranchFormatFactory 
 
274
FakeLazyFormat = None
 
275
 
 
276
 
 
277
class TestMetaDirBranchFormatFactory(tests.TestCase):
 
278
 
 
279
    def test_get_format_string_does_not_load(self):
 
280
        """Formats have a static format string."""
 
281
        factory = _mod_branch.MetaDirBranchFormatFactory("yo", None, None)
 
282
        self.assertEqual("yo", factory.get_format_string())
 
283
 
 
284
    def test_call_loads(self):
 
285
        # __call__ is used by the network_format_registry interface to get a
 
286
        # Format.
 
287
        global FakeLazyFormat
 
288
        del FakeLazyFormat
 
289
        factory = _mod_branch.MetaDirBranchFormatFactory(None,
 
290
            "bzrlib.tests.test_branch", "FakeLazyFormat")
 
291
        self.assertRaises(AttributeError, factory)
 
292
 
 
293
    def test_call_returns_call_of_referenced_object(self):
 
294
        global FakeLazyFormat
 
295
        FakeLazyFormat = lambda:'called'
 
296
        factory = _mod_branch.MetaDirBranchFormatFactory(None,
 
297
            "bzrlib.tests.test_branch", "FakeLazyFormat")
 
298
        self.assertEqual('called', factory())
 
299
 
 
300
 
 
301
class TestBranch67(object):
 
302
    """Common tests for both branch 6 and 7 which are mostly the same."""
 
303
 
 
304
    def get_format_name(self):
 
305
        raise NotImplementedError(self.get_format_name)
 
306
 
 
307
    def get_format_name_subtree(self):
 
308
        raise NotImplementedError(self.get_format_name)
 
309
 
 
310
    def get_class(self):
 
311
        raise NotImplementedError(self.get_class)
 
312
 
 
313
    def test_creation(self):
 
314
        format = bzrdir.BzrDirMetaFormat1()
 
315
        format.set_branch_format(_mod_branch.BzrBranchFormat6())
 
316
        branch = self.make_branch('a', format=format)
 
317
        self.assertIsInstance(branch, self.get_class())
 
318
        branch = self.make_branch('b', format=self.get_format_name())
 
319
        self.assertIsInstance(branch, self.get_class())
 
320
        branch = _mod_branch.Branch.open('a')
 
321
        self.assertIsInstance(branch, self.get_class())
 
322
 
 
323
    def test_layout(self):
 
324
        branch = self.make_branch('a', format=self.get_format_name())
 
325
        self.failUnlessExists('a/.bzr/branch/last-revision')
 
326
        self.failIfExists('a/.bzr/branch/revision-history')
 
327
        self.failIfExists('a/.bzr/branch/references')
 
328
 
 
329
    def test_config(self):
 
330
        """Ensure that all configuration data is stored in the branch"""
 
331
        branch = self.make_branch('a', format=self.get_format_name())
 
332
        branch.set_parent('http://example.com')
 
333
        self.failIfExists('a/.bzr/branch/parent')
 
334
        self.assertEqual('http://example.com', branch.get_parent())
 
335
        branch.set_push_location('sftp://example.com')
 
336
        config = branch.get_config()._get_branch_data_config()
 
337
        self.assertEqual('sftp://example.com',
 
338
                         config.get_user_option('push_location'))
 
339
        branch.set_bound_location('ftp://example.com')
 
340
        self.failIfExists('a/.bzr/branch/bound')
 
341
        self.assertEqual('ftp://example.com', branch.get_bound_location())
 
342
 
 
343
    def test_set_revision_history(self):
 
344
        builder = self.make_branch_builder('.', format=self.get_format_name())
 
345
        builder.build_snapshot('foo', None,
 
346
            [('add', ('', None, 'directory', None))],
 
347
            message='foo')
 
348
        builder.build_snapshot('bar', None, [], message='bar')
 
349
        branch = builder.get_branch()
 
350
        branch.lock_write()
 
351
        self.addCleanup(branch.unlock)
 
352
        branch.set_revision_history(['foo', 'bar'])
 
353
        branch.set_revision_history(['foo'])
 
354
        self.assertRaises(errors.NotLefthandHistory,
 
355
                          branch.set_revision_history, ['bar'])
 
356
 
 
357
    def do_checkout_test(self, lightweight=False):
 
358
        tree = self.make_branch_and_tree('source',
 
359
            format=self.get_format_name_subtree())
 
360
        subtree = self.make_branch_and_tree('source/subtree',
 
361
            format=self.get_format_name_subtree())
 
362
        subsubtree = self.make_branch_and_tree('source/subtree/subsubtree',
 
363
            format=self.get_format_name_subtree())
 
364
        self.build_tree(['source/subtree/file',
 
365
                         'source/subtree/subsubtree/file'])
 
366
        subsubtree.add('file')
 
367
        subtree.add('file')
 
368
        subtree.add_reference(subsubtree)
 
369
        tree.add_reference(subtree)
 
370
        tree.commit('a revision')
 
371
        subtree.commit('a subtree file')
 
372
        subsubtree.commit('a subsubtree file')
 
373
        tree.branch.create_checkout('target', lightweight=lightweight)
 
374
        self.failUnlessExists('target')
 
375
        self.failUnlessExists('target/subtree')
 
376
        self.failUnlessExists('target/subtree/file')
 
377
        self.failUnlessExists('target/subtree/subsubtree/file')
 
378
        subbranch = _mod_branch.Branch.open('target/subtree/subsubtree')
 
379
        if lightweight:
 
380
            self.assertEndsWith(subbranch.base, 'source/subtree/subsubtree/')
 
381
        else:
 
382
            self.assertEndsWith(subbranch.base, 'target/subtree/subsubtree/')
 
383
 
 
384
    def test_checkout_with_references(self):
 
385
        self.do_checkout_test()
 
386
 
 
387
    def test_light_checkout_with_references(self):
 
388
        self.do_checkout_test(lightweight=True)
 
389
 
 
390
    def test_set_push(self):
 
391
        branch = self.make_branch('source', format=self.get_format_name())
 
392
        branch.get_config().set_user_option('push_location', 'old',
 
393
            store=config.STORE_LOCATION)
 
394
        warnings = []
 
395
        def warning(*args):
 
396
            warnings.append(args[0] % args[1:])
 
397
        _warning = trace.warning
 
398
        trace.warning = warning
 
399
        try:
 
400
            branch.set_push_location('new')
 
401
        finally:
 
402
            trace.warning = _warning
 
403
        self.assertEqual(warnings[0], 'Value "new" is masked by "old" from '
 
404
                         'locations.conf')
 
405
 
 
406
 
 
407
class TestBranch6(TestBranch67, tests.TestCaseWithTransport):
 
408
 
 
409
    def get_class(self):
 
410
        return _mod_branch.BzrBranch6
 
411
 
 
412
    def get_format_name(self):
 
413
        return "dirstate-tags"
 
414
 
 
415
    def get_format_name_subtree(self):
 
416
        return "dirstate-with-subtree"
 
417
 
 
418
    def test_set_stacked_on_url_errors(self):
 
419
        branch = self.make_branch('a', format=self.get_format_name())
 
420
        self.assertRaises(errors.UnstackableBranchFormat,
 
421
            branch.set_stacked_on_url, None)
 
422
 
 
423
    def test_default_stacked_location(self):
 
424
        branch = self.make_branch('a', format=self.get_format_name())
 
425
        self.assertRaises(errors.UnstackableBranchFormat, branch.get_stacked_on_url)
 
426
 
 
427
 
 
428
class TestBranch7(TestBranch67, tests.TestCaseWithTransport):
 
429
 
 
430
    def get_class(self):
 
431
        return _mod_branch.BzrBranch7
 
432
 
 
433
    def get_format_name(self):
 
434
        return "1.9"
 
435
 
 
436
    def get_format_name_subtree(self):
 
437
        return "development-subtree"
 
438
 
 
439
    def test_set_stacked_on_url_unstackable_repo(self):
 
440
        repo = self.make_repository('a', format='dirstate-tags')
 
441
        control = repo.bzrdir
 
442
        branch = _mod_branch.BzrBranchFormat7().initialize(control)
 
443
        target = self.make_branch('b')
 
444
        self.assertRaises(errors.UnstackableRepositoryFormat,
 
445
            branch.set_stacked_on_url, target.base)
 
446
 
 
447
    def test_clone_stacked_on_unstackable_repo(self):
 
448
        repo = self.make_repository('a', format='dirstate-tags')
 
449
        control = repo.bzrdir
 
450
        branch = _mod_branch.BzrBranchFormat7().initialize(control)
 
451
        # Calling clone should not raise UnstackableRepositoryFormat.
 
452
        cloned_bzrdir = control.clone('cloned')
 
453
 
 
454
    def _test_default_stacked_location(self):
 
455
        branch = self.make_branch('a', format=self.get_format_name())
 
456
        self.assertRaises(errors.NotStacked, branch.get_stacked_on_url)
 
457
 
 
458
    def test_stack_and_unstack(self):
 
459
        branch = self.make_branch('a', format=self.get_format_name())
 
460
        target = self.make_branch_and_tree('b', format=self.get_format_name())
 
461
        branch.set_stacked_on_url(target.branch.base)
 
462
        self.assertEqual(target.branch.base, branch.get_stacked_on_url())
 
463
        revid = target.commit('foo')
 
464
        self.assertTrue(branch.repository.has_revision(revid))
 
465
        branch.set_stacked_on_url(None)
 
466
        self.assertRaises(errors.NotStacked, branch.get_stacked_on_url)
 
467
        self.assertFalse(branch.repository.has_revision(revid))
 
468
 
 
469
    def test_open_opens_stacked_reference(self):
 
470
        branch = self.make_branch('a', format=self.get_format_name())
 
471
        target = self.make_branch_and_tree('b', format=self.get_format_name())
 
472
        branch.set_stacked_on_url(target.branch.base)
 
473
        branch = branch.bzrdir.open_branch()
 
474
        revid = target.commit('foo')
 
475
        self.assertTrue(branch.repository.has_revision(revid))
 
476
 
 
477
 
 
478
class BzrBranch8(tests.TestCaseWithTransport):
 
479
 
 
480
    def make_branch(self, location, format=None):
 
481
        if format is None:
 
482
            format = bzrdir.format_registry.make_bzrdir('1.9')
 
483
            format.set_branch_format(_mod_branch.BzrBranchFormat8())
 
484
        return tests.TestCaseWithTransport.make_branch(
 
485
            self, location, format=format)
 
486
 
 
487
    def create_branch_with_reference(self):
 
488
        branch = self.make_branch('branch')
 
489
        branch._set_all_reference_info({'file-id': ('path', 'location')})
 
490
        return branch
 
491
 
 
492
    @staticmethod
 
493
    def instrument_branch(branch, gets):
 
494
        old_get = branch._transport.get
 
495
        def get(*args, **kwargs):
 
496
            gets.append((args, kwargs))
 
497
            return old_get(*args, **kwargs)
 
498
        branch._transport.get = get
 
499
 
 
500
    def test_reference_info_caching_read_locked(self):
 
501
        gets = []
 
502
        branch = self.create_branch_with_reference()
 
503
        branch.lock_read()
 
504
        self.addCleanup(branch.unlock)
 
505
        self.instrument_branch(branch, gets)
 
506
        branch.get_reference_info('file-id')
 
507
        branch.get_reference_info('file-id')
 
508
        self.assertEqual(1, len(gets))
 
509
 
 
510
    def test_reference_info_caching_read_unlocked(self):
 
511
        gets = []
 
512
        branch = self.create_branch_with_reference()
 
513
        self.instrument_branch(branch, gets)
 
514
        branch.get_reference_info('file-id')
 
515
        branch.get_reference_info('file-id')
 
516
        self.assertEqual(2, len(gets))
 
517
 
 
518
    def test_reference_info_caching_write_locked(self):
 
519
        gets = []
 
520
        branch = self.make_branch('branch')
 
521
        branch.lock_write()
 
522
        self.instrument_branch(branch, gets)
 
523
        self.addCleanup(branch.unlock)
 
524
        branch._set_all_reference_info({'file-id': ('path2', 'location2')})
 
525
        path, location = branch.get_reference_info('file-id')
 
526
        self.assertEqual(0, len(gets))
 
527
        self.assertEqual('path2', path)
 
528
        self.assertEqual('location2', location)
 
529
 
 
530
    def test_reference_info_caches_cleared(self):
 
531
        branch = self.make_branch('branch')
 
532
        branch.lock_write()
 
533
        branch.set_reference_info('file-id', 'path2', 'location2')
 
534
        branch.unlock()
 
535
        doppelganger = _mod_branch.Branch.open('branch')
 
536
        doppelganger.set_reference_info('file-id', 'path3', 'location3')
 
537
        self.assertEqual(('path3', 'location3'),
 
538
                         branch.get_reference_info('file-id'))
 
539
 
 
540
class TestBranchReference(tests.TestCaseWithTransport):
187
541
    """Tests for the branch reference facility."""
188
542
 
189
543
    def test_create_open_reference(self):
190
544
        bzrdirformat = bzrdir.BzrDirMetaFormat1()
191
 
        t = get_transport(self.get_url('.'))
 
545
        t = self.get_transport()
192
546
        t.mkdir('repo')
193
547
        dir = bzrdirformat.initialize(self.get_url('repo'))
194
548
        dir.create_repository()
195
549
        target_branch = dir.create_branch()
196
550
        t.mkdir('branch')
197
551
        branch_dir = bzrdirformat.initialize(self.get_url('branch'))
198
 
        made_branch = bzrlib.branch.BranchReferenceFormat().initialize(branch_dir, target_branch)
 
552
        made_branch = _mod_branch.BranchReferenceFormat().initialize(
 
553
            branch_dir, target_branch=target_branch)
199
554
        self.assertEqual(made_branch.base, target_branch.base)
200
555
        opened_branch = branch_dir.open_branch()
201
556
        self.assertEqual(opened_branch.base, target_branch.base)
 
557
 
 
558
    def test_get_reference(self):
 
559
        """For a BranchReference, get_reference should reutrn the location."""
 
560
        branch = self.make_branch('target')
 
561
        checkout = branch.create_checkout('checkout', lightweight=True)
 
562
        reference_url = branch.bzrdir.root_transport.abspath('') + '/'
 
563
        # if the api for create_checkout changes to return different checkout types
 
564
        # then this file read will fail.
 
565
        self.assertFileEqual(reference_url, 'checkout/.bzr/branch/location')
 
566
        self.assertEqual(reference_url,
 
567
            _mod_branch.BranchReferenceFormat().get_reference(checkout.bzrdir))
 
568
 
 
569
 
 
570
class TestHooks(tests.TestCaseWithTransport):
 
571
 
 
572
    def test_constructor(self):
 
573
        """Check that creating a BranchHooks instance has the right defaults."""
 
574
        hooks = _mod_branch.BranchHooks()
 
575
        self.assertTrue("set_rh" in hooks, "set_rh not in %s" % hooks)
 
576
        self.assertTrue("post_push" in hooks, "post_push not in %s" % hooks)
 
577
        self.assertTrue("post_commit" in hooks, "post_commit not in %s" % hooks)
 
578
        self.assertTrue("pre_commit" in hooks, "pre_commit not in %s" % hooks)
 
579
        self.assertTrue("post_pull" in hooks, "post_pull not in %s" % hooks)
 
580
        self.assertTrue("post_uncommit" in hooks,
 
581
                        "post_uncommit not in %s" % hooks)
 
582
        self.assertTrue("post_change_branch_tip" in hooks,
 
583
                        "post_change_branch_tip not in %s" % hooks)
 
584
        self.assertTrue("post_branch_init" in hooks,
 
585
                        "post_branch_init not in %s" % hooks)
 
586
        self.assertTrue("post_switch" in hooks,
 
587
                        "post_switch not in %s" % hooks)
 
588
 
 
589
    def test_installed_hooks_are_BranchHooks(self):
 
590
        """The installed hooks object should be a BranchHooks."""
 
591
        # the installed hooks are saved in self._preserved_hooks.
 
592
        self.assertIsInstance(self._preserved_hooks[_mod_branch.Branch][1],
 
593
                              _mod_branch.BranchHooks)
 
594
 
 
595
    def test_post_branch_init_hook(self):
 
596
        calls = []
 
597
        _mod_branch.Branch.hooks.install_named_hook('post_branch_init',
 
598
            calls.append, None)
 
599
        self.assertLength(0, calls)
 
600
        branch = self.make_branch('a')
 
601
        self.assertLength(1, calls)
 
602
        params = calls[0]
 
603
        self.assertIsInstance(params, _mod_branch.BranchInitHookParams)
 
604
        self.assertTrue(hasattr(params, 'bzrdir'))
 
605
        self.assertTrue(hasattr(params, 'branch'))
 
606
 
 
607
    def test_post_branch_init_hook_repr(self):
 
608
        param_reprs = []
 
609
        _mod_branch.Branch.hooks.install_named_hook('post_branch_init',
 
610
            lambda params: param_reprs.append(repr(params)), None)
 
611
        branch = self.make_branch('a')
 
612
        self.assertLength(1, param_reprs)
 
613
        param_repr = param_reprs[0]
 
614
        self.assertStartsWith(param_repr, '<BranchInitHookParams of ')
 
615
 
 
616
    def test_post_switch_hook(self):
 
617
        from bzrlib import switch
 
618
        calls = []
 
619
        _mod_branch.Branch.hooks.install_named_hook('post_switch',
 
620
            calls.append, None)
 
621
        tree = self.make_branch_and_tree('branch-1')
 
622
        self.build_tree(['branch-1/file-1'])
 
623
        tree.add('file-1')
 
624
        tree.commit('rev1')
 
625
        to_branch = tree.bzrdir.sprout('branch-2').open_branch()
 
626
        self.build_tree(['branch-1/file-2'])
 
627
        tree.add('file-2')
 
628
        tree.remove('file-1')
 
629
        tree.commit('rev2')
 
630
        checkout = tree.branch.create_checkout('checkout')
 
631
        self.assertLength(0, calls)
 
632
        switch.switch(checkout.bzrdir, to_branch)
 
633
        self.assertLength(1, calls)
 
634
        params = calls[0]
 
635
        self.assertIsInstance(params, _mod_branch.SwitchHookParams)
 
636
        self.assertTrue(hasattr(params, 'to_branch'))
 
637
        self.assertTrue(hasattr(params, 'revision_id'))
 
638
 
 
639
 
 
640
class TestBranchOptions(tests.TestCaseWithTransport):
 
641
 
 
642
    def setUp(self):
 
643
        super(TestBranchOptions, self).setUp()
 
644
        self.branch = self.make_branch('.')
 
645
        self.config = self.branch.get_config()
 
646
 
 
647
    def check_append_revisions_only(self, expected_value, value=None):
 
648
        """Set append_revisions_only in config and check its interpretation."""
 
649
        if value is not None:
 
650
            self.config.set_user_option('append_revisions_only', value)
 
651
        self.assertEqual(expected_value,
 
652
                         self.branch._get_append_revisions_only())
 
653
 
 
654
    def test_valid_append_revisions_only(self):
 
655
        self.assertEquals(None,
 
656
                          self.config.get_user_option('append_revisions_only'))
 
657
        self.check_append_revisions_only(None)
 
658
        self.check_append_revisions_only(False, 'False')
 
659
        self.check_append_revisions_only(True, 'True')
 
660
        # The following values will cause compatibility problems on projects
 
661
        # using older bzr versions (<2.2) but are accepted
 
662
        self.check_append_revisions_only(False, 'false')
 
663
        self.check_append_revisions_only(True, 'true')
 
664
 
 
665
    def test_invalid_append_revisions_only(self):
 
666
        """Ensure warning is noted on invalid settings"""
 
667
        self.warnings = []
 
668
        def warning(*args):
 
669
            self.warnings.append(args[0] % args[1:])
 
670
        self.overrideAttr(trace, 'warning', warning)
 
671
        self.check_append_revisions_only(None, 'not-a-bool')
 
672
        self.assertLength(1, self.warnings)
 
673
        self.assertEqual(
 
674
            'Value "not-a-bool" is not a boolean for "append_revisions_only"',
 
675
            self.warnings[0])
 
676
 
 
677
 
 
678
class TestPullResult(tests.TestCase):
 
679
 
 
680
    def test_pull_result_to_int(self):
 
681
        # to support old code, the pull result can be used as an int
 
682
        r = _mod_branch.PullResult()
 
683
        r.old_revno = 10
 
684
        r.new_revno = 20
 
685
        # this usage of results is not recommended for new code (because it
 
686
        # doesn't describe very well what happened), but for api stability
 
687
        # it's still supported
 
688
        self.assertEqual(self.applyDeprecated(
 
689
            symbol_versioning.deprecated_in((2, 3, 0)),
 
690
            r.__int__),
 
691
            10)
 
692
 
 
693
    def test_report_changed(self):
 
694
        r = _mod_branch.PullResult()
 
695
        r.old_revid = "old-revid"
 
696
        r.old_revno = 10
 
697
        r.new_revid = "new-revid"
 
698
        r.new_revno = 20
 
699
        f = StringIO()
 
700
        r.report(f)
 
701
        self.assertEqual("Now on revision 20.\n", f.getvalue())
 
702
 
 
703
    def test_report_unchanged(self):
 
704
        r = _mod_branch.PullResult()
 
705
        r.old_revid = "same-revid"
 
706
        r.new_revid = "same-revid"
 
707
        f = StringIO()
 
708
        r.report(f)
 
709
        self.assertEqual("No revisions to pull.\n", f.getvalue())
 
710
 
 
711
 
 
712
class _StubLockable(object):
 
713
    """Helper for TestRunWithWriteLockedTarget."""
 
714
 
 
715
    def __init__(self, calls, unlock_exc=None):
 
716
        self.calls = calls
 
717
        self.unlock_exc = unlock_exc
 
718
 
 
719
    def lock_write(self):
 
720
        self.calls.append('lock_write')
 
721
 
 
722
    def unlock(self):
 
723
        self.calls.append('unlock')
 
724
        if self.unlock_exc is not None:
 
725
            raise self.unlock_exc
 
726
 
 
727
 
 
728
class _ErrorFromCallable(Exception):
 
729
    """Helper for TestRunWithWriteLockedTarget."""
 
730
 
 
731
 
 
732
class _ErrorFromUnlock(Exception):
 
733
    """Helper for TestRunWithWriteLockedTarget."""
 
734
 
 
735
 
 
736
class TestRunWithWriteLockedTarget(tests.TestCase):
 
737
    """Tests for _run_with_write_locked_target."""
 
738
 
 
739
    def setUp(self):
 
740
        tests.TestCase.setUp(self)
 
741
        self._calls = []
 
742
 
 
743
    def func_that_returns_ok(self):
 
744
        self._calls.append('func called')
 
745
        return 'ok'
 
746
 
 
747
    def func_that_raises(self):
 
748
        self._calls.append('func called')
 
749
        raise _ErrorFromCallable()
 
750
 
 
751
    def test_success_unlocks(self):
 
752
        lockable = _StubLockable(self._calls)
 
753
        result = _mod_branch._run_with_write_locked_target(
 
754
            lockable, self.func_that_returns_ok)
 
755
        self.assertEqual('ok', result)
 
756
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
 
757
 
 
758
    def test_exception_unlocks_and_propagates(self):
 
759
        lockable = _StubLockable(self._calls)
 
760
        self.assertRaises(_ErrorFromCallable,
 
761
                          _mod_branch._run_with_write_locked_target,
 
762
                          lockable, self.func_that_raises)
 
763
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
 
764
 
 
765
    def test_callable_succeeds_but_error_during_unlock(self):
 
766
        lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
 
767
        self.assertRaises(_ErrorFromUnlock,
 
768
                          _mod_branch._run_with_write_locked_target,
 
769
                          lockable, self.func_that_returns_ok)
 
770
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
 
771
 
 
772
    def test_error_during_unlock_does_not_mask_original_error(self):
 
773
        lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
 
774
        self.assertRaises(_ErrorFromCallable,
 
775
                          _mod_branch._run_with_write_locked_target,
 
776
                          lockable, self.func_that_raises)
 
777
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)