~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_branch.py

  • Committer: John Arbash Meinel
  • Date: 2005-11-30 15:43:57 UTC
  • mto: (1185.50.1 jam-integration)
  • mto: This revision was merged to the branch mainline in revision 1518.
  • Revision ID: john@arbash-meinel.com-20051130154357-614206b3a7b83cd0
Refactored bzrlib/ui.py into a module with the possibility for multiple ui forms.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007, 2008 Canonical Ltd
2
 
#
 
1
# (C) 2005 Canonical Ltd
 
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
#
 
7
 
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
#
 
12
 
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
 
"""Tests for branch implementations - tests a branch format."""
18
 
 
19
17
import os
20
 
import sys
21
18
 
22
 
from bzrlib import (
23
 
    branch,
24
 
    bzrdir,
25
 
    errors,
26
 
    gpg,
27
 
    urlutils,
28
 
    transactions,
29
 
    remote,
30
 
    repository,
31
 
    tests,
32
 
    )
33
19
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
34
 
from bzrlib.delta import TreeDelta
35
 
from bzrlib.errors import (FileExists,
36
 
                           NoSuchRevision,
37
 
                           NoSuchFile,
38
 
                           UninitializableFormat,
39
 
                           NotBranchError,
40
 
                           )
41
 
from bzrlib.osutils import getcwd
42
 
import bzrlib.revision
43
 
from bzrlib.symbol_versioning import deprecated_in
44
 
from bzrlib.tests import TestCase, TestCaseWithTransport, TestSkipped
45
 
from bzrlib.tests.branch_implementations import TestCaseWithBranch
46
 
from bzrlib.tests.http_server import HttpServer
 
20
from bzrlib.clone import copy_branch
 
21
from bzrlib.commit import commit
 
22
import bzrlib.errors as errors
 
23
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
 
24
import bzrlib.gpg
 
25
from bzrlib.tests import TestCase, TestCaseInTempDir
 
26
from bzrlib.tests.HTTPTestUtil import TestCaseWithWebserver
47
27
from bzrlib.trace import mutter
48
 
from bzrlib.transport import get_transport
49
 
from bzrlib.transport.memory import MemoryServer
50
 
from bzrlib.upgrade import upgrade
51
 
from bzrlib.workingtree import WorkingTree
52
 
 
53
 
 
54
 
class TestTestCaseWithBranch(TestCaseWithBranch):
55
 
 
56
 
    def test_branch_format_matches_bzrdir_branch_format(self):
57
 
        bzrdir_branch_format = self.bzrdir_format.get_branch_format()
58
 
        self.assertIs(self.branch_format.__class__,
59
 
                      bzrdir_branch_format.__class__)
60
 
 
61
 
 
62
 
class TestBranch(TestCaseWithBranch):
63
 
 
64
 
    def test_create_tree_with_merge(self):
65
 
        tree = self.create_tree_with_merge()
66
 
        tree.lock_read()
67
 
        self.addCleanup(tree.unlock)
68
 
        graph = tree.branch.repository.get_graph()
69
 
        ancestry_graph = graph.get_parent_map(
70
 
            tree.branch.repository.all_revision_ids())
71
 
        self.assertEqual({'rev-1':('null:',),
72
 
                          'rev-2':('rev-1', ),
73
 
                          'rev-1.1.1':('rev-1', ),
74
 
                          'rev-3':('rev-2', 'rev-1.1.1', ),
75
 
                         }, ancestry_graph)
76
 
 
77
 
    def test_revision_ids_are_utf8(self):
78
 
        wt = self.make_branch_and_tree('tree')
79
 
        wt.commit('f', rev_id='rev1')
80
 
        wt.commit('f', rev_id='rev2')
81
 
        wt.commit('f', rev_id='rev3')
82
 
 
83
 
        br = self.get_branch()
84
 
        br.fetch(wt.branch)
85
 
        br.set_revision_history(['rev1', 'rev2', 'rev3'])
86
 
        rh = br.revision_history()
87
 
        self.assertEqual(['rev1', 'rev2', 'rev3'], rh)
88
 
        for revision_id in rh:
89
 
            self.assertIsInstance(revision_id, str)
90
 
        last = br.last_revision()
91
 
        self.assertEqual('rev3', last)
92
 
        self.assertIsInstance(last, str)
93
 
        revno, last = br.last_revision_info()
94
 
        self.assertEqual(3, revno)
95
 
        self.assertEqual('rev3', last)
96
 
        self.assertIsInstance(last, str)
 
28
import bzrlib.transactions as transactions
 
29
from bzrlib.revision import NULL_REVISION
 
30
 
 
31
# TODO: Make a branch using basis branch, and check that it 
 
32
# doesn't request any files that could have been avoided, by 
 
33
# hooking into the Transport.
 
34
 
 
35
class TestBranch(TestCaseInTempDir):
 
36
 
 
37
    def test_append_revisions(self):
 
38
        """Test appending more than one revision"""
 
39
        br = Branch.initialize(u".")
 
40
        br.append_revision("rev1")
 
41
        self.assertEquals(br.revision_history(), ["rev1",])
 
42
        br.append_revision("rev2", "rev3")
 
43
        self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
97
44
 
98
45
    def test_fetch_revisions(self):
99
46
        """Test fetch-revision operation."""
100
 
        wt = self.make_branch_and_tree('b1')
101
 
        b1 = wt.branch
102
 
        self.build_tree_contents([('b1/foo', 'hello')])
103
 
        wt.add(['foo'], ['foo-id'])
104
 
        wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
105
 
 
106
 
        b2 = self.make_branch('b2')
107
 
        self.assertEqual((1, []), b2.fetch(b1))
108
 
 
109
 
        rev = b2.repository.get_revision('revision-1')
110
 
        tree = b2.repository.revision_tree('revision-1')
111
 
        tree.lock_read()
112
 
        self.addCleanup(tree.unlock)
113
 
        self.assertEqual(tree.get_file_text('foo-id'), 'hello')
114
 
 
115
 
    def test_get_revision_delta(self):
116
 
        tree_a = self.make_branch_and_tree('a')
117
 
        self.build_tree(['a/foo'])
118
 
        tree_a.add('foo', 'file1')
119
 
        tree_a.commit('rev1', rev_id='rev1')
120
 
        self.build_tree(['a/vla'])
121
 
        tree_a.add('vla', 'file2')
122
 
        tree_a.commit('rev2', rev_id='rev2')
123
 
 
124
 
        delta = tree_a.branch.get_revision_delta(1)
125
 
        self.assertIsInstance(delta, TreeDelta)
126
 
        self.assertEqual([('foo', 'file1', 'file')], delta.added)
127
 
        delta = tree_a.branch.get_revision_delta(2)
128
 
        self.assertIsInstance(delta, TreeDelta)
129
 
        self.assertEqual([('vla', 'file2', 'file')], delta.added)
130
 
 
131
 
    def get_unbalanced_tree_pair(self):
 
47
        from bzrlib.fetch import Fetcher
 
48
        os.mkdir('b1')
 
49
        os.mkdir('b2')
 
50
        b1 = Branch.initialize('b1')
 
51
        b2 = Branch.initialize('b2')
 
52
        file(os.sep.join(['b1', 'foo']), 'w').write('hello')
 
53
        b1.working_tree().add(['foo'], ['foo-id'])
 
54
        b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=False)
 
55
 
 
56
        mutter('start fetch')
 
57
        f = Fetcher(from_branch=b1, to_branch=b2)
 
58
        eq = self.assertEquals
 
59
        eq(f.count_copied, 1)
 
60
        eq(f.last_revision, 'revision-1')
 
61
 
 
62
        rev = b2.get_revision('revision-1')
 
63
        tree = b2.revision_tree('revision-1')
 
64
        eq(tree.get_file_text('foo-id'), 'hello')
 
65
 
 
66
    def test_revision_tree(self):
 
67
        b1 = Branch.initialize(u'.')
 
68
        b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=True)
 
69
        tree = b1.revision_tree('revision-1')
 
70
        tree = b1.revision_tree(None)
 
71
        self.assertEqual(len(tree.list_files()), 0)
 
72
        tree = b1.revision_tree(NULL_REVISION)
 
73
        self.assertEqual(len(tree.list_files()), 0)
 
74
 
 
75
    def get_unbalanced_branch_pair(self):
132
76
        """Return two branches, a and b, with one file in a."""
133
 
        tree_a = self.make_branch_and_tree('a')
134
 
        self.build_tree_contents([('a/b', 'b')])
135
 
        tree_a.add('b')
136
 
        tree_a.commit("silly commit", rev_id='A')
137
 
 
138
 
        tree_b = self.make_branch_and_tree('b')
139
 
        return tree_a, tree_b
 
77
        os.mkdir('a')
 
78
        br_a = Branch.initialize("a")
 
79
        file('a/b', 'wb').write('b')
 
80
        br_a.working_tree().add('b')
 
81
        commit(br_a, "silly commit", rev_id='A')
 
82
        os.mkdir('b')
 
83
        br_b = Branch.initialize("b")
 
84
        return br_a, br_b
140
85
 
141
86
    def get_balanced_branch_pair(self):
142
87
        """Returns br_a, br_b as with one commit in a, and b has a's stores."""
143
 
        tree_a, tree_b = self.get_unbalanced_tree_pair()
144
 
        tree_b.branch.repository.fetch(tree_a.branch.repository)
145
 
        return tree_a, tree_b
146
 
 
147
 
    def test_clone_partial(self):
 
88
        br_a, br_b = self.get_unbalanced_branch_pair()
 
89
        br_a.push_stores(br_b)
 
90
        return br_a, br_b
 
91
 
 
92
    def test_push_stores(self):
 
93
        """Copy the stores from one branch to another"""
 
94
        br_a, br_b = self.get_unbalanced_branch_pair()
 
95
        # ensure the revision is missing.
 
96
        self.assertRaises(NoSuchRevision, br_b.get_revision, 
 
97
                          br_a.revision_history()[0])
 
98
        br_a.push_stores(br_b)
 
99
        # check that b now has all the data from a's first commit.
 
100
        rev = br_b.get_revision(br_a.revision_history()[0])
 
101
        tree = br_b.revision_tree(br_a.revision_history()[0])
 
102
        for file_id in tree:
 
103
            if tree.inventory[file_id].kind == "file":
 
104
                tree.get_file(file_id).read()
 
105
        return br_a, br_b
 
106
 
 
107
    def test_copy_branch(self):
 
108
        """Copy the stores from one branch to another"""
 
109
        br_a, br_b = self.get_balanced_branch_pair()
 
110
        commit(br_b, "silly commit")
 
111
        os.mkdir('c')
 
112
        br_c = copy_branch(br_a, 'c', basis_branch=br_b)
 
113
        self.assertEqual(br_a.revision_history(), br_c.revision_history())
 
114
 
 
115
    def test_copy_partial(self):
148
116
        """Copy only part of the history of a branch."""
149
 
        # TODO: RBC 20060208 test with a revision not on revision-history.
150
 
        #       what should that behaviour be ? Emailed the list.
151
 
        # First, make a branch with two commits.
152
 
        wt_a = self.make_branch_and_tree('a')
153
 
        self.build_tree(['a/one'])
154
 
        wt_a.add(['one'])
155
 
        wt_a.commit('commit one', rev_id='1')
 
117
        self.build_tree(['a/', 'a/one'])
 
118
        br_a = Branch.initialize('a')
 
119
        br_a.working_tree().add(['one'])
 
120
        br_a.working_tree().commit('commit one', rev_id='u@d-1')
156
121
        self.build_tree(['a/two'])
157
 
        wt_a.add(['two'])
158
 
        wt_a.commit('commit two', rev_id='2')
159
 
        # Now make a copy of the repository.
160
 
        repo_b = self.make_repository('b')
161
 
        wt_a.branch.repository.copy_content_into(repo_b)
162
 
        # wt_a might be a lightweight checkout, so get a hold of the actual
163
 
        # branch (because you can't do a partial clone of a lightweight
164
 
        # checkout).
165
 
        branch = wt_a.branch.bzrdir.open_branch()
166
 
        # Then make a branch where the new repository is, but specify a revision
167
 
        # ID.  The new branch's history will stop at the specified revision.
168
 
        br_b = branch.clone(repo_b.bzrdir, revision_id='1')
169
 
        self.assertEqual('1', br_b.last_revision())
170
 
 
171
 
    def get_parented_branch(self):
172
 
        wt_a = self.make_branch_and_tree('a')
173
 
        self.build_tree(['a/one'])
174
 
        wt_a.add(['one'])
175
 
        wt_a.commit('commit one', rev_id='1')
176
 
 
177
 
        branch_b = wt_a.branch.bzrdir.sprout('b', revision_id='1').open_branch()
178
 
        self.assertEqual(wt_a.branch.base, branch_b.get_parent())
179
 
        return branch_b
180
 
 
181
 
    def test_clone_branch_nickname(self):
182
 
        # test the nick name is preserved always
183
 
        raise TestSkipped('XXX branch cloning is not yet tested.')
184
 
 
185
 
    def test_clone_branch_parent(self):
186
 
        # test the parent is preserved always
187
 
        branch_b = self.get_parented_branch()
188
 
        repo_c = self.make_repository('c')
189
 
        branch_b.repository.copy_content_into(repo_c)
190
 
        branch_c = branch_b.clone(repo_c.bzrdir)
191
 
        self.assertNotEqual(None, branch_c.get_parent())
192
 
        self.assertEqual(branch_b.get_parent(), branch_c.get_parent())
193
 
 
194
 
        # We can also set a specific parent, and it should be honored
195
 
        random_parent = 'http://bazaar-vcs.org/path/to/branch'
196
 
        branch_b.set_parent(random_parent)
197
 
        repo_d = self.make_repository('d')
198
 
        branch_b.repository.copy_content_into(repo_d)
199
 
        branch_d = branch_b.clone(repo_d.bzrdir)
200
 
        self.assertEqual(random_parent, branch_d.get_parent())
201
 
 
202
 
    def test_submit_branch(self):
203
 
        """Submit location can be queried and set"""
204
 
        branch = self.make_branch('branch')
205
 
        self.assertEqual(branch.get_submit_branch(), None)
206
 
        branch.set_submit_branch('sftp://example.com')
207
 
        self.assertEqual(branch.get_submit_branch(), 'sftp://example.com')
208
 
        branch.set_submit_branch('sftp://example.net')
209
 
        self.assertEqual(branch.get_submit_branch(), 'sftp://example.net')
 
122
        br_a.working_tree().add(['two'])
 
123
        br_a.working_tree().commit('commit two', rev_id='u@d-2')
 
124
        br_b = copy_branch(br_a, 'b', revision='u@d-1')
 
125
        self.assertEqual(br_b.last_revision(), 'u@d-1')
 
126
        self.assertTrue(os.path.exists('b/one'))
 
127
        self.assertFalse(os.path.exists('b/two'))
210
128
        
211
 
    def test_public_branch(self):
212
 
        """public location can be queried and set"""
213
 
        branch = self.make_branch('branch')
214
 
        self.assertEqual(branch.get_public_branch(), None)
215
 
        branch.set_public_branch('sftp://example.com')
216
 
        self.assertEqual(branch.get_public_branch(), 'sftp://example.com')
217
 
        branch.set_public_branch('sftp://example.net')
218
 
        self.assertEqual(branch.get_public_branch(), 'sftp://example.net')
219
 
        branch.set_public_branch(None)
220
 
        self.assertEqual(branch.get_public_branch(), None)
221
 
 
222
 
    def test_record_initial_ghost(self):
223
 
        """Branches should support having ghosts."""
224
 
        wt = self.make_branch_and_tree('.')
225
 
        wt.set_parent_ids(['non:existent@rev--ision--0--2'],
226
 
            allow_leftmost_as_ghost=True)
227
 
        self.assertEqual(['non:existent@rev--ision--0--2'],
228
 
            wt.get_parent_ids())
229
 
        rev_id = wt.commit('commit against a ghost first parent.')
230
 
        rev = wt.branch.repository.get_revision(rev_id)
231
 
        self.assertEqual(rev.parent_ids, ['non:existent@rev--ision--0--2'])
 
129
    def test_record_initial_ghost_merge(self):
 
130
        """A pending merge with no revision present is still a merge."""
 
131
        branch = Branch.initialize(u'.')
 
132
        branch.working_tree().add_pending_merge('non:existent@rev--ision--0--2')
 
133
        branch.working_tree().commit('pretend to merge nonexistent-revision', rev_id='first')
 
134
        rev = branch.get_revision(branch.last_revision())
 
135
        self.assertEqual(len(rev.parent_ids), 1)
232
136
        # parent_sha1s is not populated now, WTF. rbc 20051003
233
137
        self.assertEqual(len(rev.parent_sha1s), 0)
234
 
 
235
 
    def test_record_two_ghosts(self):
236
 
        """Recording with all ghosts works."""
237
 
        wt = self.make_branch_and_tree('.')
238
 
        wt.set_parent_ids([
239
 
                'foo@azkhazan-123123-abcabc',
240
 
                'wibble@fofof--20050401--1928390812',
241
 
            ],
242
 
            allow_leftmost_as_ghost=True)
243
 
        rev_id = wt.commit("commit from ghost base with one merge")
244
 
        # the revision should have been committed with two parents
245
 
        rev = wt.branch.repository.get_revision(rev_id)
246
 
        self.assertEqual(['foo@azkhazan-123123-abcabc',
247
 
            'wibble@fofof--20050401--1928390812'],
248
 
            rev.parent_ids)
 
138
        self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
249
139
 
250
140
    def test_bad_revision(self):
251
 
        self.assertRaises(errors.InvalidRevisionId,
252
 
                          self.get_branch().repository.get_revision,
253
 
                          None)
 
141
        branch = Branch.initialize(u'.')
 
142
        self.assertRaises(errors.InvalidRevisionId, branch.get_revision, None)
254
143
 
255
144
# TODO 20051003 RBC:
256
145
# compare the gpg-to-sign info for a commit with a ghost and 
257
146
#     an identical tree without a ghost
258
147
# fetch missing should rewrite the TOC of weaves to list newly available parents.
259
148
        
 
149
    def test_pending_merges(self):
 
150
        """Tracking pending-merged revisions."""
 
151
        b = Branch.initialize(u'.')
 
152
        wt = b.working_tree()
 
153
        self.assertEquals(wt.pending_merges(), [])
 
154
        wt.add_pending_merge('foo@azkhazan-123123-abcabc')
 
155
        self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
 
156
        wt.add_pending_merge('foo@azkhazan-123123-abcabc')
 
157
        self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
 
158
        wt.add_pending_merge('wibble@fofof--20050401--1928390812')
 
159
        self.assertEquals(wt.pending_merges(),
 
160
                          ['foo@azkhazan-123123-abcabc',
 
161
                           'wibble@fofof--20050401--1928390812'])
 
162
        b.working_tree().commit("commit from base with two merges")
 
163
        rev = b.get_revision(b.revision_history()[0])
 
164
        self.assertEquals(len(rev.parent_ids), 2)
 
165
        self.assertEquals(rev.parent_ids[0],
 
166
                          'foo@azkhazan-123123-abcabc')
 
167
        self.assertEquals(rev.parent_ids[1],
 
168
                           'wibble@fofof--20050401--1928390812')
 
169
        # list should be cleared when we do a commit
 
170
        self.assertEquals(wt.pending_merges(), [])
 
171
 
260
172
    def test_sign_existing_revision(self):
261
 
        wt = self.make_branch_and_tree('.')
262
 
        branch = wt.branch
263
 
        wt.commit("base", allow_pointless=True, rev_id='A')
 
173
        branch = Branch.initialize(u'.')
 
174
        branch.working_tree().commit("base", allow_pointless=True, rev_id='A')
264
175
        from bzrlib.testament import Testament
265
 
        strategy = gpg.LoopbackGPGStrategy(None)
266
 
        branch.repository.lock_write()
267
 
        branch.repository.start_write_group()
268
 
        branch.repository.sign_revision('A', strategy)
269
 
        branch.repository.commit_write_group()
270
 
        branch.repository.unlock()
271
 
        self.assertEqual('-----BEGIN PSEUDO-SIGNED CONTENT-----\n' +
272
 
                         Testament.from_revision(branch.repository,
273
 
                         'A').as_short_text() +
274
 
                         '-----END PSEUDO-SIGNED CONTENT-----\n',
275
 
                         branch.repository.get_signature_text('A'))
 
176
        branch.sign_revision('A', bzrlib.gpg.LoopbackGPGStrategy(None))
 
177
        self.assertEqual(Testament.from_revision(branch, 'A').as_short_text(),
 
178
                         branch.revision_store.get('A', 'sig').read())
276
179
 
277
180
    def test_store_signature(self):
278
 
        wt = self.make_branch_and_tree('.')
279
 
        branch = wt.branch
280
 
        branch.lock_write()
281
 
        try:
282
 
            branch.repository.start_write_group()
283
 
            try:
284
 
                branch.repository.store_revision_signature(
285
 
                    gpg.LoopbackGPGStrategy(None), 'FOO', 'A')
286
 
            except:
287
 
                branch.repository.abort_write_group()
288
 
                raise
289
 
            else:
290
 
                branch.repository.commit_write_group()
291
 
        finally:
292
 
            branch.unlock()
293
 
        # A signature without a revision should not be accessible.
294
 
        self.assertRaises(errors.NoSuchRevision,
295
 
                          branch.repository.has_signature_for_revision_id,
296
 
                          'A')
297
 
        wt.commit("base", allow_pointless=True, rev_id='A')
298
 
        self.assertEqual('-----BEGIN PSEUDO-SIGNED CONTENT-----\n'
299
 
                         'FOO-----END PSEUDO-SIGNED CONTENT-----\n',
300
 
                         branch.repository.get_signature_text('A'))
301
 
 
302
 
    def test_branch_keeps_signatures(self):
303
 
        wt = self.make_branch_and_tree('source')
304
 
        wt.commit('A', allow_pointless=True, rev_id='A')
305
 
        repo = wt.branch.repository
306
 
        repo.lock_write()
307
 
        repo.start_write_group()
308
 
        repo.sign_revision('A', gpg.LoopbackGPGStrategy(None))
309
 
        repo.commit_write_group()
310
 
        repo.unlock()
311
 
        #FIXME: clone should work to urls,
312
 
        # wt.clone should work to disks.
313
 
        self.build_tree(['target/'])
314
 
        d2 = repo.bzrdir.clone(urlutils.local_path_to_url('target'))
315
 
        self.assertEqual(repo.get_signature_text('A'),
316
 
                         d2.open_repository().get_signature_text('A'))
317
 
 
318
 
    def test_missing_revisions(self):
319
 
        t1 = self.make_branch_and_tree('b1')
320
 
        rev1 = t1.commit('one')
321
 
        t2 = t1.bzrdir.sprout('b2').open_workingtree()
322
 
        rev2 = t1.commit('two')
323
 
        rev3 = t1.commit('three')
324
 
 
325
 
        self.assertEqual([rev2, rev3],
326
 
            self.applyDeprecated(deprecated_in((1, 6, 0)),
327
 
            t2.branch.missing_revisions, t1.branch))
328
 
 
329
 
        self.assertEqual([],
330
 
            self.applyDeprecated(deprecated_in((1, 6, 0)),
331
 
            t2.branch.missing_revisions, t1.branch, stop_revision=1))
332
 
        self.assertEqual([rev2],
333
 
            self.applyDeprecated(deprecated_in((1, 6, 0)),
334
 
            t2.branch.missing_revisions, t1.branch, stop_revision=2))
335
 
        self.assertEqual([rev2, rev3],
336
 
            self.applyDeprecated(deprecated_in((1, 6, 0)),
337
 
            t2.branch.missing_revisions, t1.branch, stop_revision=3))
338
 
 
339
 
        self.assertRaises(errors.NoSuchRevision,
340
 
            self.applyDeprecated, deprecated_in((1, 6, 0)),
341
 
            t2.branch.missing_revisions, t1.branch, stop_revision=4)
342
 
 
343
 
        rev4 = t2.commit('four')
344
 
        self.assertRaises(errors.DivergedBranches,
345
 
            self.applyDeprecated, deprecated_in((1, 6, 0)),
346
 
            t2.branch.missing_revisions, t1.branch)
 
181
        branch = Branch.initialize(u'.')
 
182
        branch.store_revision_signature(bzrlib.gpg.LoopbackGPGStrategy(None),
 
183
                                        'FOO', 'A')
 
184
        self.assertEqual('FOO', branch.revision_store.get('A', 'sig').read())
 
185
 
 
186
    def test__relcontrolfilename(self):
 
187
        branch = Branch.initialize(u'.')
 
188
        self.assertEqual('.bzr/%25', branch._rel_controlfilename('%'))
 
189
        
 
190
    def test__relcontrolfilename_empty(self):
 
191
        branch = Branch.initialize(u'.')
 
192
        self.assertEqual('.bzr', branch._rel_controlfilename(''))
347
193
 
348
194
    def test_nicks(self):
349
 
        """Test explicit and implicit branch nicknames.
350
 
        
351
 
        Nicknames are implicitly the name of the branch's directory, unless an
352
 
        explicit nickname is set.  That is, an explicit nickname always
353
 
        overrides the implicit one.
354
 
        """
355
 
        t = get_transport(self.get_url())
356
 
        branch = self.make_branch('bzr.dev')
357
 
        # The nick will be 'bzr.dev', because there is no explicit nick set.
 
195
        """Branch nicknames"""
 
196
        os.mkdir('bzr.dev')
 
197
        branch = Branch.initialize('bzr.dev')
358
198
        self.assertEqual(branch.nick, 'bzr.dev')
359
 
        # Move the branch to a different directory, 'bzr.ab'.  Now that branch
360
 
        # will report its nick as 'bzr.ab'.
361
 
        t.move('bzr.dev', 'bzr.ab')
362
 
        branch = Branch.open(self.get_url('bzr.ab'))
 
199
        os.rename('bzr.dev', 'bzr.ab')
 
200
        branch = Branch.open('bzr.ab')
363
201
        self.assertEqual(branch.nick, 'bzr.ab')
364
 
        # Set the branch nick explicitly.  This will ensure there's a branch
365
 
        # config file in the branch.
366
 
        branch.nick = "Aaron's branch"
367
 
        if not isinstance(branch, remote.RemoteBranch):
368
 
            self.failUnless(branch._transport.has("branch.conf"))
369
 
        # Because the nick has been set explicitly, the nick is now always
370
 
        # "Aaron's branch", regardless of directory name.
 
202
        branch.nick = "Aaron's branch"
 
203
        branch.nick = "Aaron's branch"
 
204
        self.failUnless(os.path.exists(branch.controlfilename("branch.conf")))
371
205
        self.assertEqual(branch.nick, "Aaron's branch")
372
 
        t.move('bzr.ab', 'integration')
373
 
        branch = Branch.open(self.get_url('integration'))
 
206
        os.rename('bzr.ab', 'integration')
 
207
        branch = Branch.open('integration')
374
208
        self.assertEqual(branch.nick, "Aaron's branch")
375
209
        branch.nick = u"\u1234"
376
210
        self.assertEqual(branch.nick, u"\u1234")
377
211
 
378
212
    def test_commit_nicks(self):
379
213
        """Nicknames are committed to the revision"""
380
 
        wt = self.make_branch_and_tree('bzr.dev')
381
 
        branch = wt.branch
 
214
        os.mkdir('bzr.dev')
 
215
        branch = Branch.initialize('bzr.dev')
382
216
        branch.nick = "My happy branch"
383
 
        wt.commit('My commit respect da nick.')
384
 
        committed = branch.repository.get_revision(branch.last_revision())
385
 
        self.assertEqual(committed.properties["branch-nick"],
 
217
        branch.working_tree().commit('My commit respect da nick.')
 
218
        committed = branch.get_revision(branch.last_revision())
 
219
        self.assertEqual(committed.properties["branch-nick"], 
386
220
                         "My happy branch")
387
221
 
388
 
    def test_create_open_branch_uses_repository(self):
389
 
        try:
390
 
            repo = self.make_repository('.', shared=True)
391
 
        except errors.IncompatibleFormat:
392
 
            return
393
 
        child_transport = repo.bzrdir.root_transport.clone('child')
394
 
        child_transport.mkdir('.')
395
 
        child_dir = self.bzrdir_format.initialize_on_transport(child_transport)
396
 
        try:
397
 
            child_branch = self.branch_format.initialize(child_dir)
398
 
        except errors.UninitializableFormat:
399
 
            # branch references are not default init'able.
400
 
            return
401
 
        self.assertEqual(repo.bzrdir.root_transport.base,
402
 
                         child_branch.repository.bzrdir.root_transport.base)
403
 
        child_branch = branch.Branch.open(self.get_url('child'))
404
 
        self.assertEqual(repo.bzrdir.root_transport.base,
405
 
                         child_branch.repository.bzrdir.root_transport.base)
406
 
 
407
 
    def test_format_description(self):
408
 
        tree = self.make_branch_and_tree('tree')
409
 
        text = tree.branch._format.get_format_description()
410
 
        self.failUnless(len(text))
411
 
 
412
 
    def test_get_commit_builder(self):
413
 
        branch = self.make_branch(".")
414
 
        branch.lock_write()
415
 
        builder = branch.get_commit_builder([])
416
 
        self.assertIsInstance(builder, repository.CommitBuilder)
417
 
        branch.repository.commit_write_group()
418
 
        branch.unlock()
419
 
 
420
 
    def test_generate_revision_history(self):
421
 
        """Create a fake revision history easily."""
422
 
        tree = self.make_branch_and_tree('.')
423
 
        rev1 = tree.commit('foo')
424
 
        orig_history = tree.branch.revision_history()
425
 
        rev2 = tree.commit('bar', allow_pointless=True)
426
 
        tree.branch.generate_revision_history(rev1)
427
 
        self.assertEqual(orig_history, tree.branch.revision_history())
428
 
 
429
 
    def test_generate_revision_history_NULL_REVISION(self):
430
 
        tree = self.make_branch_and_tree('.')
431
 
        rev1 = tree.commit('foo')
432
 
        tree.branch.generate_revision_history(bzrlib.revision.NULL_REVISION)
433
 
        self.assertEqual([], tree.branch.revision_history())
434
 
 
435
 
    def test_create_checkout(self):
436
 
        tree_a = self.make_branch_and_tree('a')
437
 
        branch_a = tree_a.branch
438
 
        checkout_b = branch_a.create_checkout('b')
439
 
        self.assertEqual('null:', checkout_b.last_revision())
440
 
        checkout_b.commit('rev1', rev_id='rev1')
441
 
        self.assertEqual('rev1', branch_a.last_revision())
442
 
        self.assertNotEqual(checkout_b.branch.base, branch_a.base)
443
 
 
444
 
        checkout_c = branch_a.create_checkout('c', lightweight=True)
445
 
        self.assertEqual('rev1', checkout_c.last_revision())
446
 
        checkout_c.commit('rev2', rev_id='rev2')
447
 
        self.assertEqual('rev2', branch_a.last_revision())
448
 
        self.assertEqual(checkout_c.branch.base, branch_a.base)
449
 
 
450
 
        os.mkdir('d')
451
 
        checkout_d = branch_a.create_checkout('d', lightweight=True)
452
 
        self.assertEqual('rev2', checkout_d.last_revision())
453
 
        os.mkdir('e')
454
 
        checkout_e = branch_a.create_checkout('e')
455
 
        self.assertEqual('rev2', checkout_e.last_revision())
456
 
 
457
 
    def test_create_anonymous_lightweight_checkout(self):
458
 
        """A lightweight checkout from a readonly branch should succeed."""
459
 
        tree_a = self.make_branch_and_tree('a')
460
 
        rev_id = tree_a.commit('put some content in the branch')
461
 
        # open the branch via a readonly transport
462
 
        source_branch = bzrlib.branch.Branch.open(self.get_readonly_url('a'))
463
 
        # sanity check that the test will be valid
464
 
        self.assertRaises((errors.LockError, errors.TransportNotPossible),
465
 
            source_branch.lock_write)
466
 
        checkout = source_branch.create_checkout('c', lightweight=True)
467
 
        self.assertEqual(rev_id, checkout.last_revision())
468
 
 
469
 
    def test_create_anonymous_heavyweight_checkout(self):
470
 
        """A regular checkout from a readonly branch should succeed."""
471
 
        tree_a = self.make_branch_and_tree('a')
472
 
        rev_id = tree_a.commit('put some content in the branch')
473
 
        # open the branch via a readonly transport
474
 
        source_branch = bzrlib.branch.Branch.open(self.get_readonly_url('a'))
475
 
        # sanity check that the test will be valid
476
 
        self.assertRaises((errors.LockError, errors.TransportNotPossible),
477
 
            source_branch.lock_write)
478
 
        checkout = source_branch.create_checkout('c')
479
 
        self.assertEqual(rev_id, checkout.last_revision())
480
 
 
481
 
    def test_set_revision_history(self):
482
 
        tree = self.make_branch_and_tree('a')
483
 
        tree.commit('a commit', rev_id='rev1')
484
 
        br = tree.branch
485
 
        br.set_revision_history(["rev1"])
486
 
        self.assertEquals(br.revision_history(), ["rev1"])
487
 
        br.set_revision_history([])
488
 
        self.assertEquals(br.revision_history(), [])
489
 
 
490
 
 
491
 
class ChrootedTests(TestCaseWithBranch):
492
 
    """A support class that provides readonly urls outside the local namespace.
493
 
 
494
 
    This is done by checking if self.transport_server is a MemoryServer. if it
495
 
    is then we are chrooted already, if it is not then an HttpServer is used
496
 
    for readonly urls.
497
 
    """
498
 
 
499
 
    def setUp(self):
500
 
        super(ChrootedTests, self).setUp()
501
 
        if not self.vfs_transport_factory == MemoryServer:
502
 
            self.transport_readonly_server = HttpServer
 
222
 
 
223
class TestRemote(TestCaseWithWebserver):
503
224
 
504
225
    def test_open_containing(self):
505
226
        self.assertRaises(NotBranchError, Branch.open_containing,
506
 
                          self.get_readonly_url(''))
 
227
                          self.get_remote_url(''))
507
228
        self.assertRaises(NotBranchError, Branch.open_containing,
508
 
                          self.get_readonly_url('g/p/q'))
509
 
        branch = self.make_branch('.')
510
 
        branch, relpath = Branch.open_containing(self.get_readonly_url(''))
 
229
                          self.get_remote_url('g/p/q'))
 
230
        b = Branch.initialize(u'.')
 
231
        branch, relpath = Branch.open_containing(self.get_remote_url(''))
511
232
        self.assertEqual('', relpath)
512
 
        branch, relpath = Branch.open_containing(self.get_readonly_url('g/p/q'))
 
233
        branch, relpath = Branch.open_containing(self.get_remote_url('g/p/q'))
513
234
        self.assertEqual('g/p/q', relpath)
514
 
 
 
235
        
 
236
# TODO: rewrite this as a regular unittest, without relying on the displayed output        
 
237
#         >>> from bzrlib.commit import commit
 
238
#         >>> bzrlib.trace.silent = True
 
239
#         >>> br1 = ScratchBranch(files=['foo', 'bar'])
 
240
#         >>> br1.working_tree().add('foo')
 
241
#         >>> br1.working_tree().add('bar')
 
242
#         >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
 
243
#         >>> br2 = ScratchBranch()
 
244
#         >>> br2.update_revisions(br1)
 
245
#         Added 2 texts.
 
246
#         Added 1 inventories.
 
247
#         Added 1 revisions.
 
248
#         >>> br2.revision_history()
 
249
#         [u'REVISION-ID-1']
 
250
#         >>> br2.update_revisions(br1)
 
251
#         Added 0 revisions.
 
252
#         >>> br1.text_store.total_size() == br2.text_store.total_size()
 
253
#         True
515
254
 
516
255
class InstrumentedTransaction(object):
517
256
 
576
315
        self.assertEqual(['lw', 'ul'], branch._calls)
577
316
 
578
317
 
579
 
class TestBranchPushLocations(TestCaseWithBranch):
580
 
 
 
318
class TestBranchTransaction(TestCaseInTempDir):
 
319
 
 
320
    def setUp(self):
 
321
        super(TestBranchTransaction, self).setUp()
 
322
        self.branch = Branch.initialize(u'.')
 
323
        
 
324
    def test_default_get_transaction(self):
 
325
        """branch.get_transaction on a new branch should give a PassThrough."""
 
326
        self.failUnless(isinstance(self.branch.get_transaction(),
 
327
                                   transactions.PassThroughTransaction))
 
328
 
 
329
    def test__set_new_transaction(self):
 
330
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
331
 
 
332
    def test__set_over_existing_transaction_raises(self):
 
333
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
334
        self.assertRaises(errors.LockError,
 
335
                          self.branch._set_transaction,
 
336
                          transactions.ReadOnlyTransaction())
 
337
 
 
338
    def test_finish_no_transaction_raises(self):
 
339
        self.assertRaises(errors.LockError, self.branch._finish_transaction)
 
340
 
 
341
    def test_finish_readonly_transaction_works(self):
 
342
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
343
        self.branch._finish_transaction()
 
344
        self.assertEqual(None, self.branch._transaction)
 
345
 
 
346
    def test_unlock_calls_finish(self):
 
347
        self.branch.lock_read()
 
348
        transaction = InstrumentedTransaction()
 
349
        self.branch._transaction = transaction
 
350
        self.branch.unlock()
 
351
        self.assertEqual(['finish'], transaction.calls)
 
352
 
 
353
    def test_lock_read_acquires_ro_transaction(self):
 
354
        self.branch.lock_read()
 
355
        self.failUnless(isinstance(self.branch.get_transaction(),
 
356
                                   transactions.ReadOnlyTransaction))
 
357
        self.branch.unlock()
 
358
        
 
359
    def test_lock_write_acquires_passthrough_transaction(self):
 
360
        self.branch.lock_write()
 
361
        # cannot use get_transaction as its magic
 
362
        self.failUnless(isinstance(self.branch._transaction,
 
363
                                   transactions.PassThroughTransaction))
 
364
        self.branch.unlock()
 
365
 
 
366
 
 
367
class TestBranchPushLocations(TestCaseInTempDir):
 
368
 
 
369
    def setUp(self):
 
370
        super(TestBranchPushLocations, self).setUp()
 
371
        self.branch = Branch.initialize(u'.')
 
372
        
581
373
    def test_get_push_location_unset(self):
582
 
        self.assertEqual(None, self.get_branch().get_push_location())
 
374
        self.assertEqual(None, self.branch.get_push_location())
583
375
 
584
376
    def test_get_push_location_exact(self):
585
 
        from bzrlib.config import (locations_config_filename,
586
 
                                   ensure_config_dir_exists)
587
 
        ensure_config_dir_exists()
588
 
        fn = locations_config_filename()
589
 
        open(fn, 'wt').write(("[%s]\n"
590
 
                                  "push_location=foo\n" %
591
 
                                  self.get_branch().base[:-1]))
592
 
        self.assertEqual("foo", self.get_branch().get_push_location())
 
377
        self.build_tree(['.bazaar/'])
 
378
        print >> open('.bazaar/branches.conf', 'wt'), ("[%s]\n"
 
379
                                                       "push_location=foo" %
 
380
                                                       os.getcwdu())
 
381
        self.assertEqual("foo", self.branch.get_push_location())
593
382
 
594
383
    def test_set_push_location(self):
595
 
        branch = self.get_branch()
596
 
        branch.set_push_location('foo')
597
 
        self.assertEqual('foo', branch.get_push_location())
598
 
 
599
 
 
600
 
class TestFormat(TestCaseWithBranch):
601
 
    """Tests for the format itself."""
602
 
 
603
 
    def test_get_reference(self):
604
 
        """get_reference on all regular branches should return None."""
605
 
        if not self.branch_format.is_supported():
606
 
            # unsupported formats are not loopback testable
607
 
            # because the default open will not open them and
608
 
            # they may not be initializable.
609
 
            return
610
 
        made_branch = self.make_branch('.')
611
 
        self.assertEqual(None,
612
 
            made_branch._format.get_reference(made_branch.bzrdir))
613
 
 
614
 
    def test_set_reference(self):
615
 
        """set_reference on all regular branches should be callable."""
616
 
        if not self.branch_format.is_supported():
617
 
            # unsupported formats are not loopback testable
618
 
            # because the default open will not open them and
619
 
            # they may not be initializable.
620
 
            return
621
 
        this_branch = self.make_branch('this')
622
 
        other_branch = self.make_branch('other')
623
 
        try:
624
 
            this_branch._format.set_reference(this_branch.bzrdir, other_branch)
625
 
        except NotImplementedError:
626
 
            # that's ok
627
 
            pass
628
 
        else:
629
 
            ref = this_branch._format.get_reference(this_branch.bzrdir)
630
 
            self.assertEqual(ref, other_branch.base)
631
 
 
632
 
    def test_format_initialize_find_open(self):
633
 
        # loopback test to check the current format initializes to itself.
634
 
        if not self.branch_format.is_supported():
635
 
            # unsupported formats are not loopback testable
636
 
            # because the default open will not open them and
637
 
            # they may not be initializable.
638
 
            return
639
 
        # supported formats must be able to init and open
640
 
        t = get_transport(self.get_url())
641
 
        readonly_t = get_transport(self.get_readonly_url())
642
 
        made_branch = self.make_branch('.')
643
 
        self.failUnless(isinstance(made_branch, branch.Branch))
644
 
 
645
 
        # find it via bzrdir opening:
646
 
        opened_control = bzrdir.BzrDir.open(readonly_t.base)
647
 
        direct_opened_branch = opened_control.open_branch()
648
 
        self.assertEqual(direct_opened_branch.__class__, made_branch.__class__)
649
 
        self.assertEqual(opened_control, direct_opened_branch.bzrdir)
650
 
        self.failUnless(isinstance(direct_opened_branch._format,
651
 
                        self.branch_format.__class__))
652
 
 
653
 
        # find it via Branch.open
654
 
        opened_branch = branch.Branch.open(readonly_t.base)
655
 
        self.failUnless(isinstance(opened_branch, made_branch.__class__))
656
 
        self.assertEqual(made_branch._format.__class__,
657
 
                         opened_branch._format.__class__)
658
 
        # if it has a unique id string, can we probe for it ?
659
 
        try:
660
 
            self.branch_format.get_format_string()
661
 
        except NotImplementedError:
662
 
            return
663
 
        self.assertEqual(self.branch_format,
664
 
                         opened_control.find_branch_format())
665
 
 
666
 
 
667
 
class TestBound(TestCaseWithBranch):
668
 
 
669
 
    def test_bind_unbind(self):
670
 
        branch = self.make_branch('1')
671
 
        branch2 = self.make_branch('2')
672
 
        try:
673
 
            branch.bind(branch2)
674
 
        except errors.UpgradeRequired:
675
 
            raise tests.TestNotApplicable('Format does not support binding')
676
 
        self.assertTrue(branch.unbind())
677
 
        self.assertFalse(branch.unbind())
678
 
        self.assertIs(None, branch.get_bound_location())
679
 
 
680
 
    def test_old_bound_location(self):
681
 
        branch = self.make_branch('branch1')
682
 
        try:
683
 
            self.assertIs(None, branch.get_old_bound_location())
684
 
        except errors.UpgradeRequired:
685
 
            raise tests.TestNotApplicable(
686
 
                    'Format does not store old bound locations')
687
 
        branch2 = self.make_branch('branch2')
688
 
        branch.bind(branch2)
689
 
        self.assertIs(None, branch.get_old_bound_location())
690
 
        branch.unbind()
691
 
        self.assertContainsRe(branch.get_old_bound_location(), '\/branch2\/$')
692
 
 
693
 
    def test_bind_diverged(self):
694
 
        tree_a = self.make_branch_and_tree('tree_a')
695
 
        tree_a.commit('rev1a')
696
 
        tree_b = tree_a.bzrdir.sprout('tree_b').open_workingtree()
697
 
        tree_a.commit('rev2a')
698
 
        tree_b.commit('rev2b')
699
 
        try:
700
 
            tree_b.branch.bind(tree_a.branch)
701
 
        except errors.UpgradeRequired:
702
 
            raise tests.TestNotApplicable('Format does not support binding')
703
 
 
704
 
 
705
 
class TestStrict(TestCaseWithBranch):
706
 
 
707
 
    def test_strict_history(self):
708
 
        tree1 = self.make_branch_and_tree('tree1')
709
 
        try:
710
 
            tree1.branch.set_append_revisions_only(True)
711
 
        except errors.UpgradeRequired:
712
 
            raise TestSkipped('Format does not support strict history')
713
 
        tree1.commit('empty commit')
714
 
        tree2 = tree1.bzrdir.sprout('tree2').open_workingtree()
715
 
        tree2.commit('empty commit 2')
716
 
        tree1.pull(tree2.branch)
717
 
        tree1.commit('empty commit 3')
718
 
        tree2.commit('empty commit 4')
719
 
        self.assertRaises(errors.DivergedBranches, tree1.pull, tree2.branch)
720
 
        tree2.merge_from_branch(tree1.branch)
721
 
        tree2.commit('empty commit 5')
722
 
        self.assertRaises(errors.AppendRevisionsOnlyViolation, tree1.pull,
723
 
                          tree2.branch)
724
 
        tree3 = tree1.bzrdir.sprout('tree3').open_workingtree()
725
 
        tree3.merge_from_branch(tree2.branch)
726
 
        tree3.commit('empty commit 6')
727
 
        tree2.pull(tree3.branch)
 
384
        self.branch.set_push_location('foo')
 
385
        self.assertFileEqual("[%s]\n"
 
386
                             "push_location = foo" % os.getcwdu(),
 
387
                             '.bazaar/branches.conf')
 
388
 
 
389
    # TODO RBC 20051029 test getting a push location from a branch in a 
 
390
    # recursive section - that is, it appends the branch name.