~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/selftest/testbranch.py

Don't ignore bzrlib/plugins -- really bad idea because they can have bad 
old plugins lying around!

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006 Canonical Ltd
2
 
#
 
1
# (C) 2005 Canonical Ltd
 
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
#
 
7
 
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
#
 
12
 
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
 
"""Tests for branch implementations - tests a branch format."""
18
 
 
19
17
import os
20
 
import sys
21
18
 
22
 
from bzrlib import (
23
 
    branch,
24
 
    bzrdir,
25
 
    errors,
26
 
    gpg,
27
 
    urlutils,
28
 
    transactions,
29
 
    repository,
30
 
    )
31
19
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
32
 
from bzrlib.delta import TreeDelta
33
 
from bzrlib.errors import (FileExists,
34
 
                           NoSuchRevision,
35
 
                           NoSuchFile,
36
 
                           UninitializableFormat,
37
 
                           NotBranchError,
38
 
                           )
39
 
from bzrlib.osutils import getcwd
40
 
import bzrlib.revision
41
 
from bzrlib.tests import TestCase, TestCaseWithTransport, TestSkipped
42
 
from bzrlib.tests.bzrdir_implementations.test_bzrdir import TestCaseWithBzrDir
 
20
from bzrlib.clone import copy_branch
 
21
from bzrlib.commit import commit
 
22
import bzrlib.errors as errors
 
23
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
 
24
import bzrlib.gpg
 
25
from bzrlib.selftest import TestCase, TestCaseInTempDir
 
26
from bzrlib.selftest.HTTPTestUtil import TestCaseWithWebserver
43
27
from bzrlib.trace import mutter
44
 
from bzrlib.transport import get_transport
45
 
from bzrlib.transport.http import HttpServer
46
 
from bzrlib.transport.memory import MemoryServer
47
 
from bzrlib.upgrade import upgrade
48
 
from bzrlib.workingtree import WorkingTree
49
 
 
 
28
import bzrlib.transactions as transactions
 
29
from bzrlib.revision import NULL_REVISION
50
30
 
51
31
# TODO: Make a branch using basis branch, and check that it 
52
32
# doesn't request any files that could have been avoided, by 
53
33
# hooking into the Transport.
54
34
 
55
 
 
56
 
class TestCaseWithBranch(TestCaseWithBzrDir):
57
 
 
58
 
    def setUp(self):
59
 
        super(TestCaseWithBranch, self).setUp()
60
 
        self.branch = None
61
 
 
62
 
    def get_branch(self):
63
 
        if self.branch is None:
64
 
            self.branch = self.make_branch('')
65
 
        return self.branch
66
 
 
67
 
    def make_branch(self, relpath, format=None):
68
 
        repo = self.make_repository(relpath, format=format)
69
 
        # fixme RBC 20060210 this isnt necessarily a fixable thing,
70
 
        # Skipped is the wrong exception to raise.
71
 
        try:
72
 
            return self.branch_format.initialize(repo.bzrdir)
73
 
        except errors.UninitializableFormat:
74
 
            raise TestSkipped('Uninitializable branch format')
75
 
 
76
 
    def make_repository(self, relpath, shared=False, format=None):
77
 
        made_control = self.make_bzrdir(relpath, format=format)
78
 
        return made_control.create_repository(shared=shared)
79
 
 
80
 
 
81
 
class TestBranch(TestCaseWithBranch):
 
35
class TestBranch(TestCaseInTempDir):
82
36
 
83
37
    def test_append_revisions(self):
84
38
        """Test appending more than one revision"""
85
 
        br = self.get_branch()
 
39
        br = Branch.initialize(".")
86
40
        br.append_revision("rev1")
87
41
        self.assertEquals(br.revision_history(), ["rev1",])
88
42
        br.append_revision("rev2", "rev3")
90
44
 
91
45
    def test_fetch_revisions(self):
92
46
        """Test fetch-revision operation."""
93
 
        get_transport(self.get_url()).mkdir('b1')
94
 
        get_transport(self.get_url()).mkdir('b2')
95
 
        wt = self.make_branch_and_tree('b1')
96
 
        b1 = wt.branch
97
 
        b2 = self.make_branch('b2')
98
 
        file('b1/foo', 'w').write('hello')
99
 
        wt.add(['foo'], ['foo-id'])
100
 
        wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
 
47
        from bzrlib.fetch import Fetcher
 
48
        os.mkdir('b1')
 
49
        os.mkdir('b2')
 
50
        b1 = Branch.initialize('b1')
 
51
        b2 = Branch.initialize('b2')
 
52
        file(os.sep.join(['b1', 'foo']), 'w').write('hello')
 
53
        b1.working_tree().add(['foo'], ['foo-id'])
 
54
        b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=False)
101
55
 
102
56
        mutter('start fetch')
103
 
        self.assertEqual((1, []), b2.fetch(b1))
104
 
 
105
 
        rev = b2.repository.get_revision('revision-1')
106
 
        tree = b2.repository.revision_tree('revision-1')
107
 
        self.assertEqual(tree.get_file_text('foo-id'), 'hello')
108
 
 
109
 
    def test_get_revision_delta(self):
110
 
        tree_a = self.make_branch_and_tree('a')
111
 
        self.build_tree(['a/foo'])
112
 
        tree_a.add('foo', 'file1')
113
 
        tree_a.commit('rev1', rev_id='rev1')
114
 
        self.build_tree(['a/vla'])
115
 
        tree_a.add('vla', 'file2')
116
 
        tree_a.commit('rev2', rev_id='rev2')
117
 
 
118
 
        delta = tree_a.branch.get_revision_delta(1)
119
 
        self.assertIsInstance(delta, TreeDelta)
120
 
        self.assertEqual([('foo', 'file1', 'file')], delta.added)
121
 
        delta = tree_a.branch.get_revision_delta(2)
122
 
        self.assertIsInstance(delta, TreeDelta)
123
 
        self.assertEqual([('vla', 'file2', 'file')], delta.added)
124
 
 
125
 
    def get_unbalanced_tree_pair(self):
 
57
        f = Fetcher(from_branch=b1, to_branch=b2)
 
58
        eq = self.assertEquals
 
59
        eq(f.count_copied, 1)
 
60
        eq(f.last_revision, 'revision-1')
 
61
 
 
62
        rev = b2.get_revision('revision-1')
 
63
        tree = b2.revision_tree('revision-1')
 
64
        eq(tree.get_file_text('foo-id'), 'hello')
 
65
 
 
66
    def test_revision_tree(self):
 
67
        b1 = Branch.initialize('.')
 
68
        b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=True)
 
69
        tree = b1.revision_tree('revision-1')
 
70
        tree = b1.revision_tree(None)
 
71
        self.assertEqual(len(tree.list_files()), 0)
 
72
        tree = b1.revision_tree(NULL_REVISION)
 
73
        self.assertEqual(len(tree.list_files()), 0)
 
74
 
 
75
    def get_unbalanced_branch_pair(self):
126
76
        """Return two branches, a and b, with one file in a."""
127
 
        get_transport(self.get_url()).mkdir('a')
128
 
        tree_a = self.make_branch_and_tree('a')
 
77
        os.mkdir('a')
 
78
        br_a = Branch.initialize("a")
129
79
        file('a/b', 'wb').write('b')
130
 
        tree_a.add('b')
131
 
        tree_a.commit("silly commit", rev_id='A')
132
 
 
133
 
        get_transport(self.get_url()).mkdir('b')
134
 
        tree_b = self.make_branch_and_tree('b')
135
 
        return tree_a, tree_b
 
80
        br_a.working_tree().add('b')
 
81
        commit(br_a, "silly commit", rev_id='A')
 
82
        os.mkdir('b')
 
83
        br_b = Branch.initialize("b")
 
84
        return br_a, br_b
136
85
 
137
86
    def get_balanced_branch_pair(self):
138
87
        """Returns br_a, br_b as with one commit in a, and b has a's stores."""
139
 
        tree_a, tree_b = self.get_unbalanced_tree_pair()
140
 
        tree_b.branch.repository.fetch(tree_a.branch.repository)
141
 
        return tree_a, tree_b
142
 
 
143
 
    def test_clone_branch(self):
144
 
        """Copy the stores from one branch to another"""
145
 
        tree_a, tree_b = self.get_balanced_branch_pair()
146
 
        tree_b.commit("silly commit")
 
88
        br_a, br_b = self.get_unbalanced_branch_pair()
 
89
        br_a.push_stores(br_b)
 
90
        return br_a, br_b
 
91
 
 
92
    def test_push_stores(self):
 
93
        """Copy the stores from one branch to another"""
 
94
        br_a, br_b = self.get_unbalanced_branch_pair()
 
95
        # ensure the revision is missing.
 
96
        self.assertRaises(NoSuchRevision, br_b.get_revision, 
 
97
                          br_a.revision_history()[0])
 
98
        br_a.push_stores(br_b)
 
99
        # check that b now has all the data from a's first commit.
 
100
        rev = br_b.get_revision(br_a.revision_history()[0])
 
101
        tree = br_b.revision_tree(br_a.revision_history()[0])
 
102
        for file_id in tree:
 
103
            if tree.inventory[file_id].kind == "file":
 
104
                tree.get_file(file_id).read()
 
105
        return br_a, br_b
 
106
 
 
107
    def test_copy_branch(self):
 
108
        """Copy the stores from one branch to another"""
 
109
        br_a, br_b = self.get_balanced_branch_pair()
 
110
        commit(br_b, "silly commit")
147
111
        os.mkdir('c')
148
 
        # this fails to test that the history from a was not used.
149
 
        dir_c = tree_a.bzrdir.clone('c', basis=tree_b.bzrdir)
150
 
        self.assertEqual(tree_a.branch.revision_history(),
151
 
                         dir_c.open_branch().revision_history())
 
112
        br_c = copy_branch(br_a, 'c', basis_branch=br_b)
 
113
        self.assertEqual(br_a.revision_history(), br_c.revision_history())
152
114
 
153
 
    def test_clone_partial(self):
 
115
    def test_copy_partial(self):
154
116
        """Copy only part of the history of a branch."""
155
 
        # TODO: RBC 20060208 test with a revision not on revision-history.
156
 
        #       what should that behaviour be ? Emailed the list.
157
 
        wt_a = self.make_branch_and_tree('a')
158
 
        self.build_tree(['a/one'])
159
 
        wt_a.add(['one'])
160
 
        wt_a.commit('commit one', rev_id='1')
161
 
        self.build_tree(['a/two'])
162
 
        wt_a.add(['two'])
163
 
        wt_a.commit('commit two', rev_id='2')
164
 
        repo_b = self.make_repository('b')
165
 
        wt_a.bzrdir.open_repository().copy_content_into(repo_b)
166
 
        br_b = wt_a.bzrdir.open_branch().clone(repo_b.bzrdir, revision_id='1')
167
 
        self.assertEqual('1', br_b.last_revision())
168
 
 
169
 
    def test_sprout_partial(self):
170
 
        # test sprouting with a prefix of the revision-history.
171
 
        # also needs not-on-revision-history behaviour defined.
172
 
        wt_a = self.make_branch_and_tree('a')
173
 
        self.build_tree(['a/one'])
174
 
        wt_a.add(['one'])
175
 
        wt_a.commit('commit one', rev_id='1')
176
 
        self.build_tree(['a/two'])
177
 
        wt_a.add(['two'])
178
 
        wt_a.commit('commit two', rev_id='2')
179
 
        repo_b = self.make_repository('b')
180
 
        wt_a.bzrdir.open_repository().copy_content_into(repo_b)
181
 
        br_b = wt_a.bzrdir.open_branch().sprout(repo_b.bzrdir, revision_id='1')
182
 
        self.assertEqual('1', br_b.last_revision())
183
 
 
184
 
    def get_parented_branch(self):
185
 
        wt_a = self.make_branch_and_tree('a')
186
 
        self.build_tree(['a/one'])
187
 
        wt_a.add(['one'])
188
 
        wt_a.commit('commit one', rev_id='1')
189
 
 
190
 
        branch_b = wt_a.bzrdir.sprout('b', revision_id='1').open_branch()
191
 
        self.assertEqual(wt_a.branch.base, branch_b.get_parent())
192
 
        return branch_b
193
 
 
194
 
    def test_clone_branch_nickname(self):
195
 
        # test the nick name is preserved always
196
 
        raise TestSkipped('XXX branch cloning is not yet tested..')
197
 
 
198
 
    def test_clone_branch_parent(self):
199
 
        # test the parent is preserved always
200
 
        branch_b = self.get_parented_branch()
201
 
        repo_c = self.make_repository('c')
202
 
        branch_b.repository.copy_content_into(repo_c)
203
 
        branch_c = branch_b.clone(repo_c.bzrdir)
204
 
        self.assertNotEqual(None, branch_c.get_parent())
205
 
        self.assertEqual(branch_b.get_parent(), branch_c.get_parent())
206
 
 
207
 
        # We can also set a specific parent, and it should be honored
208
 
        random_parent = 'http://bazaar-vcs.org/path/to/branch'
209
 
        branch_b.set_parent(random_parent)
210
 
        repo_d = self.make_repository('d')
211
 
        branch_b.repository.copy_content_into(repo_d)
212
 
        branch_d = branch_b.clone(repo_d.bzrdir)
213
 
        self.assertEqual(random_parent, branch_d.get_parent())
214
 
 
215
 
    def test_sprout_branch_nickname(self):
216
 
        # test the nick name is reset always
217
 
        raise TestSkipped('XXX branch sprouting is not yet tested..')
218
 
 
219
 
    def test_sprout_branch_parent(self):
220
 
        source = self.make_branch('source')
221
 
        target = source.bzrdir.sprout(self.get_url('target')).open_branch()
222
 
        self.assertEqual(source.bzrdir.root_transport.base, target.get_parent())
223
 
 
224
 
    def test_submit_branch(self):
225
 
        """Submit location can be queried and set"""
226
 
        branch = self.make_branch('branch')
227
 
        self.assertEqual(branch.get_submit_branch(), None)
228
 
        branch.set_submit_branch('sftp://example.com')
229
 
        self.assertEqual(branch.get_submit_branch(), 'sftp://example.com')
230
 
        branch.set_submit_branch('sftp://example.net')
231
 
        self.assertEqual(branch.get_submit_branch(), 'sftp://example.net')
 
117
        self.build_tree(['a/', 'a/one'])
 
118
        br_a = Branch.initialize('a')
 
119
        br_a.working_tree().add(['one'])
 
120
        br_a.working_tree().commit('commit one', rev_id='u@d-1')
 
121
        self.build_tree(['a/two'])
 
122
        br_a.working_tree().add(['two'])
 
123
        br_a.working_tree().commit('commit two', rev_id='u@d-2')
 
124
        br_b = copy_branch(br_a, 'b', revision='u@d-1')
 
125
        self.assertEqual(br_b.last_revision(), 'u@d-1')
 
126
        self.assertTrue(os.path.exists('b/one'))
 
127
        self.assertFalse(os.path.exists('b/two'))
232
128
        
233
 
    def test_record_initial_ghost(self):
234
 
        """Branches should support having ghosts."""
235
 
        wt = self.make_branch_and_tree('.')
236
 
        wt.set_parent_ids(['non:existent@rev--ision--0--2'],
237
 
            allow_leftmost_as_ghost=True)
238
 
        rev_id = wt.commit('commit against a ghost first parent.')
239
 
        rev = wt.branch.repository.get_revision(rev_id)
240
 
        self.assertEqual(rev.parent_ids, ['non:existent@rev--ision--0--2'])
 
129
    def test_record_initial_ghost_merge(self):
 
130
        """A pending merge with no revision present is still a merge."""
 
131
        branch = Branch.initialize('.')
 
132
        branch.working_tree().add_pending_merge('non:existent@rev--ision--0--2')
 
133
        branch.working_tree().commit('pretend to merge nonexistent-revision', rev_id='first')
 
134
        rev = branch.get_revision(branch.last_revision())
 
135
        self.assertEqual(len(rev.parent_ids), 1)
241
136
        # parent_sha1s is not populated now, WTF. rbc 20051003
242
137
        self.assertEqual(len(rev.parent_sha1s), 0)
243
 
 
244
 
    def test_record_two_ghosts(self):
245
 
        """Recording with all ghosts works."""
246
 
        wt = self.make_branch_and_tree('.')
247
 
        wt.set_parent_ids([
248
 
                'foo@azkhazan-123123-abcabc',
249
 
                'wibble@fofof--20050401--1928390812',
250
 
            ],
251
 
            allow_leftmost_as_ghost=True)
252
 
        rev_id = wt.commit("commit from ghost base with one merge")
253
 
        # the revision should have been committed with two parents
254
 
        rev = wt.branch.repository.get_revision(rev_id)
255
 
        self.assertEqual(['foo@azkhazan-123123-abcabc',
256
 
            'wibble@fofof--20050401--1928390812'],
257
 
            rev.parent_ids)
 
138
        self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
258
139
 
259
140
    def test_bad_revision(self):
260
 
        self.assertRaises(errors.InvalidRevisionId,
261
 
                          self.get_branch().repository.get_revision,
262
 
                          None)
 
141
        branch = Branch.initialize('.')
 
142
        self.assertRaises(errors.InvalidRevisionId, branch.get_revision, None)
263
143
 
264
144
# TODO 20051003 RBC:
265
145
# compare the gpg-to-sign info for a commit with a ghost and 
266
146
#     an identical tree without a ghost
267
147
# fetch missing should rewrite the TOC of weaves to list newly available parents.
268
148
        
 
149
    def test_pending_merges(self):
 
150
        """Tracking pending-merged revisions."""
 
151
        b = Branch.initialize('.')
 
152
        wt = b.working_tree()
 
153
        self.assertEquals(wt.pending_merges(), [])
 
154
        wt.add_pending_merge('foo@azkhazan-123123-abcabc')
 
155
        self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
 
156
        wt.add_pending_merge('foo@azkhazan-123123-abcabc')
 
157
        self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
 
158
        wt.add_pending_merge('wibble@fofof--20050401--1928390812')
 
159
        self.assertEquals(wt.pending_merges(),
 
160
                          ['foo@azkhazan-123123-abcabc',
 
161
                           'wibble@fofof--20050401--1928390812'])
 
162
        b.working_tree().commit("commit from base with two merges")
 
163
        rev = b.get_revision(b.revision_history()[0])
 
164
        self.assertEquals(len(rev.parent_ids), 2)
 
165
        self.assertEquals(rev.parent_ids[0],
 
166
                          'foo@azkhazan-123123-abcabc')
 
167
        self.assertEquals(rev.parent_ids[1],
 
168
                           'wibble@fofof--20050401--1928390812')
 
169
        # list should be cleared when we do a commit
 
170
        self.assertEquals(wt.pending_merges(), [])
 
171
 
269
172
    def test_sign_existing_revision(self):
270
 
        wt = self.make_branch_and_tree('.')
271
 
        branch = wt.branch
272
 
        wt.commit("base", allow_pointless=True, rev_id='A')
 
173
        branch = Branch.initialize('.')
 
174
        branch.working_tree().commit("base", allow_pointless=True, rev_id='A')
273
175
        from bzrlib.testament import Testament
274
 
        strategy = gpg.LoopbackGPGStrategy(None)
275
 
        branch.repository.sign_revision('A', strategy)
276
 
        self.assertEqual(Testament.from_revision(branch.repository, 
277
 
                         'A').as_short_text(),
278
 
                         branch.repository.get_signature_text('A'))
 
176
        branch.sign_revision('A', bzrlib.gpg.LoopbackGPGStrategy(None))
 
177
        self.assertEqual(Testament.from_revision(branch, 'A').as_short_text(),
 
178
                         branch.revision_store.get('A', 'sig').read())
279
179
 
280
180
    def test_store_signature(self):
281
 
        wt = self.make_branch_and_tree('.')
282
 
        branch = wt.branch
283
 
        branch.repository.store_revision_signature(
284
 
            gpg.LoopbackGPGStrategy(None), 'FOO', 'A')
285
 
        self.assertRaises(errors.NoSuchRevision,
286
 
                          branch.repository.has_signature_for_revision_id,
287
 
                          'A')
288
 
        wt.commit("base", allow_pointless=True, rev_id='A')
289
 
        self.assertEqual('FOO', 
290
 
                         branch.repository.get_signature_text('A'))
 
181
        branch = Branch.initialize('.')
 
182
        branch.store_revision_signature(bzrlib.gpg.LoopbackGPGStrategy(None),
 
183
                                        'FOO', 'A')
 
184
        self.assertEqual('FOO', branch.revision_store.get('A', 'sig').read())
291
185
 
292
 
    def test_branch_keeps_signatures(self):
293
 
        wt = self.make_branch_and_tree('source')
294
 
        wt.commit('A', allow_pointless=True, rev_id='A')
295
 
        wt.branch.repository.sign_revision('A',
296
 
            gpg.LoopbackGPGStrategy(None))
297
 
        #FIXME: clone should work to urls,
298
 
        # wt.clone should work to disks.
299
 
        self.build_tree(['target/'])
300
 
        d2 = wt.bzrdir.clone('target')
301
 
        self.assertEqual(wt.branch.repository.get_signature_text('A'),
302
 
                         d2.open_repository().get_signature_text('A'))
 
186
    def test__relcontrolfilename(self):
 
187
        branch = Branch.initialize('.')
 
188
        self.assertEqual('.bzr/%25', branch._rel_controlfilename('%'))
 
189
        
 
190
    def test__relcontrolfilename_empty(self):
 
191
        branch = Branch.initialize('.')
 
192
        self.assertEqual('.bzr', branch._rel_controlfilename(''))
303
193
 
304
194
    def test_nicks(self):
305
195
        """Branch nicknames"""
306
 
        t = get_transport(self.get_url())
307
 
        t.mkdir('bzr.dev')
308
 
        branch = self.make_branch('bzr.dev')
 
196
        os.mkdir('bzr.dev')
 
197
        branch = Branch.initialize('bzr.dev')
309
198
        self.assertEqual(branch.nick, 'bzr.dev')
310
 
        t.move('bzr.dev', 'bzr.ab')
311
 
        branch = Branch.open(self.get_url('bzr.ab'))
 
199
        os.rename('bzr.dev', 'bzr.ab')
 
200
        branch = Branch.open('bzr.ab')
312
201
        self.assertEqual(branch.nick, 'bzr.ab')
313
202
        branch.nick = "Aaron's branch"
314
203
        branch.nick = "Aaron's branch"
315
 
        self.failUnless(
316
 
            t.has(
317
 
                t.relpath(
318
 
                    branch.control_files.controlfilename("branch.conf")
319
 
                    )
320
 
                )
321
 
            )
 
204
        self.failUnless(os.path.exists(branch.controlfilename("branch.conf")))
322
205
        self.assertEqual(branch.nick, "Aaron's branch")
323
 
        t.move('bzr.ab', 'integration')
324
 
        branch = Branch.open(self.get_url('integration'))
 
206
        os.rename('bzr.ab', 'integration')
 
207
        branch = Branch.open('integration')
325
208
        self.assertEqual(branch.nick, "Aaron's branch")
326
209
        branch.nick = u"\u1234"
327
210
        self.assertEqual(branch.nick, u"\u1234")
328
211
 
329
212
    def test_commit_nicks(self):
330
213
        """Nicknames are committed to the revision"""
331
 
        get_transport(self.get_url()).mkdir('bzr.dev')
332
 
        wt = self.make_branch_and_tree('bzr.dev')
333
 
        branch = wt.branch
 
214
        os.mkdir('bzr.dev')
 
215
        branch = Branch.initialize('bzr.dev')
334
216
        branch.nick = "My happy branch"
335
 
        wt.commit('My commit respect da nick.')
336
 
        committed = branch.repository.get_revision(branch.last_revision())
 
217
        branch.working_tree().commit('My commit respect da nick.')
 
218
        committed = branch.get_revision(branch.last_revision())
337
219
        self.assertEqual(committed.properties["branch-nick"], 
338
220
                         "My happy branch")
339
221
 
340
 
    def test_create_open_branch_uses_repository(self):
341
 
        try:
342
 
            repo = self.make_repository('.', shared=True)
343
 
        except errors.IncompatibleFormat:
344
 
            return
345
 
        repo.bzrdir.root_transport.mkdir('child')
346
 
        child_dir = self.bzrdir_format.initialize('child')
347
 
        try:
348
 
            child_branch = self.branch_format.initialize(child_dir)
349
 
        except errors.UninitializableFormat:
350
 
            # branch references are not default init'able.
351
 
            return
352
 
        self.assertEqual(repo.bzrdir.root_transport.base,
353
 
                         child_branch.repository.bzrdir.root_transport.base)
354
 
        child_branch = branch.Branch.open(self.get_url('child'))
355
 
        self.assertEqual(repo.bzrdir.root_transport.base,
356
 
                         child_branch.repository.bzrdir.root_transport.base)
357
 
 
358
 
    def test_format_description(self):
359
 
        tree = self.make_branch_and_tree('tree')
360
 
        text = tree.branch._format.get_format_description()
361
 
        self.failUnless(len(text))
362
 
 
363
 
    def test_check_branch_report_results(self):
364
 
        """Checking a branch produces results which can be printed"""
365
 
        branch = self.make_branch('.')
366
 
        result = branch.check()
367
 
        # reports results through logging
368
 
        result.report_results(verbose=True)
369
 
        result.report_results(verbose=False)
370
 
 
371
 
    def test_get_commit_builder(self):
372
 
        self.assertIsInstance(self.make_branch(".").get_commit_builder([]), 
373
 
            repository.CommitBuilder)
374
 
 
375
 
    def test_generate_revision_history(self):
376
 
        """Create a fake revision history easily."""
377
 
        tree = self.make_branch_and_tree('.')
378
 
        rev1 = tree.commit('foo')
379
 
        orig_history = tree.branch.revision_history()
380
 
        rev2 = tree.commit('bar', allow_pointless=True)
381
 
        tree.branch.generate_revision_history(rev1)
382
 
        self.assertEqual(orig_history, tree.branch.revision_history())
383
 
 
384
 
    def test_generate_revision_history_NULL_REVISION(self):
385
 
        tree = self.make_branch_and_tree('.')
386
 
        rev1 = tree.commit('foo')
387
 
        tree.branch.generate_revision_history(bzrlib.revision.NULL_REVISION)
388
 
        self.assertEqual([], tree.branch.revision_history())
389
 
 
390
 
    def test_create_checkout(self):
391
 
        tree_a = self.make_branch_and_tree('a')
392
 
        branch_a = tree_a.branch
393
 
        checkout_b = branch_a.create_checkout('b')
394
 
        self.assertEqual(None, checkout_b.last_revision())
395
 
        checkout_b.commit('rev1', rev_id='rev1')
396
 
        self.assertEqual('rev1', branch_a.last_revision())
397
 
        self.assertNotEqual(checkout_b.branch.base, branch_a.base)
398
 
 
399
 
        checkout_c = branch_a.create_checkout('c', lightweight=True)
400
 
        self.assertEqual('rev1', checkout_c.last_revision())
401
 
        checkout_c.commit('rev2', rev_id='rev2')
402
 
        self.assertEqual('rev2', branch_a.last_revision())
403
 
        self.assertEqual(checkout_c.branch.base, branch_a.base)
404
 
 
405
 
        os.mkdir('d')
406
 
        checkout_d = branch_a.create_checkout('d', lightweight=True)
407
 
        self.assertEqual('rev2', checkout_d.last_revision())
408
 
        os.mkdir('e')
409
 
        checkout_e = branch_a.create_checkout('e')
410
 
        self.assertEqual('rev2', checkout_e.last_revision())
411
 
 
412
 
    def test_create_anonymous_lightweight_checkout(self):
413
 
        """A lightweight checkout from a readonly branch should succeed."""
414
 
        tree_a = self.make_branch_and_tree('a')
415
 
        rev_id = tree_a.commit('put some content in the branch')
416
 
        source_branch = bzrlib.branch.Branch.open(
417
 
            'readonly+' + tree_a.bzrdir.root_transport.base)
418
 
        # sanity check that the test will be valid
419
 
        self.assertRaises((errors.LockError, errors.TransportNotPossible),
420
 
            source_branch.lock_write)
421
 
        checkout = source_branch.create_checkout('c', lightweight=True)
422
 
        self.assertEqual(rev_id, checkout.last_revision())
423
 
 
424
 
    def test_create_anonymous_heavyweight_checkout(self):
425
 
        """A regular checkout from a readonly branch should succeed."""
426
 
        tree_a = self.make_branch_and_tree('a')
427
 
        rev_id = tree_a.commit('put some content in the branch')
428
 
        source_branch = bzrlib.branch.Branch.open(
429
 
            'readonly+' + tree_a.bzrdir.root_transport.base)
430
 
        # sanity check that the test will be valid
431
 
        self.assertRaises((errors.LockError, errors.TransportNotPossible),
432
 
            source_branch.lock_write)
433
 
        checkout = source_branch.create_checkout('c')
434
 
        self.assertEqual(rev_id, checkout.last_revision())
435
 
 
436
 
 
437
 
class ChrootedTests(TestCaseWithBranch):
438
 
    """A support class that provides readonly urls outside the local namespace.
439
 
 
440
 
    This is done by checking if self.transport_server is a MemoryServer. if it
441
 
    is then we are chrooted already, if it is not then an HttpServer is used
442
 
    for readonly urls.
443
 
    """
444
 
 
445
 
    def setUp(self):
446
 
        super(ChrootedTests, self).setUp()
447
 
        if not self.transport_server == MemoryServer:
448
 
            self.transport_readonly_server = HttpServer
 
222
 
 
223
class TestRemote(TestCaseWithWebserver):
449
224
 
450
225
    def test_open_containing(self):
451
226
        self.assertRaises(NotBranchError, Branch.open_containing,
452
 
                          self.get_readonly_url(''))
 
227
                          self.get_remote_url(''))
453
228
        self.assertRaises(NotBranchError, Branch.open_containing,
454
 
                          self.get_readonly_url('g/p/q'))
455
 
        branch = self.make_branch('.')
456
 
        branch, relpath = Branch.open_containing(self.get_readonly_url(''))
 
229
                          self.get_remote_url('g/p/q'))
 
230
        b = Branch.initialize('.')
 
231
        branch, relpath = Branch.open_containing(self.get_remote_url(''))
457
232
        self.assertEqual('', relpath)
458
 
        branch, relpath = Branch.open_containing(self.get_readonly_url('g/p/q'))
 
233
        branch, relpath = Branch.open_containing(self.get_remote_url('g/p/q'))
459
234
        self.assertEqual('g/p/q', relpath)
460
235
        
 
236
# TODO: rewrite this as a regular unittest, without relying on the displayed output        
 
237
#         >>> from bzrlib.commit import commit
 
238
#         >>> bzrlib.trace.silent = True
 
239
#         >>> br1 = ScratchBranch(files=['foo', 'bar'])
 
240
#         >>> br1.working_tree().add('foo')
 
241
#         >>> br1.working_tree().add('bar')
 
242
#         >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
 
243
#         >>> br2 = ScratchBranch()
 
244
#         >>> br2.update_revisions(br1)
 
245
#         Added 2 texts.
 
246
#         Added 1 inventories.
 
247
#         Added 1 revisions.
 
248
#         >>> br2.revision_history()
 
249
#         [u'REVISION-ID-1']
 
250
#         >>> br2.update_revisions(br1)
 
251
#         Added 0 revisions.
 
252
#         >>> br1.text_store.total_size() == br2.text_store.total_size()
 
253
#         True
461
254
 
462
255
class InstrumentedTransaction(object):
463
256
 
522
315
        self.assertEqual(['lw', 'ul'], branch._calls)
523
316
 
524
317
 
525
 
class TestBranchTransaction(TestCaseWithBranch):
 
318
class TestBranchTransaction(TestCaseInTempDir):
526
319
 
527
320
    def setUp(self):
528
321
        super(TestBranchTransaction, self).setUp()
529
 
        self.branch = None
 
322
        self.branch = Branch.initialize('.')
530
323
        
531
324
    def test_default_get_transaction(self):
532
325
        """branch.get_transaction on a new branch should give a PassThrough."""
533
 
        self.failUnless(isinstance(self.get_branch().get_transaction(),
 
326
        self.failUnless(isinstance(self.branch.get_transaction(),
534
327
                                   transactions.PassThroughTransaction))
535
328
 
536
329
    def test__set_new_transaction(self):
537
 
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
 
330
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
538
331
 
539
332
    def test__set_over_existing_transaction_raises(self):
540
 
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
 
333
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
541
334
        self.assertRaises(errors.LockError,
542
 
                          self.get_branch()._set_transaction,
 
335
                          self.branch._set_transaction,
543
336
                          transactions.ReadOnlyTransaction())
544
337
 
545
338
    def test_finish_no_transaction_raises(self):
546
 
        self.assertRaises(errors.LockError, self.get_branch()._finish_transaction)
 
339
        self.assertRaises(errors.LockError, self.branch._finish_transaction)
547
340
 
548
341
    def test_finish_readonly_transaction_works(self):
549
 
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
550
 
        self.get_branch()._finish_transaction()
551
 
        self.assertEqual(None, self.get_branch().control_files._transaction)
 
342
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
343
        self.branch._finish_transaction()
 
344
        self.assertEqual(None, self.branch._transaction)
552
345
 
553
346
    def test_unlock_calls_finish(self):
554
 
        self.get_branch().lock_read()
 
347
        self.branch.lock_read()
555
348
        transaction = InstrumentedTransaction()
556
 
        self.get_branch().control_files._transaction = transaction
557
 
        self.get_branch().unlock()
 
349
        self.branch._transaction = transaction
 
350
        self.branch.unlock()
558
351
        self.assertEqual(['finish'], transaction.calls)
559
352
 
560
353
    def test_lock_read_acquires_ro_transaction(self):
561
 
        self.get_branch().lock_read()
562
 
        self.failUnless(isinstance(self.get_branch().get_transaction(),
 
354
        self.branch.lock_read()
 
355
        self.failUnless(isinstance(self.branch.get_transaction(),
563
356
                                   transactions.ReadOnlyTransaction))
564
 
        self.get_branch().unlock()
 
357
        self.branch.unlock()
565
358
        
566
 
    def test_lock_write_acquires_write_transaction(self):
567
 
        self.get_branch().lock_write()
 
359
    def test_lock_write_acquires_passthrough_transaction(self):
 
360
        self.branch.lock_write()
568
361
        # cannot use get_transaction as its magic
569
 
        self.failUnless(isinstance(self.get_branch().control_files._transaction,
570
 
                                   transactions.WriteTransaction))
571
 
        self.get_branch().unlock()
572
 
 
573
 
 
574
 
class TestBranchPushLocations(TestCaseWithBranch):
575
 
 
 
362
        self.failUnless(isinstance(self.branch._transaction,
 
363
                                   transactions.PassThroughTransaction))
 
364
        self.branch.unlock()
 
365
 
 
366
 
 
367
class TestBranchPushLocations(TestCaseInTempDir):
 
368
 
 
369
    def setUp(self):
 
370
        super(TestBranchPushLocations, self).setUp()
 
371
        self.branch = Branch.initialize('.')
 
372
        
576
373
    def test_get_push_location_unset(self):
577
 
        self.assertEqual(None, self.get_branch().get_push_location())
 
374
        self.assertEqual(None, self.branch.get_push_location())
578
375
 
579
376
    def test_get_push_location_exact(self):
580
 
        from bzrlib.config import (locations_config_filename,
581
 
                                   ensure_config_dir_exists)
582
 
        ensure_config_dir_exists()
583
 
        fn = locations_config_filename()
584
 
        print >> open(fn, 'wt'), ("[%s]\n"
585
 
                                  "push_location=foo" %
586
 
                                  self.get_branch().base[:-1])
587
 
        self.assertEqual("foo", self.get_branch().get_push_location())
 
377
        self.build_tree(['.bazaar/'])
 
378
        print >> open('.bazaar/branches.conf', 'wt'), ("[%s]\n"
 
379
                                                       "push_location=foo" %
 
380
                                                       os.getcwdu())
 
381
        self.assertEqual("foo", self.branch.get_push_location())
588
382
 
589
383
    def test_set_push_location(self):
590
 
        from bzrlib.config import (locations_config_filename,
591
 
                                   ensure_config_dir_exists)
592
 
        ensure_config_dir_exists()
593
 
        fn = locations_config_filename()
594
 
        branch = self.get_branch()
595
 
        branch.set_push_location('foo')
596
 
        local_path = urlutils.local_path_from_url(branch.base[:-1])
 
384
        self.branch.set_push_location('foo')
597
385
        self.assertFileEqual("[%s]\n"
598
 
                             "push_location = foo" % local_path,
599
 
                             fn)
 
386
                             "push_location = foo" % os.getcwdu(),
 
387
                             '.bazaar/branches.conf')
600
388
 
601
389
    # TODO RBC 20051029 test getting a push location from a branch in a 
602
390
    # recursive section - that is, it appends the branch name.
603
 
 
604
 
 
605
 
class TestFormat(TestCaseWithBranch):
606
 
    """Tests for the format itself."""
607
 
 
608
 
    def test_format_initialize_find_open(self):
609
 
        # loopback test to check the current format initializes to itself.
610
 
        if not self.branch_format.is_supported():
611
 
            # unsupported formats are not loopback testable
612
 
            # because the default open will not open them and
613
 
            # they may not be initializable.
614
 
            return
615
 
        # supported formats must be able to init and open
616
 
        t = get_transport(self.get_url())
617
 
        readonly_t = get_transport(self.get_readonly_url())
618
 
        made_branch = self.make_branch('.')
619
 
        self.failUnless(isinstance(made_branch, branch.Branch))
620
 
 
621
 
        # find it via bzrdir opening:
622
 
        opened_control = bzrdir.BzrDir.open(readonly_t.base)
623
 
        direct_opened_branch = opened_control.open_branch()
624
 
        self.assertEqual(direct_opened_branch.__class__, made_branch.__class__)
625
 
        self.assertEqual(opened_control, direct_opened_branch.bzrdir)
626
 
        self.failUnless(isinstance(direct_opened_branch._format,
627
 
                        self.branch_format.__class__))
628
 
 
629
 
        # find it via Branch.open
630
 
        opened_branch = branch.Branch.open(readonly_t.base)
631
 
        self.failUnless(isinstance(opened_branch, made_branch.__class__))
632
 
        self.assertEqual(made_branch._format.__class__,
633
 
                         opened_branch._format.__class__)
634
 
        # if it has a unique id string, can we probe for it ?
635
 
        try:
636
 
            self.branch_format.get_format_string()
637
 
        except NotImplementedError:
638
 
            return
639
 
        self.assertEqual(self.branch_format,
640
 
                         branch.BranchFormat.find_format(opened_control))
641
 
 
642