~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_branch.py

  • Committer: Vincent Ladeuil
  • Date: 2007-03-14 17:01:25 UTC
  • mto: (2323.7.1 redirection)
  • mto: This revision was merged to the branch mainline in revision 2390.
  • Revision ID: v.ladeuil+lp@free.fr-20070314170125-ywny47g3rz2u4nba
Cleanup

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
"""Tests for the Branch facility that are not interface  tests.
18
18
 
27
27
from bzrlib import (
28
28
    branch as _mod_branch,
29
29
    bzrdir,
30
 
    config,
31
30
    errors,
32
 
    trace,
33
31
    urlutils,
34
32
    )
35
33
from bzrlib.branch import (
39
37
    BranchReferenceFormat,
40
38
    BzrBranch5,
41
39
    BzrBranchFormat5,
42
 
    BzrBranchFormat6,
43
40
    PullResult,
44
 
    _run_with_write_locked_target,
45
41
    )
46
 
from bzrlib.bzrdir import (BzrDirMetaFormat1, BzrDirMeta1,
 
42
from bzrlib.bzrdir import (BzrDirMetaFormat1, BzrDirMeta1, 
47
43
                           BzrDir, BzrDirFormat)
48
44
from bzrlib.errors import (NotBranchError,
49
45
                           UnknownFormatError,
54
50
from bzrlib.tests import TestCase, TestCaseWithTransport
55
51
from bzrlib.transport import get_transport
56
52
 
57
 
 
58
53
class TestDefaultFormat(TestCase):
59
54
 
60
 
    def test_default_format(self):
61
 
        # update this if you change the default branch format
62
 
        self.assertIsInstance(BranchFormat.get_default_format(),
63
 
                BzrBranchFormat6)
64
 
 
65
 
    def test_default_format_is_same_as_bzrdir_default(self):
66
 
        # XXX: it might be nice if there was only one place the default was
67
 
        # set, but at the moment that's not true -- mbp 20070814 --
68
 
        # https://bugs.launchpad.net/bzr/+bug/132376
69
 
        self.assertEqual(BranchFormat.get_default_format(),
70
 
                BzrDirFormat.get_default_format().get_branch_format())
71
 
 
72
55
    def test_get_set_default_format(self):
73
 
        # set the format and then set it back again
74
56
        old_format = BranchFormat.get_default_format()
 
57
        # default is 5
 
58
        self.assertTrue(isinstance(old_format, BzrBranchFormat5))
75
59
        BranchFormat.set_default_format(SampleBranchFormat())
76
60
        try:
77
61
            # the default branch format is used by the meta dir format
109
93
                                   ensure_config_dir_exists)
110
94
        ensure_config_dir_exists()
111
95
        fn = locations_config_filename()
112
 
        # write correct newlines to locations.conf
113
 
        # by default ConfigObj uses native line-endings for new files
114
 
        # but uses already existing line-endings if file is not empty
115
 
        f = open(fn, 'wb')
116
 
        try:
117
 
            f.write('# comment\n')
118
 
        finally:
119
 
            f.close()
120
 
 
121
96
        branch = self.make_branch('.', format='knit')
122
97
        branch.set_push_location('foo')
123
98
        local_path = urlutils.local_path_from_url(branch.base[:-1])
124
 
        self.assertFileEqual("# comment\n"
125
 
                             "[%s]\n"
 
99
        self.assertFileEqual("[%s]\n"
126
100
                             "push_location = foo\n"
127
 
                             "push_location:policy = norecurse\n" % local_path,
 
101
                             "push_location:policy = norecurse" % local_path,
128
102
                             fn)
129
103
 
130
104
    # TODO RBC 20051029 test getting a push location from a branch in a
134
108
class SampleBranchFormat(BranchFormat):
135
109
    """A sample format
136
110
 
137
 
    this format is initializable, unsupported to aid in testing the
 
111
    this format is initializable, unsupported to aid in testing the 
138
112
    open and open_downlevel routines.
139
113
    """
140
114
 
151
125
    def is_supported(self):
152
126
        return False
153
127
 
154
 
    def open(self, transport, _found=False, ignore_fallbacks=False):
 
128
    def open(self, transport, _found=False):
155
129
        return "opened branch."
156
130
 
157
131
 
161
135
    def test_find_format(self):
162
136
        # is the right format object found for a branch?
163
137
        # create a branch with a few known format objects.
164
 
        # this is not quite the same as
 
138
        # this is not quite the same as 
165
139
        self.build_tree(["foo/", "bar/"])
166
140
        def check_format(format, url):
167
141
            dir = format._matchingbzrdir.initialize(url)
170
144
            found_format = BranchFormat.find_format(dir)
171
145
            self.failUnless(isinstance(found_format, format.__class__))
172
146
        check_format(BzrBranchFormat5(), "bar")
173
 
 
 
147
        
174
148
    def test_find_format_not_branch(self):
175
149
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
176
150
        self.assertRaises(NotBranchError,
201
175
        BranchFormat.unregister_format(format)
202
176
        self.make_branch_and_tree('bar')
203
177
 
204
 
 
205
 
class TestBranch67(object):
206
 
    """Common tests for both branch 6 and 7 which are mostly the same."""
207
 
 
208
 
    def get_format_name(self):
209
 
        raise NotImplementedError(self.get_format_name)
210
 
 
211
 
    def get_format_name_subtree(self):
212
 
        raise NotImplementedError(self.get_format_name)
213
 
 
214
 
    def get_class(self):
215
 
        raise NotImplementedError(self.get_class)
 
178
    def test_checkout_format(self):
 
179
        branch = self.make_repository('repository', shared=True)
 
180
        branch = self.make_branch('repository/branch',
 
181
            format='metaweave')
 
182
        tree = branch.create_checkout('checkout')
 
183
        self.assertIs(tree.branch.__class__, _mod_branch.BzrBranch5)
 
184
 
 
185
 
 
186
class TestBranch6(TestCaseWithTransport):
216
187
 
217
188
    def test_creation(self):
218
189
        format = BzrDirMetaFormat1()
219
190
        format.set_branch_format(_mod_branch.BzrBranchFormat6())
220
191
        branch = self.make_branch('a', format=format)
221
 
        self.assertIsInstance(branch, self.get_class())
222
 
        branch = self.make_branch('b', format=self.get_format_name())
223
 
        self.assertIsInstance(branch, self.get_class())
 
192
        self.assertIsInstance(branch, _mod_branch.BzrBranch6)
 
193
        branch = self.make_branch('b', format='dirstate-tags')
 
194
        self.assertIsInstance(branch, _mod_branch.BzrBranch6)
224
195
        branch = _mod_branch.Branch.open('a')
225
 
        self.assertIsInstance(branch, self.get_class())
 
196
        self.assertIsInstance(branch, _mod_branch.BzrBranch6)
226
197
 
227
198
    def test_layout(self):
228
 
        branch = self.make_branch('a', format=self.get_format_name())
 
199
        branch = self.make_branch('a', format='dirstate-tags')
229
200
        self.failUnlessExists('a/.bzr/branch/last-revision')
230
201
        self.failIfExists('a/.bzr/branch/revision-history')
231
 
        self.failIfExists('a/.bzr/branch/references')
232
202
 
233
203
    def test_config(self):
234
204
        """Ensure that all configuration data is stored in the branch"""
235
 
        branch = self.make_branch('a', format=self.get_format_name())
 
205
        branch = self.make_branch('a', format='dirstate-tags')
236
206
        branch.set_parent('http://bazaar-vcs.org')
237
207
        self.failIfExists('a/.bzr/branch/parent')
238
208
        self.assertEqual('http://bazaar-vcs.org', branch.get_parent())
245
215
        self.assertEqual('ftp://bazaar-vcs.org', branch.get_bound_location())
246
216
 
247
217
    def test_set_revision_history(self):
248
 
        builder = self.make_branch_builder('.', format=self.get_format_name())
249
 
        builder.build_snapshot('foo', None,
250
 
            [('add', ('', None, 'directory', None))],
251
 
            message='foo')
252
 
        builder.build_snapshot('bar', None, [], message='bar')
253
 
        branch = builder.get_branch()
254
 
        branch.lock_write()
255
 
        self.addCleanup(branch.unlock)
256
 
        branch.set_revision_history(['foo', 'bar'])
257
 
        branch.set_revision_history(['foo'])
258
 
        self.assertRaises(errors.NotLefthandHistory,
259
 
                          branch.set_revision_history, ['bar'])
 
218
        tree = self.make_branch_and_memory_tree('.',
 
219
            format='dirstate-tags')
 
220
        tree.lock_write()
 
221
        try:
 
222
            tree.add('.')
 
223
            tree.commit('foo', rev_id='foo')
 
224
            tree.commit('bar', rev_id='bar')
 
225
            tree.branch.set_revision_history(['foo', 'bar'])
 
226
            tree.branch.set_revision_history(['foo'])
 
227
            self.assertRaises(errors.NotLefthandHistory,
 
228
                              tree.branch.set_revision_history, ['bar'])
 
229
        finally:
 
230
            tree.unlock()
 
231
 
 
232
    def test_append_revision(self):
 
233
        tree = self.make_branch_and_tree('branch1',
 
234
            format='dirstate-tags')
 
235
        tree.lock_write()
 
236
        try:
 
237
            tree.commit('foo', rev_id='foo')
 
238
            tree.commit('bar', rev_id='bar')
 
239
            tree.commit('baz', rev_id='baz')
 
240
            tree.set_last_revision('bar')
 
241
            tree.branch.set_last_revision_info(2, 'bar')
 
242
            tree.commit('qux', rev_id='qux')
 
243
            tree.add_parent_tree_id('baz')
 
244
            tree.commit('qux', rev_id='quxx')
 
245
            tree.branch.set_last_revision_info(0, 'null:')
 
246
            self.assertRaises(errors.NotLeftParentDescendant,
 
247
                              tree.branch.append_revision, 'bar')
 
248
            tree.branch.append_revision('foo')
 
249
            self.assertRaises(errors.NotLeftParentDescendant,
 
250
                              tree.branch.append_revision, 'baz')
 
251
            tree.branch.append_revision('bar')
 
252
            tree.branch.append_revision('baz')
 
253
            self.assertRaises(errors.NotLeftParentDescendant,
 
254
                              tree.branch.append_revision, 'quxx')
 
255
        finally:
 
256
            tree.unlock()
260
257
 
261
258
    def do_checkout_test(self, lightweight=False):
262
 
        tree = self.make_branch_and_tree('source',
263
 
            format=self.get_format_name_subtree())
 
259
        tree = self.make_branch_and_tree('source', format='dirstate-with-subtree')
264
260
        subtree = self.make_branch_and_tree('source/subtree',
265
 
            format=self.get_format_name_subtree())
 
261
            format='dirstate-with-subtree')
266
262
        subsubtree = self.make_branch_and_tree('source/subtree/subsubtree',
267
 
            format=self.get_format_name_subtree())
 
263
            format='dirstate-with-subtree')
268
264
        self.build_tree(['source/subtree/file',
269
265
                         'source/subtree/subsubtree/file'])
270
266
        subsubtree.add('file')
285
281
        else:
286
282
            self.assertEndsWith(subbranch.base, 'target/subtree/subsubtree/')
287
283
 
 
284
 
288
285
    def test_checkout_with_references(self):
289
286
        self.do_checkout_test()
290
287
 
291
288
    def test_light_checkout_with_references(self):
292
289
        self.do_checkout_test(lightweight=True)
293
290
 
294
 
    def test_set_push(self):
295
 
        branch = self.make_branch('source', format=self.get_format_name())
296
 
        branch.get_config().set_user_option('push_location', 'old',
297
 
            store=config.STORE_LOCATION)
298
 
        warnings = []
299
 
        def warning(*args):
300
 
            warnings.append(args[0] % args[1:])
301
 
        _warning = trace.warning
302
 
        trace.warning = warning
303
 
        try:
304
 
            branch.set_push_location('new')
305
 
        finally:
306
 
            trace.warning = _warning
307
 
        self.assertEqual(warnings[0], 'Value "new" is masked by "old" from '
308
 
                         'locations.conf')
309
 
 
310
 
 
311
 
class TestBranch6(TestBranch67, TestCaseWithTransport):
312
 
 
313
 
    def get_class(self):
314
 
        return _mod_branch.BzrBranch6
315
 
 
316
 
    def get_format_name(self):
317
 
        return "dirstate-tags"
318
 
 
319
 
    def get_format_name_subtree(self):
320
 
        return "dirstate-with-subtree"
321
 
 
322
 
    def test_set_stacked_on_url_errors(self):
323
 
        branch = self.make_branch('a', format=self.get_format_name())
324
 
        self.assertRaises(errors.UnstackableBranchFormat,
325
 
            branch.set_stacked_on_url, None)
326
 
 
327
 
    def test_default_stacked_location(self):
328
 
        branch = self.make_branch('a', format=self.get_format_name())
329
 
        self.assertRaises(errors.UnstackableBranchFormat, branch.get_stacked_on_url)
330
 
 
331
 
 
332
 
class TestBranch7(TestBranch67, TestCaseWithTransport):
333
 
 
334
 
    def get_class(self):
335
 
        return _mod_branch.BzrBranch7
336
 
 
337
 
    def get_format_name(self):
338
 
        return "1.9"
339
 
 
340
 
    def get_format_name_subtree(self):
341
 
        return "development-subtree"
342
 
 
343
 
    def test_set_stacked_on_url_unstackable_repo(self):
344
 
        repo = self.make_repository('a', format='dirstate-tags')
345
 
        control = repo.bzrdir
346
 
        branch = _mod_branch.BzrBranchFormat7().initialize(control)
347
 
        target = self.make_branch('b')
348
 
        self.assertRaises(errors.UnstackableRepositoryFormat,
349
 
            branch.set_stacked_on_url, target.base)
350
 
 
351
 
    def test_clone_stacked_on_unstackable_repo(self):
352
 
        repo = self.make_repository('a', format='dirstate-tags')
353
 
        control = repo.bzrdir
354
 
        branch = _mod_branch.BzrBranchFormat7().initialize(control)
355
 
        # Calling clone should not raise UnstackableRepositoryFormat.
356
 
        cloned_bzrdir = control.clone('cloned')
357
 
 
358
 
    def _test_default_stacked_location(self):
359
 
        branch = self.make_branch('a', format=self.get_format_name())
360
 
        self.assertRaises(errors.NotStacked, branch.get_stacked_on_url)
361
 
 
362
 
    def test_stack_and_unstack(self):
363
 
        branch = self.make_branch('a', format=self.get_format_name())
364
 
        target = self.make_branch_and_tree('b', format=self.get_format_name())
365
 
        branch.set_stacked_on_url(target.branch.base)
366
 
        self.assertEqual(target.branch.base, branch.get_stacked_on_url())
367
 
        revid = target.commit('foo')
368
 
        self.assertTrue(branch.repository.has_revision(revid))
369
 
        branch.set_stacked_on_url(None)
370
 
        self.assertRaises(errors.NotStacked, branch.get_stacked_on_url)
371
 
        self.assertFalse(branch.repository.has_revision(revid))
372
 
 
373
 
    def test_open_opens_stacked_reference(self):
374
 
        branch = self.make_branch('a', format=self.get_format_name())
375
 
        target = self.make_branch_and_tree('b', format=self.get_format_name())
376
 
        branch.set_stacked_on_url(target.branch.base)
377
 
        branch = branch.bzrdir.open_branch()
378
 
        revid = target.commit('foo')
379
 
        self.assertTrue(branch.repository.has_revision(revid))
380
 
 
381
 
 
382
 
class BzrBranch8(TestCaseWithTransport):
383
 
 
384
 
    def make_branch(self, location, format=None):
385
 
        if format is None:
386
 
            format = bzrdir.format_registry.make_bzrdir('1.9')
387
 
            format.set_branch_format(_mod_branch.BzrBranchFormat8())
388
 
        return TestCaseWithTransport.make_branch(self, location, format=format)
389
 
 
390
 
    def create_branch_with_reference(self):
391
 
        branch = self.make_branch('branch')
392
 
        branch._set_all_reference_info({'file-id': ('path', 'location')})
393
 
        return branch
394
 
 
395
 
    @staticmethod
396
 
    def instrument_branch(branch, gets):
397
 
        old_get = branch._transport.get
398
 
        def get(*args, **kwargs):
399
 
            gets.append((args, kwargs))
400
 
            return old_get(*args, **kwargs)
401
 
        branch._transport.get = get
402
 
 
403
 
    def test_reference_info_caching_read_locked(self):
404
 
        gets = []
405
 
        branch = self.create_branch_with_reference()
406
 
        branch.lock_read()
407
 
        self.addCleanup(branch.unlock)
408
 
        self.instrument_branch(branch, gets)
409
 
        branch.get_reference_info('file-id')
410
 
        branch.get_reference_info('file-id')
411
 
        self.assertEqual(1, len(gets))
412
 
 
413
 
    def test_reference_info_caching_read_unlocked(self):
414
 
        gets = []
415
 
        branch = self.create_branch_with_reference()
416
 
        self.instrument_branch(branch, gets)
417
 
        branch.get_reference_info('file-id')
418
 
        branch.get_reference_info('file-id')
419
 
        self.assertEqual(2, len(gets))
420
 
 
421
 
    def test_reference_info_caching_write_locked(self):
422
 
        gets = []
423
 
        branch = self.make_branch('branch')
424
 
        branch.lock_write()
425
 
        self.instrument_branch(branch, gets)
426
 
        self.addCleanup(branch.unlock)
427
 
        branch._set_all_reference_info({'file-id': ('path2', 'location2')})
428
 
        path, location = branch.get_reference_info('file-id')
429
 
        self.assertEqual(0, len(gets))
430
 
        self.assertEqual('path2', path)
431
 
        self.assertEqual('location2', location)
432
 
 
433
 
    def test_reference_info_caches_cleared(self):
434
 
        branch = self.make_branch('branch')
435
 
        branch.lock_write()
436
 
        branch.set_reference_info('file-id', 'path2', 'location2')
437
 
        branch.unlock()
438
 
        doppelganger = Branch.open('branch')
439
 
        doppelganger.set_reference_info('file-id', 'path3', 'location3')
440
 
        self.assertEqual(('path3', 'location3'),
441
 
                         branch.get_reference_info('file-id'))
442
 
 
443
291
class TestBranchReference(TestCaseWithTransport):
444
292
    """Tests for the branch reference facility."""
445
293
 
457
305
        opened_branch = branch_dir.open_branch()
458
306
        self.assertEqual(opened_branch.base, target_branch.base)
459
307
 
460
 
    def test_get_reference(self):
461
 
        """For a BranchReference, get_reference should reutrn the location."""
462
 
        branch = self.make_branch('target')
463
 
        checkout = branch.create_checkout('checkout', lightweight=True)
464
 
        reference_url = branch.bzrdir.root_transport.abspath('') + '/'
465
 
        # if the api for create_checkout changes to return different checkout types
466
 
        # then this file read will fail.
467
 
        self.assertFileEqual(reference_url, 'checkout/.bzr/branch/location')
468
 
        self.assertEqual(reference_url,
469
 
            _mod_branch.BranchReferenceFormat().get_reference(checkout.bzrdir))
470
 
 
471
308
 
472
309
class TestHooks(TestCase):
473
310
 
477
314
        self.assertTrue("set_rh" in hooks, "set_rh not in %s" % hooks)
478
315
        self.assertTrue("post_push" in hooks, "post_push not in %s" % hooks)
479
316
        self.assertTrue("post_commit" in hooks, "post_commit not in %s" % hooks)
480
 
        self.assertTrue("pre_commit" in hooks, "pre_commit not in %s" % hooks)
481
317
        self.assertTrue("post_pull" in hooks, "post_pull not in %s" % hooks)
482
318
        self.assertTrue("post_uncommit" in hooks, "post_uncommit not in %s" % hooks)
483
 
        self.assertTrue("post_change_branch_tip" in hooks,
484
 
                        "post_change_branch_tip not in %s" % hooks)
485
319
 
486
320
    def test_installed_hooks_are_BranchHooks(self):
487
321
        """The installed hooks object should be a BranchHooks."""
488
322
        # the installed hooks are saved in self._preserved_hooks.
489
 
        self.assertIsInstance(self._preserved_hooks[_mod_branch.Branch][1],
490
 
            BranchHooks)
 
323
        self.assertIsInstance(self._preserved_hooks, BranchHooks)
 
324
 
 
325
    def test_install_hook_raises_unknown_hook(self):
 
326
        """install_hook should raise UnknownHook if a hook is unknown."""
 
327
        hooks = BranchHooks()
 
328
        self.assertRaises(UnknownHook, hooks.install_hook, 'silly', None)
 
329
 
 
330
    def test_install_hook_appends_known_hook(self):
 
331
        """install_hook should append the callable for known hooks."""
 
332
        hooks = BranchHooks()
 
333
        hooks.install_hook('set_rh', None)
 
334
        self.assertEqual(hooks['set_rh'], [None])
491
335
 
492
336
 
493
337
class TestPullResult(TestCase):
502
346
        # it's still supported
503
347
        a = "%d revisions pulled" % r
504
348
        self.assertEqual(a, "10 revisions pulled")
505
 
 
506
 
 
507
 
 
508
 
class _StubLockable(object):
509
 
    """Helper for TestRunWithWriteLockedTarget."""
510
 
 
511
 
    def __init__(self, calls, unlock_exc=None):
512
 
        self.calls = calls
513
 
        self.unlock_exc = unlock_exc
514
 
 
515
 
    def lock_write(self):
516
 
        self.calls.append('lock_write')
517
 
 
518
 
    def unlock(self):
519
 
        self.calls.append('unlock')
520
 
        if self.unlock_exc is not None:
521
 
            raise self.unlock_exc
522
 
 
523
 
 
524
 
class _ErrorFromCallable(Exception):
525
 
    """Helper for TestRunWithWriteLockedTarget."""
526
 
 
527
 
 
528
 
class _ErrorFromUnlock(Exception):
529
 
    """Helper for TestRunWithWriteLockedTarget."""
530
 
 
531
 
 
532
 
class TestRunWithWriteLockedTarget(TestCase):
533
 
    """Tests for _run_with_write_locked_target."""
534
 
 
535
 
    def setUp(self):
536
 
        TestCase.setUp(self)
537
 
        self._calls = []
538
 
 
539
 
    def func_that_returns_ok(self):
540
 
        self._calls.append('func called')
541
 
        return 'ok'
542
 
 
543
 
    def func_that_raises(self):
544
 
        self._calls.append('func called')
545
 
        raise _ErrorFromCallable()
546
 
 
547
 
    def test_success_unlocks(self):
548
 
        lockable = _StubLockable(self._calls)
549
 
        result = _run_with_write_locked_target(
550
 
            lockable, self.func_that_returns_ok)
551
 
        self.assertEqual('ok', result)
552
 
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
553
 
 
554
 
    def test_exception_unlocks_and_propagates(self):
555
 
        lockable = _StubLockable(self._calls)
556
 
        self.assertRaises(_ErrorFromCallable,
557
 
            _run_with_write_locked_target, lockable, self.func_that_raises)
558
 
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
559
 
 
560
 
    def test_callable_succeeds_but_error_during_unlock(self):
561
 
        lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
562
 
        self.assertRaises(_ErrorFromUnlock,
563
 
            _run_with_write_locked_target, lockable, self.func_that_returns_ok)
564
 
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
565
 
 
566
 
    def test_error_during_unlock_does_not_mask_original_error(self):
567
 
        lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
568
 
        self.assertRaises(_ErrorFromCallable,
569
 
            _run_with_write_locked_target, lockable, self.func_that_raises)
570
 
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
571
 
 
572