~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_branch.py

  • Committer: Martin Pool
  • Date: 2005-11-28 08:03:42 UTC
  • mto: (1185.33.61 bzr.dev)
  • mto: This revision was merged to the branch mainline in revision 1518.
  • Revision ID: mbp@sourcefrog.net-20051128080342-b7db3190dca90484
[broken] start converting basic_io to more rfc822-like format

Suggestions from mailing list:
 
  no double quotes
  no cute right-alignment
  no escaping
  just indent continuation lines

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006 Canonical Ltd
2
 
#
 
1
# (C) 2005 Canonical Ltd
 
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
#
 
7
 
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
#
 
12
 
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
 
"""Tests for branch implementations - tests a branch format."""
18
 
 
19
17
import os
20
 
import sys
21
18
 
22
 
from bzrlib import (
23
 
    branch,
24
 
    bzrdir,
25
 
    errors,
26
 
    gpg,
27
 
    urlutils,
28
 
    transactions,
29
 
    remote,
30
 
    repository,
31
 
    )
32
19
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
33
 
from bzrlib.delta import TreeDelta
34
 
from bzrlib.errors import (FileExists,
35
 
                           NoSuchRevision,
36
 
                           NoSuchFile,
37
 
                           UninitializableFormat,
38
 
                           NotBranchError,
39
 
                           )
40
 
from bzrlib.osutils import getcwd
41
 
import bzrlib.revision
42
 
from bzrlib.tests import TestCase, TestCaseWithTransport, TestSkipped
43
 
from bzrlib.tests.branch_implementations import TestCaseWithBranch
44
 
from bzrlib.tests.HttpServer import HttpServer
 
20
from bzrlib.clone import copy_branch
 
21
from bzrlib.commit import commit
 
22
import bzrlib.errors as errors
 
23
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
 
24
import bzrlib.gpg
 
25
from bzrlib.tests import TestCase, TestCaseInTempDir
 
26
from bzrlib.tests.HTTPTestUtil import TestCaseWithWebserver
45
27
from bzrlib.trace import mutter
46
 
from bzrlib.transport import get_transport
47
 
from bzrlib.transport.memory import MemoryServer
48
 
from bzrlib.upgrade import upgrade
49
 
from bzrlib.workingtree import WorkingTree
50
 
 
51
 
 
52
 
class TestBranch(TestCaseWithBranch):
 
28
import bzrlib.transactions as transactions
 
29
from bzrlib.revision import NULL_REVISION
 
30
 
 
31
# TODO: Make a branch using basis branch, and check that it 
 
32
# doesn't request any files that could have been avoided, by 
 
33
# hooking into the Transport.
 
34
 
 
35
class TestBranch(TestCaseInTempDir):
53
36
 
54
37
    def test_append_revisions(self):
55
38
        """Test appending more than one revision"""
56
 
        wt = self.make_branch_and_tree('tree')
57
 
        wt.commit('f', rev_id='rev1')
58
 
        wt.commit('f', rev_id='rev2')
59
 
        wt.commit('f', rev_id='rev3')
60
 
 
61
 
        br = self.get_branch()
62
 
        br.fetch(wt.branch)
 
39
        br = Branch.initialize(".")
63
40
        br.append_revision("rev1")
64
41
        self.assertEquals(br.revision_history(), ["rev1",])
65
42
        br.append_revision("rev2", "rev3")
66
43
        self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
67
 
        self.assertRaises(errors.ReservedId, br.append_revision, 'current:')
68
 
 
69
 
    def test_create_tree_with_merge(self):
70
 
        tree = self.create_tree_with_merge()
71
 
        ancestry_graph = tree.branch.repository.get_revision_graph('rev-3')
72
 
        self.assertEqual({'rev-1':[],
73
 
                          'rev-2':['rev-1'],
74
 
                          'rev-1.1.1':['rev-1'],
75
 
                          'rev-3':['rev-2', 'rev-1.1.1'],
76
 
                         }, ancestry_graph)
77
 
 
78
 
    def test_revision_ids_are_utf8(self):
79
 
        wt = self.make_branch_and_tree('tree')
80
 
        wt.commit('f', rev_id='rev1')
81
 
        wt.commit('f', rev_id='rev2')
82
 
        wt.commit('f', rev_id='rev3')
83
 
 
84
 
        br = self.get_branch()
85
 
        br.fetch(wt.branch)
86
 
        br.set_revision_history(['rev1', 'rev2', 'rev3'])
87
 
        rh = br.revision_history()
88
 
        self.assertEqual(['rev1', 'rev2', 'rev3'], rh)
89
 
        for revision_id in rh:
90
 
            self.assertIsInstance(revision_id, str)
91
 
        last = br.last_revision()
92
 
        self.assertEqual('rev3', last)
93
 
        self.assertIsInstance(last, str)
94
 
        revno, last = br.last_revision_info()
95
 
        self.assertEqual(3, revno)
96
 
        self.assertEqual('rev3', last)
97
 
        self.assertIsInstance(last, str)
98
44
 
99
45
    def test_fetch_revisions(self):
100
46
        """Test fetch-revision operation."""
101
 
        wt = self.make_branch_and_tree('b1')
102
 
        b1 = wt.branch
103
 
        self.build_tree_contents([('b1/foo', 'hello')])
104
 
        wt.add(['foo'], ['foo-id'])
105
 
        wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
106
 
 
107
 
        b2 = self.make_branch('b2')
108
 
        self.assertEqual((1, []), b2.fetch(b1))
109
 
 
110
 
        rev = b2.repository.get_revision('revision-1')
111
 
        tree = b2.repository.revision_tree('revision-1')
112
 
        self.assertEqual(tree.get_file_text('foo-id'), 'hello')
113
 
 
114
 
    def test_get_revision_delta(self):
115
 
        tree_a = self.make_branch_and_tree('a')
116
 
        self.build_tree(['a/foo'])
117
 
        tree_a.add('foo', 'file1')
118
 
        tree_a.commit('rev1', rev_id='rev1')
119
 
        self.build_tree(['a/vla'])
120
 
        tree_a.add('vla', 'file2')
121
 
        tree_a.commit('rev2', rev_id='rev2')
122
 
 
123
 
        delta = tree_a.branch.get_revision_delta(1)
124
 
        self.assertIsInstance(delta, TreeDelta)
125
 
        self.assertEqual([('foo', 'file1', 'file')], delta.added)
126
 
        delta = tree_a.branch.get_revision_delta(2)
127
 
        self.assertIsInstance(delta, TreeDelta)
128
 
        self.assertEqual([('vla', 'file2', 'file')], delta.added)
129
 
 
130
 
    def get_unbalanced_tree_pair(self):
 
47
        from bzrlib.fetch import Fetcher
 
48
        os.mkdir('b1')
 
49
        os.mkdir('b2')
 
50
        b1 = Branch.initialize('b1')
 
51
        b2 = Branch.initialize('b2')
 
52
        file(os.sep.join(['b1', 'foo']), 'w').write('hello')
 
53
        b1.working_tree().add(['foo'], ['foo-id'])
 
54
        b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=False)
 
55
 
 
56
        mutter('start fetch')
 
57
        f = Fetcher(from_branch=b1, to_branch=b2)
 
58
        eq = self.assertEquals
 
59
        eq(f.count_copied, 1)
 
60
        eq(f.last_revision, 'revision-1')
 
61
 
 
62
        rev = b2.get_revision('revision-1')
 
63
        tree = b2.revision_tree('revision-1')
 
64
        eq(tree.get_file_text('foo-id'), 'hello')
 
65
 
 
66
    def test_revision_tree(self):
 
67
        b1 = Branch.initialize('.')
 
68
        b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=True)
 
69
        tree = b1.revision_tree('revision-1')
 
70
        tree = b1.revision_tree(None)
 
71
        self.assertEqual(len(tree.list_files()), 0)
 
72
        tree = b1.revision_tree(NULL_REVISION)
 
73
        self.assertEqual(len(tree.list_files()), 0)
 
74
 
 
75
    def get_unbalanced_branch_pair(self):
131
76
        """Return two branches, a and b, with one file in a."""
132
 
        tree_a = self.make_branch_and_tree('a')
133
 
        self.build_tree_contents([('a/b', 'b')])
134
 
        tree_a.add('b')
135
 
        tree_a.commit("silly commit", rev_id='A')
136
 
 
137
 
        tree_b = self.make_branch_and_tree('b')
138
 
        return tree_a, tree_b
 
77
        os.mkdir('a')
 
78
        br_a = Branch.initialize("a")
 
79
        file('a/b', 'wb').write('b')
 
80
        br_a.working_tree().add('b')
 
81
        commit(br_a, "silly commit", rev_id='A')
 
82
        os.mkdir('b')
 
83
        br_b = Branch.initialize("b")
 
84
        return br_a, br_b
139
85
 
140
86
    def get_balanced_branch_pair(self):
141
87
        """Returns br_a, br_b as with one commit in a, and b has a's stores."""
142
 
        tree_a, tree_b = self.get_unbalanced_tree_pair()
143
 
        tree_b.branch.repository.fetch(tree_a.branch.repository)
144
 
        return tree_a, tree_b
145
 
 
146
 
    def test_clone_partial(self):
 
88
        br_a, br_b = self.get_unbalanced_branch_pair()
 
89
        br_a.push_stores(br_b)
 
90
        return br_a, br_b
 
91
 
 
92
    def test_push_stores(self):
 
93
        """Copy the stores from one branch to another"""
 
94
        br_a, br_b = self.get_unbalanced_branch_pair()
 
95
        # ensure the revision is missing.
 
96
        self.assertRaises(NoSuchRevision, br_b.get_revision, 
 
97
                          br_a.revision_history()[0])
 
98
        br_a.push_stores(br_b)
 
99
        # check that b now has all the data from a's first commit.
 
100
        rev = br_b.get_revision(br_a.revision_history()[0])
 
101
        tree = br_b.revision_tree(br_a.revision_history()[0])
 
102
        for file_id in tree:
 
103
            if tree.inventory[file_id].kind == "file":
 
104
                tree.get_file(file_id).read()
 
105
        return br_a, br_b
 
106
 
 
107
    def test_copy_branch(self):
 
108
        """Copy the stores from one branch to another"""
 
109
        br_a, br_b = self.get_balanced_branch_pair()
 
110
        commit(br_b, "silly commit")
 
111
        os.mkdir('c')
 
112
        br_c = copy_branch(br_a, 'c', basis_branch=br_b)
 
113
        self.assertEqual(br_a.revision_history(), br_c.revision_history())
 
114
 
 
115
    def test_copy_partial(self):
147
116
        """Copy only part of the history of a branch."""
148
 
        # TODO: RBC 20060208 test with a revision not on revision-history.
149
 
        #       what should that behaviour be ? Emailed the list.
150
 
        # First, make a branch with two commits.
151
 
        wt_a = self.make_branch_and_tree('a')
152
 
        self.build_tree(['a/one'])
153
 
        wt_a.add(['one'])
154
 
        wt_a.commit('commit one', rev_id='1')
 
117
        self.build_tree(['a/', 'a/one'])
 
118
        br_a = Branch.initialize('a')
 
119
        br_a.working_tree().add(['one'])
 
120
        br_a.working_tree().commit('commit one', rev_id='u@d-1')
155
121
        self.build_tree(['a/two'])
156
 
        wt_a.add(['two'])
157
 
        wt_a.commit('commit two', rev_id='2')
158
 
        # Now make a copy of the repository.
159
 
        repo_b = self.make_repository('b')
160
 
        wt_a.branch.repository.copy_content_into(repo_b)
161
 
        # wt_a might be a lightweight checkout, so get a hold of the actual
162
 
        # branch (because you can't do a partial clone of a lightweight
163
 
        # checkout).
164
 
        branch = wt_a.branch.bzrdir.open_branch()
165
 
        # Then make a branch where the new repository is, but specify a revision
166
 
        # ID.  The new branch's history will stop at the specified revision.
167
 
        br_b = branch.clone(repo_b.bzrdir, revision_id='1')
168
 
        self.assertEqual('1', br_b.last_revision())
169
 
 
170
 
    def get_parented_branch(self):
171
 
        wt_a = self.make_branch_and_tree('a')
172
 
        self.build_tree(['a/one'])
173
 
        wt_a.add(['one'])
174
 
        wt_a.commit('commit one', rev_id='1')
175
 
 
176
 
        branch_b = wt_a.bzrdir.sprout('b', revision_id='1').open_branch()
177
 
        self.assertEqual(wt_a.branch.base, branch_b.get_parent())
178
 
        return branch_b
179
 
 
180
 
    def test_clone_branch_nickname(self):
181
 
        # test the nick name is preserved always
182
 
        raise TestSkipped('XXX branch cloning is not yet tested..')
183
 
 
184
 
    def test_clone_branch_parent(self):
185
 
        # test the parent is preserved always
186
 
        branch_b = self.get_parented_branch()
187
 
        repo_c = self.make_repository('c')
188
 
        branch_b.repository.copy_content_into(repo_c)
189
 
        branch_c = branch_b.clone(repo_c.bzrdir)
190
 
        self.assertNotEqual(None, branch_c.get_parent())
191
 
        self.assertEqual(branch_b.get_parent(), branch_c.get_parent())
192
 
 
193
 
        # We can also set a specific parent, and it should be honored
194
 
        random_parent = 'http://bazaar-vcs.org/path/to/branch'
195
 
        branch_b.set_parent(random_parent)
196
 
        repo_d = self.make_repository('d')
197
 
        branch_b.repository.copy_content_into(repo_d)
198
 
        branch_d = branch_b.clone(repo_d.bzrdir)
199
 
        self.assertEqual(random_parent, branch_d.get_parent())
200
 
 
201
 
    def test_submit_branch(self):
202
 
        """Submit location can be queried and set"""
203
 
        branch = self.make_branch('branch')
204
 
        self.assertEqual(branch.get_submit_branch(), None)
205
 
        branch.set_submit_branch('sftp://example.com')
206
 
        self.assertEqual(branch.get_submit_branch(), 'sftp://example.com')
207
 
        branch.set_submit_branch('sftp://example.net')
208
 
        self.assertEqual(branch.get_submit_branch(), 'sftp://example.net')
 
122
        br_a.working_tree().add(['two'])
 
123
        br_a.working_tree().commit('commit two', rev_id='u@d-2')
 
124
        br_b = copy_branch(br_a, 'b', revision='u@d-1')
 
125
        self.assertEqual(br_b.last_revision(), 'u@d-1')
 
126
        self.assertTrue(os.path.exists('b/one'))
 
127
        self.assertFalse(os.path.exists('b/two'))
209
128
        
210
 
    def test_public_branch(self):
211
 
        """public location can be queried and set"""
212
 
        branch = self.make_branch('branch')
213
 
        self.assertEqual(branch.get_public_branch(), None)
214
 
        branch.set_public_branch('sftp://example.com')
215
 
        self.assertEqual(branch.get_public_branch(), 'sftp://example.com')
216
 
        branch.set_public_branch('sftp://example.net')
217
 
        self.assertEqual(branch.get_public_branch(), 'sftp://example.net')
218
 
        branch.set_public_branch(None)
219
 
        self.assertEqual(branch.get_public_branch(), None)
220
 
 
221
 
    def test_record_initial_ghost(self):
222
 
        """Branches should support having ghosts."""
223
 
        wt = self.make_branch_and_tree('.')
224
 
        wt.set_parent_ids(['non:existent@rev--ision--0--2'],
225
 
            allow_leftmost_as_ghost=True)
226
 
        rev_id = wt.commit('commit against a ghost first parent.')
227
 
        rev = wt.branch.repository.get_revision(rev_id)
228
 
        self.assertEqual(rev.parent_ids, ['non:existent@rev--ision--0--2'])
 
129
    def test_record_initial_ghost_merge(self):
 
130
        """A pending merge with no revision present is still a merge."""
 
131
        branch = Branch.initialize('.')
 
132
        branch.working_tree().add_pending_merge('non:existent@rev--ision--0--2')
 
133
        branch.working_tree().commit('pretend to merge nonexistent-revision', rev_id='first')
 
134
        rev = branch.get_revision(branch.last_revision())
 
135
        self.assertEqual(len(rev.parent_ids), 1)
229
136
        # parent_sha1s is not populated now, WTF. rbc 20051003
230
137
        self.assertEqual(len(rev.parent_sha1s), 0)
231
 
 
232
 
    def test_record_two_ghosts(self):
233
 
        """Recording with all ghosts works."""
234
 
        wt = self.make_branch_and_tree('.')
235
 
        wt.set_parent_ids([
236
 
                'foo@azkhazan-123123-abcabc',
237
 
                'wibble@fofof--20050401--1928390812',
238
 
            ],
239
 
            allow_leftmost_as_ghost=True)
240
 
        rev_id = wt.commit("commit from ghost base with one merge")
241
 
        # the revision should have been committed with two parents
242
 
        rev = wt.branch.repository.get_revision(rev_id)
243
 
        self.assertEqual(['foo@azkhazan-123123-abcabc',
244
 
            'wibble@fofof--20050401--1928390812'],
245
 
            rev.parent_ids)
 
138
        self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
246
139
 
247
140
    def test_bad_revision(self):
248
 
        self.assertRaises(errors.InvalidRevisionId,
249
 
                          self.get_branch().repository.get_revision,
250
 
                          None)
 
141
        branch = Branch.initialize('.')
 
142
        self.assertRaises(errors.InvalidRevisionId, branch.get_revision, None)
251
143
 
252
144
# TODO 20051003 RBC:
253
145
# compare the gpg-to-sign info for a commit with a ghost and 
254
146
#     an identical tree without a ghost
255
147
# fetch missing should rewrite the TOC of weaves to list newly available parents.
256
148
        
 
149
    def test_pending_merges(self):
 
150
        """Tracking pending-merged revisions."""
 
151
        b = Branch.initialize('.')
 
152
        wt = b.working_tree()
 
153
        self.assertEquals(wt.pending_merges(), [])
 
154
        wt.add_pending_merge('foo@azkhazan-123123-abcabc')
 
155
        self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
 
156
        wt.add_pending_merge('foo@azkhazan-123123-abcabc')
 
157
        self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
 
158
        wt.add_pending_merge('wibble@fofof--20050401--1928390812')
 
159
        self.assertEquals(wt.pending_merges(),
 
160
                          ['foo@azkhazan-123123-abcabc',
 
161
                           'wibble@fofof--20050401--1928390812'])
 
162
        b.working_tree().commit("commit from base with two merges")
 
163
        rev = b.get_revision(b.revision_history()[0])
 
164
        self.assertEquals(len(rev.parent_ids), 2)
 
165
        self.assertEquals(rev.parent_ids[0],
 
166
                          'foo@azkhazan-123123-abcabc')
 
167
        self.assertEquals(rev.parent_ids[1],
 
168
                           'wibble@fofof--20050401--1928390812')
 
169
        # list should be cleared when we do a commit
 
170
        self.assertEquals(wt.pending_merges(), [])
 
171
 
257
172
    def test_sign_existing_revision(self):
258
 
        wt = self.make_branch_and_tree('.')
259
 
        branch = wt.branch
260
 
        wt.commit("base", allow_pointless=True, rev_id='A')
 
173
        branch = Branch.initialize('.')
 
174
        branch.working_tree().commit("base", allow_pointless=True, rev_id='A')
261
175
        from bzrlib.testament import Testament
262
 
        strategy = gpg.LoopbackGPGStrategy(None)
263
 
        branch.repository.sign_revision('A', strategy)
264
 
        self.assertEqual('-----BEGIN PSEUDO-SIGNED CONTENT-----\n' +
265
 
                         Testament.from_revision(branch.repository,
266
 
                         'A').as_short_text() +
267
 
                         '-----END PSEUDO-SIGNED CONTENT-----\n',
268
 
                         branch.repository.get_signature_text('A'))
 
176
        branch.sign_revision('A', bzrlib.gpg.LoopbackGPGStrategy(None))
 
177
        self.assertEqual(Testament.from_revision(branch, 'A').as_short_text(),
 
178
                         branch.revision_store.get('A', 'sig').read())
269
179
 
270
180
    def test_store_signature(self):
271
 
        wt = self.make_branch_and_tree('.')
272
 
        branch = wt.branch
273
 
        branch.repository.store_revision_signature(
274
 
            gpg.LoopbackGPGStrategy(None), 'FOO', 'A')
275
 
        self.assertRaises(errors.NoSuchRevision,
276
 
                          branch.repository.has_signature_for_revision_id,
277
 
                          'A')
278
 
        wt.commit("base", allow_pointless=True, rev_id='A')
279
 
        self.assertEqual('-----BEGIN PSEUDO-SIGNED CONTENT-----\n'
280
 
                         'FOO-----END PSEUDO-SIGNED CONTENT-----\n',
281
 
                         branch.repository.get_signature_text('A'))
 
181
        branch = Branch.initialize('.')
 
182
        branch.store_revision_signature(bzrlib.gpg.LoopbackGPGStrategy(None),
 
183
                                        'FOO', 'A')
 
184
        self.assertEqual('FOO', branch.revision_store.get('A', 'sig').read())
282
185
 
283
 
    def test_branch_keeps_signatures(self):
284
 
        wt = self.make_branch_and_tree('source')
285
 
        wt.commit('A', allow_pointless=True, rev_id='A')
286
 
        repo = wt.branch.repository
287
 
        repo.sign_revision('A', gpg.LoopbackGPGStrategy(None))
288
 
        #FIXME: clone should work to urls,
289
 
        # wt.clone should work to disks.
290
 
        self.build_tree(['target/'])
291
 
        d2 = repo.bzrdir.clone(urlutils.local_path_to_url('target'))
292
 
        self.assertEqual(repo.get_signature_text('A'),
293
 
                         d2.open_repository().get_signature_text('A'))
 
186
    def test__relcontrolfilename(self):
 
187
        branch = Branch.initialize('.')
 
188
        self.assertEqual('.bzr/%25', branch._rel_controlfilename('%'))
 
189
        
 
190
    def test__relcontrolfilename_empty(self):
 
191
        branch = Branch.initialize('.')
 
192
        self.assertEqual('.bzr', branch._rel_controlfilename(''))
294
193
 
295
194
    def test_nicks(self):
296
 
        """Test explicit and implicit branch nicknames.
297
 
        
298
 
        Nicknames are implicitly the name of the branch's directory, unless an
299
 
        explicit nickname is set.  That is, an explicit nickname always
300
 
        overrides the implicit one.
301
 
        """
302
 
        t = get_transport(self.get_url())
303
 
        branch = self.make_branch('bzr.dev')
304
 
        # The nick will be 'bzr.dev', because there is no explicit nick set.
 
195
        """Branch nicknames"""
 
196
        os.mkdir('bzr.dev')
 
197
        branch = Branch.initialize('bzr.dev')
305
198
        self.assertEqual(branch.nick, 'bzr.dev')
306
 
        # Move the branch to a different directory, 'bzr.ab'.  Now that branch
307
 
        # will report its nick as 'bzr.ab'.
308
 
        t.move('bzr.dev', 'bzr.ab')
309
 
        branch = Branch.open(self.get_url('bzr.ab'))
 
199
        os.rename('bzr.dev', 'bzr.ab')
 
200
        branch = Branch.open('bzr.ab')
310
201
        self.assertEqual(branch.nick, 'bzr.ab')
311
 
        # Set the branch nick explicitly.  This will ensure there's a branch
312
 
        # config file in the branch.
313
 
        branch.nick = "Aaron's branch"
314
 
        branch.nick = "Aaron's branch"
315
 
        if not isinstance(branch, remote.RemoteBranch):
316
 
            controlfilename = branch.control_files.controlfilename
317
 
            self.failUnless(t.has(t.relpath(controlfilename("branch.conf"))))
318
 
        # Because the nick has been set explicitly, the nick is now always
319
 
        # "Aaron's branch", regardless of directory name.
 
202
        branch.nick = "Aaron's branch"
 
203
        branch.nick = "Aaron's branch"
 
204
        self.failUnless(os.path.exists(branch.controlfilename("branch.conf")))
320
205
        self.assertEqual(branch.nick, "Aaron's branch")
321
 
        t.move('bzr.ab', 'integration')
322
 
        branch = Branch.open(self.get_url('integration'))
 
206
        os.rename('bzr.ab', 'integration')
 
207
        branch = Branch.open('integration')
323
208
        self.assertEqual(branch.nick, "Aaron's branch")
324
209
        branch.nick = u"\u1234"
325
210
        self.assertEqual(branch.nick, u"\u1234")
326
211
 
327
212
    def test_commit_nicks(self):
328
213
        """Nicknames are committed to the revision"""
329
 
        wt = self.make_branch_and_tree('bzr.dev')
330
 
        branch = wt.branch
 
214
        os.mkdir('bzr.dev')
 
215
        branch = Branch.initialize('bzr.dev')
331
216
        branch.nick = "My happy branch"
332
 
        wt.commit('My commit respect da nick.')
333
 
        committed = branch.repository.get_revision(branch.last_revision())
334
 
        self.assertEqual(committed.properties["branch-nick"],
 
217
        branch.working_tree().commit('My commit respect da nick.')
 
218
        committed = branch.get_revision(branch.last_revision())
 
219
        self.assertEqual(committed.properties["branch-nick"], 
335
220
                         "My happy branch")
336
221
 
337
 
    def test_create_open_branch_uses_repository(self):
338
 
        try:
339
 
            repo = self.make_repository('.', shared=True)
340
 
        except errors.IncompatibleFormat:
341
 
            return
342
 
        child_transport = repo.bzrdir.root_transport.clone('child')
343
 
        child_transport.mkdir('.')
344
 
        child_dir = self.bzrdir_format.initialize_on_transport(child_transport)
345
 
        try:
346
 
            child_branch = self.branch_format.initialize(child_dir)
347
 
        except errors.UninitializableFormat:
348
 
            # branch references are not default init'able.
349
 
            return
350
 
        self.assertEqual(repo.bzrdir.root_transport.base,
351
 
                         child_branch.repository.bzrdir.root_transport.base)
352
 
        child_branch = branch.Branch.open(self.get_url('child'))
353
 
        self.assertEqual(repo.bzrdir.root_transport.base,
354
 
                         child_branch.repository.bzrdir.root_transport.base)
355
 
 
356
 
    def test_format_description(self):
357
 
        tree = self.make_branch_and_tree('tree')
358
 
        text = tree.branch._format.get_format_description()
359
 
        self.failUnless(len(text))
360
 
 
361
 
    def test_check_branch_report_results(self):
362
 
        """Checking a branch produces results which can be printed"""
363
 
        branch = self.make_branch('.')
364
 
        result = branch.check()
365
 
        # reports results through logging
366
 
        result.report_results(verbose=True)
367
 
        result.report_results(verbose=False)
368
 
 
369
 
    def test_get_commit_builder(self):
370
 
        self.assertIsInstance(self.make_branch(".").get_commit_builder([]), 
371
 
            repository.CommitBuilder)
372
 
 
373
 
    def test_generate_revision_history(self):
374
 
        """Create a fake revision history easily."""
375
 
        tree = self.make_branch_and_tree('.')
376
 
        rev1 = tree.commit('foo')
377
 
        orig_history = tree.branch.revision_history()
378
 
        rev2 = tree.commit('bar', allow_pointless=True)
379
 
        tree.branch.generate_revision_history(rev1)
380
 
        self.assertEqual(orig_history, tree.branch.revision_history())
381
 
 
382
 
    def test_generate_revision_history_NULL_REVISION(self):
383
 
        tree = self.make_branch_and_tree('.')
384
 
        rev1 = tree.commit('foo')
385
 
        tree.branch.generate_revision_history(bzrlib.revision.NULL_REVISION)
386
 
        self.assertEqual([], tree.branch.revision_history())
387
 
 
388
 
    def test_create_checkout(self):
389
 
        tree_a = self.make_branch_and_tree('a')
390
 
        branch_a = tree_a.branch
391
 
        checkout_b = branch_a.create_checkout('b')
392
 
        self.assertEqual(None, checkout_b.last_revision())
393
 
        checkout_b.commit('rev1', rev_id='rev1')
394
 
        self.assertEqual('rev1', branch_a.last_revision())
395
 
        self.assertNotEqual(checkout_b.branch.base, branch_a.base)
396
 
 
397
 
        checkout_c = branch_a.create_checkout('c', lightweight=True)
398
 
        self.assertEqual('rev1', checkout_c.last_revision())
399
 
        checkout_c.commit('rev2', rev_id='rev2')
400
 
        self.assertEqual('rev2', branch_a.last_revision())
401
 
        self.assertEqual(checkout_c.branch.base, branch_a.base)
402
 
 
403
 
        os.mkdir('d')
404
 
        checkout_d = branch_a.create_checkout('d', lightweight=True)
405
 
        self.assertEqual('rev2', checkout_d.last_revision())
406
 
        os.mkdir('e')
407
 
        checkout_e = branch_a.create_checkout('e')
408
 
        self.assertEqual('rev2', checkout_e.last_revision())
409
 
 
410
 
    def test_create_anonymous_lightweight_checkout(self):
411
 
        """A lightweight checkout from a readonly branch should succeed."""
412
 
        tree_a = self.make_branch_and_tree('a')
413
 
        rev_id = tree_a.commit('put some content in the branch')
414
 
        # open the branch via a readonly transport
415
 
        source_branch = bzrlib.branch.Branch.open(self.get_readonly_url('a'))
416
 
        # sanity check that the test will be valid
417
 
        self.assertRaises((errors.LockError, errors.TransportNotPossible),
418
 
            source_branch.lock_write)
419
 
        checkout = source_branch.create_checkout('c', lightweight=True)
420
 
        self.assertEqual(rev_id, checkout.last_revision())
421
 
 
422
 
    def test_create_anonymous_heavyweight_checkout(self):
423
 
        """A regular checkout from a readonly branch should succeed."""
424
 
        tree_a = self.make_branch_and_tree('a')
425
 
        rev_id = tree_a.commit('put some content in the branch')
426
 
        # open the branch via a readonly transport
427
 
        source_branch = bzrlib.branch.Branch.open(self.get_readonly_url('a'))
428
 
        # sanity check that the test will be valid
429
 
        self.assertRaises((errors.LockError, errors.TransportNotPossible),
430
 
            source_branch.lock_write)
431
 
        checkout = source_branch.create_checkout('c')
432
 
        self.assertEqual(rev_id, checkout.last_revision())
433
 
 
434
 
    def test_set_revision_history(self):
435
 
        tree = self.make_branch_and_tree('a')
436
 
        tree.commit('a commit', rev_id='rev1')
437
 
        br = tree.branch
438
 
        br.set_revision_history(["rev1"])
439
 
        self.assertEquals(br.revision_history(), ["rev1"])
440
 
        br.set_revision_history([])
441
 
        self.assertEquals(br.revision_history(), [])
442
 
 
443
 
 
444
 
class ChrootedTests(TestCaseWithBranch):
445
 
    """A support class that provides readonly urls outside the local namespace.
446
 
 
447
 
    This is done by checking if self.transport_server is a MemoryServer. if it
448
 
    is then we are chrooted already, if it is not then an HttpServer is used
449
 
    for readonly urls.
450
 
    """
451
 
 
452
 
    def setUp(self):
453
 
        super(ChrootedTests, self).setUp()
454
 
        if not self.vfs_transport_factory == MemoryServer:
455
 
            self.transport_readonly_server = HttpServer
 
222
 
 
223
class TestRemote(TestCaseWithWebserver):
456
224
 
457
225
    def test_open_containing(self):
458
226
        self.assertRaises(NotBranchError, Branch.open_containing,
459
 
                          self.get_readonly_url(''))
 
227
                          self.get_remote_url(''))
460
228
        self.assertRaises(NotBranchError, Branch.open_containing,
461
 
                          self.get_readonly_url('g/p/q'))
462
 
        branch = self.make_branch('.')
463
 
        branch, relpath = Branch.open_containing(self.get_readonly_url(''))
 
229
                          self.get_remote_url('g/p/q'))
 
230
        b = Branch.initialize('.')
 
231
        branch, relpath = Branch.open_containing(self.get_remote_url(''))
464
232
        self.assertEqual('', relpath)
465
 
        branch, relpath = Branch.open_containing(self.get_readonly_url('g/p/q'))
 
233
        branch, relpath = Branch.open_containing(self.get_remote_url('g/p/q'))
466
234
        self.assertEqual('g/p/q', relpath)
467
235
        
 
236
# TODO: rewrite this as a regular unittest, without relying on the displayed output        
 
237
#         >>> from bzrlib.commit import commit
 
238
#         >>> bzrlib.trace.silent = True
 
239
#         >>> br1 = ScratchBranch(files=['foo', 'bar'])
 
240
#         >>> br1.working_tree().add('foo')
 
241
#         >>> br1.working_tree().add('bar')
 
242
#         >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
 
243
#         >>> br2 = ScratchBranch()
 
244
#         >>> br2.update_revisions(br1)
 
245
#         Added 2 texts.
 
246
#         Added 1 inventories.
 
247
#         Added 1 revisions.
 
248
#         >>> br2.revision_history()
 
249
#         [u'REVISION-ID-1']
 
250
#         >>> br2.update_revisions(br1)
 
251
#         Added 0 revisions.
 
252
#         >>> br1.text_store.total_size() == br2.text_store.total_size()
 
253
#         True
468
254
 
469
255
class InstrumentedTransaction(object):
470
256
 
529
315
        self.assertEqual(['lw', 'ul'], branch._calls)
530
316
 
531
317
 
532
 
class TestBranchPushLocations(TestCaseWithBranch):
533
 
 
 
318
class TestBranchTransaction(TestCaseInTempDir):
 
319
 
 
320
    def setUp(self):
 
321
        super(TestBranchTransaction, self).setUp()
 
322
        self.branch = Branch.initialize('.')
 
323
        
 
324
    def test_default_get_transaction(self):
 
325
        """branch.get_transaction on a new branch should give a PassThrough."""
 
326
        self.failUnless(isinstance(self.branch.get_transaction(),
 
327
                                   transactions.PassThroughTransaction))
 
328
 
 
329
    def test__set_new_transaction(self):
 
330
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
331
 
 
332
    def test__set_over_existing_transaction_raises(self):
 
333
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
334
        self.assertRaises(errors.LockError,
 
335
                          self.branch._set_transaction,
 
336
                          transactions.ReadOnlyTransaction())
 
337
 
 
338
    def test_finish_no_transaction_raises(self):
 
339
        self.assertRaises(errors.LockError, self.branch._finish_transaction)
 
340
 
 
341
    def test_finish_readonly_transaction_works(self):
 
342
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
343
        self.branch._finish_transaction()
 
344
        self.assertEqual(None, self.branch._transaction)
 
345
 
 
346
    def test_unlock_calls_finish(self):
 
347
        self.branch.lock_read()
 
348
        transaction = InstrumentedTransaction()
 
349
        self.branch._transaction = transaction
 
350
        self.branch.unlock()
 
351
        self.assertEqual(['finish'], transaction.calls)
 
352
 
 
353
    def test_lock_read_acquires_ro_transaction(self):
 
354
        self.branch.lock_read()
 
355
        self.failUnless(isinstance(self.branch.get_transaction(),
 
356
                                   transactions.ReadOnlyTransaction))
 
357
        self.branch.unlock()
 
358
        
 
359
    def test_lock_write_acquires_passthrough_transaction(self):
 
360
        self.branch.lock_write()
 
361
        # cannot use get_transaction as its magic
 
362
        self.failUnless(isinstance(self.branch._transaction,
 
363
                                   transactions.PassThroughTransaction))
 
364
        self.branch.unlock()
 
365
 
 
366
 
 
367
class TestBranchPushLocations(TestCaseInTempDir):
 
368
 
 
369
    def setUp(self):
 
370
        super(TestBranchPushLocations, self).setUp()
 
371
        self.branch = Branch.initialize('.')
 
372
        
534
373
    def test_get_push_location_unset(self):
535
 
        self.assertEqual(None, self.get_branch().get_push_location())
 
374
        self.assertEqual(None, self.branch.get_push_location())
536
375
 
537
376
    def test_get_push_location_exact(self):
538
 
        from bzrlib.config import (locations_config_filename,
539
 
                                   ensure_config_dir_exists)
540
 
        ensure_config_dir_exists()
541
 
        fn = locations_config_filename()
542
 
        print >> open(fn, 'wt'), ("[%s]\n"
543
 
                                  "push_location=foo" %
544
 
                                  self.get_branch().base[:-1])
545
 
        self.assertEqual("foo", self.get_branch().get_push_location())
 
377
        self.build_tree(['.bazaar/'])
 
378
        print >> open('.bazaar/branches.conf', 'wt'), ("[%s]\n"
 
379
                                                       "push_location=foo" %
 
380
                                                       os.getcwdu())
 
381
        self.assertEqual("foo", self.branch.get_push_location())
546
382
 
547
383
    def test_set_push_location(self):
548
 
        branch = self.get_branch()
549
 
        branch.set_push_location('foo')
550
 
        self.assertEqual('foo', branch.get_push_location())
551
 
 
552
 
 
553
 
class TestFormat(TestCaseWithBranch):
554
 
    """Tests for the format itself."""
555
 
 
556
 
    def test_get_reference(self):
557
 
        """get_reference on all regular branches should return None."""
558
 
        if not self.branch_format.is_supported():
559
 
            # unsupported formats are not loopback testable
560
 
            # because the default open will not open them and
561
 
            # they may not be initializable.
562
 
            return
563
 
        made_branch = self.make_branch('.')
564
 
        self.assertEqual(None,
565
 
            made_branch._format.get_reference(made_branch.bzrdir))
566
 
 
567
 
    def test_format_initialize_find_open(self):
568
 
        # loopback test to check the current format initializes to itself.
569
 
        if not self.branch_format.is_supported():
570
 
            # unsupported formats are not loopback testable
571
 
            # because the default open will not open them and
572
 
            # they may not be initializable.
573
 
            return
574
 
        # supported formats must be able to init and open
575
 
        t = get_transport(self.get_url())
576
 
        readonly_t = get_transport(self.get_readonly_url())
577
 
        made_branch = self.make_branch('.')
578
 
        self.failUnless(isinstance(made_branch, branch.Branch))
579
 
 
580
 
        # find it via bzrdir opening:
581
 
        opened_control = bzrdir.BzrDir.open(readonly_t.base)
582
 
        direct_opened_branch = opened_control.open_branch()
583
 
        self.assertEqual(direct_opened_branch.__class__, made_branch.__class__)
584
 
        self.assertEqual(opened_control, direct_opened_branch.bzrdir)
585
 
        self.failUnless(isinstance(direct_opened_branch._format,
586
 
                        self.branch_format.__class__))
587
 
 
588
 
        # find it via Branch.open
589
 
        opened_branch = branch.Branch.open(readonly_t.base)
590
 
        self.failUnless(isinstance(opened_branch, made_branch.__class__))
591
 
        self.assertEqual(made_branch._format.__class__,
592
 
                         opened_branch._format.__class__)
593
 
        # if it has a unique id string, can we probe for it ?
594
 
        try:
595
 
            self.branch_format.get_format_string()
596
 
        except NotImplementedError:
597
 
            return
598
 
        self.assertEqual(self.branch_format,
599
 
                         opened_control.find_branch_format())
600
 
 
601
 
 
602
 
class TestBound(TestCaseWithBranch):
603
 
 
604
 
    def test_bind_unbind(self):
605
 
        branch = self.make_branch('1')
606
 
        branch2 = self.make_branch('2')
607
 
        try:
608
 
            branch.bind(branch2)
609
 
        except errors.UpgradeRequired:
610
 
            raise TestSkipped('Format does not support binding')
611
 
        self.assertTrue(branch.unbind())
612
 
        self.assertFalse(branch.unbind())
613
 
        self.assertIs(None, branch.get_bound_location())
614
 
 
615
 
    def test_old_bound_location(self):
616
 
        branch = self.make_branch('branch1')
617
 
        try:
618
 
            self.assertIs(None, branch.get_old_bound_location())
619
 
        except errors.UpgradeRequired:
620
 
            raise TestSkipped('Format does not store old bound locations')
621
 
        branch2 = self.make_branch('branch2')
622
 
        branch.bind(branch2)
623
 
        self.assertIs(None, branch.get_old_bound_location())
624
 
        branch.unbind()
625
 
        self.assertContainsRe(branch.get_old_bound_location(), '\/branch2\/$')
626
 
 
627
 
 
628
 
class TestStrict(TestCaseWithBranch):
629
 
 
630
 
    def test_strict_history(self):
631
 
        tree1 = self.make_branch_and_tree('tree1')
632
 
        try:
633
 
            tree1.branch.set_append_revisions_only(True)
634
 
        except errors.UpgradeRequired:
635
 
            raise TestSkipped('Format does not support strict history')
636
 
        tree1.commit('empty commit')
637
 
        tree2 = tree1.bzrdir.sprout('tree2').open_workingtree()
638
 
        tree2.commit('empty commit 2')
639
 
        tree1.pull(tree2.branch)
640
 
        tree1.commit('empty commit 3')
641
 
        tree2.commit('empty commit 4')
642
 
        self.assertRaises(errors.DivergedBranches, tree1.pull, tree2.branch)
643
 
        tree2.merge_from_branch(tree1.branch)
644
 
        tree2.commit('empty commit 5')
645
 
        self.assertRaises(errors.AppendRevisionsOnlyViolation, tree1.pull,
646
 
                          tree2.branch)
647
 
        tree3 = tree1.bzrdir.sprout('tree3').open_workingtree()
648
 
        tree3.merge_from_branch(tree2.branch)
649
 
        tree3.commit('empty commit 6')
650
 
        tree2.pull(tree3.branch)
 
384
        self.branch.set_push_location('foo')
 
385
        self.assertFileEqual("[%s]\n"
 
386
                             "push_location = foo" % os.getcwdu(),
 
387
                             '.bazaar/branches.conf')
 
388
 
 
389
    # TODO RBC 20051029 test getting a push location from a branch in a 
 
390
    # recursive section - that is, it appends the branch name.