~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_branch.py

  • Committer: Naoki INADA
  • Date: 2009-10-29 10:01:19 UTC
  • mto: (4634.97.3 2.0)
  • mto: This revision was merged to the branch mainline in revision 4798.
  • Revision ID: inada-n@klab.jp-20091029100119-uckv9t7ej2qrghw3
import doc-ja rev90

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Tests for the Branch facility that are not interface  tests.
18
18
 
19
 
For interface tests see tests/branch_implementations/*.py.
 
19
For interface tests see tests/per_branch/*.py.
20
20
 
21
21
For concrete class tests see this file, and for meta-branch tests
22
22
also see this file.
40
40
    BzrBranch5,
41
41
    BzrBranchFormat5,
42
42
    BzrBranchFormat6,
 
43
    BzrBranchFormat7,
43
44
    PullResult,
 
45
    _run_with_write_locked_target,
44
46
    )
45
 
from bzrlib.bzrdir import (BzrDirMetaFormat1, BzrDirMeta1, 
 
47
from bzrlib.bzrdir import (BzrDirMetaFormat1, BzrDirMeta1,
46
48
                           BzrDir, BzrDirFormat)
47
49
from bzrlib.errors import (NotBranchError,
48
50
                           UnknownFormatError,
53
55
from bzrlib.tests import TestCase, TestCaseWithTransport
54
56
from bzrlib.transport import get_transport
55
57
 
 
58
 
56
59
class TestDefaultFormat(TestCase):
57
60
 
58
61
    def test_default_format(self):
59
62
        # update this if you change the default branch format
60
63
        self.assertIsInstance(BranchFormat.get_default_format(),
61
 
                BzrBranchFormat6)
 
64
                BzrBranchFormat7)
62
65
 
63
66
    def test_default_format_is_same_as_bzrdir_default(self):
64
67
        # XXX: it might be nice if there was only one place the default was
65
 
        # set, but at the moment that's not true -- mbp 20070814 -- 
 
68
        # set, but at the moment that's not true -- mbp 20070814 --
66
69
        # https://bugs.launchpad.net/bzr/+bug/132376
67
70
        self.assertEqual(BranchFormat.get_default_format(),
68
71
                BzrDirFormat.get_default_format().get_branch_format())
122
125
        self.assertFileEqual("# comment\n"
123
126
                             "[%s]\n"
124
127
                             "push_location = foo\n"
125
 
                             "push_location:policy = norecurse" % local_path,
 
128
                             "push_location:policy = norecurse\n" % local_path,
126
129
                             fn)
127
130
 
128
131
    # TODO RBC 20051029 test getting a push location from a branch in a
132
135
class SampleBranchFormat(BranchFormat):
133
136
    """A sample format
134
137
 
135
 
    this format is initializable, unsupported to aid in testing the 
 
138
    this format is initializable, unsupported to aid in testing the
136
139
    open and open_downlevel routines.
137
140
    """
138
141
 
149
152
    def is_supported(self):
150
153
        return False
151
154
 
152
 
    def open(self, transport, _found=False):
 
155
    def open(self, transport, _found=False, ignore_fallbacks=False):
153
156
        return "opened branch."
154
157
 
155
158
 
159
162
    def test_find_format(self):
160
163
        # is the right format object found for a branch?
161
164
        # create a branch with a few known format objects.
162
 
        # this is not quite the same as 
 
165
        # this is not quite the same as
163
166
        self.build_tree(["foo/", "bar/"])
164
167
        def check_format(format, url):
165
168
            dir = format._matchingbzrdir.initialize(url)
168
171
            found_format = BranchFormat.find_format(dir)
169
172
            self.failUnless(isinstance(found_format, format.__class__))
170
173
        check_format(BzrBranchFormat5(), "bar")
171
 
        
 
174
 
172
175
    def test_find_format_not_branch(self):
173
176
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
174
177
        self.assertRaises(NotBranchError,
200
203
        self.make_branch_and_tree('bar')
201
204
 
202
205
 
203
 
class TestBranch6(TestCaseWithTransport):
 
206
class TestBranch67(object):
 
207
    """Common tests for both branch 6 and 7 which are mostly the same."""
 
208
 
 
209
    def get_format_name(self):
 
210
        raise NotImplementedError(self.get_format_name)
 
211
 
 
212
    def get_format_name_subtree(self):
 
213
        raise NotImplementedError(self.get_format_name)
 
214
 
 
215
    def get_class(self):
 
216
        raise NotImplementedError(self.get_class)
204
217
 
205
218
    def test_creation(self):
206
219
        format = BzrDirMetaFormat1()
207
220
        format.set_branch_format(_mod_branch.BzrBranchFormat6())
208
221
        branch = self.make_branch('a', format=format)
209
 
        self.assertIsInstance(branch, _mod_branch.BzrBranch6)
210
 
        branch = self.make_branch('b', format='dirstate-tags')
211
 
        self.assertIsInstance(branch, _mod_branch.BzrBranch6)
 
222
        self.assertIsInstance(branch, self.get_class())
 
223
        branch = self.make_branch('b', format=self.get_format_name())
 
224
        self.assertIsInstance(branch, self.get_class())
212
225
        branch = _mod_branch.Branch.open('a')
213
 
        self.assertIsInstance(branch, _mod_branch.BzrBranch6)
 
226
        self.assertIsInstance(branch, self.get_class())
214
227
 
215
228
    def test_layout(self):
216
 
        branch = self.make_branch('a', format='dirstate-tags')
 
229
        branch = self.make_branch('a', format=self.get_format_name())
217
230
        self.failUnlessExists('a/.bzr/branch/last-revision')
218
231
        self.failIfExists('a/.bzr/branch/revision-history')
 
232
        self.failIfExists('a/.bzr/branch/references')
219
233
 
220
234
    def test_config(self):
221
235
        """Ensure that all configuration data is stored in the branch"""
222
 
        branch = self.make_branch('a', format='dirstate-tags')
 
236
        branch = self.make_branch('a', format=self.get_format_name())
223
237
        branch.set_parent('http://bazaar-vcs.org')
224
238
        self.failIfExists('a/.bzr/branch/parent')
225
239
        self.assertEqual('http://bazaar-vcs.org', branch.get_parent())
232
246
        self.assertEqual('ftp://bazaar-vcs.org', branch.get_bound_location())
233
247
 
234
248
    def test_set_revision_history(self):
235
 
        tree = self.make_branch_and_memory_tree('.',
236
 
            format='dirstate-tags')
237
 
        tree.lock_write()
238
 
        try:
239
 
            tree.add('.')
240
 
            tree.commit('foo', rev_id='foo')
241
 
            tree.commit('bar', rev_id='bar')
242
 
            tree.branch.set_revision_history(['foo', 'bar'])
243
 
            tree.branch.set_revision_history(['foo'])
244
 
            self.assertRaises(errors.NotLefthandHistory,
245
 
                              tree.branch.set_revision_history, ['bar'])
246
 
        finally:
247
 
            tree.unlock()
 
249
        builder = self.make_branch_builder('.', format=self.get_format_name())
 
250
        builder.build_snapshot('foo', None,
 
251
            [('add', ('', None, 'directory', None))],
 
252
            message='foo')
 
253
        builder.build_snapshot('bar', None, [], message='bar')
 
254
        branch = builder.get_branch()
 
255
        branch.lock_write()
 
256
        self.addCleanup(branch.unlock)
 
257
        branch.set_revision_history(['foo', 'bar'])
 
258
        branch.set_revision_history(['foo'])
 
259
        self.assertRaises(errors.NotLefthandHistory,
 
260
                          branch.set_revision_history, ['bar'])
248
261
 
249
262
    def do_checkout_test(self, lightweight=False):
250
 
        tree = self.make_branch_and_tree('source', format='dirstate-with-subtree')
 
263
        tree = self.make_branch_and_tree('source',
 
264
            format=self.get_format_name_subtree())
251
265
        subtree = self.make_branch_and_tree('source/subtree',
252
 
            format='dirstate-with-subtree')
 
266
            format=self.get_format_name_subtree())
253
267
        subsubtree = self.make_branch_and_tree('source/subtree/subsubtree',
254
 
            format='dirstate-with-subtree')
 
268
            format=self.get_format_name_subtree())
255
269
        self.build_tree(['source/subtree/file',
256
270
                         'source/subtree/subsubtree/file'])
257
271
        subsubtree.add('file')
279
293
        self.do_checkout_test(lightweight=True)
280
294
 
281
295
    def test_set_push(self):
282
 
        branch = self.make_branch('source', format='dirstate-tags')
 
296
        branch = self.make_branch('source', format=self.get_format_name())
283
297
        branch.get_config().set_user_option('push_location', 'old',
284
298
            store=config.STORE_LOCATION)
285
299
        warnings = []
294
308
        self.assertEqual(warnings[0], 'Value "new" is masked by "old" from '
295
309
                         'locations.conf')
296
310
 
 
311
 
 
312
class TestBranch6(TestBranch67, TestCaseWithTransport):
 
313
 
 
314
    def get_class(self):
 
315
        return _mod_branch.BzrBranch6
 
316
 
 
317
    def get_format_name(self):
 
318
        return "dirstate-tags"
 
319
 
 
320
    def get_format_name_subtree(self):
 
321
        return "dirstate-with-subtree"
 
322
 
 
323
    def test_set_stacked_on_url_errors(self):
 
324
        branch = self.make_branch('a', format=self.get_format_name())
 
325
        self.assertRaises(errors.UnstackableBranchFormat,
 
326
            branch.set_stacked_on_url, None)
 
327
 
 
328
    def test_default_stacked_location(self):
 
329
        branch = self.make_branch('a', format=self.get_format_name())
 
330
        self.assertRaises(errors.UnstackableBranchFormat, branch.get_stacked_on_url)
 
331
 
 
332
 
 
333
class TestBranch7(TestBranch67, TestCaseWithTransport):
 
334
 
 
335
    def get_class(self):
 
336
        return _mod_branch.BzrBranch7
 
337
 
 
338
    def get_format_name(self):
 
339
        return "1.9"
 
340
 
 
341
    def get_format_name_subtree(self):
 
342
        return "development-subtree"
 
343
 
 
344
    def test_set_stacked_on_url_unstackable_repo(self):
 
345
        repo = self.make_repository('a', format='dirstate-tags')
 
346
        control = repo.bzrdir
 
347
        branch = _mod_branch.BzrBranchFormat7().initialize(control)
 
348
        target = self.make_branch('b')
 
349
        self.assertRaises(errors.UnstackableRepositoryFormat,
 
350
            branch.set_stacked_on_url, target.base)
 
351
 
 
352
    def test_clone_stacked_on_unstackable_repo(self):
 
353
        repo = self.make_repository('a', format='dirstate-tags')
 
354
        control = repo.bzrdir
 
355
        branch = _mod_branch.BzrBranchFormat7().initialize(control)
 
356
        # Calling clone should not raise UnstackableRepositoryFormat.
 
357
        cloned_bzrdir = control.clone('cloned')
 
358
 
 
359
    def _test_default_stacked_location(self):
 
360
        branch = self.make_branch('a', format=self.get_format_name())
 
361
        self.assertRaises(errors.NotStacked, branch.get_stacked_on_url)
 
362
 
 
363
    def test_stack_and_unstack(self):
 
364
        branch = self.make_branch('a', format=self.get_format_name())
 
365
        target = self.make_branch_and_tree('b', format=self.get_format_name())
 
366
        branch.set_stacked_on_url(target.branch.base)
 
367
        self.assertEqual(target.branch.base, branch.get_stacked_on_url())
 
368
        revid = target.commit('foo')
 
369
        self.assertTrue(branch.repository.has_revision(revid))
 
370
        branch.set_stacked_on_url(None)
 
371
        self.assertRaises(errors.NotStacked, branch.get_stacked_on_url)
 
372
        self.assertFalse(branch.repository.has_revision(revid))
 
373
 
 
374
    def test_open_opens_stacked_reference(self):
 
375
        branch = self.make_branch('a', format=self.get_format_name())
 
376
        target = self.make_branch_and_tree('b', format=self.get_format_name())
 
377
        branch.set_stacked_on_url(target.branch.base)
 
378
        branch = branch.bzrdir.open_branch()
 
379
        revid = target.commit('foo')
 
380
        self.assertTrue(branch.repository.has_revision(revid))
 
381
 
 
382
 
 
383
class BzrBranch8(TestCaseWithTransport):
 
384
 
 
385
    def make_branch(self, location, format=None):
 
386
        if format is None:
 
387
            format = bzrdir.format_registry.make_bzrdir('1.9')
 
388
            format.set_branch_format(_mod_branch.BzrBranchFormat8())
 
389
        return TestCaseWithTransport.make_branch(self, location, format=format)
 
390
 
 
391
    def create_branch_with_reference(self):
 
392
        branch = self.make_branch('branch')
 
393
        branch._set_all_reference_info({'file-id': ('path', 'location')})
 
394
        return branch
 
395
 
 
396
    @staticmethod
 
397
    def instrument_branch(branch, gets):
 
398
        old_get = branch._transport.get
 
399
        def get(*args, **kwargs):
 
400
            gets.append((args, kwargs))
 
401
            return old_get(*args, **kwargs)
 
402
        branch._transport.get = get
 
403
 
 
404
    def test_reference_info_caching_read_locked(self):
 
405
        gets = []
 
406
        branch = self.create_branch_with_reference()
 
407
        branch.lock_read()
 
408
        self.addCleanup(branch.unlock)
 
409
        self.instrument_branch(branch, gets)
 
410
        branch.get_reference_info('file-id')
 
411
        branch.get_reference_info('file-id')
 
412
        self.assertEqual(1, len(gets))
 
413
 
 
414
    def test_reference_info_caching_read_unlocked(self):
 
415
        gets = []
 
416
        branch = self.create_branch_with_reference()
 
417
        self.instrument_branch(branch, gets)
 
418
        branch.get_reference_info('file-id')
 
419
        branch.get_reference_info('file-id')
 
420
        self.assertEqual(2, len(gets))
 
421
 
 
422
    def test_reference_info_caching_write_locked(self):
 
423
        gets = []
 
424
        branch = self.make_branch('branch')
 
425
        branch.lock_write()
 
426
        self.instrument_branch(branch, gets)
 
427
        self.addCleanup(branch.unlock)
 
428
        branch._set_all_reference_info({'file-id': ('path2', 'location2')})
 
429
        path, location = branch.get_reference_info('file-id')
 
430
        self.assertEqual(0, len(gets))
 
431
        self.assertEqual('path2', path)
 
432
        self.assertEqual('location2', location)
 
433
 
 
434
    def test_reference_info_caches_cleared(self):
 
435
        branch = self.make_branch('branch')
 
436
        branch.lock_write()
 
437
        branch.set_reference_info('file-id', 'path2', 'location2')
 
438
        branch.unlock()
 
439
        doppelganger = Branch.open('branch')
 
440
        doppelganger.set_reference_info('file-id', 'path3', 'location3')
 
441
        self.assertEqual(('path3', 'location3'),
 
442
                         branch.get_reference_info('file-id'))
 
443
 
297
444
class TestBranchReference(TestCaseWithTransport):
298
445
    """Tests for the branch reference facility."""
299
446
 
334
481
        self.assertTrue("pre_commit" in hooks, "pre_commit not in %s" % hooks)
335
482
        self.assertTrue("post_pull" in hooks, "post_pull not in %s" % hooks)
336
483
        self.assertTrue("post_uncommit" in hooks, "post_uncommit not in %s" % hooks)
 
484
        self.assertTrue("post_change_branch_tip" in hooks,
 
485
                        "post_change_branch_tip not in %s" % hooks)
337
486
 
338
487
    def test_installed_hooks_are_BranchHooks(self):
339
488
        """The installed hooks object should be a BranchHooks."""
340
489
        # the installed hooks are saved in self._preserved_hooks.
341
 
        self.assertIsInstance(self._preserved_hooks[_mod_branch.Branch], BranchHooks)
 
490
        self.assertIsInstance(self._preserved_hooks[_mod_branch.Branch][1],
 
491
            BranchHooks)
342
492
 
343
493
 
344
494
class TestPullResult(TestCase):
353
503
        # it's still supported
354
504
        a = "%d revisions pulled" % r
355
505
        self.assertEqual(a, "10 revisions pulled")
 
506
 
 
507
 
 
508
 
 
509
class _StubLockable(object):
 
510
    """Helper for TestRunWithWriteLockedTarget."""
 
511
 
 
512
    def __init__(self, calls, unlock_exc=None):
 
513
        self.calls = calls
 
514
        self.unlock_exc = unlock_exc
 
515
 
 
516
    def lock_write(self):
 
517
        self.calls.append('lock_write')
 
518
 
 
519
    def unlock(self):
 
520
        self.calls.append('unlock')
 
521
        if self.unlock_exc is not None:
 
522
            raise self.unlock_exc
 
523
 
 
524
 
 
525
class _ErrorFromCallable(Exception):
 
526
    """Helper for TestRunWithWriteLockedTarget."""
 
527
 
 
528
 
 
529
class _ErrorFromUnlock(Exception):
 
530
    """Helper for TestRunWithWriteLockedTarget."""
 
531
 
 
532
 
 
533
class TestRunWithWriteLockedTarget(TestCase):
 
534
    """Tests for _run_with_write_locked_target."""
 
535
 
 
536
    def setUp(self):
 
537
        TestCase.setUp(self)
 
538
        self._calls = []
 
539
 
 
540
    def func_that_returns_ok(self):
 
541
        self._calls.append('func called')
 
542
        return 'ok'
 
543
 
 
544
    def func_that_raises(self):
 
545
        self._calls.append('func called')
 
546
        raise _ErrorFromCallable()
 
547
 
 
548
    def test_success_unlocks(self):
 
549
        lockable = _StubLockable(self._calls)
 
550
        result = _run_with_write_locked_target(
 
551
            lockable, self.func_that_returns_ok)
 
552
        self.assertEqual('ok', result)
 
553
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
 
554
 
 
555
    def test_exception_unlocks_and_propagates(self):
 
556
        lockable = _StubLockable(self._calls)
 
557
        self.assertRaises(_ErrorFromCallable,
 
558
            _run_with_write_locked_target, lockable, self.func_that_raises)
 
559
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
 
560
 
 
561
    def test_callable_succeeds_but_error_during_unlock(self):
 
562
        lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
 
563
        self.assertRaises(_ErrorFromUnlock,
 
564
            _run_with_write_locked_target, lockable, self.func_that_returns_ok)
 
565
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
 
566
 
 
567
    def test_error_during_unlock_does_not_mask_original_error(self):
 
568
        lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
 
569
        self.assertRaises(_ErrorFromCallable,
 
570
            _run_with_write_locked_target, lockable, self.func_that_raises)
 
571
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
 
572
 
 
573