~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/selftest/testbranch.py

  • Committer: Robert Collins
  • Date: 2005-10-17 11:41:07 UTC
  • mfrom: (1442.1.60)
  • Revision ID: robertc@robertcollins.net-20051017114107-f5586285d825c105
Merge in first part of GPG support.

This adds check_signatures config support, triams back the transport api
to be leaner and easier to hook in suffixes - non primary streams in the store
associated with the fileid that primary data is stored in, a gpg module which
will encapsulate all signing and checking operations.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011 Canonical Ltd
2
 
#
 
1
# (C) 2005 Canonical Ltd
 
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
#
 
7
 
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
#
 
12
 
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
"""Tests for branch implementations - tests a branch format."""
18
 
 
19
 
from bzrlib import (
20
 
    branch as _mod_branch,
21
 
    bzrdir,
22
 
    config,
23
 
    delta as _mod_delta,
24
 
    errors,
25
 
    gpg,
26
 
    merge,
27
 
    urlutils,
28
 
    transport,
29
 
    remote,
30
 
    repository,
31
 
    revision,
32
 
    symbol_versioning,
33
 
    tests,
34
 
    )
35
 
from bzrlib.tests import (
36
 
    per_branch,
37
 
    )
38
 
from bzrlib.tests.http_server import HttpServer
39
 
from bzrlib.transport import memory
40
 
 
41
 
 
42
 
class TestTestCaseWithBranch(per_branch.TestCaseWithBranch):
43
 
 
44
 
    def test_branch_format_matches_bzrdir_branch_format(self):
45
 
        bzrdir_branch_format = self.bzrdir_format.get_branch_format()
46
 
        self.assertIs(self.branch_format.__class__,
47
 
                      bzrdir_branch_format.__class__)
48
 
 
49
 
    def test_make_branch_gets_expected_format(self):
50
 
        branch = self.make_branch('.')
51
 
        self.assertIs(self.branch_format.__class__,
52
 
            branch._format.__class__)
53
 
 
54
 
 
55
 
class TestBranch(per_branch.TestCaseWithBranch):
56
 
 
57
 
    def test_create_tree_with_merge(self):
58
 
        tree = self.create_tree_with_merge()
59
 
        tree.lock_read()
60
 
        self.addCleanup(tree.unlock)
61
 
        graph = tree.branch.repository.get_graph()
62
 
        ancestry_graph = graph.get_parent_map(
63
 
            tree.branch.repository.all_revision_ids())
64
 
        self.assertEqual({'rev-1':('null:',),
65
 
                          'rev-2':('rev-1', ),
66
 
                          'rev-1.1.1':('rev-1', ),
67
 
                          'rev-3':('rev-2', 'rev-1.1.1', ),
68
 
                         }, ancestry_graph)
69
 
 
70
 
    def test_revision_ids_are_utf8(self):
71
 
        wt = self.make_branch_and_tree('tree')
72
 
        wt.commit('f', rev_id='rev1')
73
 
        wt.commit('f', rev_id='rev2')
74
 
        wt.commit('f', rev_id='rev3')
75
 
 
76
 
        br = self.get_branch()
77
 
        br.fetch(wt.branch)
78
 
        br.generate_revision_history('rev3')
79
 
        rh = br.revision_history()
80
 
        self.assertEqual(['rev1', 'rev2', 'rev3'], rh)
81
 
        for revision_id in rh:
82
 
            self.assertIsInstance(revision_id, str)
83
 
        last = br.last_revision()
84
 
        self.assertEqual('rev3', last)
85
 
        self.assertIsInstance(last, str)
86
 
        revno, last = br.last_revision_info()
87
 
        self.assertEqual(3, revno)
88
 
        self.assertEqual('rev3', last)
89
 
        self.assertIsInstance(last, str)
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
import os
 
18
from bzrlib.branch import Branch
 
19
from bzrlib.clone import copy_branch
 
20
from bzrlib.commit import commit
 
21
import bzrlib.errors as errors
 
22
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
 
23
from bzrlib.selftest import TestCaseInTempDir
 
24
from bzrlib.trace import mutter
 
25
import bzrlib.transactions as transactions
 
26
from bzrlib.selftest.HTTPTestUtil import TestCaseWithWebserver
 
27
 
 
28
class TestBranch(TestCaseInTempDir):
 
29
 
 
30
    def test_append_revisions(self):
 
31
        """Test appending more than one revision"""
 
32
        br = Branch.initialize(".")
 
33
        br.append_revision("rev1")
 
34
        self.assertEquals(br.revision_history(), ["rev1",])
 
35
        br.append_revision("rev2", "rev3")
 
36
        self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
90
37
 
91
38
    def test_fetch_revisions(self):
92
39
        """Test fetch-revision operation."""
93
 
        wt = self.make_branch_and_tree('b1')
94
 
        b1 = wt.branch
95
 
        self.build_tree_contents([('b1/foo', 'hello')])
96
 
        wt.add(['foo'], ['foo-id'])
97
 
        wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
98
 
 
99
 
        b2 = self.make_branch('b2')
100
 
        b2.fetch(b1)
101
 
 
102
 
        rev = b2.repository.get_revision('revision-1')
103
 
        tree = b2.repository.revision_tree('revision-1')
104
 
        tree.lock_read()
105
 
        self.addCleanup(tree.unlock)
106
 
        self.assertEqual(tree.get_file_text('foo-id'), 'hello')
107
 
 
108
 
    def test_get_revision_delta(self):
109
 
        tree_a = self.make_branch_and_tree('a')
110
 
        self.build_tree(['a/foo'])
111
 
        tree_a.add('foo', 'file1')
112
 
        tree_a.commit('rev1', rev_id='rev1')
113
 
        self.build_tree(['a/vla'])
114
 
        tree_a.add('vla', 'file2')
115
 
        tree_a.commit('rev2', rev_id='rev2')
116
 
 
117
 
        delta = tree_a.branch.get_revision_delta(1)
118
 
        self.assertIsInstance(delta, _mod_delta.TreeDelta)
119
 
        self.assertEqual([('foo', 'file1', 'file')], delta.added)
120
 
        delta = tree_a.branch.get_revision_delta(2)
121
 
        self.assertIsInstance(delta, _mod_delta.TreeDelta)
122
 
        self.assertEqual([('vla', 'file2', 'file')], delta.added)
123
 
 
124
 
    def get_unbalanced_tree_pair(self):
125
 
        """Return two branches, a and b, with one file in a."""
126
 
        tree_a = self.make_branch_and_tree('a')
127
 
        self.build_tree_contents([('a/b', 'b')])
128
 
        tree_a.add('b')
129
 
        tree_a.commit("silly commit", rev_id='A')
130
 
 
131
 
        tree_b = self.make_branch_and_tree('b')
132
 
        return tree_a, tree_b
133
 
 
134
 
    def get_balanced_branch_pair(self):
135
 
        """Returns br_a, br_b as with one commit in a, and b has a's stores."""
136
 
        tree_a, tree_b = self.get_unbalanced_tree_pair()
137
 
        tree_b.branch.repository.fetch(tree_a.branch.repository)
138
 
        return tree_a, tree_b
139
 
 
140
 
    def test_clone_partial(self):
 
40
        from bzrlib.fetch import Fetcher
 
41
        os.mkdir('b1')
 
42
        os.mkdir('b2')
 
43
        b1 = Branch.initialize('b1')
 
44
        b2 = Branch.initialize('b2')
 
45
        file(os.sep.join(['b1', 'foo']), 'w').write('hello')
 
46
        b1.add(['foo'], ['foo-id'])
 
47
        b1.commit('lala!', rev_id='revision-1', allow_pointless=False)
 
48
 
 
49
        mutter('start fetch')
 
50
        f = Fetcher(from_branch=b1, to_branch=b2)
 
51
        eq = self.assertEquals
 
52
        eq(f.count_copied, 1)
 
53
        eq(f.last_revision, 'revision-1')
 
54
 
 
55
        rev = b2.get_revision('revision-1')
 
56
        tree = b2.revision_tree('revision-1')
 
57
        eq(tree.get_file_text('foo-id'), 'hello')
 
58
 
 
59
    def test_push_stores(self):
 
60
        """Copy the stores from one branch to another"""
 
61
        os.mkdir('a')
 
62
        br_a = Branch.initialize("a")
 
63
        file('a/b', 'wb').write('b')
 
64
        br_a.add('b')
 
65
        commit(br_a, "silly commit")
 
66
 
 
67
        os.mkdir('b')
 
68
        br_b = Branch.initialize("b")
 
69
        self.assertRaises(NoSuchRevision, br_b.get_revision, 
 
70
                          br_a.revision_history()[0])
 
71
        br_a.push_stores(br_b)
 
72
        rev = br_b.get_revision(br_a.revision_history()[0])
 
73
        tree = br_b.revision_tree(br_a.revision_history()[0])
 
74
        for file_id in tree:
 
75
            if tree.inventory[file_id].kind == "file":
 
76
                tree.get_file(file_id).read()
 
77
        return br_a, br_b
 
78
 
 
79
    def test_copy_branch(self):
 
80
        """Copy the stores from one branch to another"""
 
81
        br_a, br_b = self.test_push_stores()
 
82
        commit(br_b, "silly commit")
 
83
        os.mkdir('c')
 
84
        br_c = copy_branch(br_a, 'c', basis_branch=br_b)
 
85
        self.assertEqual(br_a.revision_history(), br_c.revision_history())
 
86
        ## # basis branches currently disabled for weave format
 
87
        ## self.assertFalse(br_b.last_revision() in br_c.revision_history())
 
88
        ## br_c.get_revision(br_b.last_revision())
 
89
 
 
90
    def test_copy_partial(self):
141
91
        """Copy only part of the history of a branch."""
142
 
        # TODO: RBC 20060208 test with a revision not on revision-history.
143
 
        #       what should that behaviour be ? Emailed the list.
144
 
        # First, make a branch with two commits.
145
 
        wt_a = self.make_branch_and_tree('a')
146
 
        self.build_tree(['a/one'])
147
 
        wt_a.add(['one'])
148
 
        wt_a.commit('commit one', rev_id='1')
 
92
        self.build_tree(['a/', 'a/one'])
 
93
        br_a = Branch.initialize('a')
 
94
        br_a.add(['one'])
 
95
        br_a.commit('commit one', rev_id='u@d-1')
149
96
        self.build_tree(['a/two'])
150
 
        wt_a.add(['two'])
151
 
        wt_a.commit('commit two', rev_id='2')
152
 
        # Now make a copy of the repository.
153
 
        repo_b = self.make_repository('b')
154
 
        wt_a.branch.repository.copy_content_into(repo_b)
155
 
        # wt_a might be a lightweight checkout, so get a hold of the actual
156
 
        # branch (because you can't do a partial clone of a lightweight
157
 
        # checkout).
158
 
        branch = wt_a.branch.bzrdir.open_branch()
159
 
        # Then make a branch where the new repository is, but specify a revision
160
 
        # ID.  The new branch's history will stop at the specified revision.
161
 
        br_b = branch.clone(repo_b.bzrdir, revision_id='1')
162
 
        self.assertEqual('1', br_b.last_revision())
163
 
 
164
 
    def get_parented_branch(self):
165
 
        wt_a = self.make_branch_and_tree('a')
166
 
        self.build_tree(['a/one'])
167
 
        wt_a.add(['one'])
168
 
        wt_a.commit('commit one', rev_id='1')
169
 
 
170
 
        branch_b = wt_a.branch.bzrdir.sprout('b', revision_id='1').open_branch()
171
 
        self.assertEqual(wt_a.branch.base, branch_b.get_parent())
172
 
        return branch_b
173
 
 
174
 
    def test_clone_branch_nickname(self):
175
 
        # test the nick name is preserved always
176
 
        raise tests.TestSkipped('XXX branch cloning is not yet tested.')
177
 
 
178
 
    def test_clone_branch_parent(self):
179
 
        # test the parent is preserved always
180
 
        branch_b = self.get_parented_branch()
181
 
        repo_c = self.make_repository('c')
182
 
        branch_b.repository.copy_content_into(repo_c)
183
 
        branch_c = branch_b.clone(repo_c.bzrdir)
184
 
        self.assertNotEqual(None, branch_c.get_parent())
185
 
        self.assertEqual(branch_b.get_parent(), branch_c.get_parent())
186
 
 
187
 
        # We can also set a specific parent, and it should be honored
188
 
        random_parent = 'http://example.com/path/to/branch'
189
 
        branch_b.set_parent(random_parent)
190
 
        repo_d = self.make_repository('d')
191
 
        branch_b.repository.copy_content_into(repo_d)
192
 
        branch_d = branch_b.clone(repo_d.bzrdir)
193
 
        self.assertEqual(random_parent, branch_d.get_parent())
194
 
 
195
 
    def test_submit_branch(self):
196
 
        """Submit location can be queried and set"""
197
 
        branch = self.make_branch('branch')
198
 
        self.assertEqual(branch.get_submit_branch(), None)
199
 
        branch.set_submit_branch('sftp://example.com')
200
 
        self.assertEqual(branch.get_submit_branch(), 'sftp://example.com')
201
 
        branch.set_submit_branch('sftp://example.net')
202
 
        self.assertEqual(branch.get_submit_branch(), 'sftp://example.net')
203
 
 
204
 
    def test_public_branch(self):
205
 
        """public location can be queried and set"""
206
 
        branch = self.make_branch('branch')
207
 
        self.assertEqual(branch.get_public_branch(), None)
208
 
        branch.set_public_branch('sftp://example.com')
209
 
        self.assertEqual(branch.get_public_branch(), 'sftp://example.com')
210
 
        branch.set_public_branch('sftp://example.net')
211
 
        self.assertEqual(branch.get_public_branch(), 'sftp://example.net')
212
 
        branch.set_public_branch(None)
213
 
        self.assertEqual(branch.get_public_branch(), None)
214
 
 
215
 
    def test_record_initial_ghost(self):
216
 
        """Branches should support having ghosts."""
217
 
        wt = self.make_branch_and_tree('.')
218
 
        wt.set_parent_ids(['non:existent@rev--ision--0--2'],
219
 
            allow_leftmost_as_ghost=True)
220
 
        self.assertEqual(['non:existent@rev--ision--0--2'],
221
 
            wt.get_parent_ids())
222
 
        rev_id = wt.commit('commit against a ghost first parent.')
223
 
        rev = wt.branch.repository.get_revision(rev_id)
224
 
        self.assertEqual(rev.parent_ids, ['non:existent@rev--ision--0--2'])
 
97
        br_a.add(['two'])
 
98
        br_a.commit('commit two', rev_id='u@d-2')
 
99
        br_b = copy_branch(br_a, 'b', revision='u@d-1')
 
100
        self.assertEqual(br_b.last_revision(), 'u@d-1')
 
101
        self.assertTrue(os.path.exists('b/one'))
 
102
        self.assertFalse(os.path.exists('b/two'))
 
103
        
 
104
 
 
105
    def test_record_initial_ghost_merge(self):
 
106
        """A pending merge with no revision present is still a merge."""
 
107
        branch = Branch.initialize('.')
 
108
        branch.add_pending_merge('non:existent@rev--ision--0--2')
 
109
        branch.commit('pretend to merge nonexistent-revision', rev_id='first')
 
110
        rev = branch.get_revision(branch.last_revision())
 
111
        self.assertEqual(len(rev.parent_ids), 1)
225
112
        # parent_sha1s is not populated now, WTF. rbc 20051003
226
113
        self.assertEqual(len(rev.parent_sha1s), 0)
227
 
 
228
 
    def test_record_two_ghosts(self):
229
 
        """Recording with all ghosts works."""
230
 
        wt = self.make_branch_and_tree('.')
231
 
        wt.set_parent_ids([
232
 
                'foo@azkhazan-123123-abcabc',
233
 
                'wibble@fofof--20050401--1928390812',
234
 
            ],
235
 
            allow_leftmost_as_ghost=True)
236
 
        rev_id = wt.commit("commit from ghost base with one merge")
237
 
        # the revision should have been committed with two parents
238
 
        rev = wt.branch.repository.get_revision(rev_id)
239
 
        self.assertEqual(['foo@azkhazan-123123-abcabc',
240
 
            'wibble@fofof--20050401--1928390812'],
241
 
            rev.parent_ids)
242
 
 
243
 
    def test_bad_revision(self):
244
 
        self.assertRaises(errors.InvalidRevisionId,
245
 
                          self.get_branch().repository.get_revision,
246
 
                          None)
 
114
        self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
247
115
 
248
116
# TODO 20051003 RBC:
249
 
# compare the gpg-to-sign info for a commit with a ghost and
 
117
# compare the gpg-to-sign info for a commit with a ghost and 
250
118
#     an identical tree without a ghost
251
119
# fetch missing should rewrite the TOC of weaves to list newly available parents.
 
120
        
 
121
    def test_pending_merges(self):
 
122
        """Tracking pending-merged revisions."""
 
123
        b = Branch.initialize('.')
 
124
 
 
125
        self.assertEquals(b.pending_merges(), [])
 
126
        b.add_pending_merge('foo@azkhazan-123123-abcabc')
 
127
        self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
 
128
        b.add_pending_merge('foo@azkhazan-123123-abcabc')
 
129
        self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
 
130
        b.add_pending_merge('wibble@fofof--20050401--1928390812')
 
131
        self.assertEquals(b.pending_merges(),
 
132
                          ['foo@azkhazan-123123-abcabc',
 
133
                           'wibble@fofof--20050401--1928390812'])
 
134
        b.commit("commit from base with two merges")
 
135
        rev = b.get_revision(b.revision_history()[0])
 
136
        self.assertEquals(len(rev.parent_ids), 2)
 
137
        self.assertEquals(rev.parent_ids[0],
 
138
                          'foo@azkhazan-123123-abcabc')
 
139
        self.assertEquals(rev.parent_ids[1],
 
140
                           'wibble@fofof--20050401--1928390812')
 
141
        # list should be cleared when we do a commit
 
142
        self.assertEquals(b.pending_merges(), [])
252
143
 
253
144
    def test_sign_existing_revision(self):
254
 
        wt = self.make_branch_and_tree('.')
255
 
        branch = wt.branch
256
 
        wt.commit("base", allow_pointless=True, rev_id='A')
 
145
        import bzrlib.gpg
 
146
        branch = Branch.initialize('.')
 
147
        branch.commit("base", allow_pointless=True, rev_id='A')
257
148
        from bzrlib.testament import Testament
258
 
        strategy = gpg.LoopbackGPGStrategy(None)
259
 
        branch.repository.lock_write()
260
 
        branch.repository.start_write_group()
261
 
        branch.repository.sign_revision('A', strategy)
262
 
        branch.repository.commit_write_group()
263
 
        branch.repository.unlock()
264
 
        self.assertEqual('-----BEGIN PSEUDO-SIGNED CONTENT-----\n' +
265
 
                         Testament.from_revision(branch.repository,
266
 
                         'A').as_short_text() +
267
 
                         '-----END PSEUDO-SIGNED CONTENT-----\n',
268
 
                         branch.repository.get_signature_text('A'))
269
 
 
270
 
    def test_store_signature(self):
271
 
        wt = self.make_branch_and_tree('.')
272
 
        branch = wt.branch
273
 
        branch.lock_write()
274
 
        try:
275
 
            branch.repository.start_write_group()
276
 
            try:
277
 
                branch.repository.store_revision_signature(
278
 
                    gpg.LoopbackGPGStrategy(None), 'FOO', 'A')
279
 
            except:
280
 
                branch.repository.abort_write_group()
281
 
                raise
282
 
            else:
283
 
                branch.repository.commit_write_group()
284
 
        finally:
285
 
            branch.unlock()
286
 
        # A signature without a revision should not be accessible.
287
 
        self.assertRaises(errors.NoSuchRevision,
288
 
                          branch.repository.has_signature_for_revision_id,
289
 
                          'A')
290
 
        wt.commit("base", allow_pointless=True, rev_id='A')
291
 
        self.assertEqual('-----BEGIN PSEUDO-SIGNED CONTENT-----\n'
292
 
                         'FOO-----END PSEUDO-SIGNED CONTENT-----\n',
293
 
                         branch.repository.get_signature_text('A'))
294
 
 
295
 
    def test_branch_keeps_signatures(self):
296
 
        wt = self.make_branch_and_tree('source')
297
 
        wt.commit('A', allow_pointless=True, rev_id='A')
298
 
        repo = wt.branch.repository
299
 
        repo.lock_write()
300
 
        repo.start_write_group()
301
 
        repo.sign_revision('A', gpg.LoopbackGPGStrategy(None))
302
 
        repo.commit_write_group()
303
 
        repo.unlock()
304
 
        #FIXME: clone should work to urls,
305
 
        # wt.clone should work to disks.
306
 
        self.build_tree(['target/'])
307
 
        d2 = repo.bzrdir.clone(urlutils.local_path_to_url('target'))
308
 
        self.assertEqual(repo.get_signature_text('A'),
309
 
                         d2.open_repository().get_signature_text('A'))
310
 
 
311
 
    def test_nicks(self):
312
 
        """Test explicit and implicit branch nicknames.
313
 
 
314
 
        Nicknames are implicitly the name of the branch's directory, unless an
315
 
        explicit nickname is set.  That is, an explicit nickname always
316
 
        overrides the implicit one.
317
 
        """
318
 
        t = self.get_transport()
319
 
        branch = self.make_branch('bzr.dev')
320
 
        # The nick will be 'bzr.dev', because there is no explicit nick set.
321
 
        self.assertEqual(branch.nick, 'bzr.dev')
322
 
        # Move the branch to a different directory, 'bzr.ab'.  Now that branch
323
 
        # will report its nick as 'bzr.ab'.
324
 
        t.move('bzr.dev', 'bzr.ab')
325
 
        branch = _mod_branch.Branch.open(self.get_url('bzr.ab'))
326
 
        self.assertEqual(branch.nick, 'bzr.ab')
327
 
        # Set the branch nick explicitly.  This will ensure there's a branch
328
 
        # config file in the branch.
329
 
        branch.nick = "Aaron's branch"
330
 
        if not isinstance(branch, remote.RemoteBranch):
331
 
            self.assertTrue(branch._transport.has("branch.conf"))
332
 
        # Because the nick has been set explicitly, the nick is now always
333
 
        # "Aaron's branch", regardless of directory name.
334
 
        self.assertEqual(branch.nick, "Aaron's branch")
335
 
        t.move('bzr.ab', 'integration')
336
 
        branch = _mod_branch.Branch.open(self.get_url('integration'))
337
 
        self.assertEqual(branch.nick, "Aaron's branch")
338
 
        branch.nick = u"\u1234"
339
 
        self.assertEqual(branch.nick, u"\u1234")
340
 
 
341
 
    def test_commit_nicks(self):
342
 
        """Nicknames are committed to the revision"""
343
 
        wt = self.make_branch_and_tree('bzr.dev')
344
 
        branch = wt.branch
345
 
        branch.nick = "My happy branch"
346
 
        wt.commit('My commit respect da nick.')
347
 
        committed = branch.repository.get_revision(branch.last_revision())
348
 
        self.assertEqual(committed.properties["branch-nick"],
349
 
                         "My happy branch")
350
 
 
351
 
    def test_create_colocated(self):
352
 
        try:
353
 
            repo = self.make_repository('.', shared=True)
354
 
        except errors.IncompatibleFormat:
355
 
            return
356
 
        self.assertEquals(0, len(repo.bzrdir.list_branches()))
357
 
        try:
358
 
            child_branch1 = self.branch_format.initialize(repo.bzrdir, 
359
 
                name='branch1')
360
 
        except (errors.UninitializableFormat, errors.NoColocatedBranchSupport):
361
 
            # branch references are not default init'able and
362
 
            # not all bzrdirs support colocated branches.
363
 
            return
364
 
        self.assertEquals(1, len(repo.bzrdir.list_branches()))
365
 
        self.branch_format.initialize(repo.bzrdir, name='branch2')
366
 
        self.assertEquals(2, len(repo.bzrdir.list_branches()))
367
 
 
368
 
    def test_create_open_branch_uses_repository(self):
369
 
        try:
370
 
            repo = self.make_repository('.', shared=True)
371
 
        except errors.IncompatibleFormat:
372
 
            return
373
 
        child_transport = repo.bzrdir.root_transport.clone('child')
374
 
        child_transport.mkdir('.')
375
 
        child_dir = self.bzrdir_format.initialize_on_transport(child_transport)
376
 
        try:
377
 
            child_branch = self.branch_format.initialize(child_dir)
378
 
        except errors.UninitializableFormat:
379
 
            # branch references are not default init'able.
380
 
            return
381
 
        self.assertEqual(repo.bzrdir.root_transport.base,
382
 
                         child_branch.repository.bzrdir.root_transport.base)
383
 
        child_branch = _mod_branch.Branch.open(self.get_url('child'))
384
 
        self.assertEqual(repo.bzrdir.root_transport.base,
385
 
                         child_branch.repository.bzrdir.root_transport.base)
386
 
 
387
 
    def test_format_description(self):
388
 
        tree = self.make_branch_and_tree('tree')
389
 
        text = tree.branch._format.get_format_description()
390
 
        self.assertTrue(len(text))
391
 
 
392
 
    def test_get_commit_builder(self):
393
 
        branch = self.make_branch(".")
394
 
        branch.lock_write()
395
 
        builder = branch.get_commit_builder([])
396
 
        self.assertIsInstance(builder, repository.CommitBuilder)
397
 
        branch.repository.commit_write_group()
398
 
        branch.unlock()
399
 
 
400
 
    def test_generate_revision_history(self):
401
 
        """Create a fake revision history easily."""
402
 
        tree = self.make_branch_and_tree('.')
403
 
        rev1 = tree.commit('foo')
404
 
        orig_history = tree.branch.revision_history()
405
 
        rev2 = tree.commit('bar', allow_pointless=True)
406
 
        tree.branch.generate_revision_history(rev1)
407
 
        self.assertEqual(orig_history, tree.branch.revision_history())
408
 
 
409
 
    def test_generate_revision_history_NULL_REVISION(self):
410
 
        tree = self.make_branch_and_tree('.')
411
 
        rev1 = tree.commit('foo')
412
 
        tree.branch.generate_revision_history(revision.NULL_REVISION)
413
 
        self.assertEqual([], tree.branch.revision_history())
414
 
 
415
 
    def test_create_checkout(self):
416
 
        tree_a = self.make_branch_and_tree('a')
417
 
        branch_a = tree_a.branch
418
 
        checkout_b = branch_a.create_checkout('b')
419
 
        self.assertEqual('null:', checkout_b.last_revision())
420
 
        checkout_b.commit('rev1', rev_id='rev1')
421
 
        self.assertEqual('rev1', branch_a.last_revision())
422
 
        self.assertNotEqual(checkout_b.branch.base, branch_a.base)
423
 
 
424
 
        checkout_c = branch_a.create_checkout('c', lightweight=True)
425
 
        self.assertEqual('rev1', checkout_c.last_revision())
426
 
        checkout_c.commit('rev2', rev_id='rev2')
427
 
        self.assertEqual('rev2', branch_a.last_revision())
428
 
        self.assertEqual(checkout_c.branch.base, branch_a.base)
429
 
 
430
 
        checkout_d = branch_a.create_checkout('d', lightweight=True)
431
 
        self.assertEqual('rev2', checkout_d.last_revision())
432
 
        checkout_e = branch_a.create_checkout('e')
433
 
        self.assertEqual('rev2', checkout_e.last_revision())
434
 
 
435
 
    def test_create_anonymous_lightweight_checkout(self):
436
 
        """A lightweight checkout from a readonly branch should succeed."""
437
 
        tree_a = self.make_branch_and_tree('a')
438
 
        rev_id = tree_a.commit('put some content in the branch')
439
 
        # open the branch via a readonly transport
440
 
        source_branch = _mod_branch.Branch.open(self.get_readonly_url('a'))
441
 
        # sanity check that the test will be valid
442
 
        self.assertRaises((errors.LockError, errors.TransportNotPossible),
443
 
            source_branch.lock_write)
444
 
        checkout = source_branch.create_checkout('c', lightweight=True)
445
 
        self.assertEqual(rev_id, checkout.last_revision())
446
 
 
447
 
    def test_create_anonymous_heavyweight_checkout(self):
448
 
        """A regular checkout from a readonly branch should succeed."""
449
 
        tree_a = self.make_branch_and_tree('a')
450
 
        rev_id = tree_a.commit('put some content in the branch')
451
 
        # open the branch via a readonly transport
452
 
        source_branch = _mod_branch.Branch.open(self.get_readonly_url('a'))
453
 
        # sanity check that the test will be valid
454
 
        self.assertRaises((errors.LockError, errors.TransportNotPossible),
455
 
            source_branch.lock_write)
456
 
        checkout = source_branch.create_checkout('c')
457
 
        self.assertEqual(rev_id, checkout.last_revision())
458
 
 
459
 
    def test_set_revision_history(self):
460
 
        tree = self.make_branch_and_tree('a')
461
 
        tree.commit('a commit', rev_id='rev1')
462
 
        br = tree.branch
463
 
        self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
464
 
            br.set_revision_history, ["rev1"])
465
 
        self.assertEquals(br.revision_history(), ["rev1"])
466
 
        self.applyDeprecated(symbol_versioning.deprecated_in((2, 4, 0)),
467
 
            br.set_revision_history, [])
468
 
        self.assertEquals(br.revision_history(), [])
469
 
 
470
 
    def test_heads_to_fetch(self):
471
 
        # heads_to_fetch is a method that returns a collection of revids that
472
 
        # need to be fetched to copy this branch into another repo.  At a
473
 
        # minimum this will include the tip.
474
 
        # (In native formats, this is the tip + tags, but other formats may
475
 
        # have other revs needed)
476
 
        tree = self.make_branch_and_tree('a')
477
 
        tree.commit('first commit', rev_id='rev1')
478
 
        tree.commit('second commit', rev_id='rev2')
479
 
        must_fetch, should_fetch = tree.branch.heads_to_fetch()
480
 
        self.assertTrue('rev2' in must_fetch)
481
 
 
482
 
    def test_heads_to_fetch_not_null_revision(self):
483
 
        # NULL_REVISION does not appear in the result of heads_to_fetch, even
484
 
        # for an empty branch.
485
 
        tree = self.make_branch_and_tree('a')
486
 
        must_fetch, should_fetch = tree.branch.heads_to_fetch()
487
 
        self.assertFalse(revision.NULL_REVISION in must_fetch)
488
 
        self.assertFalse(revision.NULL_REVISION in should_fetch)
489
 
 
490
 
 
491
 
class TestBranchFormat(per_branch.TestCaseWithBranch):
492
 
 
493
 
    def test_branch_format_network_name(self):
494
 
        br = self.make_branch('.')
495
 
        format = br._format
496
 
        network_name = format.network_name()
497
 
        self.assertIsInstance(network_name, str)
498
 
        # We want to test that the network_name matches the actual format on
499
 
        # disk. For local branches that means that using network_name as a key
500
 
        # in the registry gives back the same format. For remote branches we
501
 
        # check that the network_name of the RemoteBranchFormat we have locally
502
 
        # matches the actual format present on disk.
503
 
        if isinstance(format, remote.RemoteBranchFormat):
504
 
            br._ensure_real()
505
 
            real_branch = br._real_branch
506
 
            self.assertEqual(real_branch._format.network_name(), network_name)
507
 
        else:
508
 
            registry = _mod_branch.network_format_registry
509
 
            looked_up_format = registry.get(network_name)
510
 
            self.assertEqual(format.__class__, looked_up_format.__class__)
511
 
 
512
 
 
513
 
class ChrootedTests(per_branch.TestCaseWithBranch):
514
 
    """A support class that provides readonly urls outside the local namespace.
515
 
 
516
 
    This is done by checking if self.transport_server is a MemoryServer. if it
517
 
    is then we are chrooted already, if it is not then an HttpServer is used
518
 
    for readonly urls.
519
 
    """
520
 
 
521
 
    def setUp(self):
522
 
        super(ChrootedTests, self).setUp()
523
 
        if not self.vfs_transport_factory == memory.MemoryServer:
524
 
            self.transport_readonly_server = HttpServer
 
149
        branch.sign_revision('A', bzrlib.gpg.LoopbackGPGStrategy(None))
 
150
        self.assertEqual(Testament.from_revision(branch, 'A').as_short_text(),
 
151
                         branch.revision_store.get('A', 'sig').read())
 
152
 
 
153
 
 
154
class TestRemote(TestCaseWithWebserver):
525
155
 
526
156
    def test_open_containing(self):
527
 
        self.assertRaises(errors.NotBranchError,
528
 
                          _mod_branch.Branch.open_containing,
529
 
                          self.get_readonly_url(''))
530
 
        self.assertRaises(errors.NotBranchError,
531
 
                          _mod_branch.Branch.open_containing,
532
 
                          self.get_readonly_url('g/p/q'))
533
 
        branch = self.make_branch('.')
534
 
        branch, relpath = _mod_branch.Branch.open_containing(
535
 
            self.get_readonly_url(''))
536
 
        self.assertEqual('', relpath)
537
 
        branch, relpath = _mod_branch.Branch.open_containing(
538
 
            self.get_readonly_url('g/p/q'))
539
 
        self.assertEqual('g/p/q', relpath)
540
 
 
 
157
        self.assertRaises(NotBranchError, Branch.open_containing,
 
158
                          self.get_remote_url(''))
 
159
        self.assertRaises(NotBranchError, Branch.open_containing,
 
160
                          self.get_remote_url('g/p/q'))
 
161
        b = Branch.initialize('.')
 
162
        Branch.open_containing(self.get_remote_url(''))
 
163
        Branch.open_containing(self.get_remote_url('g/p/q'))
 
164
        
 
165
# TODO: rewrite this as a regular unittest, without relying on the displayed output        
 
166
#         >>> from bzrlib.commit import commit
 
167
#         >>> bzrlib.trace.silent = True
 
168
#         >>> br1 = ScratchBranch(files=['foo', 'bar'])
 
169
#         >>> br1.add('foo')
 
170
#         >>> br1.add('bar')
 
171
#         >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
 
172
#         >>> br2 = ScratchBranch()
 
173
#         >>> br2.update_revisions(br1)
 
174
#         Added 2 texts.
 
175
#         Added 1 inventories.
 
176
#         Added 1 revisions.
 
177
#         >>> br2.revision_history()
 
178
#         [u'REVISION-ID-1']
 
179
#         >>> br2.update_revisions(br1)
 
180
#         Added 0 revisions.
 
181
#         >>> br1.text_store.total_size() == br2.text_store.total_size()
 
182
#         True
541
183
 
542
184
class InstrumentedTransaction(object):
543
185
 
548
190
        self.calls = []
549
191
 
550
192
 
551
 
class TestDecorator(object):
552
 
 
553
 
    def __init__(self):
554
 
        self._calls = []
555
 
 
556
 
    def lock_read(self):
557
 
        self._calls.append('lr')
558
 
 
559
 
    def lock_write(self):
560
 
        self._calls.append('lw')
561
 
 
562
 
    def unlock(self):
563
 
        self._calls.append('ul')
564
 
 
565
 
    @_mod_branch.needs_read_lock
566
 
    def do_with_read(self):
567
 
        return 1
568
 
 
569
 
    @_mod_branch.needs_read_lock
570
 
    def except_with_read(self):
571
 
        raise RuntimeError
572
 
 
573
 
    @_mod_branch.needs_write_lock
574
 
    def do_with_write(self):
575
 
        return 2
576
 
 
577
 
    @_mod_branch.needs_write_lock
578
 
    def except_with_write(self):
579
 
        raise RuntimeError
580
 
 
581
 
 
582
 
class TestDecorators(tests.TestCase):
583
 
 
584
 
    def test_needs_read_lock(self):
585
 
        branch = TestDecorator()
586
 
        self.assertEqual(1, branch.do_with_read())
587
 
        self.assertEqual(['lr', 'ul'], branch._calls)
588
 
 
589
 
    def test_excepts_in_read_lock(self):
590
 
        branch = TestDecorator()
591
 
        self.assertRaises(RuntimeError, branch.except_with_read)
592
 
        self.assertEqual(['lr', 'ul'], branch._calls)
593
 
 
594
 
    def test_needs_write_lock(self):
595
 
        branch = TestDecorator()
596
 
        self.assertEqual(2, branch.do_with_write())
597
 
        self.assertEqual(['lw', 'ul'], branch._calls)
598
 
 
599
 
    def test_excepts_in_write_lock(self):
600
 
        branch = TestDecorator()
601
 
        self.assertRaises(RuntimeError, branch.except_with_write)
602
 
        self.assertEqual(['lw', 'ul'], branch._calls)
603
 
 
604
 
 
605
 
class TestBranchPushLocations(per_branch.TestCaseWithBranch):
606
 
 
607
 
    def test_get_push_location_unset(self):
608
 
        self.assertEqual(None, self.get_branch().get_push_location())
609
 
 
610
 
    def test_get_push_location_exact(self):
611
 
        b = self.get_branch()
612
 
        config.LocationConfig.from_string(
613
 
            '[%s]\npush_location=foo\n' % (b.base,), b.base, save=True)
614
 
        self.assertEqual("foo", self.get_branch().get_push_location())
615
 
 
616
 
    def test_set_push_location(self):
617
 
        branch = self.get_branch()
618
 
        branch.set_push_location('foo')
619
 
        self.assertEqual('foo', branch.get_push_location())
620
 
 
621
 
 
622
 
class TestChildSubmitFormats(per_branch.TestCaseWithBranch):
623
 
 
624
 
    def test_get_child_submit_format_default(self):
625
 
        self.assertEqual(None, self.get_branch().get_child_submit_format())
626
 
 
627
 
    def test_get_child_submit_format(self):
628
 
        branch = self.get_branch()
629
 
        branch.get_config().set_user_option('child_submit_format', '10')
630
 
        branch = self.get_branch()
631
 
        self.assertEqual('10', branch.get_child_submit_format())
632
 
 
633
 
 
634
 
class TestFormat(per_branch.TestCaseWithBranch):
635
 
    """Tests for the format itself."""
636
 
 
637
 
    def test_get_reference(self):
638
 
        """get_reference on all regular branches should return None."""
639
 
        if not self.branch_format.is_supported():
640
 
            # unsupported formats are not loopback testable
641
 
            # because the default open will not open them and
642
 
            # they may not be initializable.
643
 
            return
644
 
        made_branch = self.make_branch('.')
645
 
        self.assertEqual(None,
646
 
            made_branch._format.get_reference(made_branch.bzrdir))
647
 
 
648
 
    def test_set_reference(self):
649
 
        """set_reference on all regular branches should be callable."""
650
 
        if not self.branch_format.is_supported():
651
 
            # unsupported formats are not loopback testable
652
 
            # because the default open will not open them and
653
 
            # they may not be initializable.
654
 
            return
655
 
        this_branch = self.make_branch('this')
656
 
        other_branch = self.make_branch('other')
657
 
        try:
658
 
            this_branch._format.set_reference(this_branch.bzrdir, None,
659
 
                other_branch)
660
 
        except NotImplementedError:
661
 
            # that's ok
662
 
            pass
663
 
        else:
664
 
            ref = this_branch._format.get_reference(this_branch.bzrdir)
665
 
            self.assertEqual(ref, other_branch.base)
666
 
 
667
 
    def test_format_initialize_find_open(self):
668
 
        # loopback test to check the current format initializes to itself.
669
 
        if not self.branch_format.is_supported():
670
 
            # unsupported formats are not loopback testable
671
 
            # because the default open will not open them and
672
 
            # they may not be initializable.
673
 
            return
674
 
        # supported formats must be able to init and open
675
 
        t = self.get_transport()
676
 
        readonly_t = transport.get_transport_from_url(self.get_readonly_url())
677
 
        made_branch = self.make_branch('.')
678
 
        self.assertIsInstance(made_branch, _mod_branch.Branch)
679
 
 
680
 
        # find it via bzrdir opening:
681
 
        opened_control = bzrdir.BzrDir.open(readonly_t.base)
682
 
        direct_opened_branch = opened_control.open_branch()
683
 
        self.assertEqual(direct_opened_branch.__class__, made_branch.__class__)
684
 
        self.assertEqual(opened_control, direct_opened_branch.bzrdir)
685
 
        self.assertIsInstance(direct_opened_branch._format,
686
 
            self.branch_format.__class__)
687
 
 
688
 
        # find it via Branch.open
689
 
        opened_branch = _mod_branch.Branch.open(readonly_t.base)
690
 
        self.assertIsInstance(opened_branch, made_branch.__class__)
691
 
        self.assertEqual(made_branch._format.__class__,
692
 
                         opened_branch._format.__class__)
693
 
        # if it has a unique id string, can we probe for it ?
694
 
        try:
695
 
            self.branch_format.get_format_string()
696
 
        except NotImplementedError:
697
 
            return
698
 
        self.assertEqual(self.branch_format,
699
 
                         opened_control.find_branch_format())
700
 
 
701
 
 
702
 
class TestBound(per_branch.TestCaseWithBranch):
703
 
 
704
 
    def test_bind_unbind(self):
705
 
        branch = self.make_branch('1')
706
 
        branch2 = self.make_branch('2')
707
 
        try:
708
 
            branch.bind(branch2)
709
 
        except errors.UpgradeRequired:
710
 
            raise tests.TestNotApplicable('Format does not support binding')
711
 
        self.assertTrue(branch.unbind())
712
 
        self.assertFalse(branch.unbind())
713
 
        self.assertIs(None, branch.get_bound_location())
714
 
 
715
 
    def test_old_bound_location(self):
716
 
        branch = self.make_branch('branch1')
717
 
        try:
718
 
            self.assertIs(None, branch.get_old_bound_location())
719
 
        except errors.UpgradeRequired:
720
 
            raise tests.TestNotApplicable(
721
 
                    'Format does not store old bound locations')
722
 
        branch2 = self.make_branch('branch2')
723
 
        branch.bind(branch2)
724
 
        self.assertIs(None, branch.get_old_bound_location())
725
 
        branch.unbind()
726
 
        self.assertContainsRe(branch.get_old_bound_location(), '\/branch2\/$')
727
 
 
728
 
    def test_bind_diverged(self):
729
 
        tree_a = self.make_branch_and_tree('tree_a')
730
 
        tree_a.commit('rev1a')
731
 
        tree_b = tree_a.bzrdir.sprout('tree_b').open_workingtree()
732
 
        tree_a.commit('rev2a')
733
 
        tree_b.commit('rev2b')
734
 
        try:
735
 
            tree_b.branch.bind(tree_a.branch)
736
 
        except errors.UpgradeRequired:
737
 
            raise tests.TestNotApplicable('Format does not support binding')
738
 
 
739
 
    def test_unbind_clears_cached_master_branch(self):
740
 
        """b.unbind clears any cached value of b.get_master_branch."""
741
 
        master = self.make_branch('master')
742
 
        branch = self.make_branch('branch')
743
 
        try:
744
 
            branch.bind(master)
745
 
        except errors.UpgradeRequired:
746
 
            raise tests.TestNotApplicable('Format does not support binding')
747
 
        self.addCleanup(branch.lock_write().unlock)
748
 
        self.assertNotEqual(None, branch.get_master_branch())
749
 
        branch.unbind()
750
 
        self.assertEqual(None, branch.get_master_branch())
751
 
 
752
 
    def test_unlocked_does_not_cache_master_branch(self):
753
 
        """Unlocked branches do not cache the result of get_master_branch."""
754
 
        master = self.make_branch('master')
755
 
        branch1 = self.make_branch('branch')
756
 
        try:
757
 
            branch1.bind(master)
758
 
        except errors.UpgradeRequired:
759
 
            raise tests.TestNotApplicable('Format does not support binding')
760
 
        # Open branch1 again
761
 
        branch2 = branch1.bzrdir.open_branch()
762
 
        self.assertNotEqual(None, branch1.get_master_branch())
763
 
        # Unbind the branch via branch2.  branch1 isn't locked so will
764
 
        # immediately return the new value for get_master_branch.
765
 
        branch2.unbind()
766
 
        self.assertEqual(None, branch1.get_master_branch())
767
 
 
768
 
    def test_bind_clears_cached_master_branch(self):
769
 
        """b.bind clears any cached value of b.get_master_branch."""
770
 
        master1 = self.make_branch('master1')
771
 
        master2 = self.make_branch('master2')
772
 
        branch = self.make_branch('branch')
773
 
        try:
774
 
            branch.bind(master1)
775
 
        except errors.UpgradeRequired:
776
 
            raise tests.TestNotApplicable('Format does not support binding')
777
 
        self.addCleanup(branch.lock_write().unlock)
778
 
        self.assertNotEqual(None, branch.get_master_branch())
779
 
        branch.bind(master2)
780
 
        self.assertEqual('.', urlutils.relative_url(self.get_url('master2'),
781
 
                branch.get_master_branch().base))
782
 
 
783
 
    def test_set_bound_location_clears_cached_master_branch(self):
784
 
        """b.set_bound_location clears any cached value of b.get_master_branch.
785
 
        """
786
 
        master1 = self.make_branch('master1')
787
 
        master2 = self.make_branch('master2')
788
 
        branch = self.make_branch('branch')
789
 
        try:
790
 
            branch.bind(master1)
791
 
        except errors.UpgradeRequired:
792
 
            raise tests.TestNotApplicable('Format does not support binding')
793
 
        self.addCleanup(branch.lock_write().unlock)
794
 
        self.assertNotEqual(None, branch.get_master_branch())
795
 
        branch.set_bound_location(self.get_url('master2'))
796
 
        self.assertEqual('.', urlutils.relative_url(self.get_url('master2'),
797
 
                branch.get_master_branch().base))
798
 
 
799
 
 
800
 
class TestStrict(per_branch.TestCaseWithBranch):
801
 
 
802
 
    def test_strict_history(self):
803
 
        tree1 = self.make_branch_and_tree('tree1')
804
 
        try:
805
 
            tree1.branch.set_append_revisions_only(True)
806
 
        except errors.UpgradeRequired:
807
 
            raise tests.TestSkipped('Format does not support strict history')
808
 
        tree1.commit('empty commit')
809
 
        tree2 = tree1.bzrdir.sprout('tree2').open_workingtree()
810
 
        tree2.commit('empty commit 2')
811
 
        tree1.pull(tree2.branch)
812
 
        tree1.commit('empty commit 3')
813
 
        tree2.commit('empty commit 4')
814
 
        self.assertRaises(errors.DivergedBranches, tree1.pull, tree2.branch)
815
 
        tree2.merge_from_branch(tree1.branch)
816
 
        tree2.commit('empty commit 5')
817
 
        self.assertRaises(errors.AppendRevisionsOnlyViolation, tree1.pull,
818
 
                          tree2.branch)
819
 
        tree3 = tree1.bzrdir.sprout('tree3').open_workingtree()
820
 
        tree3.merge_from_branch(tree2.branch)
821
 
        tree3.commit('empty commit 6')
822
 
        tree2.pull(tree3.branch)
823
 
 
824
 
 
825
 
class TestIgnoreFallbacksParameter(per_branch.TestCaseWithBranch):
826
 
 
827
 
    def make_branch_with_fallback(self):
828
 
        fallback = self.make_branch('fallback')
829
 
        if not fallback._format.supports_stacking():
830
 
            raise tests.TestNotApplicable("format does not support stacking")
831
 
        stacked = self.make_branch('stacked')
832
 
        stacked.set_stacked_on_url(fallback.base)
833
 
        return stacked
834
 
 
835
 
    def test_fallbacks_not_opened(self):
836
 
        stacked = self.make_branch_with_fallback()
837
 
        self.get_transport('').rename('fallback', 'moved')
838
 
        reopened = stacked.bzrdir.open_branch(ignore_fallbacks=True)
839
 
        self.assertEqual([], reopened.repository._fallback_repositories)
840
 
 
841
 
    def test_fallbacks_are_opened(self):
842
 
        stacked = self.make_branch_with_fallback()
843
 
        reopened = stacked.bzrdir.open_branch(ignore_fallbacks=False)
844
 
        self.assertLength(1, reopened.repository._fallback_repositories)
845
 
 
846
 
 
847
 
class TestReferenceLocation(per_branch.TestCaseWithBranch):
848
 
 
849
 
    def test_reference_parent(self):
850
 
        tree = self.make_branch_and_tree('tree')
851
 
        subtree = self.make_branch_and_tree('tree/subtree')
852
 
        subtree.set_root_id('subtree-id')
853
 
        try:
854
 
            tree.add_reference(subtree)
855
 
        except errors.UnsupportedOperation:
856
 
            raise tests.TestNotApplicable('Tree cannot hold references.')
857
 
        reference_parent = tree.branch.reference_parent('subtree-id',
858
 
                                                        'subtree')
859
 
        self.assertEqual(subtree.branch.base, reference_parent.base)
860
 
 
861
 
    def test_reference_parent_accepts_possible_transports(self):
862
 
        tree = self.make_branch_and_tree('tree')
863
 
        subtree = self.make_branch_and_tree('tree/subtree')
864
 
        subtree.set_root_id('subtree-id')
865
 
        try:
866
 
            tree.add_reference(subtree)
867
 
        except errors.UnsupportedOperation:
868
 
            raise tests.TestNotApplicable('Tree cannot hold references.')
869
 
        reference_parent = tree.branch.reference_parent('subtree-id',
870
 
            'subtree', possible_transports=[subtree.bzrdir.root_transport])
871
 
 
872
 
    def test_get_reference_info(self):
873
 
        branch = self.make_branch('branch')
874
 
        try:
875
 
            path, loc = branch.get_reference_info('file-id')
876
 
        except errors.UnsupportedOperation:
877
 
            raise tests.TestNotApplicable('Branch cannot hold references.')
878
 
        self.assertIs(None, path)
879
 
        self.assertIs(None, loc)
880
 
 
881
 
    def test_set_reference_info(self):
882
 
        branch = self.make_branch('branch')
883
 
        try:
884
 
            branch.set_reference_info('file-id', 'path/to/location',
885
 
                                      'path/to/file')
886
 
        except errors.UnsupportedOperation:
887
 
            raise tests.TestNotApplicable('Branch cannot hold references.')
888
 
 
889
 
    def test_set_get_reference_info(self):
890
 
        branch = self.make_branch('branch')
891
 
        try:
892
 
            branch.set_reference_info('file-id', 'path/to/file',
893
 
                                      'path/to/location')
894
 
        except errors.UnsupportedOperation:
895
 
            raise tests.TestNotApplicable('Branch cannot hold references.')
896
 
        # Create a new instance to ensure storage is permanent
897
 
        branch = _mod_branch.Branch.open('branch')
898
 
        tree_path, branch_location = branch.get_reference_info('file-id')
899
 
        self.assertEqual('path/to/location', branch_location)
900
 
 
901
 
    def test_set_null_reference_info(self):
902
 
        branch = self.make_branch('branch')
903
 
        try:
904
 
            branch.set_reference_info('file-id', 'path/to/file',
905
 
                                      'path/to/location')
906
 
        except errors.UnsupportedOperation:
907
 
            raise tests.TestNotApplicable('Branch cannot hold references.')
908
 
        branch.set_reference_info('file-id', None, None)
909
 
        tree_path, branch_location = branch.get_reference_info('file-id')
910
 
        self.assertIs(None, tree_path)
911
 
        self.assertIs(None, branch_location)
912
 
 
913
 
    def test_set_null_reference_info_when_null(self):
914
 
        branch = self.make_branch('branch')
915
 
        try:
916
 
            tree_path, branch_location = branch.get_reference_info('file-id')
917
 
        except errors.UnsupportedOperation:
918
 
            raise tests.TestNotApplicable('Branch cannot hold references.')
919
 
        self.assertIs(None, tree_path)
920
 
        self.assertIs(None, branch_location)
921
 
        branch.set_reference_info('file-id', None, None)
922
 
 
923
 
    def test_set_null_requires_two_nones(self):
924
 
        branch = self.make_branch('branch')
925
 
        try:
926
 
            e = self.assertRaises(ValueError, branch.set_reference_info,
927
 
                                  'file-id', 'path', None)
928
 
        except errors.UnsupportedOperation:
929
 
            raise tests.TestNotApplicable('Branch cannot hold references.')
930
 
        self.assertEqual('tree_path must be None when branch_location is'
931
 
                         ' None.', str(e))
932
 
        e = self.assertRaises(ValueError, branch.set_reference_info,
933
 
                              'file-id', None, 'location')
934
 
        self.assertEqual('branch_location must be None when tree_path is'
935
 
                         ' None.', str(e))
936
 
 
937
 
    def make_branch_with_reference(self, location, reference_location,
938
 
                                   file_id='file-id'):
939
 
        branch = self.make_branch(location)
940
 
        try:
941
 
            branch.set_reference_info(file_id, 'path/to/file',
942
 
                                      reference_location)
943
 
        except errors.UnsupportedOperation:
944
 
            raise tests.TestNotApplicable('Branch cannot hold references.')
945
 
        return branch
946
 
 
947
 
    def test_reference_parent_from_reference_info_(self):
948
 
        referenced_branch = self.make_branch('reference_branch')
949
 
        branch = self.make_branch_with_reference('branch',
950
 
                                                 referenced_branch.base)
951
 
        parent = branch.reference_parent('file-id', 'path/to/file')
952
 
        self.assertEqual(parent.base, referenced_branch.base)
953
 
 
954
 
    def test_branch_relative_reference_location(self):
955
 
        branch = self.make_branch('branch')
956
 
        try:
957
 
            branch.set_reference_info('file-id', 'path/to/file',
958
 
            '../reference_branch')
959
 
        except errors.UnsupportedOperation:
960
 
            raise tests.TestNotApplicable('Branch cannot hold references.')
961
 
        referenced_branch = self.make_branch('reference_branch')
962
 
        parent = branch.reference_parent('file-id', 'path/to/file')
963
 
        self.assertEqual(parent.base, referenced_branch.base)
964
 
 
965
 
    def test_sprout_copies_reference_location(self):
966
 
        branch = self.make_branch_with_reference('branch', '../reference')
967
 
        new_branch = branch.bzrdir.sprout('new-branch').open_branch()
968
 
        self.assertEqual('../reference',
969
 
                         new_branch.get_reference_info('file-id')[1])
970
 
 
971
 
    def test_clone_copies_reference_location(self):
972
 
        branch = self.make_branch_with_reference('branch', '../reference')
973
 
        new_branch = branch.bzrdir.clone('new-branch').open_branch()
974
 
        self.assertEqual('../reference',
975
 
                         new_branch.get_reference_info('file-id')[1])
976
 
 
977
 
    def test_copied_locations_are_rebased(self):
978
 
        branch = self.make_branch_with_reference('branch', 'reference')
979
 
        new_branch = branch.bzrdir.sprout('branch/new-branch').open_branch()
980
 
        self.assertEqual('../reference',
981
 
                         new_branch.get_reference_info('file-id')[1])
982
 
 
983
 
    def test_update_references_retains_old_references(self):
984
 
        branch = self.make_branch_with_reference('branch', 'reference')
985
 
        new_branch = self.make_branch_with_reference(
986
 
            'new_branch', 'reference', 'file-id2')
987
 
        new_branch.update_references(branch)
988
 
        self.assertEqual('reference',
989
 
                         branch.get_reference_info('file-id')[1])
990
 
 
991
 
    def test_update_references_retains_known_references(self):
992
 
        branch = self.make_branch_with_reference('branch', 'reference')
993
 
        new_branch = self.make_branch_with_reference(
994
 
            'new_branch', 'reference2')
995
 
        new_branch.update_references(branch)
996
 
        self.assertEqual('reference',
997
 
                         branch.get_reference_info('file-id')[1])
998
 
 
999
 
    def test_update_references_skips_known_references(self):
1000
 
        branch = self.make_branch_with_reference('branch', 'reference')
1001
 
        new_branch = branch.bzrdir.sprout('branch/new-branch').open_branch()
1002
 
        new_branch.set_reference_info('file-id', '../foo', '../foo')
1003
 
        new_branch.update_references(branch)
1004
 
        self.assertEqual('reference',
1005
 
                         branch.get_reference_info('file-id')[1])
1006
 
 
1007
 
    def test_pull_updates_references(self):
1008
 
        branch = self.make_branch_with_reference('branch', 'reference')
1009
 
        new_branch = branch.bzrdir.sprout('branch/new-branch').open_branch()
1010
 
        new_branch.set_reference_info('file-id2', '../foo', '../foo')
1011
 
        branch.pull(new_branch)
1012
 
        self.assertEqual('foo',
1013
 
                         branch.get_reference_info('file-id2')[1])
1014
 
 
1015
 
    def test_push_updates_references(self):
1016
 
        branch = self.make_branch_with_reference('branch', 'reference')
1017
 
        new_branch = branch.bzrdir.sprout('branch/new-branch').open_branch()
1018
 
        new_branch.set_reference_info('file-id2', '../foo', '../foo')
1019
 
        new_branch.push(branch)
1020
 
        self.assertEqual('foo',
1021
 
                         branch.get_reference_info('file-id2')[1])
1022
 
 
1023
 
    def test_merge_updates_references(self):
1024
 
        branch = self.make_branch_with_reference('branch', 'reference')
1025
 
        tree = self.make_branch_and_tree('tree')
1026
 
        tree.commit('foo')
1027
 
        branch.pull(tree.branch)
1028
 
        checkout = branch.create_checkout('checkout', lightweight=True)
1029
 
        checkout.commit('bar')
1030
 
        tree.lock_write()
1031
 
        self.addCleanup(tree.unlock)
1032
 
        merger = merge.Merger.from_revision_ids(None, tree,
1033
 
                                                branch.last_revision(),
1034
 
                                                other_branch=branch)
1035
 
        merger.merge_type = merge.Merge3Merger
1036
 
        merger.do_merge()
1037
 
        self.assertEqual('../branch/reference',
1038
 
                         tree.branch.get_reference_info('file-id')[1])
1039
 
 
1040
 
 
1041
 
class TestBranchControlComponent(per_branch.TestCaseWithBranch):
1042
 
    """Branch implementations adequately implement ControlComponent."""
1043
 
    
1044
 
    def test_urls(self):
1045
 
        br = self.make_branch('branch')
1046
 
        self.assertIsInstance(br.user_url, str)
1047
 
        self.assertEqual(br.user_url, br.user_transport.base)
1048
 
        # for all current bzrdir implementations the user dir must be 
1049
 
        # above the control dir but we might need to relax that?
1050
 
        self.assertEqual(br.control_url.find(br.user_url), 0)
1051
 
        self.assertEqual(br.control_url, br.control_transport.base)
 
193
class TestBranchTransaction(TestCaseInTempDir):
 
194
 
 
195
    def setUp(self):
 
196
        super(TestBranchTransaction, self).setUp()
 
197
        self.branch = Branch.initialize('.')
 
198
        
 
199
    def test_default_get_transaction(self):
 
200
        """branch.get_transaction on a new branch should give a PassThrough."""
 
201
        self.failUnless(isinstance(self.branch.get_transaction(),
 
202
                                   transactions.PassThroughTransaction))
 
203
 
 
204
    def test__set_new_transaction(self):
 
205
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
206
 
 
207
    def test__set_over_existing_transaction_raises(self):
 
208
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
209
        self.assertRaises(errors.LockError,
 
210
                          self.branch._set_transaction,
 
211
                          transactions.ReadOnlyTransaction())
 
212
 
 
213
    def test_finish_no_transaction_raises(self):
 
214
        self.assertRaises(errors.LockError, self.branch._finish_transaction)
 
215
 
 
216
    def test_finish_readonly_transaction_works(self):
 
217
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
218
        self.branch._finish_transaction()
 
219
        self.assertEqual(None, self.branch._transaction)
 
220
 
 
221
    def test_unlock_calls_finish(self):
 
222
        self.branch.lock_read()
 
223
        transaction = InstrumentedTransaction()
 
224
        self.branch._transaction = transaction
 
225
        self.branch.unlock()
 
226
        self.assertEqual(['finish'], transaction.calls)
 
227
 
 
228
    def test_lock_read_acquires_ro_transaction(self):
 
229
        self.branch.lock_read()
 
230
        self.failUnless(isinstance(self.branch.get_transaction(),
 
231
                                   transactions.ReadOnlyTransaction))
 
232
        self.branch.unlock()
 
233
        
 
234
    def test_lock_write_acquires_passthrough_transaction(self):
 
235
        self.branch.lock_write()
 
236
        # cannot use get_transaction as its magic
 
237
        self.failUnless(isinstance(self.branch._transaction,
 
238
                                   transactions.PassThroughTransaction))
 
239
        self.branch.unlock()