~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/branch_implementations/test_branch.py

  • Committer: Martin Pool
  • Date: 2006-03-06 11:20:10 UTC
  • mfrom: (1593 +trunk)
  • mto: This revision was merged to the branch mainline in revision 1611.
  • Revision ID: mbp@sourcefrog.net-20060306112010-17c0170dde5d1eea
[merge] large merge to sync with bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
19
19
import os
20
20
import sys
21
21
 
22
 
import bzrlib.branch as branch
 
22
import bzrlib.branch
 
23
import bzrlib.bzrdir as bzrdir
23
24
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
24
25
from bzrlib.commit import commit
25
26
import bzrlib.errors as errors
31
32
                           )
32
33
import bzrlib.gpg
33
34
from bzrlib.osutils import getcwd
34
 
from bzrlib.revision import NULL_REVISION
35
35
from bzrlib.tests import TestCase, TestCaseWithTransport, TestSkipped
36
36
from bzrlib.trace import mutter
37
37
import bzrlib.transactions as transactions
41
41
from bzrlib.upgrade import upgrade
42
42
from bzrlib.workingtree import WorkingTree
43
43
 
 
44
 
44
45
# TODO: Make a branch using basis branch, and check that it 
45
46
# doesn't request any files that could have been avoided, by 
46
47
# hooking into the Transport.
58
59
        return self.branch
59
60
 
60
61
    def make_branch(self, relpath):
 
62
        repo = self.make_repository(relpath)
 
63
        # fixme RBC 20060210 this isnt necessarily a fixable thing,
 
64
        # Skipped is the wrong exception to raise.
 
65
        try:
 
66
            return self.branch_format.initialize(repo.bzrdir)
 
67
        except errors.UninitializableFormat:
 
68
            raise TestSkipped('Uninitializable branch format')
 
69
 
 
70
    def make_repository(self, relpath, shared=False):
61
71
        try:
62
72
            url = self.get_url(relpath)
63
73
            segments = url.split('/')
68
78
                    t.mkdir(segments[-1])
69
79
                except FileExists:
70
80
                    pass
71
 
            return self.branch_format.initialize(url)
 
81
            made_control = self.bzrdir_format.initialize(url)
 
82
            return made_control.create_repository(shared=shared)
72
83
        except UninitializableFormat:
73
84
            raise TestSkipped("Format %s is not initializable.")
74
85
 
85
96
 
86
97
    def test_fetch_revisions(self):
87
98
        """Test fetch-revision operation."""
88
 
        from bzrlib.fetch import Fetcher
89
99
        get_transport(self.get_url()).mkdir('b1')
90
100
        get_transport(self.get_url()).mkdir('b2')
91
 
        b1 = self.make_branch('b1')
 
101
        wt = self.make_branch_and_tree('b1')
 
102
        b1 = wt.branch
92
103
        b2 = self.make_branch('b2')
93
 
        wt = WorkingTree.create(b1, 'b1')
94
104
        file('b1/foo', 'w').write('hello')
95
105
        wt.add(['foo'], ['foo-id'])
96
106
        wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
97
107
 
98
108
        mutter('start fetch')
99
 
        f = Fetcher(from_branch=b1, to_branch=b2)
100
 
        eq = self.assertEquals
101
 
        eq(f.count_copied, 1)
102
 
        eq(f._last_revision, 'revision-1')
 
109
        self.assertEqual((1, []), b2.fetch(b1))
103
110
 
104
111
        rev = b2.repository.get_revision('revision-1')
105
112
        tree = b2.repository.revision_tree('revision-1')
106
 
        eq(tree.get_file_text('foo-id'), 'hello')
107
 
 
108
 
    def test_revision_tree(self):
109
 
        b1 = self.get_branch()
110
 
        wt = WorkingTree.create(b1, '.')
111
 
        wt.commit('lala!', rev_id='revision-1', allow_pointless=True)
112
 
        tree = b1.repository.revision_tree('revision-1')
113
 
        tree = b1.repository.revision_tree(None)
114
 
        self.assertEqual(len(tree.list_files()), 0)
115
 
        tree = b1.repository.revision_tree(NULL_REVISION)
116
 
        self.assertEqual(len(tree.list_files()), 0)
 
113
        self.assertEqual(tree.get_file_text('foo-id'), 'hello')
117
114
 
118
115
    def get_unbalanced_tree_pair(self):
119
116
        """Return two branches, a and b, with one file in a."""
120
117
        get_transport(self.get_url()).mkdir('a')
121
 
        br_a = self.make_branch('a')
122
 
        tree_a = WorkingTree.create(br_a, 'a')
 
118
        tree_a = self.make_branch_and_tree('a')
123
119
        file('a/b', 'wb').write('b')
124
120
        tree_a.add('b')
125
121
        tree_a.commit("silly commit", rev_id='A')
126
122
 
127
123
        get_transport(self.get_url()).mkdir('b')
128
 
        br_b = self.make_branch('b')
129
 
        tree_b = WorkingTree.create(br_b, 'b')
 
124
        tree_b = self.make_branch_and_tree('b')
130
125
        return tree_a, tree_b
131
126
 
132
127
    def get_balanced_branch_pair(self):
133
128
        """Returns br_a, br_b as with one commit in a, and b has a's stores."""
134
129
        tree_a, tree_b = self.get_unbalanced_tree_pair()
135
 
        tree_a.branch.push_stores(tree_b.branch)
 
130
        tree_b.branch.repository.fetch(tree_a.branch.repository)
136
131
        return tree_a, tree_b
137
132
 
138
 
    def test_push_stores(self):
139
 
        """Copy the stores from one branch to another"""
140
 
        tree_a, tree_b = self.get_unbalanced_tree_pair()
141
 
        br_a = tree_a.branch
142
 
        br_b = tree_b.branch
143
 
        # ensure the revision is missing.
144
 
        self.assertRaises(NoSuchRevision, br_b.repository.get_revision, 
145
 
                          br_a.revision_history()[0])
146
 
        br_a.push_stores(br_b)
147
 
        # check that b now has all the data from a's first commit.
148
 
        rev = br_b.repository.get_revision(br_a.revision_history()[0])
149
 
        tree = br_b.repository.revision_tree(br_a.revision_history()[0])
150
 
        for file_id in tree:
151
 
            if tree.inventory[file_id].kind == "file":
152
 
                tree.get_file(file_id).read()
153
 
 
154
133
    def test_clone_branch(self):
155
134
        """Copy the stores from one branch to another"""
156
135
        tree_a, tree_b = self.get_balanced_branch_pair()
157
136
        tree_b.commit("silly commit")
158
137
        os.mkdir('c')
159
 
        br_c = tree_a.branch.clone('c', basis_branch=tree_b.branch)
 
138
        # this fails to test that the history from a was not used.
 
139
        dir_c = tree_a.bzrdir.clone('c', basis=tree_b.bzrdir)
160
140
        self.assertEqual(tree_a.branch.revision_history(),
161
 
                         br_c.revision_history())
 
141
                         dir_c.open_branch().revision_history())
162
142
 
163
143
    def test_clone_partial(self):
164
144
        """Copy only part of the history of a branch."""
165
 
        get_transport(self.get_url()).mkdir('a')
166
 
        br_a = self.make_branch('a')
167
 
        wt = WorkingTree.create(br_a, "a")
168
 
        self.build_tree(['a/one'])
169
 
        wt.add(['one'])
170
 
        wt.commit('commit one', rev_id='u@d-1')
171
 
        self.build_tree(['a/two'])
172
 
        wt.add(['two'])
173
 
        wt.commit('commit two', rev_id='u@d-2')
174
 
        br_b = br_a.clone('b', revision='u@d-1')
175
 
        self.assertEqual(br_b.last_revision(), 'u@d-1')
176
 
        self.assertTrue(os.path.exists('b/one'))
177
 
        self.assertFalse(os.path.exists('b/two'))
 
145
        # TODO: RBC 20060208 test with a revision not on revision-history.
 
146
        #       what should that behaviour be ? Emailed the list.
 
147
        wt_a = self.make_branch_and_tree('a')
 
148
        self.build_tree(['a/one'])
 
149
        wt_a.add(['one'])
 
150
        wt_a.commit('commit one', rev_id='1')
 
151
        self.build_tree(['a/two'])
 
152
        wt_a.add(['two'])
 
153
        wt_a.commit('commit two', rev_id='2')
 
154
        repo_b = self.make_repository('b')
 
155
        wt_a.bzrdir.open_repository().copy_content_into(repo_b)
 
156
        br_b = wt_a.bzrdir.open_branch().clone(repo_b.bzrdir, revision_id='1')
 
157
        self.assertEqual(br_b.last_revision(), '1')
 
158
 
 
159
    def test_sprout_partial(self):
 
160
        # test sprouting with a prefix of the revision-history.
 
161
        # also needs not-on-revision-history behaviour defined.
 
162
        wt_a = self.make_branch_and_tree('a')
 
163
        self.build_tree(['a/one'])
 
164
        wt_a.add(['one'])
 
165
        wt_a.commit('commit one', rev_id='1')
 
166
        self.build_tree(['a/two'])
 
167
        wt_a.add(['two'])
 
168
        wt_a.commit('commit two', rev_id='2')
 
169
        repo_b = self.make_repository('b')
 
170
        wt_a.bzrdir.open_repository().copy_content_into(repo_b)
 
171
        br_b = wt_a.bzrdir.open_branch().sprout(repo_b.bzrdir, revision_id='1')
 
172
        self.assertEqual(br_b.last_revision(), '1')
 
173
 
 
174
    def test_clone_branch_nickname(self):
 
175
        # test the nick name is preserved always
 
176
        raise TestSkipped('XXX branch cloning is not yet tested..')
 
177
 
 
178
    def test_clone_branch_parent(self):
 
179
        # test the parent is preserved always
 
180
        raise TestSkipped('XXX branch cloning is not yet tested..')
 
181
        
 
182
    def test_sprout_branch_nickname(self):
 
183
        # test the nick name is reset always
 
184
        raise TestSkipped('XXX branch sprouting is not yet tested..')
 
185
 
 
186
    def test_sprout_branch_parent(self):
 
187
        source = self.make_branch('source')
 
188
        target = source.bzrdir.sprout(self.get_url('target')).open_branch()
 
189
        self.assertEqual(source.bzrdir.root_transport.base, target.get_parent())
178
190
        
179
191
    def test_record_initial_ghost_merge(self):
180
192
        """A pending merge with no revision present is still a merge."""
181
 
        branch = self.get_branch()
182
 
        wt = WorkingTree.create(branch, ".")
 
193
        wt = self.make_branch_and_tree('.')
 
194
        branch = wt.branch
183
195
        wt.add_pending_merge('non:existent@rev--ision--0--2')
184
196
        wt.commit('pretend to merge nonexistent-revision', rev_id='first')
185
197
        rev = branch.repository.get_revision(branch.last_revision())
200
212
        
201
213
    def test_pending_merges(self):
202
214
        """Tracking pending-merged revisions."""
203
 
        b = self.get_branch()
204
 
        wt = WorkingTree.create(b, '.')
 
215
        wt = self.make_branch_and_tree('.')
 
216
        b = wt.branch
205
217
        self.assertEquals(wt.pending_merges(), [])
206
218
        wt.add_pending_merge('foo@azkhazan-123123-abcabc')
207
219
        self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
222
234
        self.assertEquals(wt.pending_merges(), [])
223
235
 
224
236
    def test_sign_existing_revision(self):
225
 
        branch = self.get_branch()
226
 
        wt = WorkingTree.create(branch, ".")
 
237
        wt = self.make_branch_and_tree('.')
 
238
        branch = wt.branch
227
239
        wt.commit("base", allow_pointless=True, rev_id='A')
228
240
        from bzrlib.testament import Testament
229
241
        strategy = bzrlib.gpg.LoopbackGPGStrategy(None)
249
261
        #FIXME: clone should work to urls,
250
262
        # wt.clone should work to disks.
251
263
        self.build_tree(['target/'])
252
 
        b2 = wt.branch.clone('target')
 
264
        d2 = wt.bzrdir.clone('target')
253
265
        self.assertEqual(wt.branch.repository.revision_store.get('A', 
254
266
                            'sig').read(),
255
 
                         b2.repository.revision_store.get('A', 
 
267
                         d2.open_repository().revision_store.get('A', 
256
268
                            'sig').read())
257
269
 
258
 
    def test_upgrade_preserves_signatures(self):
259
 
        # this is in the current test format
260
 
        wt = self.make_branch_and_tree('source')
261
 
        wt.commit('A', allow_pointless=True, rev_id='A')
262
 
        wt.branch.repository.sign_revision('A',
263
 
            bzrlib.gpg.LoopbackGPGStrategy(None))
264
 
        old_signature = wt.branch.repository.revision_store.get('A',
265
 
            'sig').read()
266
 
        upgrade(wt.basedir)
267
 
        wt = WorkingTree(wt.basedir)
268
 
        new_signature = wt.branch.repository.revision_store.get('A',
269
 
            'sig').read()
270
 
        self.assertEqual(old_signature, new_signature)
271
 
 
272
270
    def test_nicks(self):
273
271
        """Branch nicknames"""
274
272
        t = get_transport(self.get_url())
297
295
    def test_commit_nicks(self):
298
296
        """Nicknames are committed to the revision"""
299
297
        get_transport(self.get_url()).mkdir('bzr.dev')
300
 
        branch = self.make_branch('bzr.dev')
 
298
        wt = self.make_branch_and_tree('bzr.dev')
 
299
        branch = wt.branch
301
300
        branch.nick = "My happy branch"
302
 
        WorkingTree.create(branch, 'bzr.dev').commit('My commit respect da nick.')
 
301
        wt.commit('My commit respect da nick.')
303
302
        committed = branch.repository.get_revision(branch.last_revision())
304
303
        self.assertEqual(committed.properties["branch-nick"], 
305
304
                         "My happy branch")
306
305
 
307
 
    def test_no_ancestry_weave(self):
308
 
        # We no longer need to create the ancestry.weave file
309
 
        # since it is *never* used.
310
 
        branch = Branch.create('.')
311
 
        self.failIfExists('.bzr/ancestry.weave')
 
306
    def test_create_open_branch_uses_repository(self):
 
307
        try:
 
308
            repo = self.make_repository('.', shared=True)
 
309
        except errors.IncompatibleFormat:
 
310
            return
 
311
        repo.bzrdir.root_transport.mkdir('child')
 
312
        child_dir = self.bzrdir_format.initialize('child')
 
313
        try:
 
314
            child_branch = self.branch_format.initialize(child_dir)
 
315
        except errors.UninitializableFormat:
 
316
            # branch references are not default init'able.
 
317
            return
 
318
        self.assertEqual(repo.bzrdir.root_transport.base,
 
319
                         child_branch.repository.bzrdir.root_transport.base)
 
320
        child_branch = bzrlib.branch.Branch.open(self.get_url('child'))
 
321
        self.assertEqual(repo.bzrdir.root_transport.base,
 
322
                         child_branch.repository.bzrdir.root_transport.base)
312
323
 
313
324
 
314
325
class ChrootedTests(TestCaseWithBranch):
329
340
                          self.get_readonly_url(''))
330
341
        self.assertRaises(NotBranchError, Branch.open_containing,
331
342
                          self.get_readonly_url('g/p/q'))
332
 
        try:
333
 
            branch = self.branch_format.initialize(self.get_url())
334
 
        except UninitializableFormat:
335
 
            raise TestSkipped("Format %s is not initializable.")
 
343
        branch = self.make_branch('.')
336
344
        branch, relpath = Branch.open_containing(self.get_readonly_url(''))
337
345
        self.assertEqual('', relpath)
338
346
        branch, relpath = Branch.open_containing(self.get_readonly_url('g/p/q'))
511
519
        # supported formats must be able to init and open
512
520
        t = get_transport(self.get_url())
513
521
        readonly_t = get_transport(self.get_readonly_url())
514
 
        made_branch = self.branch_format.initialize(t.base)
515
 
        self.failUnless(isinstance(made_branch, branch.Branch))
 
522
        made_branch = self.make_branch('.')
 
523
        self.failUnless(isinstance(made_branch, bzrlib.branch.Branch))
 
524
 
 
525
        # find it via bzrdir opening:
 
526
        opened_control = bzrdir.BzrDir.open(readonly_t.base)
 
527
        direct_opened_branch = opened_control.open_branch()
 
528
        self.assertEqual(direct_opened_branch.__class__, made_branch.__class__)
 
529
        self.assertEqual(opened_control, direct_opened_branch.bzrdir)
 
530
        self.failUnless(isinstance(direct_opened_branch._format,
 
531
                        self.branch_format.__class__))
 
532
 
 
533
        # find it via Branch.open
 
534
        opened_branch = bzrlib.branch.Branch.open(readonly_t.base)
 
535
        self.failUnless(isinstance(opened_branch, made_branch.__class__))
 
536
        self.assertEqual(made_branch._format.__class__,
 
537
                         opened_branch._format.__class__)
 
538
        # if it has a unique id string, can we probe for it ?
 
539
        try:
 
540
            self.branch_format.get_format_string()
 
541
        except NotImplementedError:
 
542
            return
516
543
        self.assertEqual(self.branch_format,
517
 
                         branch.BzrBranchFormat.find_format(readonly_t))
518
 
        direct_opened_branch = self.branch_format.open(readonly_t)
519
 
        opened_branch = branch.Branch.open(t.base)
520
 
        self.assertEqual(made_branch._branch_format,
521
 
                         opened_branch._branch_format)
522
 
        self.assertEqual(direct_opened_branch._branch_format,
523
 
                         opened_branch._branch_format)
524
 
        self.failUnless(isinstance(opened_branch, branch.Branch))
525
 
 
526
 
    def test_open_not_branch(self):
527
 
        self.assertRaises(NoSuchFile,
528
 
                          self.branch_format.open,
529
 
                          get_transport(self.get_readonly_url()))
 
544
                         bzrlib.branch.BranchFormat.find_format(opened_control))