~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_branch.py

  • Committer: Robert Collins
  • Date: 2009-07-07 04:32:13 UTC
  • mto: This revision was merged to the branch mainline in revision 4524.
  • Revision ID: robertc@robertcollins.net-20090707043213-4hjjhgr40iq7gk2d
More informative assertions in xml serialisation.

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Tests for the Branch facility that are not interface  tests.
18
18
 
39
39
    BranchReferenceFormat,
40
40
    BzrBranch5,
41
41
    BzrBranchFormat5,
 
42
    BzrBranchFormat6,
42
43
    PullResult,
 
44
    _run_with_write_locked_target,
43
45
    )
44
 
from bzrlib.bzrdir import (BzrDirMetaFormat1, BzrDirMeta1, 
 
46
from bzrlib.bzrdir import (BzrDirMetaFormat1, BzrDirMeta1,
45
47
                           BzrDir, BzrDirFormat)
46
48
from bzrlib.errors import (NotBranchError,
47
49
                           UnknownFormatError,
52
54
from bzrlib.tests import TestCase, TestCaseWithTransport
53
55
from bzrlib.transport import get_transport
54
56
 
 
57
 
55
58
class TestDefaultFormat(TestCase):
56
59
 
 
60
    def test_default_format(self):
 
61
        # update this if you change the default branch format
 
62
        self.assertIsInstance(BranchFormat.get_default_format(),
 
63
                BzrBranchFormat6)
 
64
 
 
65
    def test_default_format_is_same_as_bzrdir_default(self):
 
66
        # XXX: it might be nice if there was only one place the default was
 
67
        # set, but at the moment that's not true -- mbp 20070814 --
 
68
        # https://bugs.launchpad.net/bzr/+bug/132376
 
69
        self.assertEqual(BranchFormat.get_default_format(),
 
70
                BzrDirFormat.get_default_format().get_branch_format())
 
71
 
57
72
    def test_get_set_default_format(self):
 
73
        # set the format and then set it back again
58
74
        old_format = BranchFormat.get_default_format()
59
 
        # default is 5
60
 
        self.assertTrue(isinstance(old_format, BzrBranchFormat5))
61
75
        BranchFormat.set_default_format(SampleBranchFormat())
62
76
        try:
63
77
            # the default branch format is used by the meta dir format
95
109
                                   ensure_config_dir_exists)
96
110
        ensure_config_dir_exists()
97
111
        fn = locations_config_filename()
 
112
        # write correct newlines to locations.conf
 
113
        # by default ConfigObj uses native line-endings for new files
 
114
        # but uses already existing line-endings if file is not empty
 
115
        f = open(fn, 'wb')
 
116
        try:
 
117
            f.write('# comment\n')
 
118
        finally:
 
119
            f.close()
 
120
 
98
121
        branch = self.make_branch('.', format='knit')
99
122
        branch.set_push_location('foo')
100
123
        local_path = urlutils.local_path_from_url(branch.base[:-1])
101
 
        self.assertFileEqual("[%s]\n"
 
124
        self.assertFileEqual("# comment\n"
 
125
                             "[%s]\n"
102
126
                             "push_location = foo\n"
103
 
                             "push_location:policy = norecurse" % local_path,
 
127
                             "push_location:policy = norecurse\n" % local_path,
104
128
                             fn)
105
129
 
106
130
    # TODO RBC 20051029 test getting a push location from a branch in a
110
134
class SampleBranchFormat(BranchFormat):
111
135
    """A sample format
112
136
 
113
 
    this format is initializable, unsupported to aid in testing the 
 
137
    this format is initializable, unsupported to aid in testing the
114
138
    open and open_downlevel routines.
115
139
    """
116
140
 
127
151
    def is_supported(self):
128
152
        return False
129
153
 
130
 
    def open(self, transport, _found=False):
 
154
    def open(self, transport, _found=False, ignore_fallbacks=False):
131
155
        return "opened branch."
132
156
 
133
157
 
137
161
    def test_find_format(self):
138
162
        # is the right format object found for a branch?
139
163
        # create a branch with a few known format objects.
140
 
        # this is not quite the same as 
 
164
        # this is not quite the same as
141
165
        self.build_tree(["foo/", "bar/"])
142
166
        def check_format(format, url):
143
167
            dir = format._matchingbzrdir.initialize(url)
146
170
            found_format = BranchFormat.find_format(dir)
147
171
            self.failUnless(isinstance(found_format, format.__class__))
148
172
        check_format(BzrBranchFormat5(), "bar")
149
 
        
 
173
 
150
174
    def test_find_format_not_branch(self):
151
175
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
152
176
        self.assertRaises(NotBranchError,
178
202
        self.make_branch_and_tree('bar')
179
203
 
180
204
 
181
 
class TestBranch6(TestCaseWithTransport):
 
205
class TestBranch67(object):
 
206
    """Common tests for both branch 6 and 7 which are mostly the same."""
 
207
 
 
208
    def get_format_name(self):
 
209
        raise NotImplementedError(self.get_format_name)
 
210
 
 
211
    def get_format_name_subtree(self):
 
212
        raise NotImplementedError(self.get_format_name)
 
213
 
 
214
    def get_class(self):
 
215
        raise NotImplementedError(self.get_class)
182
216
 
183
217
    def test_creation(self):
184
218
        format = BzrDirMetaFormat1()
185
219
        format.set_branch_format(_mod_branch.BzrBranchFormat6())
186
220
        branch = self.make_branch('a', format=format)
187
 
        self.assertIsInstance(branch, _mod_branch.BzrBranch6)
188
 
        branch = self.make_branch('b', format='dirstate-tags')
189
 
        self.assertIsInstance(branch, _mod_branch.BzrBranch6)
 
221
        self.assertIsInstance(branch, self.get_class())
 
222
        branch = self.make_branch('b', format=self.get_format_name())
 
223
        self.assertIsInstance(branch, self.get_class())
190
224
        branch = _mod_branch.Branch.open('a')
191
 
        self.assertIsInstance(branch, _mod_branch.BzrBranch6)
 
225
        self.assertIsInstance(branch, self.get_class())
192
226
 
193
227
    def test_layout(self):
194
 
        branch = self.make_branch('a', format='dirstate-tags')
 
228
        branch = self.make_branch('a', format=self.get_format_name())
195
229
        self.failUnlessExists('a/.bzr/branch/last-revision')
196
230
        self.failIfExists('a/.bzr/branch/revision-history')
 
231
        self.failIfExists('a/.bzr/branch/references')
197
232
 
198
233
    def test_config(self):
199
234
        """Ensure that all configuration data is stored in the branch"""
200
 
        branch = self.make_branch('a', format='dirstate-tags')
 
235
        branch = self.make_branch('a', format=self.get_format_name())
201
236
        branch.set_parent('http://bazaar-vcs.org')
202
237
        self.failIfExists('a/.bzr/branch/parent')
203
238
        self.assertEqual('http://bazaar-vcs.org', branch.get_parent())
210
245
        self.assertEqual('ftp://bazaar-vcs.org', branch.get_bound_location())
211
246
 
212
247
    def test_set_revision_history(self):
213
 
        tree = self.make_branch_and_memory_tree('.',
214
 
            format='dirstate-tags')
215
 
        tree.lock_write()
216
 
        try:
217
 
            tree.add('.')
218
 
            tree.commit('foo', rev_id='foo')
219
 
            tree.commit('bar', rev_id='bar')
220
 
            tree.branch.set_revision_history(['foo', 'bar'])
221
 
            tree.branch.set_revision_history(['foo'])
222
 
            self.assertRaises(errors.NotLefthandHistory,
223
 
                              tree.branch.set_revision_history, ['bar'])
224
 
        finally:
225
 
            tree.unlock()
226
 
 
227
 
    def test_append_revision(self):
228
 
        tree = self.make_branch_and_tree('branch1',
229
 
            format='dirstate-tags')
230
 
        tree.lock_write()
231
 
        try:
232
 
            tree.commit('foo', rev_id='foo')
233
 
            tree.commit('bar', rev_id='bar')
234
 
            tree.commit('baz', rev_id='baz')
235
 
            tree.set_last_revision('bar')
236
 
            tree.branch.set_last_revision_info(2, 'bar')
237
 
            tree.commit('qux', rev_id='qux')
238
 
            tree.add_parent_tree_id('baz')
239
 
            tree.commit('qux', rev_id='quxx')
240
 
            tree.branch.set_last_revision_info(0, 'null:')
241
 
            self.assertRaises(errors.NotLeftParentDescendant,
242
 
                              tree.branch.append_revision, 'bar')
243
 
            tree.branch.append_revision('foo')
244
 
            self.assertRaises(errors.NotLeftParentDescendant,
245
 
                              tree.branch.append_revision, 'baz')
246
 
            tree.branch.append_revision('bar')
247
 
            tree.branch.append_revision('baz')
248
 
            self.assertRaises(errors.NotLeftParentDescendant,
249
 
                              tree.branch.append_revision, 'quxx')
250
 
        finally:
251
 
            tree.unlock()
 
248
        builder = self.make_branch_builder('.', format=self.get_format_name())
 
249
        builder.build_snapshot('foo', None,
 
250
            [('add', ('', None, 'directory', None))],
 
251
            message='foo')
 
252
        builder.build_snapshot('bar', None, [], message='bar')
 
253
        branch = builder.get_branch()
 
254
        branch.lock_write()
 
255
        self.addCleanup(branch.unlock)
 
256
        branch.set_revision_history(['foo', 'bar'])
 
257
        branch.set_revision_history(['foo'])
 
258
        self.assertRaises(errors.NotLefthandHistory,
 
259
                          branch.set_revision_history, ['bar'])
252
260
 
253
261
    def do_checkout_test(self, lightweight=False):
254
 
        tree = self.make_branch_and_tree('source', format='dirstate-with-subtree')
 
262
        tree = self.make_branch_and_tree('source',
 
263
            format=self.get_format_name_subtree())
255
264
        subtree = self.make_branch_and_tree('source/subtree',
256
 
            format='dirstate-with-subtree')
 
265
            format=self.get_format_name_subtree())
257
266
        subsubtree = self.make_branch_and_tree('source/subtree/subsubtree',
258
 
            format='dirstate-with-subtree')
 
267
            format=self.get_format_name_subtree())
259
268
        self.build_tree(['source/subtree/file',
260
269
                         'source/subtree/subsubtree/file'])
261
270
        subsubtree.add('file')
276
285
        else:
277
286
            self.assertEndsWith(subbranch.base, 'target/subtree/subsubtree/')
278
287
 
279
 
 
280
288
    def test_checkout_with_references(self):
281
289
        self.do_checkout_test()
282
290
 
284
292
        self.do_checkout_test(lightweight=True)
285
293
 
286
294
    def test_set_push(self):
287
 
        branch = self.make_branch('source', format='dirstate-tags')
 
295
        branch = self.make_branch('source', format=self.get_format_name())
288
296
        branch.get_config().set_user_option('push_location', 'old',
289
297
            store=config.STORE_LOCATION)
290
298
        warnings = []
299
307
        self.assertEqual(warnings[0], 'Value "new" is masked by "old" from '
300
308
                         'locations.conf')
301
309
 
 
310
 
 
311
class TestBranch6(TestBranch67, TestCaseWithTransport):
 
312
 
 
313
    def get_class(self):
 
314
        return _mod_branch.BzrBranch6
 
315
 
 
316
    def get_format_name(self):
 
317
        return "dirstate-tags"
 
318
 
 
319
    def get_format_name_subtree(self):
 
320
        return "dirstate-with-subtree"
 
321
 
 
322
    def test_set_stacked_on_url_errors(self):
 
323
        branch = self.make_branch('a', format=self.get_format_name())
 
324
        self.assertRaises(errors.UnstackableBranchFormat,
 
325
            branch.set_stacked_on_url, None)
 
326
 
 
327
    def test_default_stacked_location(self):
 
328
        branch = self.make_branch('a', format=self.get_format_name())
 
329
        self.assertRaises(errors.UnstackableBranchFormat, branch.get_stacked_on_url)
 
330
 
 
331
 
 
332
class TestBranch7(TestBranch67, TestCaseWithTransport):
 
333
 
 
334
    def get_class(self):
 
335
        return _mod_branch.BzrBranch7
 
336
 
 
337
    def get_format_name(self):
 
338
        return "1.9"
 
339
 
 
340
    def get_format_name_subtree(self):
 
341
        return "development-subtree"
 
342
 
 
343
    def test_set_stacked_on_url_unstackable_repo(self):
 
344
        repo = self.make_repository('a', format='dirstate-tags')
 
345
        control = repo.bzrdir
 
346
        branch = _mod_branch.BzrBranchFormat7().initialize(control)
 
347
        target = self.make_branch('b')
 
348
        self.assertRaises(errors.UnstackableRepositoryFormat,
 
349
            branch.set_stacked_on_url, target.base)
 
350
 
 
351
    def test_clone_stacked_on_unstackable_repo(self):
 
352
        repo = self.make_repository('a', format='dirstate-tags')
 
353
        control = repo.bzrdir
 
354
        branch = _mod_branch.BzrBranchFormat7().initialize(control)
 
355
        # Calling clone should not raise UnstackableRepositoryFormat.
 
356
        cloned_bzrdir = control.clone('cloned')
 
357
 
 
358
    def _test_default_stacked_location(self):
 
359
        branch = self.make_branch('a', format=self.get_format_name())
 
360
        self.assertRaises(errors.NotStacked, branch.get_stacked_on_url)
 
361
 
 
362
    def test_stack_and_unstack(self):
 
363
        branch = self.make_branch('a', format=self.get_format_name())
 
364
        target = self.make_branch_and_tree('b', format=self.get_format_name())
 
365
        branch.set_stacked_on_url(target.branch.base)
 
366
        self.assertEqual(target.branch.base, branch.get_stacked_on_url())
 
367
        revid = target.commit('foo')
 
368
        self.assertTrue(branch.repository.has_revision(revid))
 
369
        branch.set_stacked_on_url(None)
 
370
        self.assertRaises(errors.NotStacked, branch.get_stacked_on_url)
 
371
        self.assertFalse(branch.repository.has_revision(revid))
 
372
 
 
373
    def test_open_opens_stacked_reference(self):
 
374
        branch = self.make_branch('a', format=self.get_format_name())
 
375
        target = self.make_branch_and_tree('b', format=self.get_format_name())
 
376
        branch.set_stacked_on_url(target.branch.base)
 
377
        branch = branch.bzrdir.open_branch()
 
378
        revid = target.commit('foo')
 
379
        self.assertTrue(branch.repository.has_revision(revid))
 
380
 
 
381
 
 
382
class BzrBranch8(TestCaseWithTransport):
 
383
 
 
384
    def make_branch(self, location, format=None):
 
385
        if format is None:
 
386
            format = bzrdir.format_registry.make_bzrdir('1.9')
 
387
            format.set_branch_format(_mod_branch.BzrBranchFormat8())
 
388
        return TestCaseWithTransport.make_branch(self, location, format=format)
 
389
 
 
390
    def create_branch_with_reference(self):
 
391
        branch = self.make_branch('branch')
 
392
        branch._set_all_reference_info({'file-id': ('path', 'location')})
 
393
        return branch
 
394
 
 
395
    @staticmethod
 
396
    def instrument_branch(branch, gets):
 
397
        old_get = branch._transport.get
 
398
        def get(*args, **kwargs):
 
399
            gets.append((args, kwargs))
 
400
            return old_get(*args, **kwargs)
 
401
        branch._transport.get = get
 
402
 
 
403
    def test_reference_info_caching_read_locked(self):
 
404
        gets = []
 
405
        branch = self.create_branch_with_reference()
 
406
        branch.lock_read()
 
407
        self.addCleanup(branch.unlock)
 
408
        self.instrument_branch(branch, gets)
 
409
        branch.get_reference_info('file-id')
 
410
        branch.get_reference_info('file-id')
 
411
        self.assertEqual(1, len(gets))
 
412
 
 
413
    def test_reference_info_caching_read_unlocked(self):
 
414
        gets = []
 
415
        branch = self.create_branch_with_reference()
 
416
        self.instrument_branch(branch, gets)
 
417
        branch.get_reference_info('file-id')
 
418
        branch.get_reference_info('file-id')
 
419
        self.assertEqual(2, len(gets))
 
420
 
 
421
    def test_reference_info_caching_write_locked(self):
 
422
        gets = []
 
423
        branch = self.make_branch('branch')
 
424
        branch.lock_write()
 
425
        self.instrument_branch(branch, gets)
 
426
        self.addCleanup(branch.unlock)
 
427
        branch._set_all_reference_info({'file-id': ('path2', 'location2')})
 
428
        path, location = branch.get_reference_info('file-id')
 
429
        self.assertEqual(0, len(gets))
 
430
        self.assertEqual('path2', path)
 
431
        self.assertEqual('location2', location)
 
432
 
 
433
    def test_reference_info_caches_cleared(self):
 
434
        branch = self.make_branch('branch')
 
435
        branch.lock_write()
 
436
        branch.set_reference_info('file-id', 'path2', 'location2')
 
437
        branch.unlock()
 
438
        doppelganger = Branch.open('branch')
 
439
        doppelganger.set_reference_info('file-id', 'path3', 'location3')
 
440
        self.assertEqual(('path3', 'location3'),
 
441
                         branch.get_reference_info('file-id'))
 
442
 
302
443
class TestBranchReference(TestCaseWithTransport):
303
444
    """Tests for the branch reference facility."""
304
445
 
336
477
        self.assertTrue("set_rh" in hooks, "set_rh not in %s" % hooks)
337
478
        self.assertTrue("post_push" in hooks, "post_push not in %s" % hooks)
338
479
        self.assertTrue("post_commit" in hooks, "post_commit not in %s" % hooks)
 
480
        self.assertTrue("pre_commit" in hooks, "pre_commit not in %s" % hooks)
339
481
        self.assertTrue("post_pull" in hooks, "post_pull not in %s" % hooks)
340
482
        self.assertTrue("post_uncommit" in hooks, "post_uncommit not in %s" % hooks)
 
483
        self.assertTrue("post_change_branch_tip" in hooks,
 
484
                        "post_change_branch_tip not in %s" % hooks)
341
485
 
342
486
    def test_installed_hooks_are_BranchHooks(self):
343
487
        """The installed hooks object should be a BranchHooks."""
344
488
        # the installed hooks are saved in self._preserved_hooks.
345
 
        self.assertIsInstance(self._preserved_hooks[_mod_branch.Branch], BranchHooks)
 
489
        self.assertIsInstance(self._preserved_hooks[_mod_branch.Branch][1],
 
490
            BranchHooks)
346
491
 
347
492
 
348
493
class TestPullResult(TestCase):
357
502
        # it's still supported
358
503
        a = "%d revisions pulled" % r
359
504
        self.assertEqual(a, "10 revisions pulled")
 
505
 
 
506
 
 
507
 
 
508
class _StubLockable(object):
 
509
    """Helper for TestRunWithWriteLockedTarget."""
 
510
 
 
511
    def __init__(self, calls, unlock_exc=None):
 
512
        self.calls = calls
 
513
        self.unlock_exc = unlock_exc
 
514
 
 
515
    def lock_write(self):
 
516
        self.calls.append('lock_write')
 
517
 
 
518
    def unlock(self):
 
519
        self.calls.append('unlock')
 
520
        if self.unlock_exc is not None:
 
521
            raise self.unlock_exc
 
522
 
 
523
 
 
524
class _ErrorFromCallable(Exception):
 
525
    """Helper for TestRunWithWriteLockedTarget."""
 
526
 
 
527
 
 
528
class _ErrorFromUnlock(Exception):
 
529
    """Helper for TestRunWithWriteLockedTarget."""
 
530
 
 
531
 
 
532
class TestRunWithWriteLockedTarget(TestCase):
 
533
    """Tests for _run_with_write_locked_target."""
 
534
 
 
535
    def setUp(self):
 
536
        TestCase.setUp(self)
 
537
        self._calls = []
 
538
 
 
539
    def func_that_returns_ok(self):
 
540
        self._calls.append('func called')
 
541
        return 'ok'
 
542
 
 
543
    def func_that_raises(self):
 
544
        self._calls.append('func called')
 
545
        raise _ErrorFromCallable()
 
546
 
 
547
    def test_success_unlocks(self):
 
548
        lockable = _StubLockable(self._calls)
 
549
        result = _run_with_write_locked_target(
 
550
            lockable, self.func_that_returns_ok)
 
551
        self.assertEqual('ok', result)
 
552
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
 
553
 
 
554
    def test_exception_unlocks_and_propagates(self):
 
555
        lockable = _StubLockable(self._calls)
 
556
        self.assertRaises(_ErrorFromCallable,
 
557
            _run_with_write_locked_target, lockable, self.func_that_raises)
 
558
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
 
559
 
 
560
    def test_callable_succeeds_but_error_during_unlock(self):
 
561
        lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
 
562
        self.assertRaises(_ErrorFromUnlock,
 
563
            _run_with_write_locked_target, lockable, self.func_that_returns_ok)
 
564
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
 
565
 
 
566
    def test_error_during_unlock_does_not_mask_original_error(self):
 
567
        lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
 
568
        self.assertRaises(_ErrorFromCallable,
 
569
            _run_with_write_locked_target, lockable, self.func_that_raises)
 
570
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
 
571
 
 
572