~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_branch.py

  • Committer: Martin Pool
  • Date: 2006-01-13 09:57:13 UTC
  • mto: This revision was merged to the branch mainline in revision 1611.
  • Revision ID: mbp@sourcefrog.net-20060113095713-1fa5912229a3898e
Review updates of pycurl transport

Split them out into 

  bzrlib.transport.http             common base
  bzrlib.transport.http._urllib     pure python
  bzrlib.transport.http._pycurl     calls pycurl

Update to work with robert's nice transport test multiplexer.

Add PyCurlTransport.has() which does just a HEAD request; should be faster.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006 Canonical Ltd
2
 
#
 
1
# (C) 2005 Canonical Ltd
 
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
#
 
7
 
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
#
 
12
 
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
 
"""Tests for branch implementations - tests a branch format."""
18
 
 
19
17
import os
20
18
import sys
21
19
 
22
 
from bzrlib import (
23
 
    branch,
24
 
    bzrdir,
25
 
    errors,
26
 
    gpg,
27
 
    urlutils,
28
 
    transactions,
29
 
    repository,
30
 
    )
31
20
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
32
 
from bzrlib.delta import TreeDelta
33
 
from bzrlib.errors import (FileExists,
34
 
                           NoSuchRevision,
35
 
                           NoSuchFile,
36
 
                           UninitializableFormat,
37
 
                           NotBranchError,
38
 
                           )
 
21
from bzrlib.clone import copy_branch
 
22
from bzrlib.commit import commit
 
23
import bzrlib.errors as errors
 
24
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
 
25
import bzrlib.gpg
39
26
from bzrlib.osutils import getcwd
40
 
import bzrlib.revision
41
 
from bzrlib.tests import TestCase, TestCaseWithTransport, TestSkipped
42
 
from bzrlib.tests.bzrdir_implementations.test_bzrdir import TestCaseWithBzrDir
 
27
from bzrlib.tests import TestCase, TestCaseInTempDir
 
28
from bzrlib.tests.HTTPTestUtil import TestCaseWithWebserver
43
29
from bzrlib.trace import mutter
44
 
from bzrlib.transport import get_transport
45
 
from bzrlib.transport.http import HttpServer
46
 
from bzrlib.transport.memory import MemoryServer
47
 
from bzrlib.upgrade import upgrade
48
 
from bzrlib.workingtree import WorkingTree
49
 
 
 
30
import bzrlib.transactions as transactions
 
31
from bzrlib.revision import NULL_REVISION
50
32
 
51
33
# TODO: Make a branch using basis branch, and check that it 
52
34
# doesn't request any files that could have been avoided, by 
53
35
# hooking into the Transport.
54
36
 
55
 
 
56
 
class TestCaseWithBranch(TestCaseWithBzrDir):
57
 
 
58
 
    def setUp(self):
59
 
        super(TestCaseWithBranch, self).setUp()
60
 
        self.branch = None
61
 
 
62
 
    def get_branch(self):
63
 
        if self.branch is None:
64
 
            self.branch = self.make_branch('')
65
 
        return self.branch
66
 
 
67
 
    def make_branch(self, relpath, format=None):
68
 
        repo = self.make_repository(relpath, format=format)
69
 
        # fixme RBC 20060210 this isnt necessarily a fixable thing,
70
 
        # Skipped is the wrong exception to raise.
71
 
        try:
72
 
            return self.branch_format.initialize(repo.bzrdir)
73
 
        except errors.UninitializableFormat:
74
 
            raise TestSkipped('Uninitializable branch format')
75
 
 
76
 
    def make_repository(self, relpath, shared=False, format=None):
77
 
        made_control = self.make_bzrdir(relpath, format=format)
78
 
        return made_control.create_repository(shared=shared)
79
 
 
80
 
 
81
 
class TestBranch(TestCaseWithBranch):
 
37
class TestBranch(TestCaseInTempDir):
82
38
 
83
39
    def test_append_revisions(self):
84
40
        """Test appending more than one revision"""
85
 
        br = self.get_branch()
 
41
        br = Branch.initialize(u".")
86
42
        br.append_revision("rev1")
87
43
        self.assertEquals(br.revision_history(), ["rev1",])
88
44
        br.append_revision("rev2", "rev3")
90
46
 
91
47
    def test_fetch_revisions(self):
92
48
        """Test fetch-revision operation."""
93
 
        get_transport(self.get_url()).mkdir('b1')
94
 
        get_transport(self.get_url()).mkdir('b2')
95
 
        wt = self.make_branch_and_tree('b1')
96
 
        b1 = wt.branch
97
 
        b2 = self.make_branch('b2')
 
49
        from bzrlib.fetch import Fetcher
 
50
        os.mkdir('b1')
 
51
        os.mkdir('b2')
 
52
        b1 = Branch.initialize('b1')
 
53
        b2 = Branch.initialize('b2')
98
54
        file('b1/foo', 'w').write('hello')
99
 
        wt.add(['foo'], ['foo-id'])
100
 
        wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
 
55
        b1.working_tree().add(['foo'], ['foo-id'])
 
56
        b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=False)
101
57
 
102
58
        mutter('start fetch')
103
 
        self.assertEqual((1, []), b2.fetch(b1))
104
 
 
105
 
        rev = b2.repository.get_revision('revision-1')
106
 
        tree = b2.repository.revision_tree('revision-1')
107
 
        self.assertEqual(tree.get_file_text('foo-id'), 'hello')
108
 
 
109
 
    def test_get_revision_delta(self):
110
 
        tree_a = self.make_branch_and_tree('a')
111
 
        self.build_tree(['a/foo'])
112
 
        tree_a.add('foo', 'file1')
113
 
        tree_a.commit('rev1', rev_id='rev1')
114
 
        self.build_tree(['a/vla'])
115
 
        tree_a.add('vla', 'file2')
116
 
        tree_a.commit('rev2', rev_id='rev2')
117
 
 
118
 
        delta = tree_a.branch.get_revision_delta(1)
119
 
        self.assertIsInstance(delta, TreeDelta)
120
 
        self.assertEqual([('foo', 'file1', 'file')], delta.added)
121
 
        delta = tree_a.branch.get_revision_delta(2)
122
 
        self.assertIsInstance(delta, TreeDelta)
123
 
        self.assertEqual([('vla', 'file2', 'file')], delta.added)
124
 
 
125
 
    def get_unbalanced_tree_pair(self):
 
59
        f = Fetcher(from_branch=b1, to_branch=b2)
 
60
        eq = self.assertEquals
 
61
        eq(f.count_copied, 1)
 
62
        eq(f.last_revision, 'revision-1')
 
63
 
 
64
        rev = b2.get_revision('revision-1')
 
65
        tree = b2.revision_tree('revision-1')
 
66
        eq(tree.get_file_text('foo-id'), 'hello')
 
67
 
 
68
    def test_revision_tree(self):
 
69
        b1 = Branch.initialize(u'.')
 
70
        b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=True)
 
71
        tree = b1.revision_tree('revision-1')
 
72
        tree = b1.revision_tree(None)
 
73
        self.assertEqual(len(tree.list_files()), 0)
 
74
        tree = b1.revision_tree(NULL_REVISION)
 
75
        self.assertEqual(len(tree.list_files()), 0)
 
76
 
 
77
    def get_unbalanced_branch_pair(self):
126
78
        """Return two branches, a and b, with one file in a."""
127
 
        get_transport(self.get_url()).mkdir('a')
128
 
        tree_a = self.make_branch_and_tree('a')
 
79
        os.mkdir('a')
 
80
        br_a = Branch.initialize("a")
129
81
        file('a/b', 'wb').write('b')
130
 
        tree_a.add('b')
131
 
        tree_a.commit("silly commit", rev_id='A')
132
 
 
133
 
        get_transport(self.get_url()).mkdir('b')
134
 
        tree_b = self.make_branch_and_tree('b')
135
 
        return tree_a, tree_b
 
82
        br_a.working_tree().add('b')
 
83
        commit(br_a, "silly commit", rev_id='A')
 
84
        os.mkdir('b')
 
85
        br_b = Branch.initialize("b")
 
86
        return br_a, br_b
136
87
 
137
88
    def get_balanced_branch_pair(self):
138
89
        """Returns br_a, br_b as with one commit in a, and b has a's stores."""
139
 
        tree_a, tree_b = self.get_unbalanced_tree_pair()
140
 
        tree_b.branch.repository.fetch(tree_a.branch.repository)
141
 
        return tree_a, tree_b
142
 
 
143
 
    def test_clone_branch(self):
144
 
        """Copy the stores from one branch to another"""
145
 
        tree_a, tree_b = self.get_balanced_branch_pair()
146
 
        tree_b.commit("silly commit")
 
90
        br_a, br_b = self.get_unbalanced_branch_pair()
 
91
        br_a.push_stores(br_b)
 
92
        return br_a, br_b
 
93
 
 
94
    def test_push_stores(self):
 
95
        """Copy the stores from one branch to another"""
 
96
        br_a, br_b = self.get_unbalanced_branch_pair()
 
97
        # ensure the revision is missing.
 
98
        self.assertRaises(NoSuchRevision, br_b.get_revision, 
 
99
                          br_a.revision_history()[0])
 
100
        br_a.push_stores(br_b)
 
101
        # check that b now has all the data from a's first commit.
 
102
        rev = br_b.get_revision(br_a.revision_history()[0])
 
103
        tree = br_b.revision_tree(br_a.revision_history()[0])
 
104
        for file_id in tree:
 
105
            if tree.inventory[file_id].kind == "file":
 
106
                tree.get_file(file_id).read()
 
107
        return br_a, br_b
 
108
 
 
109
    def test_copy_branch(self):
 
110
        """Copy the stores from one branch to another"""
 
111
        br_a, br_b = self.get_balanced_branch_pair()
 
112
        commit(br_b, "silly commit")
147
113
        os.mkdir('c')
148
 
        # this fails to test that the history from a was not used.
149
 
        dir_c = tree_a.bzrdir.clone('c', basis=tree_b.bzrdir)
150
 
        self.assertEqual(tree_a.branch.revision_history(),
151
 
                         dir_c.open_branch().revision_history())
 
114
        br_c = copy_branch(br_a, 'c', basis_branch=br_b)
 
115
        self.assertEqual(br_a.revision_history(), br_c.revision_history())
152
116
 
153
 
    def test_clone_partial(self):
 
117
    def test_copy_partial(self):
154
118
        """Copy only part of the history of a branch."""
155
 
        # TODO: RBC 20060208 test with a revision not on revision-history.
156
 
        #       what should that behaviour be ? Emailed the list.
157
 
        wt_a = self.make_branch_and_tree('a')
158
 
        self.build_tree(['a/one'])
159
 
        wt_a.add(['one'])
160
 
        wt_a.commit('commit one', rev_id='1')
161
 
        self.build_tree(['a/two'])
162
 
        wt_a.add(['two'])
163
 
        wt_a.commit('commit two', rev_id='2')
164
 
        repo_b = self.make_repository('b')
165
 
        wt_a.bzrdir.open_repository().copy_content_into(repo_b)
166
 
        br_b = wt_a.bzrdir.open_branch().clone(repo_b.bzrdir, revision_id='1')
167
 
        self.assertEqual(br_b.last_revision(), '1')
168
 
 
169
 
    def test_sprout_partial(self):
170
 
        # test sprouting with a prefix of the revision-history.
171
 
        # also needs not-on-revision-history behaviour defined.
172
 
        wt_a = self.make_branch_and_tree('a')
173
 
        self.build_tree(['a/one'])
174
 
        wt_a.add(['one'])
175
 
        wt_a.commit('commit one', rev_id='1')
176
 
        self.build_tree(['a/two'])
177
 
        wt_a.add(['two'])
178
 
        wt_a.commit('commit two', rev_id='2')
179
 
        repo_b = self.make_repository('b')
180
 
        wt_a.bzrdir.open_repository().copy_content_into(repo_b)
181
 
        br_b = wt_a.bzrdir.open_branch().sprout(repo_b.bzrdir, revision_id='1')
182
 
        self.assertEqual(br_b.last_revision(), '1')
183
 
 
184
 
    def get_parented_branch(self):
185
 
        wt_a = self.make_branch_and_tree('a')
186
 
        self.build_tree(['a/one'])
187
 
        wt_a.add(['one'])
188
 
        wt_a.commit('commit one', rev_id='1')
189
 
 
190
 
        branch_b = wt_a.bzrdir.sprout('b', revision_id='1').open_branch()
191
 
        self.assertEqual(wt_a.branch.base, branch_b.get_parent())
192
 
        return branch_b
193
 
 
194
 
    def test_clone_branch_nickname(self):
195
 
        # test the nick name is preserved always
196
 
        raise TestSkipped('XXX branch cloning is not yet tested..')
197
 
 
198
 
    def test_clone_branch_parent(self):
199
 
        # test the parent is preserved always
200
 
        branch_b = self.get_parented_branch()
201
 
        repo_c = self.make_repository('c')
202
 
        branch_b.repository.copy_content_into(repo_c)
203
 
        branch_c = branch_b.clone(repo_c.bzrdir)
204
 
        self.assertNotEqual(None, branch_c.get_parent())
205
 
        self.assertEqual(branch_b.get_parent(), branch_c.get_parent())
206
 
 
207
 
        # We can also set a specific parent, and it should be honored
208
 
        random_parent = 'http://bazaar-vcs.org/path/to/branch'
209
 
        branch_b.set_parent(random_parent)
210
 
        repo_d = self.make_repository('d')
211
 
        branch_b.repository.copy_content_into(repo_d)
212
 
        branch_d = branch_b.clone(repo_d.bzrdir)
213
 
        self.assertEqual(random_parent, branch_d.get_parent())
214
 
 
215
 
    def test_sprout_branch_nickname(self):
216
 
        # test the nick name is reset always
217
 
        raise TestSkipped('XXX branch sprouting is not yet tested..')
218
 
 
219
 
    def test_sprout_branch_parent(self):
220
 
        source = self.make_branch('source')
221
 
        target = source.bzrdir.sprout(self.get_url('target')).open_branch()
222
 
        self.assertEqual(source.bzrdir.root_transport.base, target.get_parent())
223
 
 
224
 
    def test_submit_branch(self):
225
 
        """Submit location can be queried and set"""
226
 
        branch = self.make_branch('branch')
227
 
        self.assertEqual(branch.get_submit_branch(), None)
228
 
        branch.set_submit_branch('sftp://example.com')
229
 
        self.assertEqual(branch.get_submit_branch(), 'sftp://example.com')
230
 
        branch.set_submit_branch('sftp://example.net')
231
 
        self.assertEqual(branch.get_submit_branch(), 'sftp://example.net')
 
119
        self.build_tree(['a/', 'a/one'])
 
120
        br_a = Branch.initialize('a')
 
121
        br_a.working_tree().add(['one'])
 
122
        br_a.working_tree().commit('commit one', rev_id='u@d-1')
 
123
        self.build_tree(['a/two'])
 
124
        br_a.working_tree().add(['two'])
 
125
        br_a.working_tree().commit('commit two', rev_id='u@d-2')
 
126
        br_b = copy_branch(br_a, 'b', revision='u@d-1')
 
127
        self.assertEqual(br_b.last_revision(), 'u@d-1')
 
128
        self.assertTrue(os.path.exists('b/one'))
 
129
        self.assertFalse(os.path.exists('b/two'))
232
130
        
233
131
    def test_record_initial_ghost_merge(self):
234
132
        """A pending merge with no revision present is still a merge."""
235
 
        wt = self.make_branch_and_tree('.')
236
 
        branch = wt.branch
237
 
        wt.add_pending_merge('non:existent@rev--ision--0--2')
238
 
        wt.commit('pretend to merge nonexistent-revision', rev_id='first')
239
 
        rev = branch.repository.get_revision(branch.last_revision())
 
133
        branch = Branch.initialize(u'.')
 
134
        branch.working_tree().add_pending_merge('non:existent@rev--ision--0--2')
 
135
        branch.working_tree().commit('pretend to merge nonexistent-revision', rev_id='first')
 
136
        rev = branch.get_revision(branch.last_revision())
240
137
        self.assertEqual(len(rev.parent_ids), 1)
241
138
        # parent_sha1s is not populated now, WTF. rbc 20051003
242
139
        self.assertEqual(len(rev.parent_sha1s), 0)
243
140
        self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
244
141
 
245
142
    def test_bad_revision(self):
246
 
        self.assertRaises(errors.InvalidRevisionId,
247
 
                          self.get_branch().repository.get_revision,
248
 
                          None)
 
143
        branch = Branch.initialize(u'.')
 
144
        self.assertRaises(errors.InvalidRevisionId, branch.get_revision, None)
249
145
 
250
146
# TODO 20051003 RBC:
251
147
# compare the gpg-to-sign info for a commit with a ghost and 
254
150
        
255
151
    def test_pending_merges(self):
256
152
        """Tracking pending-merged revisions."""
257
 
        wt = self.make_branch_and_tree('.')
258
 
        b = wt.branch
 
153
        b = Branch.initialize(u'.')
 
154
        wt = b.working_tree()
259
155
        self.assertEquals(wt.pending_merges(), [])
260
156
        wt.add_pending_merge('foo@azkhazan-123123-abcabc')
261
157
        self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
265
161
        self.assertEquals(wt.pending_merges(),
266
162
                          ['foo@azkhazan-123123-abcabc',
267
163
                           'wibble@fofof--20050401--1928390812'])
268
 
        wt.commit("commit from base with two merges")
269
 
        rev = b.repository.get_revision(b.revision_history()[0])
 
164
        b.working_tree().commit("commit from base with two merges")
 
165
        rev = b.get_revision(b.revision_history()[0])
270
166
        self.assertEquals(len(rev.parent_ids), 2)
271
167
        self.assertEquals(rev.parent_ids[0],
272
168
                          'foo@azkhazan-123123-abcabc')
276
172
        self.assertEquals(wt.pending_merges(), [])
277
173
 
278
174
    def test_sign_existing_revision(self):
279
 
        wt = self.make_branch_and_tree('.')
280
 
        branch = wt.branch
281
 
        wt.commit("base", allow_pointless=True, rev_id='A')
 
175
        branch = Branch.initialize(u'.')
 
176
        branch.working_tree().commit("base", allow_pointless=True, rev_id='A')
282
177
        from bzrlib.testament import Testament
283
 
        strategy = gpg.LoopbackGPGStrategy(None)
284
 
        branch.repository.sign_revision('A', strategy)
285
 
        self.assertEqual(Testament.from_revision(branch.repository, 
286
 
                         'A').as_short_text(),
287
 
                         branch.repository.get_signature_text('A'))
 
178
        branch.sign_revision('A', bzrlib.gpg.LoopbackGPGStrategy(None))
 
179
        self.assertEqual(Testament.from_revision(branch, 'A').as_short_text(),
 
180
                         branch.revision_store.get('A', 'sig').read())
288
181
 
289
182
    def test_store_signature(self):
290
 
        wt = self.make_branch_and_tree('.')
291
 
        branch = wt.branch
292
 
        branch.repository.store_revision_signature(
293
 
            gpg.LoopbackGPGStrategy(None), 'FOO', 'A')
294
 
        self.assertRaises(errors.NoSuchRevision,
295
 
                          branch.repository.has_signature_for_revision_id,
296
 
                          'A')
297
 
        wt.commit("base", allow_pointless=True, rev_id='A')
298
 
        self.assertEqual('FOO', 
299
 
                         branch.repository.get_signature_text('A'))
 
183
        branch = Branch.initialize(u'.')
 
184
        branch.store_revision_signature(bzrlib.gpg.LoopbackGPGStrategy(None),
 
185
                                        'FOO', 'A')
 
186
        self.assertEqual('FOO', branch.revision_store.get('A', 'sig').read())
300
187
 
301
 
    def test_branch_keeps_signatures(self):
302
 
        wt = self.make_branch_and_tree('source')
303
 
        wt.commit('A', allow_pointless=True, rev_id='A')
304
 
        wt.branch.repository.sign_revision('A',
305
 
            gpg.LoopbackGPGStrategy(None))
306
 
        #FIXME: clone should work to urls,
307
 
        # wt.clone should work to disks.
308
 
        self.build_tree(['target/'])
309
 
        d2 = wt.bzrdir.clone('target')
310
 
        self.assertEqual(wt.branch.repository.get_signature_text('A'),
311
 
                         d2.open_repository().get_signature_text('A'))
 
188
    def test__relcontrolfilename(self):
 
189
        branch = Branch.initialize(u'.')
 
190
        self.assertEqual('.bzr/%25', branch._rel_controlfilename('%'))
 
191
        
 
192
    def test__relcontrolfilename_empty(self):
 
193
        branch = Branch.initialize(u'.')
 
194
        self.assertEqual('.bzr', branch._rel_controlfilename(''))
312
195
 
313
196
    def test_nicks(self):
314
197
        """Branch nicknames"""
315
 
        t = get_transport(self.get_url())
316
 
        t.mkdir('bzr.dev')
317
 
        branch = self.make_branch('bzr.dev')
 
198
        os.mkdir('bzr.dev')
 
199
        branch = Branch.initialize('bzr.dev')
318
200
        self.assertEqual(branch.nick, 'bzr.dev')
319
 
        t.move('bzr.dev', 'bzr.ab')
320
 
        branch = Branch.open(self.get_url('bzr.ab'))
 
201
        os.rename('bzr.dev', 'bzr.ab')
 
202
        branch = Branch.open('bzr.ab')
321
203
        self.assertEqual(branch.nick, 'bzr.ab')
322
204
        branch.nick = "Aaron's branch"
323
205
        branch.nick = "Aaron's branch"
324
 
        self.failUnless(
325
 
            t.has(
326
 
                t.relpath(
327
 
                    branch.control_files.controlfilename("branch.conf")
328
 
                    )
329
 
                )
330
 
            )
 
206
        self.failUnless(os.path.exists(branch.controlfilename("branch.conf")))
331
207
        self.assertEqual(branch.nick, "Aaron's branch")
332
 
        t.move('bzr.ab', 'integration')
333
 
        branch = Branch.open(self.get_url('integration'))
 
208
        os.rename('bzr.ab', 'integration')
 
209
        branch = Branch.open('integration')
334
210
        self.assertEqual(branch.nick, "Aaron's branch")
335
211
        branch.nick = u"\u1234"
336
212
        self.assertEqual(branch.nick, u"\u1234")
337
213
 
338
214
    def test_commit_nicks(self):
339
215
        """Nicknames are committed to the revision"""
340
 
        get_transport(self.get_url()).mkdir('bzr.dev')
341
 
        wt = self.make_branch_and_tree('bzr.dev')
342
 
        branch = wt.branch
 
216
        os.mkdir('bzr.dev')
 
217
        branch = Branch.initialize('bzr.dev')
343
218
        branch.nick = "My happy branch"
344
 
        wt.commit('My commit respect da nick.')
345
 
        committed = branch.repository.get_revision(branch.last_revision())
 
219
        branch.working_tree().commit('My commit respect da nick.')
 
220
        committed = branch.get_revision(branch.last_revision())
346
221
        self.assertEqual(committed.properties["branch-nick"], 
347
222
                         "My happy branch")
348
223
 
349
 
    def test_create_open_branch_uses_repository(self):
350
 
        try:
351
 
            repo = self.make_repository('.', shared=True)
352
 
        except errors.IncompatibleFormat:
353
 
            return
354
 
        repo.bzrdir.root_transport.mkdir('child')
355
 
        child_dir = self.bzrdir_format.initialize('child')
356
 
        try:
357
 
            child_branch = self.branch_format.initialize(child_dir)
358
 
        except errors.UninitializableFormat:
359
 
            # branch references are not default init'able.
360
 
            return
361
 
        self.assertEqual(repo.bzrdir.root_transport.base,
362
 
                         child_branch.repository.bzrdir.root_transport.base)
363
 
        child_branch = branch.Branch.open(self.get_url('child'))
364
 
        self.assertEqual(repo.bzrdir.root_transport.base,
365
 
                         child_branch.repository.bzrdir.root_transport.base)
366
 
 
367
 
    def test_format_description(self):
368
 
        tree = self.make_branch_and_tree('tree')
369
 
        text = tree.branch._format.get_format_description()
370
 
        self.failUnless(len(text))
371
 
 
372
 
    def test_check_branch_report_results(self):
373
 
        """Checking a branch produces results which can be printed"""
374
 
        branch = self.make_branch('.')
375
 
        result = branch.check()
376
 
        # reports results through logging
377
 
        result.report_results(verbose=True)
378
 
        result.report_results(verbose=False)
379
 
 
380
 
    def test_get_commit_builder(self):
381
 
        self.assertIsInstance(self.make_branch(".").get_commit_builder([]), 
382
 
            repository.CommitBuilder)
383
 
 
384
 
    def test_generate_revision_history(self):
385
 
        """Create a fake revision history easily."""
386
 
        tree = self.make_branch_and_tree('.')
387
 
        rev1 = tree.commit('foo')
388
 
        orig_history = tree.branch.revision_history()
389
 
        rev2 = tree.commit('bar', allow_pointless=True)
390
 
        tree.branch.generate_revision_history(rev1)
391
 
        self.assertEqual(orig_history, tree.branch.revision_history())
392
 
 
393
 
    def test_generate_revision_history_NULL_REVISION(self):
394
 
        tree = self.make_branch_and_tree('.')
395
 
        rev1 = tree.commit('foo')
396
 
        tree.branch.generate_revision_history(bzrlib.revision.NULL_REVISION)
397
 
        self.assertEqual([], tree.branch.revision_history())
398
 
 
399
 
    def test_create_checkout(self):
400
 
        tree_a = self.make_branch_and_tree('a')
401
 
        branch_a = tree_a.branch
402
 
        checkout_b = branch_a.create_checkout('b')
403
 
        checkout_b.commit('rev1', rev_id='rev1')
404
 
        self.assertEqual('rev1', branch_a.last_revision())
405
 
        self.assertNotEqual(checkout_b.branch.base, branch_a.base)
406
 
 
407
 
        checkout_c = branch_a.create_checkout('c', lightweight=True)
408
 
        checkout_c.commit('rev2', rev_id='rev2')
409
 
        self.assertEqual('rev2', branch_a.last_revision())
410
 
        self.assertEqual(checkout_c.branch.base, branch_a.base)
411
 
 
412
 
        os.mkdir('d')
413
 
        checkout_d = branch_a.create_checkout('d', lightweight=True)
414
 
        os.mkdir('e')
415
 
        checkout_e = branch_a.create_checkout('e')
416
 
 
417
 
 
418
 
class ChrootedTests(TestCaseWithBranch):
419
 
    """A support class that provides readonly urls outside the local namespace.
420
 
 
421
 
    This is done by checking if self.transport_server is a MemoryServer. if it
422
 
    is then we are chrooted already, if it is not then an HttpServer is used
423
 
    for readonly urls.
424
 
    """
425
 
 
426
 
    def setUp(self):
427
 
        super(ChrootedTests, self).setUp()
428
 
        if not self.transport_server == MemoryServer:
429
 
            self.transport_readonly_server = HttpServer
 
224
 
 
225
class TestRemote(TestCaseWithWebserver):
430
226
 
431
227
    def test_open_containing(self):
432
228
        self.assertRaises(NotBranchError, Branch.open_containing,
433
 
                          self.get_readonly_url(''))
 
229
                          self.get_remote_url(''))
434
230
        self.assertRaises(NotBranchError, Branch.open_containing,
435
 
                          self.get_readonly_url('g/p/q'))
436
 
        branch = self.make_branch('.')
437
 
        branch, relpath = Branch.open_containing(self.get_readonly_url(''))
 
231
                          self.get_remote_url('g/p/q'))
 
232
        b = Branch.initialize(u'.')
 
233
        branch, relpath = Branch.open_containing(self.get_remote_url(''))
438
234
        self.assertEqual('', relpath)
439
 
        branch, relpath = Branch.open_containing(self.get_readonly_url('g/p/q'))
 
235
        branch, relpath = Branch.open_containing(self.get_remote_url('g/p/q'))
440
236
        self.assertEqual('g/p/q', relpath)
441
237
        
 
238
# TODO: rewrite this as a regular unittest, without relying on the displayed output        
 
239
#         >>> from bzrlib.commit import commit
 
240
#         >>> bzrlib.trace.silent = True
 
241
#         >>> br1 = ScratchBranch(files=['foo', 'bar'])
 
242
#         >>> br1.working_tree().add('foo')
 
243
#         >>> br1.working_tree().add('bar')
 
244
#         >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
 
245
#         >>> br2 = ScratchBranch()
 
246
#         >>> br2.update_revisions(br1)
 
247
#         Added 2 texts.
 
248
#         Added 1 inventories.
 
249
#         Added 1 revisions.
 
250
#         >>> br2.revision_history()
 
251
#         [u'REVISION-ID-1']
 
252
#         >>> br2.update_revisions(br1)
 
253
#         Added 0 revisions.
 
254
#         >>> br1.text_store.total_size() == br2.text_store.total_size()
 
255
#         True
442
256
 
443
257
class InstrumentedTransaction(object):
444
258
 
503
317
        self.assertEqual(['lw', 'ul'], branch._calls)
504
318
 
505
319
 
506
 
class TestBranchTransaction(TestCaseWithBranch):
 
320
class TestBranchTransaction(TestCaseInTempDir):
507
321
 
508
322
    def setUp(self):
509
323
        super(TestBranchTransaction, self).setUp()
510
 
        self.branch = None
 
324
        self.branch = Branch.initialize(u'.')
511
325
        
512
326
    def test_default_get_transaction(self):
513
327
        """branch.get_transaction on a new branch should give a PassThrough."""
514
 
        self.failUnless(isinstance(self.get_branch().get_transaction(),
 
328
        self.failUnless(isinstance(self.branch.get_transaction(),
515
329
                                   transactions.PassThroughTransaction))
516
330
 
517
331
    def test__set_new_transaction(self):
518
 
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
 
332
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
519
333
 
520
334
    def test__set_over_existing_transaction_raises(self):
521
 
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
 
335
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
522
336
        self.assertRaises(errors.LockError,
523
 
                          self.get_branch()._set_transaction,
 
337
                          self.branch._set_transaction,
524
338
                          transactions.ReadOnlyTransaction())
525
339
 
526
340
    def test_finish_no_transaction_raises(self):
527
 
        self.assertRaises(errors.LockError, self.get_branch()._finish_transaction)
 
341
        self.assertRaises(errors.LockError, self.branch._finish_transaction)
528
342
 
529
343
    def test_finish_readonly_transaction_works(self):
530
 
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
531
 
        self.get_branch()._finish_transaction()
532
 
        self.assertEqual(None, self.get_branch().control_files._transaction)
 
344
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
345
        self.branch._finish_transaction()
 
346
        self.assertEqual(None, self.branch._transaction)
533
347
 
534
348
    def test_unlock_calls_finish(self):
535
 
        self.get_branch().lock_read()
 
349
        self.branch.lock_read()
536
350
        transaction = InstrumentedTransaction()
537
 
        self.get_branch().control_files._transaction = transaction
538
 
        self.get_branch().unlock()
 
351
        self.branch._transaction = transaction
 
352
        self.branch.unlock()
539
353
        self.assertEqual(['finish'], transaction.calls)
540
354
 
541
355
    def test_lock_read_acquires_ro_transaction(self):
542
 
        self.get_branch().lock_read()
543
 
        self.failUnless(isinstance(self.get_branch().get_transaction(),
 
356
        self.branch.lock_read()
 
357
        self.failUnless(isinstance(self.branch.get_transaction(),
544
358
                                   transactions.ReadOnlyTransaction))
545
 
        self.get_branch().unlock()
 
359
        self.branch.unlock()
546
360
        
547
 
    def test_lock_write_acquires_write_transaction(self):
548
 
        self.get_branch().lock_write()
 
361
    def test_lock_write_acquires_passthrough_transaction(self):
 
362
        self.branch.lock_write()
549
363
        # cannot use get_transaction as its magic
550
 
        self.failUnless(isinstance(self.get_branch().control_files._transaction,
551
 
                                   transactions.WriteTransaction))
552
 
        self.get_branch().unlock()
553
 
 
554
 
 
555
 
class TestBranchPushLocations(TestCaseWithBranch):
556
 
 
 
364
        self.failUnless(isinstance(self.branch._transaction,
 
365
                                   transactions.PassThroughTransaction))
 
366
        self.branch.unlock()
 
367
 
 
368
 
 
369
class TestBranchPushLocations(TestCaseInTempDir):
 
370
 
 
371
    def setUp(self):
 
372
        super(TestBranchPushLocations, self).setUp()
 
373
        self.branch = Branch.initialize(u'.')
 
374
        
557
375
    def test_get_push_location_unset(self):
558
 
        self.assertEqual(None, self.get_branch().get_push_location())
 
376
        self.assertEqual(None, self.branch.get_push_location())
559
377
 
560
378
    def test_get_push_location_exact(self):
561
 
        from bzrlib.config import (locations_config_filename,
 
379
        from bzrlib.config import (branches_config_filename,
562
380
                                   ensure_config_dir_exists)
563
381
        ensure_config_dir_exists()
564
 
        fn = locations_config_filename()
 
382
        fn = branches_config_filename()
565
383
        print >> open(fn, 'wt'), ("[%s]\n"
566
384
                                  "push_location=foo" %
567
 
                                  self.get_branch().base[:-1])
568
 
        self.assertEqual("foo", self.get_branch().get_push_location())
 
385
                                  getcwd())
 
386
        self.assertEqual("foo", self.branch.get_push_location())
569
387
 
570
388
    def test_set_push_location(self):
571
 
        from bzrlib.config import (locations_config_filename,
 
389
        from bzrlib.config import (branches_config_filename,
572
390
                                   ensure_config_dir_exists)
573
391
        ensure_config_dir_exists()
574
 
        fn = locations_config_filename()
575
 
        branch = self.get_branch()
576
 
        branch.set_push_location('foo')
577
 
        local_path = urlutils.local_path_from_url(branch.base[:-1])
 
392
        fn = branches_config_filename()
 
393
        self.branch.set_push_location('foo')
578
394
        self.assertFileEqual("[%s]\n"
579
 
                             "push_location = foo" % local_path,
 
395
                             "push_location = foo" % getcwd(),
580
396
                             fn)
581
397
 
582
398
    # TODO RBC 20051029 test getting a push location from a branch in a 
583
399
    # recursive section - that is, it appends the branch name.
584
 
 
585
 
 
586
 
class TestFormat(TestCaseWithBranch):
587
 
    """Tests for the format itself."""
588
 
 
589
 
    def test_format_initialize_find_open(self):
590
 
        # loopback test to check the current format initializes to itself.
591
 
        if not self.branch_format.is_supported():
592
 
            # unsupported formats are not loopback testable
593
 
            # because the default open will not open them and
594
 
            # they may not be initializable.
595
 
            return
596
 
        # supported formats must be able to init and open
597
 
        t = get_transport(self.get_url())
598
 
        readonly_t = get_transport(self.get_readonly_url())
599
 
        made_branch = self.make_branch('.')
600
 
        self.failUnless(isinstance(made_branch, branch.Branch))
601
 
 
602
 
        # find it via bzrdir opening:
603
 
        opened_control = bzrdir.BzrDir.open(readonly_t.base)
604
 
        direct_opened_branch = opened_control.open_branch()
605
 
        self.assertEqual(direct_opened_branch.__class__, made_branch.__class__)
606
 
        self.assertEqual(opened_control, direct_opened_branch.bzrdir)
607
 
        self.failUnless(isinstance(direct_opened_branch._format,
608
 
                        self.branch_format.__class__))
609
 
 
610
 
        # find it via Branch.open
611
 
        opened_branch = branch.Branch.open(readonly_t.base)
612
 
        self.failUnless(isinstance(opened_branch, made_branch.__class__))
613
 
        self.assertEqual(made_branch._format.__class__,
614
 
                         opened_branch._format.__class__)
615
 
        # if it has a unique id string, can we probe for it ?
616
 
        try:
617
 
            self.branch_format.get_format_string()
618
 
        except NotImplementedError:
619
 
            return
620
 
        self.assertEqual(self.branch_format,
621
 
                         branch.BranchFormat.find_format(opened_control))
622
 
 
623