~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_branch.py

  • Committer: Aaron Bentley
  • Date: 2007-08-16 00:54:00 UTC
  • mto: This revision was merged to the branch mainline in revision 2711.
  • Revision ID: aaron.bentley@utoronto.ca-20070816005400-oxxtqiy310wx10h9
Fix typo

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
"""Tests for the Branch facility that are not interface  tests.
18
18
 
39
39
    BranchReferenceFormat,
40
40
    BzrBranch5,
41
41
    BzrBranchFormat5,
42
 
    BzrBranchFormat6,
43
42
    PullResult,
44
 
    _run_with_write_locked_target,
45
43
    )
46
 
from bzrlib.bzrdir import (BzrDirMetaFormat1, BzrDirMeta1,
 
44
from bzrlib.bzrdir import (BzrDirMetaFormat1, BzrDirMeta1, 
47
45
                           BzrDir, BzrDirFormat)
48
46
from bzrlib.errors import (NotBranchError,
49
47
                           UnknownFormatError,
54
52
from bzrlib.tests import TestCase, TestCaseWithTransport
55
53
from bzrlib.transport import get_transport
56
54
 
57
 
 
58
55
class TestDefaultFormat(TestCase):
59
56
 
60
 
    def test_default_format(self):
61
 
        # update this if you change the default branch format
62
 
        self.assertIsInstance(BranchFormat.get_default_format(),
63
 
                BzrBranchFormat6)
64
 
 
65
 
    def test_default_format_is_same_as_bzrdir_default(self):
66
 
        # XXX: it might be nice if there was only one place the default was
67
 
        # set, but at the moment that's not true -- mbp 20070814 --
68
 
        # https://bugs.launchpad.net/bzr/+bug/132376
69
 
        self.assertEqual(BranchFormat.get_default_format(),
70
 
                BzrDirFormat.get_default_format().get_branch_format())
71
 
 
72
57
    def test_get_set_default_format(self):
73
 
        # set the format and then set it back again
74
58
        old_format = BranchFormat.get_default_format()
 
59
        # default is 5
 
60
        self.assertTrue(isinstance(old_format, BzrBranchFormat5))
75
61
        BranchFormat.set_default_format(SampleBranchFormat())
76
62
        try:
77
63
            # the default branch format is used by the meta dir format
109
95
                                   ensure_config_dir_exists)
110
96
        ensure_config_dir_exists()
111
97
        fn = locations_config_filename()
112
 
        # write correct newlines to locations.conf
113
 
        # by default ConfigObj uses native line-endings for new files
114
 
        # but uses already existing line-endings if file is not empty
115
 
        f = open(fn, 'wb')
116
 
        try:
117
 
            f.write('# comment\n')
118
 
        finally:
119
 
            f.close()
120
 
 
121
98
        branch = self.make_branch('.', format='knit')
122
99
        branch.set_push_location('foo')
123
100
        local_path = urlutils.local_path_from_url(branch.base[:-1])
124
 
        self.assertFileEqual("# comment\n"
125
 
                             "[%s]\n"
 
101
        self.assertFileEqual("[%s]\n"
126
102
                             "push_location = foo\n"
127
 
                             "push_location:policy = norecurse\n" % local_path,
 
103
                             "push_location:policy = norecurse" % local_path,
128
104
                             fn)
129
105
 
130
106
    # TODO RBC 20051029 test getting a push location from a branch in a
134
110
class SampleBranchFormat(BranchFormat):
135
111
    """A sample format
136
112
 
137
 
    this format is initializable, unsupported to aid in testing the
 
113
    this format is initializable, unsupported to aid in testing the 
138
114
    open and open_downlevel routines.
139
115
    """
140
116
 
151
127
    def is_supported(self):
152
128
        return False
153
129
 
154
 
    def open(self, transport, _found=False, ignore_fallbacks=False):
 
130
    def open(self, transport, _found=False):
155
131
        return "opened branch."
156
132
 
157
133
 
161
137
    def test_find_format(self):
162
138
        # is the right format object found for a branch?
163
139
        # create a branch with a few known format objects.
164
 
        # this is not quite the same as
 
140
        # this is not quite the same as 
165
141
        self.build_tree(["foo/", "bar/"])
166
142
        def check_format(format, url):
167
143
            dir = format._matchingbzrdir.initialize(url)
170
146
            found_format = BranchFormat.find_format(dir)
171
147
            self.failUnless(isinstance(found_format, format.__class__))
172
148
        check_format(BzrBranchFormat5(), "bar")
173
 
 
 
149
        
174
150
    def test_find_format_not_branch(self):
175
151
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
176
152
        self.assertRaises(NotBranchError,
202
178
        self.make_branch_and_tree('bar')
203
179
 
204
180
 
205
 
class TestBranch67(object):
206
 
    """Common tests for both branch 6 and 7 which are mostly the same."""
207
 
 
208
 
    def get_format_name(self):
209
 
        raise NotImplementedError(self.get_format_name)
210
 
 
211
 
    def get_format_name_subtree(self):
212
 
        raise NotImplementedError(self.get_format_name)
213
 
 
214
 
    def get_class(self):
215
 
        raise NotImplementedError(self.get_class)
 
181
class TestBranch6(TestCaseWithTransport):
216
182
 
217
183
    def test_creation(self):
218
184
        format = BzrDirMetaFormat1()
219
185
        format.set_branch_format(_mod_branch.BzrBranchFormat6())
220
186
        branch = self.make_branch('a', format=format)
221
 
        self.assertIsInstance(branch, self.get_class())
222
 
        branch = self.make_branch('b', format=self.get_format_name())
223
 
        self.assertIsInstance(branch, self.get_class())
 
187
        self.assertIsInstance(branch, _mod_branch.BzrBranch6)
 
188
        branch = self.make_branch('b', format='dirstate-tags')
 
189
        self.assertIsInstance(branch, _mod_branch.BzrBranch6)
224
190
        branch = _mod_branch.Branch.open('a')
225
 
        self.assertIsInstance(branch, self.get_class())
 
191
        self.assertIsInstance(branch, _mod_branch.BzrBranch6)
226
192
 
227
193
    def test_layout(self):
228
 
        branch = self.make_branch('a', format=self.get_format_name())
 
194
        branch = self.make_branch('a', format='dirstate-tags')
229
195
        self.failUnlessExists('a/.bzr/branch/last-revision')
230
196
        self.failIfExists('a/.bzr/branch/revision-history')
231
 
        self.failIfExists('a/.bzr/branch/references')
232
197
 
233
198
    def test_config(self):
234
199
        """Ensure that all configuration data is stored in the branch"""
235
 
        branch = self.make_branch('a', format=self.get_format_name())
 
200
        branch = self.make_branch('a', format='dirstate-tags')
236
201
        branch.set_parent('http://bazaar-vcs.org')
237
202
        self.failIfExists('a/.bzr/branch/parent')
238
203
        self.assertEqual('http://bazaar-vcs.org', branch.get_parent())
245
210
        self.assertEqual('ftp://bazaar-vcs.org', branch.get_bound_location())
246
211
 
247
212
    def test_set_revision_history(self):
248
 
        builder = self.make_branch_builder('.', format=self.get_format_name())
249
 
        builder.build_snapshot('foo', None,
250
 
            [('add', ('', None, 'directory', None))],
251
 
            message='foo')
252
 
        builder.build_snapshot('bar', None, [], message='bar')
253
 
        branch = builder.get_branch()
254
 
        branch.lock_write()
255
 
        self.addCleanup(branch.unlock)
256
 
        branch.set_revision_history(['foo', 'bar'])
257
 
        branch.set_revision_history(['foo'])
258
 
        self.assertRaises(errors.NotLefthandHistory,
259
 
                          branch.set_revision_history, ['bar'])
 
213
        tree = self.make_branch_and_memory_tree('.',
 
214
            format='dirstate-tags')
 
215
        tree.lock_write()
 
216
        try:
 
217
            tree.add('.')
 
218
            tree.commit('foo', rev_id='foo')
 
219
            tree.commit('bar', rev_id='bar')
 
220
            tree.branch.set_revision_history(['foo', 'bar'])
 
221
            tree.branch.set_revision_history(['foo'])
 
222
            self.assertRaises(errors.NotLefthandHistory,
 
223
                              tree.branch.set_revision_history, ['bar'])
 
224
        finally:
 
225
            tree.unlock()
 
226
 
 
227
    def test_append_revision(self):
 
228
        tree = self.make_branch_and_tree('branch1',
 
229
            format='dirstate-tags')
 
230
        tree.lock_write()
 
231
        try:
 
232
            tree.commit('foo', rev_id='foo')
 
233
            tree.commit('bar', rev_id='bar')
 
234
            tree.commit('baz', rev_id='baz')
 
235
            tree.set_last_revision('bar')
 
236
            tree.branch.set_last_revision_info(2, 'bar')
 
237
            tree.commit('qux', rev_id='qux')
 
238
            tree.add_parent_tree_id('baz')
 
239
            tree.commit('qux', rev_id='quxx')
 
240
            tree.branch.set_last_revision_info(0, 'null:')
 
241
            self.assertRaises(errors.NotLeftParentDescendant,
 
242
                              tree.branch.append_revision, 'bar')
 
243
            tree.branch.append_revision('foo')
 
244
            self.assertRaises(errors.NotLeftParentDescendant,
 
245
                              tree.branch.append_revision, 'baz')
 
246
            tree.branch.append_revision('bar')
 
247
            tree.branch.append_revision('baz')
 
248
            self.assertRaises(errors.NotLeftParentDescendant,
 
249
                              tree.branch.append_revision, 'quxx')
 
250
        finally:
 
251
            tree.unlock()
260
252
 
261
253
    def do_checkout_test(self, lightweight=False):
262
 
        tree = self.make_branch_and_tree('source',
263
 
            format=self.get_format_name_subtree())
 
254
        tree = self.make_branch_and_tree('source', format='dirstate-with-subtree')
264
255
        subtree = self.make_branch_and_tree('source/subtree',
265
 
            format=self.get_format_name_subtree())
 
256
            format='dirstate-with-subtree')
266
257
        subsubtree = self.make_branch_and_tree('source/subtree/subsubtree',
267
 
            format=self.get_format_name_subtree())
 
258
            format='dirstate-with-subtree')
268
259
        self.build_tree(['source/subtree/file',
269
260
                         'source/subtree/subsubtree/file'])
270
261
        subsubtree.add('file')
285
276
        else:
286
277
            self.assertEndsWith(subbranch.base, 'target/subtree/subsubtree/')
287
278
 
 
279
 
288
280
    def test_checkout_with_references(self):
289
281
        self.do_checkout_test()
290
282
 
292
284
        self.do_checkout_test(lightweight=True)
293
285
 
294
286
    def test_set_push(self):
295
 
        branch = self.make_branch('source', format=self.get_format_name())
 
287
        branch = self.make_branch('source', format='dirstate-tags')
296
288
        branch.get_config().set_user_option('push_location', 'old',
297
289
            store=config.STORE_LOCATION)
298
290
        warnings = []
307
299
        self.assertEqual(warnings[0], 'Value "new" is masked by "old" from '
308
300
                         'locations.conf')
309
301
 
310
 
 
311
 
class TestBranch6(TestBranch67, TestCaseWithTransport):
312
 
 
313
 
    def get_class(self):
314
 
        return _mod_branch.BzrBranch6
315
 
 
316
 
    def get_format_name(self):
317
 
        return "dirstate-tags"
318
 
 
319
 
    def get_format_name_subtree(self):
320
 
        return "dirstate-with-subtree"
321
 
 
322
 
    def test_set_stacked_on_url_errors(self):
323
 
        branch = self.make_branch('a', format=self.get_format_name())
324
 
        self.assertRaises(errors.UnstackableBranchFormat,
325
 
            branch.set_stacked_on_url, None)
326
 
 
327
 
    def test_default_stacked_location(self):
328
 
        branch = self.make_branch('a', format=self.get_format_name())
329
 
        self.assertRaises(errors.UnstackableBranchFormat, branch.get_stacked_on_url)
330
 
 
331
 
 
332
 
class TestBranch7(TestBranch67, TestCaseWithTransport):
333
 
 
334
 
    def get_class(self):
335
 
        return _mod_branch.BzrBranch7
336
 
 
337
 
    def get_format_name(self):
338
 
        return "1.9"
339
 
 
340
 
    def get_format_name_subtree(self):
341
 
        return "development-subtree"
342
 
 
343
 
    def test_set_stacked_on_url_unstackable_repo(self):
344
 
        repo = self.make_repository('a', format='dirstate-tags')
345
 
        control = repo.bzrdir
346
 
        branch = _mod_branch.BzrBranchFormat7().initialize(control)
347
 
        target = self.make_branch('b')
348
 
        self.assertRaises(errors.UnstackableRepositoryFormat,
349
 
            branch.set_stacked_on_url, target.base)
350
 
 
351
 
    def test_clone_stacked_on_unstackable_repo(self):
352
 
        repo = self.make_repository('a', format='dirstate-tags')
353
 
        control = repo.bzrdir
354
 
        branch = _mod_branch.BzrBranchFormat7().initialize(control)
355
 
        # Calling clone should not raise UnstackableRepositoryFormat.
356
 
        cloned_bzrdir = control.clone('cloned')
357
 
 
358
 
    def _test_default_stacked_location(self):
359
 
        branch = self.make_branch('a', format=self.get_format_name())
360
 
        self.assertRaises(errors.NotStacked, branch.get_stacked_on_url)
361
 
 
362
 
    def test_stack_and_unstack(self):
363
 
        branch = self.make_branch('a', format=self.get_format_name())
364
 
        target = self.make_branch_and_tree('b', format=self.get_format_name())
365
 
        branch.set_stacked_on_url(target.branch.base)
366
 
        self.assertEqual(target.branch.base, branch.get_stacked_on_url())
367
 
        revid = target.commit('foo')
368
 
        self.assertTrue(branch.repository.has_revision(revid))
369
 
        branch.set_stacked_on_url(None)
370
 
        self.assertRaises(errors.NotStacked, branch.get_stacked_on_url)
371
 
        self.assertFalse(branch.repository.has_revision(revid))
372
 
 
373
 
    def test_open_opens_stacked_reference(self):
374
 
        branch = self.make_branch('a', format=self.get_format_name())
375
 
        target = self.make_branch_and_tree('b', format=self.get_format_name())
376
 
        branch.set_stacked_on_url(target.branch.base)
377
 
        branch = branch.bzrdir.open_branch()
378
 
        revid = target.commit('foo')
379
 
        self.assertTrue(branch.repository.has_revision(revid))
380
 
 
381
 
 
382
 
class BzrBranch8(TestCaseWithTransport):
383
 
 
384
 
    def make_branch(self, location, format=None):
385
 
        if format is None:
386
 
            format = bzrdir.format_registry.make_bzrdir('1.9')
387
 
            format.set_branch_format(_mod_branch.BzrBranchFormat8())
388
 
        return TestCaseWithTransport.make_branch(self, location, format=format)
389
 
 
390
 
    def create_branch_with_reference(self):
391
 
        branch = self.make_branch('branch')
392
 
        branch._set_all_reference_info({'file-id': ('path', 'location')})
393
 
        return branch
394
 
 
395
 
    @staticmethod
396
 
    def instrument_branch(branch, gets):
397
 
        old_get = branch._transport.get
398
 
        def get(*args, **kwargs):
399
 
            gets.append((args, kwargs))
400
 
            return old_get(*args, **kwargs)
401
 
        branch._transport.get = get
402
 
 
403
 
    def test_reference_info_caching_read_locked(self):
404
 
        gets = []
405
 
        branch = self.create_branch_with_reference()
406
 
        branch.lock_read()
407
 
        self.addCleanup(branch.unlock)
408
 
        self.instrument_branch(branch, gets)
409
 
        branch.get_reference_info('file-id')
410
 
        branch.get_reference_info('file-id')
411
 
        self.assertEqual(1, len(gets))
412
 
 
413
 
    def test_reference_info_caching_read_unlocked(self):
414
 
        gets = []
415
 
        branch = self.create_branch_with_reference()
416
 
        self.instrument_branch(branch, gets)
417
 
        branch.get_reference_info('file-id')
418
 
        branch.get_reference_info('file-id')
419
 
        self.assertEqual(2, len(gets))
420
 
 
421
 
    def test_reference_info_caching_write_locked(self):
422
 
        gets = []
423
 
        branch = self.make_branch('branch')
424
 
        branch.lock_write()
425
 
        self.instrument_branch(branch, gets)
426
 
        self.addCleanup(branch.unlock)
427
 
        branch._set_all_reference_info({'file-id': ('path2', 'location2')})
428
 
        path, location = branch.get_reference_info('file-id')
429
 
        self.assertEqual(0, len(gets))
430
 
        self.assertEqual('path2', path)
431
 
        self.assertEqual('location2', location)
432
 
 
433
 
    def test_reference_info_caches_cleared(self):
434
 
        branch = self.make_branch('branch')
435
 
        branch.lock_write()
436
 
        branch.set_reference_info('file-id', 'path2', 'location2')
437
 
        branch.unlock()
438
 
        doppelganger = Branch.open('branch')
439
 
        doppelganger.set_reference_info('file-id', 'path3', 'location3')
440
 
        self.assertEqual(('path3', 'location3'),
441
 
                         branch.get_reference_info('file-id'))
442
 
 
443
302
class TestBranchReference(TestCaseWithTransport):
444
303
    """Tests for the branch reference facility."""
445
304
 
477
336
        self.assertTrue("set_rh" in hooks, "set_rh not in %s" % hooks)
478
337
        self.assertTrue("post_push" in hooks, "post_push not in %s" % hooks)
479
338
        self.assertTrue("post_commit" in hooks, "post_commit not in %s" % hooks)
480
 
        self.assertTrue("pre_commit" in hooks, "pre_commit not in %s" % hooks)
481
339
        self.assertTrue("post_pull" in hooks, "post_pull not in %s" % hooks)
482
340
        self.assertTrue("post_uncommit" in hooks, "post_uncommit not in %s" % hooks)
483
 
        self.assertTrue("post_change_branch_tip" in hooks,
484
 
                        "post_change_branch_tip not in %s" % hooks)
485
341
 
486
342
    def test_installed_hooks_are_BranchHooks(self):
487
343
        """The installed hooks object should be a BranchHooks."""
488
344
        # the installed hooks are saved in self._preserved_hooks.
489
 
        self.assertIsInstance(self._preserved_hooks[_mod_branch.Branch][1],
490
 
            BranchHooks)
 
345
        self.assertIsInstance(self._preserved_hooks[_mod_branch.Branch], BranchHooks)
491
346
 
492
347
 
493
348
class TestPullResult(TestCase):
502
357
        # it's still supported
503
358
        a = "%d revisions pulled" % r
504
359
        self.assertEqual(a, "10 revisions pulled")
505
 
 
506
 
 
507
 
 
508
 
class _StubLockable(object):
509
 
    """Helper for TestRunWithWriteLockedTarget."""
510
 
 
511
 
    def __init__(self, calls, unlock_exc=None):
512
 
        self.calls = calls
513
 
        self.unlock_exc = unlock_exc
514
 
 
515
 
    def lock_write(self):
516
 
        self.calls.append('lock_write')
517
 
 
518
 
    def unlock(self):
519
 
        self.calls.append('unlock')
520
 
        if self.unlock_exc is not None:
521
 
            raise self.unlock_exc
522
 
 
523
 
 
524
 
class _ErrorFromCallable(Exception):
525
 
    """Helper for TestRunWithWriteLockedTarget."""
526
 
 
527
 
 
528
 
class _ErrorFromUnlock(Exception):
529
 
    """Helper for TestRunWithWriteLockedTarget."""
530
 
 
531
 
 
532
 
class TestRunWithWriteLockedTarget(TestCase):
533
 
    """Tests for _run_with_write_locked_target."""
534
 
 
535
 
    def setUp(self):
536
 
        TestCase.setUp(self)
537
 
        self._calls = []
538
 
 
539
 
    def func_that_returns_ok(self):
540
 
        self._calls.append('func called')
541
 
        return 'ok'
542
 
 
543
 
    def func_that_raises(self):
544
 
        self._calls.append('func called')
545
 
        raise _ErrorFromCallable()
546
 
 
547
 
    def test_success_unlocks(self):
548
 
        lockable = _StubLockable(self._calls)
549
 
        result = _run_with_write_locked_target(
550
 
            lockable, self.func_that_returns_ok)
551
 
        self.assertEqual('ok', result)
552
 
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
553
 
 
554
 
    def test_exception_unlocks_and_propagates(self):
555
 
        lockable = _StubLockable(self._calls)
556
 
        self.assertRaises(_ErrorFromCallable,
557
 
            _run_with_write_locked_target, lockable, self.func_that_raises)
558
 
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
559
 
 
560
 
    def test_callable_succeeds_but_error_during_unlock(self):
561
 
        lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
562
 
        self.assertRaises(_ErrorFromUnlock,
563
 
            _run_with_write_locked_target, lockable, self.func_that_returns_ok)
564
 
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
565
 
 
566
 
    def test_error_during_unlock_does_not_mask_original_error(self):
567
 
        lockable = _StubLockable(self._calls, unlock_exc=_ErrorFromUnlock())
568
 
        self.assertRaises(_ErrorFromCallable,
569
 
            _run_with_write_locked_target, lockable, self.func_that_raises)
570
 
        self.assertEqual(['lock_write', 'func called', 'unlock'], self._calls)
571
 
 
572