~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/branch_implementations/test_branch.py

Merge integration.

Show diffs side-by-side

added added

removed removed

Lines of Context:
19
19
import os
20
20
import sys
21
21
 
22
 
import bzrlib.branch as branch
 
22
import bzrlib.branch
 
23
import bzrlib.bzrdir as bzrdir
23
24
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
24
25
from bzrlib.commit import commit
25
26
import bzrlib.errors as errors
31
32
                           )
32
33
import bzrlib.gpg
33
34
from bzrlib.osutils import getcwd
34
 
from bzrlib.revision import NULL_REVISION
35
35
from bzrlib.tests import TestCase, TestCaseWithTransport, TestSkipped
36
36
from bzrlib.trace import mutter
37
37
import bzrlib.transactions as transactions
58
58
        return self.branch
59
59
 
60
60
    def make_branch(self, relpath):
 
61
        repo = self.make_repository(relpath)
 
62
        # fixme RBC 20060210 this isnt necessarily a fixable thing,
 
63
        # Skipped is the wrong exception to raise.
 
64
        try:
 
65
            return self.branch_format.initialize(repo.bzrdir)
 
66
        except errors.UninitializableFormat:
 
67
            raise TestSkipped('Uninitializable branch format')
 
68
 
 
69
    def make_repository(self, relpath):
61
70
        try:
62
71
            url = self.get_url(relpath)
63
72
            segments = url.split('/')
68
77
                    t.mkdir(segments[-1])
69
78
                except FileExists:
70
79
                    pass
71
 
            return self.branch_format.initialize(url)
 
80
            made_control = self.bzrdir_format.initialize(url)
 
81
            return made_control.create_repository()
72
82
        except UninitializableFormat:
73
83
            raise TestSkipped("Format %s is not initializable.")
74
84
 
88
98
        from bzrlib.fetch import Fetcher
89
99
        get_transport(self.get_url()).mkdir('b1')
90
100
        get_transport(self.get_url()).mkdir('b2')
91
 
        b1 = self.make_branch('b1')
 
101
        wt = self.make_branch_and_tree('b1')
 
102
        b1 = wt.branch
92
103
        b2 = self.make_branch('b2')
93
 
        wt = WorkingTree.create(b1, 'b1')
94
104
        file('b1/foo', 'w').write('hello')
95
105
        wt.add(['foo'], ['foo-id'])
96
106
        wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
105
115
        tree = b2.repository.revision_tree('revision-1')
106
116
        eq(tree.get_file_text('foo-id'), 'hello')
107
117
 
108
 
    def test_revision_tree(self):
109
 
        b1 = self.get_branch()
110
 
        wt = WorkingTree.create(b1, '.')
111
 
        wt.commit('lala!', rev_id='revision-1', allow_pointless=True)
112
 
        tree = b1.repository.revision_tree('revision-1')
113
 
        tree = b1.repository.revision_tree(None)
114
 
        self.assertEqual(len(tree.list_files()), 0)
115
 
        tree = b1.repository.revision_tree(NULL_REVISION)
116
 
        self.assertEqual(len(tree.list_files()), 0)
117
 
 
118
118
    def get_unbalanced_tree_pair(self):
119
119
        """Return two branches, a and b, with one file in a."""
120
120
        get_transport(self.get_url()).mkdir('a')
121
 
        br_a = self.make_branch('a')
122
 
        tree_a = WorkingTree.create(br_a, 'a')
 
121
        tree_a = self.make_branch_and_tree('a')
123
122
        file('a/b', 'wb').write('b')
124
123
        tree_a.add('b')
125
124
        tree_a.commit("silly commit", rev_id='A')
126
125
 
127
126
        get_transport(self.get_url()).mkdir('b')
128
 
        br_b = self.make_branch('b')
129
 
        tree_b = WorkingTree.create(br_b, 'b')
 
127
        tree_b = self.make_branch_and_tree('b')
130
128
        return tree_a, tree_b
131
129
 
132
130
    def get_balanced_branch_pair(self):
133
131
        """Returns br_a, br_b as with one commit in a, and b has a's stores."""
134
132
        tree_a, tree_b = self.get_unbalanced_tree_pair()
135
 
        tree_a.branch.push_stores(tree_b.branch)
 
133
        tree_b.branch.repository.fetch(tree_a.branch.repository)
136
134
        return tree_a, tree_b
137
135
 
138
 
    def test_push_stores(self):
139
 
        """Copy the stores from one branch to another"""
140
 
        tree_a, tree_b = self.get_unbalanced_tree_pair()
141
 
        br_a = tree_a.branch
142
 
        br_b = tree_b.branch
143
 
        # ensure the revision is missing.
144
 
        self.assertRaises(NoSuchRevision, br_b.repository.get_revision, 
145
 
                          br_a.revision_history()[0])
146
 
        br_a.push_stores(br_b)
147
 
        # check that b now has all the data from a's first commit.
148
 
        rev = br_b.repository.get_revision(br_a.revision_history()[0])
149
 
        tree = br_b.repository.revision_tree(br_a.revision_history()[0])
150
 
        for file_id in tree:
151
 
            if tree.inventory[file_id].kind == "file":
152
 
                tree.get_file(file_id).read()
153
 
 
154
136
    def test_clone_branch(self):
155
137
        """Copy the stores from one branch to another"""
156
138
        tree_a, tree_b = self.get_balanced_branch_pair()
157
139
        tree_b.commit("silly commit")
158
140
        os.mkdir('c')
159
 
        br_c = tree_a.branch.clone('c', basis_branch=tree_b.branch)
 
141
        # this fails to test that the history from a was not used.
 
142
        dir_c = tree_a.bzrdir.clone('c', basis=tree_b.bzrdir)
160
143
        self.assertEqual(tree_a.branch.revision_history(),
161
 
                         br_c.revision_history())
 
144
                         dir_c.open_branch().revision_history())
162
145
 
163
146
    def test_clone_partial(self):
164
147
        """Copy only part of the history of a branch."""
165
 
        get_transport(self.get_url()).mkdir('a')
166
 
        br_a = self.make_branch('a')
167
 
        wt = WorkingTree.create(br_a, "a")
168
 
        self.build_tree(['a/one'])
169
 
        wt.add(['one'])
170
 
        wt.commit('commit one', rev_id='u@d-1')
171
 
        self.build_tree(['a/two'])
172
 
        wt.add(['two'])
173
 
        wt.commit('commit two', rev_id='u@d-2')
174
 
        br_b = br_a.clone('b', revision='u@d-1')
175
 
        self.assertEqual(br_b.last_revision(), 'u@d-1')
176
 
        self.assertTrue(os.path.exists('b/one'))
177
 
        self.assertFalse(os.path.exists('b/two'))
 
148
        # TODO: RBC 20060208 test with a revision not on revision-history.
 
149
        #       what should that behaviour be ? Emailed the list.
 
150
        wt_a = self.make_branch_and_tree('a')
 
151
        self.build_tree(['a/one'])
 
152
        wt_a.add(['one'])
 
153
        wt_a.commit('commit one', rev_id='1')
 
154
        self.build_tree(['a/two'])
 
155
        wt_a.add(['two'])
 
156
        wt_a.commit('commit two', rev_id='2')
 
157
        repo_b = self.make_repository('b')
 
158
        wt_a.bzrdir.open_repository().copy_content_into(repo_b)
 
159
        br_b = wt_a.bzrdir.open_branch().clone(repo_b.bzrdir, revision_id='1')
 
160
        self.assertEqual(br_b.last_revision(), '1')
 
161
 
 
162
    def test_sprout_partial(self):
 
163
        # test sprouting with a prefix of the revision-history.
 
164
        # also needs not-on-revision-history behaviour defined.
 
165
        wt_a = self.make_branch_and_tree('a')
 
166
        self.build_tree(['a/one'])
 
167
        wt_a.add(['one'])
 
168
        wt_a.commit('commit one', rev_id='1')
 
169
        self.build_tree(['a/two'])
 
170
        wt_a.add(['two'])
 
171
        wt_a.commit('commit two', rev_id='2')
 
172
        repo_b = self.make_repository('b')
 
173
        wt_a.bzrdir.open_repository().copy_content_into(repo_b)
 
174
        br_b = wt_a.bzrdir.open_branch().sprout(repo_b.bzrdir, revision_id='1')
 
175
        self.assertEqual(br_b.last_revision(), '1')
 
176
 
 
177
    def test_clone_branch_nickname(self):
 
178
        # test the nick name is preserved always
 
179
        raise TestSkipped('XXX branch cloning is not yet tested..')
 
180
 
 
181
    def test_clone_branch_parent(self):
 
182
        # test the parent is preserved always
 
183
        raise TestSkipped('XXX branch cloning is not yet tested..')
 
184
        
 
185
    def test_sprout_branch_nickname(self):
 
186
        # test the nick name is reset always
 
187
        raise TestSkipped('XXX branch sprouting is not yet tested..')
 
188
 
 
189
    def test_sprout_branch_parent(self):
 
190
        source = self.make_branch('source')
 
191
        target = source.bzrdir.sprout(self.get_url('target')).open_branch()
 
192
        self.assertEqual(source.bzrdir.root_transport.base, target.get_parent())
178
193
        
179
194
    def test_record_initial_ghost_merge(self):
180
195
        """A pending merge with no revision present is still a merge."""
181
 
        branch = self.get_branch()
182
 
        wt = WorkingTree.create(branch, ".")
 
196
        wt = self.make_branch_and_tree('.')
 
197
        branch = wt.branch
183
198
        wt.add_pending_merge('non:existent@rev--ision--0--2')
184
199
        wt.commit('pretend to merge nonexistent-revision', rev_id='first')
185
200
        rev = branch.repository.get_revision(branch.last_revision())
200
215
        
201
216
    def test_pending_merges(self):
202
217
        """Tracking pending-merged revisions."""
203
 
        b = self.get_branch()
204
 
        wt = WorkingTree.create(b, '.')
 
218
        wt = self.make_branch_and_tree('.')
 
219
        b = wt.branch
205
220
        self.assertEquals(wt.pending_merges(), [])
206
221
        wt.add_pending_merge('foo@azkhazan-123123-abcabc')
207
222
        self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
222
237
        self.assertEquals(wt.pending_merges(), [])
223
238
 
224
239
    def test_sign_existing_revision(self):
225
 
        branch = self.get_branch()
226
 
        wt = WorkingTree.create(branch, ".")
 
240
        wt = self.make_branch_and_tree('.')
 
241
        branch = wt.branch
227
242
        wt.commit("base", allow_pointless=True, rev_id='A')
228
243
        from bzrlib.testament import Testament
229
244
        strategy = bzrlib.gpg.LoopbackGPGStrategy(None)
249
264
        #FIXME: clone should work to urls,
250
265
        # wt.clone should work to disks.
251
266
        self.build_tree(['target/'])
252
 
        b2 = wt.branch.clone('target')
 
267
        d2 = wt.bzrdir.clone('target')
253
268
        self.assertEqual(wt.branch.repository.revision_store.get('A', 
254
269
                            'sig').read(),
255
 
                         b2.repository.revision_store.get('A', 
 
270
                         d2.open_repository().revision_store.get('A', 
256
271
                            'sig').read())
257
272
 
258
273
    def test_upgrade_preserves_signatures(self):
264
279
        old_signature = wt.branch.repository.revision_store.get('A',
265
280
            'sig').read()
266
281
        upgrade(wt.basedir)
267
 
        wt = WorkingTree(wt.basedir)
 
282
        wt = WorkingTree.open(wt.basedir)
268
283
        new_signature = wt.branch.repository.revision_store.get('A',
269
284
            'sig').read()
270
285
        self.assertEqual(old_signature, new_signature)
297
312
    def test_commit_nicks(self):
298
313
        """Nicknames are committed to the revision"""
299
314
        get_transport(self.get_url()).mkdir('bzr.dev')
300
 
        branch = self.make_branch('bzr.dev')
 
315
        wt = self.make_branch_and_tree('bzr.dev')
 
316
        branch = wt.branch
301
317
        branch.nick = "My happy branch"
302
 
        WorkingTree.create(branch, 'bzr.dev').commit('My commit respect da nick.')
 
318
        wt.commit('My commit respect da nick.')
303
319
        committed = branch.repository.get_revision(branch.last_revision())
304
320
        self.assertEqual(committed.properties["branch-nick"], 
305
321
                         "My happy branch")
306
322
 
307
 
    def test_no_ancestry_weave(self):
308
 
        # We no longer need to create the ancestry.weave file
309
 
        # since it is *never* used.
310
 
        branch = Branch.create('.')
311
 
        self.failIfExists('.bzr/ancestry.weave')
312
 
 
313
323
 
314
324
class ChrootedTests(TestCaseWithBranch):
315
325
    """A support class that provides readonly urls outside the local namespace.
329
339
                          self.get_readonly_url(''))
330
340
        self.assertRaises(NotBranchError, Branch.open_containing,
331
341
                          self.get_readonly_url('g/p/q'))
332
 
        try:
333
 
            branch = self.branch_format.initialize(self.get_url())
334
 
        except UninitializableFormat:
335
 
            raise TestSkipped("Format %s is not initializable.")
 
342
        branch = self.make_branch('.')
336
343
        branch, relpath = Branch.open_containing(self.get_readonly_url(''))
337
344
        self.assertEqual('', relpath)
338
345
        branch, relpath = Branch.open_containing(self.get_readonly_url('g/p/q'))
511
518
        # supported formats must be able to init and open
512
519
        t = get_transport(self.get_url())
513
520
        readonly_t = get_transport(self.get_readonly_url())
514
 
        made_branch = self.branch_format.initialize(t.base)
515
 
        self.failUnless(isinstance(made_branch, branch.Branch))
 
521
        made_branch = self.make_branch('.')
 
522
        self.failUnless(isinstance(made_branch, bzrlib.branch.Branch))
 
523
 
 
524
        # find it via bzrdir opening:
 
525
        opened_control = bzrdir.BzrDir.open(readonly_t.base)
 
526
        direct_opened_branch = opened_control.open_branch()
 
527
        self.assertEqual(direct_opened_branch.__class__, made_branch.__class__)
 
528
        self.assertEqual(opened_control, direct_opened_branch.bzrdir)
 
529
        self.failUnless(isinstance(direct_opened_branch._format,
 
530
                        self.branch_format.__class__))
 
531
 
 
532
        # find it via Branch.open
 
533
        opened_branch = bzrlib.branch.Branch.open(readonly_t.base)
 
534
        self.failUnless(isinstance(opened_branch, made_branch.__class__))
 
535
        self.assertEqual(made_branch._format.__class__,
 
536
                         opened_branch._format.__class__)
 
537
        # if it has a unique id string, can we probe for it ?
 
538
        try:
 
539
            self.branch_format.get_format_string()
 
540
        except NotImplementedError:
 
541
            return
516
542
        self.assertEqual(self.branch_format,
517
 
                         branch.BzrBranchFormat.find_format(readonly_t))
518
 
        direct_opened_branch = self.branch_format.open(readonly_t)
519
 
        opened_branch = branch.Branch.open(t.base)
520
 
        self.assertEqual(made_branch._branch_format,
521
 
                         opened_branch._branch_format)
522
 
        self.assertEqual(direct_opened_branch._branch_format,
523
 
                         opened_branch._branch_format)
524
 
        self.failUnless(isinstance(opened_branch, branch.Branch))
525
 
 
526
 
    def test_open_not_branch(self):
527
 
        self.assertRaises(NoSuchFile,
528
 
                          self.branch_format.open,
529
 
                          get_transport(self.get_readonly_url()))
 
543
                         bzrlib.branch.BranchFormat.find_format(opened_control))