~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/selftest/testbranch.py

  • Committer: Robert Collins
  • Date: 2005-10-18 13:11:57 UTC
  • mfrom: (1185.16.72) (0.2.1)
  • Revision ID: robertc@robertcollins.net-20051018131157-76a9970aa78e927e
Merged Martin.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006 Canonical Ltd
2
 
#
 
1
# (C) 2005 Canonical Ltd
 
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
#
 
7
 
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
#
 
12
 
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
 
"""Tests for branch implementations - tests a branch format."""
18
 
 
19
17
import os
20
 
import sys
21
18
 
22
 
from bzrlib import (
23
 
    branch,
24
 
    bzrdir,
25
 
    errors,
26
 
    gpg,
27
 
    urlutils,
28
 
    transactions,
29
 
    repository,
30
 
    )
31
19
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
32
 
from bzrlib.delta import TreeDelta
33
 
from bzrlib.errors import (FileExists,
34
 
                           NoSuchRevision,
35
 
                           NoSuchFile,
36
 
                           UninitializableFormat,
37
 
                           NotBranchError,
38
 
                           )
39
 
from bzrlib.osutils import getcwd
40
 
import bzrlib.revision
41
 
from bzrlib.tests import TestCase, TestCaseWithTransport, TestSkipped
42
 
from bzrlib.tests.bzrdir_implementations.test_bzrdir import TestCaseWithBzrDir
43
 
from bzrlib.tests.HttpServer import HttpServer
 
20
from bzrlib.clone import copy_branch
 
21
from bzrlib.commit import commit
 
22
import bzrlib.errors as errors
 
23
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
 
24
import bzrlib.gpg
 
25
from bzrlib.selftest import TestCase, TestCaseInTempDir
 
26
from bzrlib.selftest.HTTPTestUtil import TestCaseWithWebserver
44
27
from bzrlib.trace import mutter
45
 
from bzrlib.transport import get_transport
46
 
from bzrlib.transport.memory import MemoryServer
47
 
from bzrlib.upgrade import upgrade
48
 
from bzrlib.workingtree import WorkingTree
49
 
 
 
28
import bzrlib.transactions as transactions
50
29
 
51
30
# TODO: Make a branch using basis branch, and check that it 
52
31
# doesn't request any files that could have been avoided, by 
53
32
# hooking into the Transport.
54
33
 
55
 
 
56
 
class TestCaseWithBranch(TestCaseWithBzrDir):
57
 
 
58
 
    def setUp(self):
59
 
        super(TestCaseWithBranch, self).setUp()
60
 
        self.branch = None
61
 
 
62
 
    def get_branch(self):
63
 
        if self.branch is None:
64
 
            self.branch = self.make_branch('')
65
 
        return self.branch
66
 
 
67
 
    def make_branch(self, relpath, format=None):
68
 
        repo = self.make_repository(relpath, format=format)
69
 
        # fixme RBC 20060210 this isnt necessarily a fixable thing,
70
 
        # Skipped is the wrong exception to raise.
71
 
        try:
72
 
            return self.branch_format.initialize(repo.bzrdir)
73
 
        except errors.UninitializableFormat:
74
 
            raise TestSkipped('Uninitializable branch format')
75
 
 
76
 
    def make_repository(self, relpath, shared=False, format=None):
77
 
        made_control = self.make_bzrdir(relpath, format=format)
78
 
        return made_control.create_repository(shared=shared)
79
 
 
80
 
 
81
 
class TestBranch(TestCaseWithBranch):
 
34
class TestBranch(TestCaseInTempDir):
82
35
 
83
36
    def test_append_revisions(self):
84
37
        """Test appending more than one revision"""
85
 
        br = self.get_branch()
 
38
        br = Branch.initialize(".")
86
39
        br.append_revision("rev1")
87
40
        self.assertEquals(br.revision_history(), ["rev1",])
88
41
        br.append_revision("rev2", "rev3")
89
42
        self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
90
 
        self.assertRaises(errors.ReservedId, br.append_revision, 'current:')
91
 
 
92
 
    def test_revision_ids_are_utf8(self):
93
 
        br = self.get_branch()
94
 
        br.set_revision_history(['rev1', 'rev2', 'rev3'])
95
 
        rh = br.revision_history()
96
 
        self.assertEqual(['rev1', 'rev2', 'rev3'], rh)
97
 
        for revision_id in rh:
98
 
            self.assertIsInstance(revision_id, str)
99
 
        last = br.last_revision()
100
 
        self.assertEqual('rev3', last)
101
 
        self.assertIsInstance(last, str)
102
43
 
103
44
    def test_fetch_revisions(self):
104
45
        """Test fetch-revision operation."""
105
 
        get_transport(self.get_url()).mkdir('b1')
106
 
        get_transport(self.get_url()).mkdir('b2')
107
 
        wt = self.make_branch_and_tree('b1')
108
 
        b1 = wt.branch
109
 
        b2 = self.make_branch('b2')
110
 
        file('b1/foo', 'w').write('hello')
111
 
        wt.add(['foo'], ['foo-id'])
112
 
        wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
 
46
        from bzrlib.fetch import Fetcher
 
47
        os.mkdir('b1')
 
48
        os.mkdir('b2')
 
49
        b1 = Branch.initialize('b1')
 
50
        b2 = Branch.initialize('b2')
 
51
        file(os.sep.join(['b1', 'foo']), 'w').write('hello')
 
52
        b1.add(['foo'], ['foo-id'])
 
53
        b1.commit('lala!', rev_id='revision-1', allow_pointless=False)
113
54
 
114
55
        mutter('start fetch')
115
 
        self.assertEqual((1, []), b2.fetch(b1))
116
 
 
117
 
        rev = b2.repository.get_revision('revision-1')
118
 
        tree = b2.repository.revision_tree('revision-1')
119
 
        self.assertEqual(tree.get_file_text('foo-id'), 'hello')
120
 
 
121
 
    def test_get_revision_delta(self):
122
 
        tree_a = self.make_branch_and_tree('a')
123
 
        self.build_tree(['a/foo'])
124
 
        tree_a.add('foo', 'file1')
125
 
        tree_a.commit('rev1', rev_id='rev1')
126
 
        self.build_tree(['a/vla'])
127
 
        tree_a.add('vla', 'file2')
128
 
        tree_a.commit('rev2', rev_id='rev2')
129
 
 
130
 
        delta = tree_a.branch.get_revision_delta(1)
131
 
        self.assertIsInstance(delta, TreeDelta)
132
 
        self.assertEqual([('foo', 'file1', 'file')], delta.added)
133
 
        delta = tree_a.branch.get_revision_delta(2)
134
 
        self.assertIsInstance(delta, TreeDelta)
135
 
        self.assertEqual([('vla', 'file2', 'file')], delta.added)
136
 
 
137
 
    def get_unbalanced_tree_pair(self):
 
56
        f = Fetcher(from_branch=b1, to_branch=b2)
 
57
        eq = self.assertEquals
 
58
        eq(f.count_copied, 1)
 
59
        eq(f.last_revision, 'revision-1')
 
60
 
 
61
        rev = b2.get_revision('revision-1')
 
62
        tree = b2.revision_tree('revision-1')
 
63
        eq(tree.get_file_text('foo-id'), 'hello')
 
64
 
 
65
    def get_unbalanced_branch_pair(self):
138
66
        """Return two branches, a and b, with one file in a."""
139
 
        get_transport(self.get_url()).mkdir('a')
140
 
        tree_a = self.make_branch_and_tree('a')
 
67
        os.mkdir('a')
 
68
        br_a = Branch.initialize("a")
141
69
        file('a/b', 'wb').write('b')
142
 
        tree_a.add('b')
143
 
        tree_a.commit("silly commit", rev_id='A')
144
 
 
145
 
        get_transport(self.get_url()).mkdir('b')
146
 
        tree_b = self.make_branch_and_tree('b')
147
 
        return tree_a, tree_b
 
70
        br_a.add('b')
 
71
        commit(br_a, "silly commit", rev_id='A')
 
72
        os.mkdir('b')
 
73
        br_b = Branch.initialize("b")
 
74
        return br_a, br_b
148
75
 
149
76
    def get_balanced_branch_pair(self):
150
77
        """Returns br_a, br_b as with one commit in a, and b has a's stores."""
151
 
        tree_a, tree_b = self.get_unbalanced_tree_pair()
152
 
        tree_b.branch.repository.fetch(tree_a.branch.repository)
153
 
        return tree_a, tree_b
154
 
 
155
 
    def test_clone_branch(self):
156
 
        """Copy the stores from one branch to another"""
157
 
        tree_a, tree_b = self.get_balanced_branch_pair()
158
 
        tree_b.commit("silly commit")
 
78
        br_a, br_b = self.get_unbalanced_branch_pair()
 
79
        br_a.push_stores(br_b)
 
80
        return br_a, br_b
 
81
 
 
82
    def test_push_stores(self):
 
83
        """Copy the stores from one branch to another"""
 
84
        br_a, br_b = self.get_unbalanced_branch_pair()
 
85
        # ensure the revision is missing.
 
86
        self.assertRaises(NoSuchRevision, br_b.get_revision, 
 
87
                          br_a.revision_history()[0])
 
88
        br_a.push_stores(br_b)
 
89
        # check that b now has all the data from a's first commit.
 
90
        rev = br_b.get_revision(br_a.revision_history()[0])
 
91
        tree = br_b.revision_tree(br_a.revision_history()[0])
 
92
        for file_id in tree:
 
93
            if tree.inventory[file_id].kind == "file":
 
94
                tree.get_file(file_id).read()
 
95
        return br_a, br_b
 
96
 
 
97
    def test_copy_branch(self):
 
98
        """Copy the stores from one branch to another"""
 
99
        br_a, br_b = self.get_balanced_branch_pair()
 
100
        commit(br_b, "silly commit")
159
101
        os.mkdir('c')
160
 
        # this fails to test that the history from a was not used.
161
 
        dir_c = tree_a.bzrdir.clone('c', basis=tree_b.bzrdir)
162
 
        self.assertEqual(tree_a.branch.revision_history(),
163
 
                         dir_c.open_branch().revision_history())
 
102
        br_c = copy_branch(br_a, 'c', basis_branch=br_b)
 
103
        self.assertEqual(br_a.revision_history(), br_c.revision_history())
164
104
 
165
 
    def test_clone_partial(self):
 
105
    def test_copy_partial(self):
166
106
        """Copy only part of the history of a branch."""
167
 
        # TODO: RBC 20060208 test with a revision not on revision-history.
168
 
        #       what should that behaviour be ? Emailed the list.
169
 
        wt_a = self.make_branch_and_tree('a')
170
 
        self.build_tree(['a/one'])
171
 
        wt_a.add(['one'])
172
 
        wt_a.commit('commit one', rev_id='1')
173
 
        self.build_tree(['a/two'])
174
 
        wt_a.add(['two'])
175
 
        wt_a.commit('commit two', rev_id='2')
176
 
        repo_b = self.make_repository('b')
177
 
        wt_a.bzrdir.open_repository().copy_content_into(repo_b)
178
 
        br_b = wt_a.bzrdir.open_branch().clone(repo_b.bzrdir, revision_id='1')
179
 
        self.assertEqual('1', br_b.last_revision())
180
 
 
181
 
    def test_sprout_partial(self):
182
 
        # test sprouting with a prefix of the revision-history.
183
 
        # also needs not-on-revision-history behaviour defined.
184
 
        wt_a = self.make_branch_and_tree('a')
185
 
        self.build_tree(['a/one'])
186
 
        wt_a.add(['one'])
187
 
        wt_a.commit('commit one', rev_id='1')
188
 
        self.build_tree(['a/two'])
189
 
        wt_a.add(['two'])
190
 
        wt_a.commit('commit two', rev_id='2')
191
 
        repo_b = self.make_repository('b')
192
 
        wt_a.bzrdir.open_repository().copy_content_into(repo_b)
193
 
        br_b = wt_a.bzrdir.open_branch().sprout(repo_b.bzrdir, revision_id='1')
194
 
        self.assertEqual('1', br_b.last_revision())
195
 
 
196
 
    def get_parented_branch(self):
197
 
        wt_a = self.make_branch_and_tree('a')
198
 
        self.build_tree(['a/one'])
199
 
        wt_a.add(['one'])
200
 
        wt_a.commit('commit one', rev_id='1')
201
 
 
202
 
        branch_b = wt_a.bzrdir.sprout('b', revision_id='1').open_branch()
203
 
        self.assertEqual(wt_a.branch.base, branch_b.get_parent())
204
 
        return branch_b
205
 
 
206
 
    def test_clone_branch_nickname(self):
207
 
        # test the nick name is preserved always
208
 
        raise TestSkipped('XXX branch cloning is not yet tested..')
209
 
 
210
 
    def test_clone_branch_parent(self):
211
 
        # test the parent is preserved always
212
 
        branch_b = self.get_parented_branch()
213
 
        repo_c = self.make_repository('c')
214
 
        branch_b.repository.copy_content_into(repo_c)
215
 
        branch_c = branch_b.clone(repo_c.bzrdir)
216
 
        self.assertNotEqual(None, branch_c.get_parent())
217
 
        self.assertEqual(branch_b.get_parent(), branch_c.get_parent())
218
 
 
219
 
        # We can also set a specific parent, and it should be honored
220
 
        random_parent = 'http://bazaar-vcs.org/path/to/branch'
221
 
        branch_b.set_parent(random_parent)
222
 
        repo_d = self.make_repository('d')
223
 
        branch_b.repository.copy_content_into(repo_d)
224
 
        branch_d = branch_b.clone(repo_d.bzrdir)
225
 
        self.assertEqual(random_parent, branch_d.get_parent())
226
 
 
227
 
    def test_sprout_branch_nickname(self):
228
 
        # test the nick name is reset always
229
 
        raise TestSkipped('XXX branch sprouting is not yet tested..')
230
 
 
231
 
    def test_sprout_branch_parent(self):
232
 
        source = self.make_branch('source')
233
 
        target = source.bzrdir.sprout(self.get_url('target')).open_branch()
234
 
        self.assertEqual(source.bzrdir.root_transport.base, target.get_parent())
235
 
 
236
 
    def test_submit_branch(self):
237
 
        """Submit location can be queried and set"""
238
 
        branch = self.make_branch('branch')
239
 
        self.assertEqual(branch.get_submit_branch(), None)
240
 
        branch.set_submit_branch('sftp://example.com')
241
 
        self.assertEqual(branch.get_submit_branch(), 'sftp://example.com')
242
 
        branch.set_submit_branch('sftp://example.net')
243
 
        self.assertEqual(branch.get_submit_branch(), 'sftp://example.net')
 
107
        self.build_tree(['a/', 'a/one'])
 
108
        br_a = Branch.initialize('a')
 
109
        br_a.add(['one'])
 
110
        br_a.commit('commit one', rev_id='u@d-1')
 
111
        self.build_tree(['a/two'])
 
112
        br_a.add(['two'])
 
113
        br_a.commit('commit two', rev_id='u@d-2')
 
114
        br_b = copy_branch(br_a, 'b', revision='u@d-1')
 
115
        self.assertEqual(br_b.last_revision(), 'u@d-1')
 
116
        self.assertTrue(os.path.exists('b/one'))
 
117
        self.assertFalse(os.path.exists('b/two'))
244
118
        
245
 
    def test_record_initial_ghost(self):
246
 
        """Branches should support having ghosts."""
247
 
        wt = self.make_branch_and_tree('.')
248
 
        wt.set_parent_ids(['non:existent@rev--ision--0--2'],
249
 
            allow_leftmost_as_ghost=True)
250
 
        rev_id = wt.commit('commit against a ghost first parent.')
251
 
        rev = wt.branch.repository.get_revision(rev_id)
252
 
        self.assertEqual(rev.parent_ids, ['non:existent@rev--ision--0--2'])
 
119
 
 
120
    def test_record_initial_ghost_merge(self):
 
121
        """A pending merge with no revision present is still a merge."""
 
122
        branch = Branch.initialize('.')
 
123
        branch.add_pending_merge('non:existent@rev--ision--0--2')
 
124
        branch.commit('pretend to merge nonexistent-revision', rev_id='first')
 
125
        rev = branch.get_revision(branch.last_revision())
 
126
        self.assertEqual(len(rev.parent_ids), 1)
253
127
        # parent_sha1s is not populated now, WTF. rbc 20051003
254
128
        self.assertEqual(len(rev.parent_sha1s), 0)
255
 
 
256
 
    def test_record_two_ghosts(self):
257
 
        """Recording with all ghosts works."""
258
 
        wt = self.make_branch_and_tree('.')
259
 
        wt.set_parent_ids([
260
 
                'foo@azkhazan-123123-abcabc',
261
 
                'wibble@fofof--20050401--1928390812',
262
 
            ],
263
 
            allow_leftmost_as_ghost=True)
264
 
        rev_id = wt.commit("commit from ghost base with one merge")
265
 
        # the revision should have been committed with two parents
266
 
        rev = wt.branch.repository.get_revision(rev_id)
267
 
        self.assertEqual(['foo@azkhazan-123123-abcabc',
268
 
            'wibble@fofof--20050401--1928390812'],
269
 
            rev.parent_ids)
270
 
 
271
 
    def test_bad_revision(self):
272
 
        self.assertRaises(errors.InvalidRevisionId,
273
 
                          self.get_branch().repository.get_revision,
274
 
                          None)
 
129
        self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
275
130
 
276
131
# TODO 20051003 RBC:
277
132
# compare the gpg-to-sign info for a commit with a ghost and 
278
133
#     an identical tree without a ghost
279
134
# fetch missing should rewrite the TOC of weaves to list newly available parents.
280
135
        
 
136
    def test_pending_merges(self):
 
137
        """Tracking pending-merged revisions."""
 
138
        b = Branch.initialize('.')
 
139
 
 
140
        self.assertEquals(b.pending_merges(), [])
 
141
        b.add_pending_merge('foo@azkhazan-123123-abcabc')
 
142
        self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
 
143
        b.add_pending_merge('foo@azkhazan-123123-abcabc')
 
144
        self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
 
145
        b.add_pending_merge('wibble@fofof--20050401--1928390812')
 
146
        self.assertEquals(b.pending_merges(),
 
147
                          ['foo@azkhazan-123123-abcabc',
 
148
                           'wibble@fofof--20050401--1928390812'])
 
149
        b.commit("commit from base with two merges")
 
150
        rev = b.get_revision(b.revision_history()[0])
 
151
        self.assertEquals(len(rev.parent_ids), 2)
 
152
        self.assertEquals(rev.parent_ids[0],
 
153
                          'foo@azkhazan-123123-abcabc')
 
154
        self.assertEquals(rev.parent_ids[1],
 
155
                           'wibble@fofof--20050401--1928390812')
 
156
        # list should be cleared when we do a commit
 
157
        self.assertEquals(b.pending_merges(), [])
 
158
 
281
159
    def test_sign_existing_revision(self):
282
 
        wt = self.make_branch_and_tree('.')
283
 
        branch = wt.branch
284
 
        wt.commit("base", allow_pointless=True, rev_id='A')
 
160
        branch = Branch.initialize('.')
 
161
        branch.commit("base", allow_pointless=True, rev_id='A')
285
162
        from bzrlib.testament import Testament
286
 
        strategy = gpg.LoopbackGPGStrategy(None)
287
 
        branch.repository.sign_revision('A', strategy)
288
 
        self.assertEqual(Testament.from_revision(branch.repository, 
289
 
                         'A').as_short_text(),
290
 
                         branch.repository.get_signature_text('A'))
 
163
        branch.sign_revision('A', bzrlib.gpg.LoopbackGPGStrategy(None))
 
164
        self.assertEqual(Testament.from_revision(branch, 'A').as_short_text(),
 
165
                         branch.revision_store.get('A', 'sig').read())
291
166
 
292
167
    def test_store_signature(self):
293
 
        wt = self.make_branch_and_tree('.')
294
 
        branch = wt.branch
295
 
        branch.repository.store_revision_signature(
296
 
            gpg.LoopbackGPGStrategy(None), 'FOO', 'A')
297
 
        self.assertRaises(errors.NoSuchRevision,
298
 
                          branch.repository.has_signature_for_revision_id,
299
 
                          'A')
300
 
        wt.commit("base", allow_pointless=True, rev_id='A')
301
 
        self.assertEqual('FOO', 
302
 
                         branch.repository.get_signature_text('A'))
303
 
 
304
 
    def test_branch_keeps_signatures(self):
305
 
        wt = self.make_branch_and_tree('source')
306
 
        wt.commit('A', allow_pointless=True, rev_id='A')
307
 
        wt.branch.repository.sign_revision('A',
308
 
            gpg.LoopbackGPGStrategy(None))
309
 
        #FIXME: clone should work to urls,
310
 
        # wt.clone should work to disks.
311
 
        self.build_tree(['target/'])
312
 
        d2 = wt.bzrdir.clone('target')
313
 
        self.assertEqual(wt.branch.repository.get_signature_text('A'),
314
 
                         d2.open_repository().get_signature_text('A'))
315
 
 
316
 
    def test_nicks(self):
317
 
        """Branch nicknames"""
318
 
        t = get_transport(self.get_url())
319
 
        t.mkdir('bzr.dev')
320
 
        branch = self.make_branch('bzr.dev')
321
 
        self.assertEqual(branch.nick, 'bzr.dev')
322
 
        t.move('bzr.dev', 'bzr.ab')
323
 
        branch = Branch.open(self.get_url('bzr.ab'))
324
 
        self.assertEqual(branch.nick, 'bzr.ab')
325
 
        branch.nick = "Aaron's branch"
326
 
        branch.nick = "Aaron's branch"
327
 
        self.failUnless(
328
 
            t.has(
329
 
                t.relpath(
330
 
                    branch.control_files.controlfilename("branch.conf")
331
 
                    )
332
 
                )
333
 
            )
334
 
        self.assertEqual(branch.nick, "Aaron's branch")
335
 
        t.move('bzr.ab', 'integration')
336
 
        branch = Branch.open(self.get_url('integration'))
337
 
        self.assertEqual(branch.nick, "Aaron's branch")
338
 
        branch.nick = u"\u1234"
339
 
        self.assertEqual(branch.nick, u"\u1234")
340
 
 
341
 
    def test_commit_nicks(self):
342
 
        """Nicknames are committed to the revision"""
343
 
        get_transport(self.get_url()).mkdir('bzr.dev')
344
 
        wt = self.make_branch_and_tree('bzr.dev')
345
 
        branch = wt.branch
346
 
        branch.nick = "My happy branch"
347
 
        wt.commit('My commit respect da nick.')
348
 
        committed = branch.repository.get_revision(branch.last_revision())
349
 
        self.assertEqual(committed.properties["branch-nick"], 
350
 
                         "My happy branch")
351
 
 
352
 
    def test_create_open_branch_uses_repository(self):
353
 
        try:
354
 
            repo = self.make_repository('.', shared=True)
355
 
        except errors.IncompatibleFormat:
356
 
            return
357
 
        repo.bzrdir.root_transport.mkdir('child')
358
 
        child_dir = self.bzrdir_format.initialize('child')
359
 
        try:
360
 
            child_branch = self.branch_format.initialize(child_dir)
361
 
        except errors.UninitializableFormat:
362
 
            # branch references are not default init'able.
363
 
            return
364
 
        self.assertEqual(repo.bzrdir.root_transport.base,
365
 
                         child_branch.repository.bzrdir.root_transport.base)
366
 
        child_branch = branch.Branch.open(self.get_url('child'))
367
 
        self.assertEqual(repo.bzrdir.root_transport.base,
368
 
                         child_branch.repository.bzrdir.root_transport.base)
369
 
 
370
 
    def test_format_description(self):
371
 
        tree = self.make_branch_and_tree('tree')
372
 
        text = tree.branch._format.get_format_description()
373
 
        self.failUnless(len(text))
374
 
 
375
 
    def test_check_branch_report_results(self):
376
 
        """Checking a branch produces results which can be printed"""
377
 
        branch = self.make_branch('.')
378
 
        result = branch.check()
379
 
        # reports results through logging
380
 
        result.report_results(verbose=True)
381
 
        result.report_results(verbose=False)
382
 
 
383
 
    def test_get_commit_builder(self):
384
 
        self.assertIsInstance(self.make_branch(".").get_commit_builder([]), 
385
 
            repository.CommitBuilder)
386
 
 
387
 
    def test_generate_revision_history(self):
388
 
        """Create a fake revision history easily."""
389
 
        tree = self.make_branch_and_tree('.')
390
 
        rev1 = tree.commit('foo')
391
 
        orig_history = tree.branch.revision_history()
392
 
        rev2 = tree.commit('bar', allow_pointless=True)
393
 
        tree.branch.generate_revision_history(rev1)
394
 
        self.assertEqual(orig_history, tree.branch.revision_history())
395
 
 
396
 
    def test_generate_revision_history_NULL_REVISION(self):
397
 
        tree = self.make_branch_and_tree('.')
398
 
        rev1 = tree.commit('foo')
399
 
        tree.branch.generate_revision_history(bzrlib.revision.NULL_REVISION)
400
 
        self.assertEqual([], tree.branch.revision_history())
401
 
 
402
 
    def test_create_checkout(self):
403
 
        tree_a = self.make_branch_and_tree('a')
404
 
        branch_a = tree_a.branch
405
 
        checkout_b = branch_a.create_checkout('b')
406
 
        self.assertEqual(None, checkout_b.last_revision())
407
 
        checkout_b.commit('rev1', rev_id='rev1')
408
 
        self.assertEqual('rev1', branch_a.last_revision())
409
 
        self.assertNotEqual(checkout_b.branch.base, branch_a.base)
410
 
 
411
 
        checkout_c = branch_a.create_checkout('c', lightweight=True)
412
 
        self.assertEqual('rev1', checkout_c.last_revision())
413
 
        checkout_c.commit('rev2', rev_id='rev2')
414
 
        self.assertEqual('rev2', branch_a.last_revision())
415
 
        self.assertEqual(checkout_c.branch.base, branch_a.base)
416
 
 
417
 
        os.mkdir('d')
418
 
        checkout_d = branch_a.create_checkout('d', lightweight=True)
419
 
        self.assertEqual('rev2', checkout_d.last_revision())
420
 
        os.mkdir('e')
421
 
        checkout_e = branch_a.create_checkout('e')
422
 
        self.assertEqual('rev2', checkout_e.last_revision())
423
 
 
424
 
    def test_create_anonymous_lightweight_checkout(self):
425
 
        """A lightweight checkout from a readonly branch should succeed."""
426
 
        tree_a = self.make_branch_and_tree('a')
427
 
        rev_id = tree_a.commit('put some content in the branch')
428
 
        source_branch = bzrlib.branch.Branch.open(
429
 
            'readonly+' + tree_a.bzrdir.root_transport.base)
430
 
        # sanity check that the test will be valid
431
 
        self.assertRaises((errors.LockError, errors.TransportNotPossible),
432
 
            source_branch.lock_write)
433
 
        checkout = source_branch.create_checkout('c', lightweight=True)
434
 
        self.assertEqual(rev_id, checkout.last_revision())
435
 
 
436
 
    def test_create_anonymous_heavyweight_checkout(self):
437
 
        """A regular checkout from a readonly branch should succeed."""
438
 
        tree_a = self.make_branch_and_tree('a')
439
 
        rev_id = tree_a.commit('put some content in the branch')
440
 
        source_branch = bzrlib.branch.Branch.open(
441
 
            'readonly+' + tree_a.bzrdir.root_transport.base)
442
 
        # sanity check that the test will be valid
443
 
        self.assertRaises((errors.LockError, errors.TransportNotPossible),
444
 
            source_branch.lock_write)
445
 
        checkout = source_branch.create_checkout('c')
446
 
        self.assertEqual(rev_id, checkout.last_revision())
447
 
 
448
 
 
449
 
class ChrootedTests(TestCaseWithBranch):
450
 
    """A support class that provides readonly urls outside the local namespace.
451
 
 
452
 
    This is done by checking if self.transport_server is a MemoryServer. if it
453
 
    is then we are chrooted already, if it is not then an HttpServer is used
454
 
    for readonly urls.
455
 
    """
456
 
 
457
 
    def setUp(self):
458
 
        super(ChrootedTests, self).setUp()
459
 
        if not self.transport_server == MemoryServer:
460
 
            self.transport_readonly_server = HttpServer
 
168
        branch = Branch.initialize('.')
 
169
        branch.store_revision_signature(bzrlib.gpg.LoopbackGPGStrategy(None),
 
170
                                        'FOO', 'A')
 
171
        self.assertEqual('FOO', branch.revision_store.get('A', 'sig').read())
 
172
 
 
173
 
 
174
class TestRemote(TestCaseWithWebserver):
461
175
 
462
176
    def test_open_containing(self):
463
177
        self.assertRaises(NotBranchError, Branch.open_containing,
464
 
                          self.get_readonly_url(''))
 
178
                          self.get_remote_url(''))
465
179
        self.assertRaises(NotBranchError, Branch.open_containing,
466
 
                          self.get_readonly_url('g/p/q'))
467
 
        branch = self.make_branch('.')
468
 
        branch, relpath = Branch.open_containing(self.get_readonly_url(''))
 
180
                          self.get_remote_url('g/p/q'))
 
181
        b = Branch.initialize('.')
 
182
        branch, relpath = Branch.open_containing(self.get_remote_url(''))
469
183
        self.assertEqual('', relpath)
470
 
        branch, relpath = Branch.open_containing(self.get_readonly_url('g/p/q'))
 
184
        branch, relpath = Branch.open_containing(self.get_remote_url('g/p/q'))
471
185
        self.assertEqual('g/p/q', relpath)
472
186
        
 
187
# TODO: rewrite this as a regular unittest, without relying on the displayed output        
 
188
#         >>> from bzrlib.commit import commit
 
189
#         >>> bzrlib.trace.silent = True
 
190
#         >>> br1 = ScratchBranch(files=['foo', 'bar'])
 
191
#         >>> br1.add('foo')
 
192
#         >>> br1.add('bar')
 
193
#         >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
 
194
#         >>> br2 = ScratchBranch()
 
195
#         >>> br2.update_revisions(br1)
 
196
#         Added 2 texts.
 
197
#         Added 1 inventories.
 
198
#         Added 1 revisions.
 
199
#         >>> br2.revision_history()
 
200
#         [u'REVISION-ID-1']
 
201
#         >>> br2.update_revisions(br1)
 
202
#         Added 0 revisions.
 
203
#         >>> br1.text_store.total_size() == br2.text_store.total_size()
 
204
#         True
473
205
 
474
206
class InstrumentedTransaction(object):
475
207
 
534
266
        self.assertEqual(['lw', 'ul'], branch._calls)
535
267
 
536
268
 
537
 
class TestBranchTransaction(TestCaseWithBranch):
 
269
class TestBranchTransaction(TestCaseInTempDir):
538
270
 
539
271
    def setUp(self):
540
272
        super(TestBranchTransaction, self).setUp()
541
 
        self.branch = None
 
273
        self.branch = Branch.initialize('.')
542
274
        
543
275
    def test_default_get_transaction(self):
544
276
        """branch.get_transaction on a new branch should give a PassThrough."""
545
 
        self.failUnless(isinstance(self.get_branch().get_transaction(),
 
277
        self.failUnless(isinstance(self.branch.get_transaction(),
546
278
                                   transactions.PassThroughTransaction))
547
279
 
548
280
    def test__set_new_transaction(self):
549
 
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
 
281
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
550
282
 
551
283
    def test__set_over_existing_transaction_raises(self):
552
 
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
 
284
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
553
285
        self.assertRaises(errors.LockError,
554
 
                          self.get_branch()._set_transaction,
 
286
                          self.branch._set_transaction,
555
287
                          transactions.ReadOnlyTransaction())
556
288
 
557
289
    def test_finish_no_transaction_raises(self):
558
 
        self.assertRaises(errors.LockError, self.get_branch()._finish_transaction)
 
290
        self.assertRaises(errors.LockError, self.branch._finish_transaction)
559
291
 
560
292
    def test_finish_readonly_transaction_works(self):
561
 
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
562
 
        self.get_branch()._finish_transaction()
563
 
        self.assertEqual(None, self.get_branch().control_files._transaction)
 
293
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
294
        self.branch._finish_transaction()
 
295
        self.assertEqual(None, self.branch._transaction)
564
296
 
565
297
    def test_unlock_calls_finish(self):
566
 
        self.get_branch().lock_read()
 
298
        self.branch.lock_read()
567
299
        transaction = InstrumentedTransaction()
568
 
        self.get_branch().control_files._transaction = transaction
569
 
        self.get_branch().unlock()
 
300
        self.branch._transaction = transaction
 
301
        self.branch.unlock()
570
302
        self.assertEqual(['finish'], transaction.calls)
571
303
 
572
304
    def test_lock_read_acquires_ro_transaction(self):
573
 
        self.get_branch().lock_read()
574
 
        self.failUnless(isinstance(self.get_branch().get_transaction(),
 
305
        self.branch.lock_read()
 
306
        self.failUnless(isinstance(self.branch.get_transaction(),
575
307
                                   transactions.ReadOnlyTransaction))
576
 
        self.get_branch().unlock()
 
308
        self.branch.unlock()
577
309
        
578
 
    def test_lock_write_acquires_write_transaction(self):
579
 
        self.get_branch().lock_write()
 
310
    def test_lock_write_acquires_passthrough_transaction(self):
 
311
        self.branch.lock_write()
580
312
        # cannot use get_transaction as its magic
581
 
        self.failUnless(isinstance(self.get_branch().control_files._transaction,
582
 
                                   transactions.WriteTransaction))
583
 
        self.get_branch().unlock()
584
 
 
585
 
 
586
 
class TestBranchPushLocations(TestCaseWithBranch):
587
 
 
588
 
    def test_get_push_location_unset(self):
589
 
        self.assertEqual(None, self.get_branch().get_push_location())
590
 
 
591
 
    def test_get_push_location_exact(self):
592
 
        from bzrlib.config import (locations_config_filename,
593
 
                                   ensure_config_dir_exists)
594
 
        ensure_config_dir_exists()
595
 
        fn = locations_config_filename()
596
 
        print >> open(fn, 'wt'), ("[%s]\n"
597
 
                                  "push_location=foo" %
598
 
                                  self.get_branch().base[:-1])
599
 
        self.assertEqual("foo", self.get_branch().get_push_location())
600
 
 
601
 
    def test_set_push_location(self):
602
 
        from bzrlib.config import (locations_config_filename,
603
 
                                   ensure_config_dir_exists)
604
 
        ensure_config_dir_exists()
605
 
        fn = locations_config_filename()
606
 
        branch = self.get_branch()
607
 
        branch.set_push_location('foo')
608
 
        local_path = urlutils.local_path_from_url(branch.base[:-1])
609
 
        self.assertFileEqual("[%s]\n"
610
 
                             "push_location = foo\n"
611
 
                             "push_location:policy = norecurse" % local_path,
612
 
                             fn)
613
 
 
614
 
    # TODO RBC 20051029 test getting a push location from a branch in a 
615
 
    # recursive section - that is, it appends the branch name.
616
 
 
617
 
 
618
 
class TestFormat(TestCaseWithBranch):
619
 
    """Tests for the format itself."""
620
 
 
621
 
    def test_format_initialize_find_open(self):
622
 
        # loopback test to check the current format initializes to itself.
623
 
        if not self.branch_format.is_supported():
624
 
            # unsupported formats are not loopback testable
625
 
            # because the default open will not open them and
626
 
            # they may not be initializable.
627
 
            return
628
 
        # supported formats must be able to init and open
629
 
        t = get_transport(self.get_url())
630
 
        readonly_t = get_transport(self.get_readonly_url())
631
 
        made_branch = self.make_branch('.')
632
 
        self.failUnless(isinstance(made_branch, branch.Branch))
633
 
 
634
 
        # find it via bzrdir opening:
635
 
        opened_control = bzrdir.BzrDir.open(readonly_t.base)
636
 
        direct_opened_branch = opened_control.open_branch()
637
 
        self.assertEqual(direct_opened_branch.__class__, made_branch.__class__)
638
 
        self.assertEqual(opened_control, direct_opened_branch.bzrdir)
639
 
        self.failUnless(isinstance(direct_opened_branch._format,
640
 
                        self.branch_format.__class__))
641
 
 
642
 
        # find it via Branch.open
643
 
        opened_branch = branch.Branch.open(readonly_t.base)
644
 
        self.failUnless(isinstance(opened_branch, made_branch.__class__))
645
 
        self.assertEqual(made_branch._format.__class__,
646
 
                         opened_branch._format.__class__)
647
 
        # if it has a unique id string, can we probe for it ?
648
 
        try:
649
 
            self.branch_format.get_format_string()
650
 
        except NotImplementedError:
651
 
            return
652
 
        self.assertEqual(self.branch_format,
653
 
                         branch.BranchFormat.find_format(opened_control))
654
 
 
655
 
 
 
313
        self.failUnless(isinstance(self.branch._transaction,
 
314
                                   transactions.PassThroughTransaction))
 
315
        self.branch.unlock()