~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/per_branch/test_branch.py

  • Committer: Tarmac
  • Author(s): Vincent Ladeuil
  • Date: 2017-01-30 14:42:05 UTC
  • mfrom: (6620.1.1 trunk)
  • Revision ID: tarmac-20170130144205-r8fh2xpmiuxyozpv
Merge  2.7 into trunk including fix for bug #1657238 [r=vila]

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011 Canonical Ltd
 
1
# Copyright (C) 2005-2012, 2016 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
"""Tests for branch implementations - tests a branch format."""
18
18
 
 
19
import contextlib
 
20
 
19
21
from bzrlib import (
20
22
    branch as _mod_branch,
21
 
    bzrdir,
 
23
    controldir,
22
24
    config,
23
25
    delta as _mod_delta,
24
26
    errors,
25
 
    gpg,
26
27
    merge,
 
28
    osutils,
27
29
    urlutils,
 
30
    transform,
28
31
    transport,
29
32
    remote,
30
33
    repository,
31
34
    revision,
 
35
    shelf,
32
36
    symbol_versioning,
33
37
    tests,
34
38
    )
76
80
        br = self.get_branch()
77
81
        br.fetch(wt.branch)
78
82
        br.generate_revision_history('rev3')
79
 
        rh = br.revision_history()
80
 
        self.assertEqual(['rev1', 'rev2', 'rev3'], rh)
81
 
        for revision_id in rh:
 
83
        for revision_id in ['rev3', 'rev2', 'rev1']:
82
84
            self.assertIsInstance(revision_id, str)
83
85
        last = br.last_revision()
84
86
        self.assertEqual('rev3', last)
114
116
        tree_a.add('vla', 'file2')
115
117
        tree_a.commit('rev2', rev_id='rev2')
116
118
 
117
 
        delta = tree_a.branch.get_revision_delta(1)
 
119
        delta = self.applyDeprecated(symbol_versioning.deprecated_in(
 
120
            (2, 5, 0)), tree_a.branch.get_revision_delta, 1)
118
121
        self.assertIsInstance(delta, _mod_delta.TreeDelta)
119
122
        self.assertEqual([('foo', 'file1', 'file')], delta.added)
120
 
        delta = tree_a.branch.get_revision_delta(2)
 
123
        delta = self.applyDeprecated(symbol_versioning.deprecated_in(
 
124
            (2, 5, 0)), tree_a.branch.get_revision_delta, 2)
121
125
        self.assertIsInstance(delta, _mod_delta.TreeDelta)
122
126
        self.assertEqual([('vla', 'file2', 'file')], delta.added)
123
127
 
215
219
    def test_record_initial_ghost(self):
216
220
        """Branches should support having ghosts."""
217
221
        wt = self.make_branch_and_tree('.')
 
222
        if not wt.branch.repository._format.supports_ghosts:
 
223
            raise tests.TestNotApplicable("repository format does not "
 
224
                "support ghosts")
218
225
        wt.set_parent_ids(['non:existent@rev--ision--0--2'],
219
226
            allow_leftmost_as_ghost=True)
220
227
        self.assertEqual(['non:existent@rev--ision--0--2'],
228
235
    def test_record_two_ghosts(self):
229
236
        """Recording with all ghosts works."""
230
237
        wt = self.make_branch_and_tree('.')
 
238
        if not wt.branch.repository._format.supports_ghosts:
 
239
            raise tests.TestNotApplicable("repository format does not "
 
240
                "support ghosts")
231
241
        wt.set_parent_ids([
232
242
                'foo@azkhazan-123123-abcabc',
233
243
                'wibble@fofof--20050401--1928390812',
245
255
                          self.get_branch().repository.get_revision,
246
256
                          None)
247
257
 
248
 
# TODO 20051003 RBC:
249
 
# compare the gpg-to-sign info for a commit with a ghost and
250
 
#     an identical tree without a ghost
251
 
# fetch missing should rewrite the TOC of weaves to list newly available parents.
252
 
 
253
 
    def test_sign_existing_revision(self):
254
 
        wt = self.make_branch_and_tree('.')
255
 
        branch = wt.branch
256
 
        wt.commit("base", allow_pointless=True, rev_id='A')
257
 
        from bzrlib.testament import Testament
258
 
        strategy = gpg.LoopbackGPGStrategy(None)
259
 
        branch.repository.lock_write()
260
 
        branch.repository.start_write_group()
261
 
        branch.repository.sign_revision('A', strategy)
262
 
        branch.repository.commit_write_group()
263
 
        branch.repository.unlock()
264
 
        self.assertEqual('-----BEGIN PSEUDO-SIGNED CONTENT-----\n' +
265
 
                         Testament.from_revision(branch.repository,
266
 
                         'A').as_short_text() +
267
 
                         '-----END PSEUDO-SIGNED CONTENT-----\n',
268
 
                         branch.repository.get_signature_text('A'))
269
 
 
270
 
    def test_store_signature(self):
271
 
        wt = self.make_branch_and_tree('.')
272
 
        branch = wt.branch
273
 
        branch.lock_write()
274
 
        try:
275
 
            branch.repository.start_write_group()
276
 
            try:
277
 
                branch.repository.store_revision_signature(
278
 
                    gpg.LoopbackGPGStrategy(None), 'FOO', 'A')
279
 
            except:
280
 
                branch.repository.abort_write_group()
281
 
                raise
282
 
            else:
283
 
                branch.repository.commit_write_group()
284
 
        finally:
285
 
            branch.unlock()
286
 
        # A signature without a revision should not be accessible.
287
 
        self.assertRaises(errors.NoSuchRevision,
288
 
                          branch.repository.has_signature_for_revision_id,
289
 
                          'A')
290
 
        wt.commit("base", allow_pointless=True, rev_id='A')
291
 
        self.assertEqual('-----BEGIN PSEUDO-SIGNED CONTENT-----\n'
292
 
                         'FOO-----END PSEUDO-SIGNED CONTENT-----\n',
293
 
                         branch.repository.get_signature_text('A'))
294
 
 
295
 
    def test_branch_keeps_signatures(self):
296
 
        wt = self.make_branch_and_tree('source')
297
 
        wt.commit('A', allow_pointless=True, rev_id='A')
298
 
        repo = wt.branch.repository
299
 
        repo.lock_write()
300
 
        repo.start_write_group()
301
 
        repo.sign_revision('A', gpg.LoopbackGPGStrategy(None))
302
 
        repo.commit_write_group()
303
 
        repo.unlock()
304
 
        #FIXME: clone should work to urls,
305
 
        # wt.clone should work to disks.
306
 
        self.build_tree(['target/'])
307
 
        d2 = repo.bzrdir.clone(urlutils.local_path_to_url('target'))
308
 
        self.assertEqual(repo.get_signature_text('A'),
309
 
                         d2.open_repository().get_signature_text('A'))
310
 
 
311
258
    def test_nicks_bzr(self):
312
259
        """Test the behaviour of branch nicks specific to bzr branches.
313
260
 
374
321
            repo = self.make_repository('.', shared=True)
375
322
        except errors.IncompatibleFormat:
376
323
            return
377
 
        self.assertEquals(0, len(repo.bzrdir.list_branches()))
 
324
        if repo.bzrdir._format.colocated_branches:
 
325
            raise tests.TestNotApplicable(
 
326
                "control dir does not support colocated branches")
 
327
        self.assertEqual(0, len(repo.bzrdir.list_branches()))
 
328
        if not self.bzrdir_format.colocated_branches:
 
329
            raise tests.TestNotApplicable("control dir format does not support "
 
330
                "colocated branches")
378
331
        try:
379
332
            child_branch1 = self.branch_format.initialize(repo.bzrdir, 
380
333
                name='branch1')
381
 
        except (errors.UninitializableFormat, errors.NoColocatedBranchSupport):
 
334
        except errors.UninitializableFormat:
382
335
            # branch references are not default init'able and
383
336
            # not all bzrdirs support colocated branches.
384
337
            return
385
 
        self.assertEquals(1, len(repo.bzrdir.list_branches()))
 
338
        self.assertEqual(1, len(repo.bzrdir.list_branches()))
386
339
        self.branch_format.initialize(repo.bzrdir, name='branch2')
387
 
        self.assertEquals(2, len(repo.bzrdir.list_branches()))
 
340
        self.assertEqual(2, len(repo.bzrdir.list_branches()))
388
341
 
389
342
    def test_create_append_revisions_only(self):
390
343
        try:
399
352
                # branch references are not default init'able and
400
353
                # not all branches support append_revisions_only
401
354
                return
402
 
            self.assertEquals(True, branch.get_append_revisions_only())
 
355
            self.assertEqual(True, branch.get_append_revisions_only())
403
356
            repo.bzrdir.destroy_branch()
404
357
 
405
358
    def test_get_set_append_revisions_only(self):
418
371
        try:
419
372
            repo = self.make_repository('.', shared=True)
420
373
        except errors.IncompatibleFormat:
421
 
            return
 
374
            raise tests.TestNotApplicable("requires shared repository support")
422
375
        child_transport = repo.bzrdir.root_transport.clone('child')
423
376
        child_transport.mkdir('.')
424
377
        try:
425
378
            child_dir = self.bzrdir_format.initialize_on_transport(child_transport)
426
379
        except errors.UninitializableFormat:
427
 
            return
 
380
            raise tests.TestNotApplicable("control dir format not initializable")
428
381
        try:
429
382
            child_branch = self.branch_format.initialize(child_dir)
430
383
        except errors.UninitializableFormat:
453
406
        """Create a fake revision history easily."""
454
407
        tree = self.make_branch_and_tree('.')
455
408
        rev1 = tree.commit('foo')
456
 
        orig_history = tree.branch.revision_history()
 
409
        tree.lock_write()
 
410
        self.addCleanup(tree.unlock)
 
411
        graph = tree.branch.repository.get_graph()
 
412
        orig_history = list(
 
413
            graph.iter_lefthand_ancestry(
 
414
                tree.branch.last_revision(), [revision.NULL_REVISION]))
457
415
        rev2 = tree.commit('bar', allow_pointless=True)
458
416
        tree.branch.generate_revision_history(rev1)
459
 
        self.assertEqual(orig_history, tree.branch.revision_history())
 
417
        self.assertEqual(orig_history, list(
 
418
            graph.iter_lefthand_ancestry(
 
419
                tree.branch.last_revision(), [revision.NULL_REVISION])))
460
420
 
461
421
    def test_generate_revision_history_NULL_REVISION(self):
462
422
        tree = self.make_branch_and_tree('.')
463
423
        rev1 = tree.commit('foo')
 
424
        tree.lock_write()
 
425
        self.addCleanup(tree.unlock)
464
426
        tree.branch.generate_revision_history(revision.NULL_REVISION)
465
 
        self.assertEqual([], tree.branch.revision_history())
 
427
        self.assertEqual(revision.NULL_REVISION, tree.branch.last_revision())
466
428
 
467
429
    def test_create_checkout(self):
468
430
        tree_a = self.make_branch_and_tree('a')
489
451
        tree_a = self.make_branch_and_tree('a')
490
452
        rev_id = tree_a.commit('put some content in the branch')
491
453
        # open the branch via a readonly transport
492
 
        source_branch = _mod_branch.Branch.open(self.get_readonly_url('a'))
 
454
        url = self.get_readonly_url(urlutils.basename(tree_a.branch.base))
 
455
        t = transport.get_transport_from_url(url)
 
456
        if not tree_a.branch.bzrdir._format.supports_transport(t):
 
457
            raise tests.TestNotApplicable("format does not support transport")
 
458
        source_branch = _mod_branch.Branch.open(url)
493
459
        # sanity check that the test will be valid
494
460
        self.assertRaises((errors.LockError, errors.TransportNotPossible),
495
461
            source_branch.lock_write)
501
467
        tree_a = self.make_branch_and_tree('a')
502
468
        rev_id = tree_a.commit('put some content in the branch')
503
469
        # open the branch via a readonly transport
504
 
        source_branch = _mod_branch.Branch.open(self.get_readonly_url('a'))
 
470
        url = self.get_readonly_url(
 
471
            osutils.basename(tree_a.branch.base.rstrip('/')))
 
472
        t = transport.get_transport_from_url(url)
 
473
        if not tree_a.branch.bzrdir._format.supports_transport(t):
 
474
            raise tests.TestNotApplicable("format does not support transport")
 
475
        source_branch = _mod_branch.Branch.open(url)
505
476
        # sanity check that the test will be valid
506
477
        self.assertRaises((errors.LockError, errors.TransportNotPossible),
507
478
            source_branch.lock_write)
508
479
        checkout = source_branch.create_checkout('c')
509
480
        self.assertEqual(rev_id, checkout.last_revision())
510
481
 
511
 
    def test_set_revision_history(self):
512
 
        tree = self.make_branch_and_tree('a')
513
 
        tree.commit('a commit', rev_id='rev1')
514
 
        br = tree.branch
515
 
        self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
516
 
            br.set_revision_history, ["rev1"])
517
 
        self.assertEquals(br.revision_history(), ["rev1"])
518
 
        self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
519
 
            br.set_revision_history, [])
520
 
        self.assertEquals(br.revision_history(), [])
521
 
 
522
482
    def test_heads_to_fetch(self):
523
483
        # heads_to_fetch is a method that returns a collection of revids that
524
484
        # need to be fetched to copy this branch into another repo.  At a
561
521
            looked_up_format = registry.get(network_name)
562
522
            self.assertEqual(format.__class__, looked_up_format.__class__)
563
523
 
564
 
    def get_get_config_calls(self):
 
524
    def test_get_config_calls(self):
565
525
        # Smoke test that all branch succeed getting a config
566
526
        br = self.make_branch('.')
567
527
        br.get_config()
589
549
                          _mod_branch.Branch.open_containing,
590
550
                          self.get_readonly_url('g/p/q'))
591
551
        branch = self.make_branch('.')
 
552
        if not branch.bzrdir._format.supports_transport(
 
553
            transport.get_transport_from_url(self.get_readonly_url('.'))):
 
554
            raise tests.TestNotApplicable("format does not support transport")
592
555
        branch, relpath = _mod_branch.Branch.open_containing(
593
556
            self.get_readonly_url(''))
594
557
        self.assertEqual('', relpath)
686
649
 
687
650
    def test_get_child_submit_format(self):
688
651
        branch = self.get_branch()
689
 
        branch.get_config().set_user_option('child_submit_format', '10')
 
652
        branch.get_config_stack().set('child_submit_format', '10')
690
653
        branch = self.get_branch()
691
654
        self.assertEqual('10', branch.get_child_submit_format())
692
655
 
738
701
        self.assertIsInstance(made_branch, _mod_branch.Branch)
739
702
 
740
703
        # find it via bzrdir opening:
741
 
        opened_control = bzrdir.BzrDir.open(readonly_t.base)
 
704
        opened_control = controldir.ControlDir.open(readonly_t.base)
742
705
        direct_opened_branch = opened_control.open_branch()
743
706
        self.assertEqual(direct_opened_branch.__class__, made_branch.__class__)
744
707
        self.assertEqual(opened_control, direct_opened_branch.bzrdir)
809
772
        branch.unbind()
810
773
        self.assertEqual(None, branch.get_master_branch())
811
774
 
812
 
    def test_unlocked_does_not_cache_master_branch(self):
813
 
        """Unlocked branches do not cache the result of get_master_branch."""
814
 
        master = self.make_branch('master')
815
 
        branch1 = self.make_branch('branch')
816
 
        try:
817
 
            branch1.bind(master)
818
 
        except errors.UpgradeRequired:
819
 
            raise tests.TestNotApplicable('Format does not support binding')
820
 
        # Open branch1 again
821
 
        branch2 = branch1.bzrdir.open_branch()
822
 
        self.assertNotEqual(None, branch1.get_master_branch())
823
 
        # Unbind the branch via branch2.  branch1 isn't locked so will
824
 
        # immediately return the new value for get_master_branch.
825
 
        branch2.unbind()
826
 
        self.assertEqual(None, branch1.get_master_branch())
827
 
 
828
775
    def test_bind_clears_cached_master_branch(self):
829
776
        """b.bind clears any cached value of b.get_master_branch."""
830
777
        master1 = self.make_branch('master1')
895
842
    def test_fallbacks_not_opened(self):
896
843
        stacked = self.make_branch_with_fallback()
897
844
        self.get_transport('').rename('fallback', 'moved')
898
 
        reopened = stacked.bzrdir.open_branch(ignore_fallbacks=True)
 
845
        reopened_dir = controldir.ControlDir.open(stacked.base)
 
846
        reopened = reopened_dir.open_branch(ignore_fallbacks=True)
899
847
        self.assertEqual([], reopened.repository._fallback_repositories)
900
848
 
901
849
    def test_fallbacks_are_opened(self):
902
850
        stacked = self.make_branch_with_fallback()
903
 
        reopened = stacked.bzrdir.open_branch(ignore_fallbacks=False)
 
851
        reopened_dir = controldir.ControlDir.open(stacked.base)
 
852
        reopened = reopened_dir.open_branch(ignore_fallbacks=False)
904
853
        self.assertLength(1, reopened.repository._fallback_repositories)
905
854
 
906
855
 
914
863
            tree.add_reference(subtree)
915
864
        except errors.UnsupportedOperation:
916
865
            raise tests.TestNotApplicable('Tree cannot hold references.')
917
 
        reference_parent = tree.branch.reference_parent('subtree-id',
918
 
                                                        'subtree')
 
866
        reference_parent = tree.branch.reference_parent(
 
867
            'subtree-id',
 
868
            urlutils.relative_url(tree.branch.user_url, subtree.branch.user_url))
919
869
        self.assertEqual(subtree.branch.base, reference_parent.base)
920
870
 
921
871
    def test_reference_parent_accepts_possible_transports(self):
927
877
        except errors.UnsupportedOperation:
928
878
            raise tests.TestNotApplicable('Tree cannot hold references.')
929
879
        reference_parent = tree.branch.reference_parent('subtree-id',
930
 
            'subtree', possible_transports=[subtree.bzrdir.root_transport])
 
880
            urlutils.relative_url(
 
881
                tree.branch.user_url, subtree.branch.user_url),
 
882
            possible_transports=[subtree.bzrdir.root_transport])
931
883
 
932
884
    def test_get_reference_info(self):
933
885
        branch = self.make_branch('branch')
1109
1061
        # above the control dir but we might need to relax that?
1110
1062
        self.assertEqual(br.control_url.find(br.user_url), 0)
1111
1063
        self.assertEqual(br.control_url, br.control_transport.base)
 
1064
 
 
1065
 
 
1066
class FakeShelfCreator(object):
 
1067
 
 
1068
    def __init__(self, branch):
 
1069
        self.branch = branch
 
1070
 
 
1071
    def write_shelf(self, shelf_file, message=None):
 
1072
        tree = self.branch.repository.revision_tree(revision.NULL_REVISION)
 
1073
        with transform.TransformPreview(tree) as tt:
 
1074
            shelf.ShelfCreator._write_shelf(
 
1075
                shelf_file, tt, revision.NULL_REVISION)
 
1076
 
 
1077
 
 
1078
@contextlib.contextmanager
 
1079
def skip_if_storing_uncommitted_unsupported():
 
1080
    try:
 
1081
        yield
 
1082
    except errors.StoringUncommittedNotSupported:
 
1083
        raise tests.TestNotApplicable('Cannot store uncommitted changes.')
 
1084
 
 
1085
 
 
1086
class TestUncommittedChanges(per_branch.TestCaseWithBranch):
 
1087
 
 
1088
    def bind(self, branch, master):
 
1089
        try:
 
1090
            branch.bind(master)
 
1091
        except errors.UpgradeRequired:
 
1092
            raise tests.TestNotApplicable('Branch cannot be bound.')
 
1093
 
 
1094
    def test_store_uncommitted(self):
 
1095
        tree = self.make_branch_and_tree('b')
 
1096
        branch = tree.branch
 
1097
        creator = FakeShelfCreator(branch)
 
1098
        with skip_if_storing_uncommitted_unsupported():
 
1099
            self.assertIs(None, branch.get_unshelver(tree))
 
1100
        branch.store_uncommitted(creator)
 
1101
        self.assertIsNot(None, branch.get_unshelver(tree))
 
1102
 
 
1103
    def test_store_uncommitted_bound(self):
 
1104
        tree = self.make_branch_and_tree('b')
 
1105
        branch = tree.branch
 
1106
        master = self.make_branch('master')
 
1107
        self.bind(branch, master)
 
1108
        creator = FakeShelfCreator(tree.branch)
 
1109
        self.assertIs(None, tree.branch.get_unshelver(tree))
 
1110
        self.assertIs(None, master.get_unshelver(tree))
 
1111
        tree.branch.store_uncommitted(creator)
 
1112
        self.assertIsNot(None, master.get_unshelver(tree))
 
1113
 
 
1114
    def test_store_uncommitted_already_stored(self):
 
1115
        branch = self.make_branch('b')
 
1116
        with skip_if_storing_uncommitted_unsupported():
 
1117
            branch.store_uncommitted(FakeShelfCreator(branch))
 
1118
        self.assertRaises(errors.ChangesAlreadyStored,
 
1119
                          branch.store_uncommitted, FakeShelfCreator(branch))
 
1120
 
 
1121
    def test_store_uncommitted_none(self):
 
1122
        branch = self.make_branch('b')
 
1123
        with skip_if_storing_uncommitted_unsupported():
 
1124
            branch.store_uncommitted(FakeShelfCreator(branch))
 
1125
        branch.store_uncommitted(None)
 
1126
        self.assertIs(None, branch.get_unshelver(None))
 
1127
 
 
1128
    def test_get_unshelver(self):
 
1129
        tree = self.make_branch_and_tree('tree')
 
1130
        tree.commit('')
 
1131
        self.build_tree_contents([('tree/file', 'contents1')])
 
1132
        tree.add('file')
 
1133
        with skip_if_storing_uncommitted_unsupported():
 
1134
            tree.store_uncommitted()
 
1135
        unshelver = tree.branch.get_unshelver(tree)
 
1136
        self.assertIsNot(None, unshelver)
 
1137
 
 
1138
    def test_get_unshelver_bound(self):
 
1139
        tree = self.make_branch_and_tree('tree')
 
1140
        tree.commit('')
 
1141
        self.build_tree_contents([('tree/file', 'contents1')])
 
1142
        tree.add('file')
 
1143
        with skip_if_storing_uncommitted_unsupported():
 
1144
            tree.store_uncommitted()
 
1145
        branch = self.make_branch('branch')
 
1146
        self.bind(branch, tree.branch)
 
1147
        unshelver = branch.get_unshelver(tree)
 
1148
        self.assertIsNot(None, unshelver)