~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/selftest/testbranch.py

- properties are retrieved when revisions are loaded

- test for this

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# (C) 2005, 2006 Canonical Ltd
 
1
# (C) 2005 Canonical Ltd
2
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
 
"""Tests for branch implementations - tests a branch format."""
18
 
 
19
17
import os
20
 
import sys
21
 
 
22
 
import bzrlib.branch
23
 
import bzrlib.bzrdir as bzrdir
24
 
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
 
18
from bzrlib.branch import Branch
 
19
from bzrlib.clone import copy_branch
25
20
from bzrlib.commit import commit
26
21
import bzrlib.errors as errors
27
 
from bzrlib.errors import (FileExists,
28
 
                           NoSuchRevision,
29
 
                           NoSuchFile,
30
 
                           UninitializableFormat,
31
 
                           NotBranchError,
32
 
                           )
33
 
import bzrlib.gpg
34
 
from bzrlib.osutils import getcwd
35
 
from bzrlib.tests import TestCase, TestCaseWithTransport, TestSkipped
36
 
from bzrlib.tests.bzrdir_implementations.test_bzrdir import TestCaseWithBzrDir
 
22
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
 
23
from bzrlib.selftest import TestCaseInTempDir
37
24
from bzrlib.trace import mutter
38
25
import bzrlib.transactions as transactions
39
 
from bzrlib.transport import get_transport
40
 
from bzrlib.transport.http import HttpServer
41
 
from bzrlib.transport.memory import MemoryServer
42
 
from bzrlib.upgrade import upgrade
43
 
from bzrlib.workingtree import WorkingTree
44
 
 
45
 
 
46
 
# TODO: Make a branch using basis branch, and check that it 
47
 
# doesn't request any files that could have been avoided, by 
48
 
# hooking into the Transport.
49
 
 
50
 
 
51
 
class TestCaseWithBranch(TestCaseWithBzrDir):
52
 
 
53
 
    def setUp(self):
54
 
        super(TestCaseWithBranch, self).setUp()
55
 
        self.branch = None
56
 
 
57
 
    def get_branch(self):
58
 
        if self.branch is None:
59
 
            self.branch = self.make_branch('')
60
 
        return self.branch
61
 
 
62
 
    def make_branch(self, relpath, format=None):
63
 
        repo = self.make_repository(relpath, format=format)
64
 
        # fixme RBC 20060210 this isnt necessarily a fixable thing,
65
 
        # Skipped is the wrong exception to raise.
66
 
        try:
67
 
            return self.branch_format.initialize(repo.bzrdir)
68
 
        except errors.UninitializableFormat:
69
 
            raise TestSkipped('Uninitializable branch format')
70
 
 
71
 
    def make_repository(self, relpath, shared=False, format=None):
72
 
        made_control = self.make_bzrdir(relpath, format=format)
73
 
        return made_control.create_repository(shared=shared)
74
 
 
75
 
 
76
 
class TestBranch(TestCaseWithBranch):
 
26
from bzrlib.selftest.HTTPTestUtil import TestCaseWithWebserver
 
27
 
 
28
class TestBranch(TestCaseInTempDir):
77
29
 
78
30
    def test_append_revisions(self):
79
31
        """Test appending more than one revision"""
80
 
        br = self.get_branch()
 
32
        br = Branch.initialize(".")
81
33
        br.append_revision("rev1")
82
34
        self.assertEquals(br.revision_history(), ["rev1",])
83
35
        br.append_revision("rev2", "rev3")
85
37
 
86
38
    def test_fetch_revisions(self):
87
39
        """Test fetch-revision operation."""
88
 
        get_transport(self.get_url()).mkdir('b1')
89
 
        get_transport(self.get_url()).mkdir('b2')
90
 
        wt = self.make_branch_and_tree('b1')
91
 
        b1 = wt.branch
92
 
        b2 = self.make_branch('b2')
93
 
        file('b1/foo', 'w').write('hello')
94
 
        wt.add(['foo'], ['foo-id'])
95
 
        wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
 
40
        from bzrlib.fetch import Fetcher
 
41
        os.mkdir('b1')
 
42
        os.mkdir('b2')
 
43
        b1 = Branch.initialize('b1')
 
44
        b2 = Branch.initialize('b2')
 
45
        file(os.sep.join(['b1', 'foo']), 'w').write('hello')
 
46
        b1.add(['foo'], ['foo-id'])
 
47
        b1.commit('lala!', rev_id='revision-1', allow_pointless=False)
96
48
 
97
49
        mutter('start fetch')
98
 
        self.assertEqual((1, []), b2.fetch(b1))
99
 
 
100
 
        rev = b2.repository.get_revision('revision-1')
101
 
        tree = b2.repository.revision_tree('revision-1')
102
 
        self.assertEqual(tree.get_file_text('foo-id'), 'hello')
103
 
 
104
 
    def get_unbalanced_tree_pair(self):
105
 
        """Return two branches, a and b, with one file in a."""
106
 
        get_transport(self.get_url()).mkdir('a')
107
 
        tree_a = self.make_branch_and_tree('a')
 
50
        f = Fetcher(from_branch=b1, to_branch=b2)
 
51
        eq = self.assertEquals
 
52
        eq(f.count_copied, 1)
 
53
        eq(f.last_revision, 'revision-1')
 
54
 
 
55
        rev = b2.get_revision('revision-1')
 
56
        tree = b2.revision_tree('revision-1')
 
57
        eq(tree.get_file_text('foo-id'), 'hello')
 
58
 
 
59
    def test_push_stores(self):
 
60
        """Copy the stores from one branch to another"""
 
61
        os.mkdir('a')
 
62
        br_a = Branch.initialize("a")
108
63
        file('a/b', 'wb').write('b')
109
 
        tree_a.add('b')
110
 
        tree_a.commit("silly commit", rev_id='A')
111
 
 
112
 
        get_transport(self.get_url()).mkdir('b')
113
 
        tree_b = self.make_branch_and_tree('b')
114
 
        return tree_a, tree_b
115
 
 
116
 
    def get_balanced_branch_pair(self):
117
 
        """Returns br_a, br_b as with one commit in a, and b has a's stores."""
118
 
        tree_a, tree_b = self.get_unbalanced_tree_pair()
119
 
        tree_b.branch.repository.fetch(tree_a.branch.repository)
120
 
        return tree_a, tree_b
121
 
 
122
 
    def test_clone_branch(self):
 
64
        br_a.add('b')
 
65
        commit(br_a, "silly commit")
 
66
 
 
67
        os.mkdir('b')
 
68
        br_b = Branch.initialize("b")
 
69
        self.assertRaises(NoSuchRevision, br_b.get_revision, 
 
70
                          br_a.revision_history()[0])
 
71
        br_a.push_stores(br_b)
 
72
        rev = br_b.get_revision(br_a.revision_history()[0])
 
73
        tree = br_b.revision_tree(br_a.revision_history()[0])
 
74
        for file_id in tree:
 
75
            if tree.inventory[file_id].kind == "file":
 
76
                tree.get_file(file_id).read()
 
77
        return br_a, br_b
 
78
 
 
79
    def test_copy_branch(self):
123
80
        """Copy the stores from one branch to another"""
124
 
        tree_a, tree_b = self.get_balanced_branch_pair()
125
 
        tree_b.commit("silly commit")
 
81
        br_a, br_b = self.test_push_stores()
 
82
        commit(br_b, "silly commit")
126
83
        os.mkdir('c')
127
 
        # this fails to test that the history from a was not used.
128
 
        dir_c = tree_a.bzrdir.clone('c', basis=tree_b.bzrdir)
129
 
        self.assertEqual(tree_a.branch.revision_history(),
130
 
                         dir_c.open_branch().revision_history())
 
84
        br_c = copy_branch(br_a, 'c', basis_branch=br_b)
 
85
        self.assertEqual(br_a.revision_history(), br_c.revision_history())
 
86
        ## # basis branches currently disabled for weave format
 
87
        ## self.assertFalse(br_b.last_revision() in br_c.revision_history())
 
88
        ## br_c.get_revision(br_b.last_revision())
131
89
 
132
 
    def test_clone_partial(self):
 
90
    def test_copy_partial(self):
133
91
        """Copy only part of the history of a branch."""
134
 
        # TODO: RBC 20060208 test with a revision not on revision-history.
135
 
        #       what should that behaviour be ? Emailed the list.
136
 
        wt_a = self.make_branch_and_tree('a')
137
 
        self.build_tree(['a/one'])
138
 
        wt_a.add(['one'])
139
 
        wt_a.commit('commit one', rev_id='1')
140
 
        self.build_tree(['a/two'])
141
 
        wt_a.add(['two'])
142
 
        wt_a.commit('commit two', rev_id='2')
143
 
        repo_b = self.make_repository('b')
144
 
        wt_a.bzrdir.open_repository().copy_content_into(repo_b)
145
 
        br_b = wt_a.bzrdir.open_branch().clone(repo_b.bzrdir, revision_id='1')
146
 
        self.assertEqual(br_b.last_revision(), '1')
147
 
 
148
 
    def test_sprout_partial(self):
149
 
        # test sprouting with a prefix of the revision-history.
150
 
        # also needs not-on-revision-history behaviour defined.
151
 
        wt_a = self.make_branch_and_tree('a')
152
 
        self.build_tree(['a/one'])
153
 
        wt_a.add(['one'])
154
 
        wt_a.commit('commit one', rev_id='1')
155
 
        self.build_tree(['a/two'])
156
 
        wt_a.add(['two'])
157
 
        wt_a.commit('commit two', rev_id='2')
158
 
        repo_b = self.make_repository('b')
159
 
        wt_a.bzrdir.open_repository().copy_content_into(repo_b)
160
 
        br_b = wt_a.bzrdir.open_branch().sprout(repo_b.bzrdir, revision_id='1')
161
 
        self.assertEqual(br_b.last_revision(), '1')
162
 
 
163
 
    def test_clone_branch_nickname(self):
164
 
        # test the nick name is preserved always
165
 
        raise TestSkipped('XXX branch cloning is not yet tested..')
166
 
 
167
 
    def test_clone_branch_parent(self):
168
 
        # test the parent is preserved always
169
 
        raise TestSkipped('XXX branch cloning is not yet tested..')
170
 
        
171
 
    def test_sprout_branch_nickname(self):
172
 
        # test the nick name is reset always
173
 
        raise TestSkipped('XXX branch sprouting is not yet tested..')
174
 
 
175
 
    def test_sprout_branch_parent(self):
176
 
        source = self.make_branch('source')
177
 
        target = source.bzrdir.sprout(self.get_url('target')).open_branch()
178
 
        self.assertEqual(source.bzrdir.root_transport.base, target.get_parent())
179
 
        
 
92
        self.build_tree(['a/', 'a/one'])
 
93
        br_a = Branch.initialize('a')
 
94
        br_a.add(['one'])
 
95
        br_a.commit('commit one', rev_id='u@d-1')
 
96
        self.build_tree(['a/two'])
 
97
        br_a.add(['two'])
 
98
        br_a.commit('commit two', rev_id='u@d-2')
 
99
        br_b = copy_branch(br_a, 'b', revision='u@d-1')
 
100
        self.assertEqual(br_b.last_revision(), 'u@d-1')
 
101
        self.assertTrue(os.path.exists('b/one'))
 
102
        self.assertFalse(os.path.exists('b/two'))
 
103
        
 
104
 
180
105
    def test_record_initial_ghost_merge(self):
181
106
        """A pending merge with no revision present is still a merge."""
182
 
        wt = self.make_branch_and_tree('.')
183
 
        branch = wt.branch
184
 
        wt.add_pending_merge('non:existent@rev--ision--0--2')
185
 
        wt.commit('pretend to merge nonexistent-revision', rev_id='first')
186
 
        rev = branch.repository.get_revision(branch.last_revision())
 
107
        branch = Branch.initialize('.')
 
108
        branch.add_pending_merge('non:existent@rev--ision--0--2')
 
109
        branch.commit('pretend to merge nonexistent-revision', rev_id='first')
 
110
        rev = branch.get_revision(branch.last_revision())
187
111
        self.assertEqual(len(rev.parent_ids), 1)
188
112
        # parent_sha1s is not populated now, WTF. rbc 20051003
189
113
        self.assertEqual(len(rev.parent_sha1s), 0)
190
114
        self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
191
115
 
192
 
    def test_bad_revision(self):
193
 
        self.assertRaises(errors.InvalidRevisionId,
194
 
                          self.get_branch().repository.get_revision,
195
 
                          None)
196
 
 
197
116
# TODO 20051003 RBC:
198
117
# compare the gpg-to-sign info for a commit with a ghost and 
199
118
#     an identical tree without a ghost
201
120
        
202
121
    def test_pending_merges(self):
203
122
        """Tracking pending-merged revisions."""
204
 
        wt = self.make_branch_and_tree('.')
205
 
        b = wt.branch
206
 
        self.assertEquals(wt.pending_merges(), [])
207
 
        wt.add_pending_merge('foo@azkhazan-123123-abcabc')
208
 
        self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
209
 
        wt.add_pending_merge('foo@azkhazan-123123-abcabc')
210
 
        self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
211
 
        wt.add_pending_merge('wibble@fofof--20050401--1928390812')
212
 
        self.assertEquals(wt.pending_merges(),
 
123
        b = Branch.initialize('.')
 
124
 
 
125
        self.assertEquals(b.pending_merges(), [])
 
126
        b.add_pending_merge('foo@azkhazan-123123-abcabc')
 
127
        self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
 
128
        b.add_pending_merge('foo@azkhazan-123123-abcabc')
 
129
        self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
 
130
        b.add_pending_merge('wibble@fofof--20050401--1928390812')
 
131
        self.assertEquals(b.pending_merges(),
213
132
                          ['foo@azkhazan-123123-abcabc',
214
133
                           'wibble@fofof--20050401--1928390812'])
215
 
        wt.commit("commit from base with two merges")
216
 
        rev = b.repository.get_revision(b.revision_history()[0])
 
134
        b.commit("commit from base with two merges")
 
135
        rev = b.get_revision(b.revision_history()[0])
217
136
        self.assertEquals(len(rev.parent_ids), 2)
218
137
        self.assertEquals(rev.parent_ids[0],
219
138
                          'foo@azkhazan-123123-abcabc')
220
139
        self.assertEquals(rev.parent_ids[1],
221
140
                           'wibble@fofof--20050401--1928390812')
222
141
        # list should be cleared when we do a commit
223
 
        self.assertEquals(wt.pending_merges(), [])
224
 
 
225
 
    def test_sign_existing_revision(self):
226
 
        wt = self.make_branch_and_tree('.')
227
 
        branch = wt.branch
228
 
        wt.commit("base", allow_pointless=True, rev_id='A')
229
 
        from bzrlib.testament import Testament
230
 
        strategy = bzrlib.gpg.LoopbackGPGStrategy(None)
231
 
        branch.repository.sign_revision('A', strategy)
232
 
        self.assertEqual(Testament.from_revision(branch.repository, 
233
 
                         'A').as_short_text(),
234
 
                         branch.repository.get_signature_text('A'))
235
 
 
236
 
    def test_store_signature(self):
237
 
        wt = self.make_branch_and_tree('.')
238
 
        branch = wt.branch
239
 
        branch.repository.store_revision_signature(
240
 
            bzrlib.gpg.LoopbackGPGStrategy(None), 'FOO', 'A')
241
 
        self.assertRaises(errors.NoSuchRevision,
242
 
                          branch.repository.has_signature_for_revision_id,
243
 
                          'A')
244
 
        wt.commit("base", allow_pointless=True, rev_id='A')
245
 
        self.assertEqual('FOO', 
246
 
                         branch.repository.get_signature_text('A'))
247
 
 
248
 
    def test_branch_keeps_signatures(self):
249
 
        wt = self.make_branch_and_tree('source')
250
 
        wt.commit('A', allow_pointless=True, rev_id='A')
251
 
        wt.branch.repository.sign_revision('A',
252
 
            bzrlib.gpg.LoopbackGPGStrategy(None))
253
 
        #FIXME: clone should work to urls,
254
 
        # wt.clone should work to disks.
255
 
        self.build_tree(['target/'])
256
 
        d2 = wt.bzrdir.clone('target')
257
 
        self.assertEqual(wt.branch.repository.get_signature_text('A'),
258
 
                         d2.open_repository().get_signature_text('A'))
259
 
 
260
 
    def test_nicks(self):
261
 
        """Branch nicknames"""
262
 
        t = get_transport(self.get_url())
263
 
        t.mkdir('bzr.dev')
264
 
        branch = self.make_branch('bzr.dev')
265
 
        self.assertEqual(branch.nick, 'bzr.dev')
266
 
        t.move('bzr.dev', 'bzr.ab')
267
 
        branch = Branch.open(self.get_url('bzr.ab'))
268
 
        self.assertEqual(branch.nick, 'bzr.ab')
269
 
        branch.nick = "Aaron's branch"
270
 
        branch.nick = "Aaron's branch"
271
 
        self.failUnless(
272
 
            t.has(
273
 
                t.relpath(
274
 
                    branch.control_files.controlfilename("branch.conf")
275
 
                    )
276
 
                )
277
 
            )
278
 
        self.assertEqual(branch.nick, "Aaron's branch")
279
 
        t.move('bzr.ab', 'integration')
280
 
        branch = Branch.open(self.get_url('integration'))
281
 
        self.assertEqual(branch.nick, "Aaron's branch")
282
 
        branch.nick = u"\u1234"
283
 
        self.assertEqual(branch.nick, u"\u1234")
284
 
 
285
 
    def test_commit_nicks(self):
286
 
        """Nicknames are committed to the revision"""
287
 
        get_transport(self.get_url()).mkdir('bzr.dev')
288
 
        wt = self.make_branch_and_tree('bzr.dev')
289
 
        branch = wt.branch
290
 
        branch.nick = "My happy branch"
291
 
        wt.commit('My commit respect da nick.')
292
 
        committed = branch.repository.get_revision(branch.last_revision())
293
 
        self.assertEqual(committed.properties["branch-nick"], 
294
 
                         "My happy branch")
295
 
 
296
 
    def test_create_open_branch_uses_repository(self):
297
 
        try:
298
 
            repo = self.make_repository('.', shared=True)
299
 
        except errors.IncompatibleFormat:
300
 
            return
301
 
        repo.bzrdir.root_transport.mkdir('child')
302
 
        child_dir = self.bzrdir_format.initialize('child')
303
 
        try:
304
 
            child_branch = self.branch_format.initialize(child_dir)
305
 
        except errors.UninitializableFormat:
306
 
            # branch references are not default init'able.
307
 
            return
308
 
        self.assertEqual(repo.bzrdir.root_transport.base,
309
 
                         child_branch.repository.bzrdir.root_transport.base)
310
 
        child_branch = bzrlib.branch.Branch.open(self.get_url('child'))
311
 
        self.assertEqual(repo.bzrdir.root_transport.base,
312
 
                         child_branch.repository.bzrdir.root_transport.base)
313
 
 
314
 
    def test_format_description(self):
315
 
        tree = self.make_branch_and_tree('tree')
316
 
        text = tree.branch._format.get_format_description()
317
 
        self.failUnless(len(text))
318
 
 
319
 
 
320
 
class ChrootedTests(TestCaseWithBranch):
321
 
    """A support class that provides readonly urls outside the local namespace.
322
 
 
323
 
    This is done by checking if self.transport_server is a MemoryServer. if it
324
 
    is then we are chrooted already, if it is not then an HttpServer is used
325
 
    for readonly urls.
326
 
    """
327
 
 
328
 
    def setUp(self):
329
 
        super(ChrootedTests, self).setUp()
330
 
        if not self.transport_server == MemoryServer:
331
 
            self.transport_readonly_server = HttpServer
 
142
        self.assertEquals(b.pending_merges(), [])
 
143
 
 
144
 
 
145
class TestRemote(TestCaseWithWebserver):
332
146
 
333
147
    def test_open_containing(self):
334
148
        self.assertRaises(NotBranchError, Branch.open_containing,
335
 
                          self.get_readonly_url(''))
 
149
                          self.get_remote_url(''))
336
150
        self.assertRaises(NotBranchError, Branch.open_containing,
337
 
                          self.get_readonly_url('g/p/q'))
338
 
        branch = self.make_branch('.')
339
 
        branch, relpath = Branch.open_containing(self.get_readonly_url(''))
340
 
        self.assertEqual('', relpath)
341
 
        branch, relpath = Branch.open_containing(self.get_readonly_url('g/p/q'))
342
 
        self.assertEqual('g/p/q', relpath)
 
151
                          self.get_remote_url('g/p/q'))
 
152
        b = Branch.initialize('.')
 
153
        Branch.open_containing(self.get_remote_url(''))
 
154
        Branch.open_containing(self.get_remote_url('g/p/q'))
343
155
        
344
156
# TODO: rewrite this as a regular unittest, without relying on the displayed output        
345
157
#         >>> from bzrlib.commit import commit
346
158
#         >>> bzrlib.trace.silent = True
347
159
#         >>> br1 = ScratchBranch(files=['foo', 'bar'])
348
 
#         >>> br1.working_tree().add('foo')
349
 
#         >>> br1.working_tree().add('bar')
 
160
#         >>> br1.add('foo')
 
161
#         >>> br1.add('bar')
350
162
#         >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
351
163
#         >>> br2 = ScratchBranch()
352
164
#         >>> br2.update_revisions(br1)
369
181
        self.calls = []
370
182
 
371
183
 
372
 
class TestDecorator(object):
373
 
 
374
 
    def __init__(self):
375
 
        self._calls = []
376
 
 
377
 
    def lock_read(self):
378
 
        self._calls.append('lr')
379
 
 
380
 
    def lock_write(self):
381
 
        self._calls.append('lw')
382
 
 
383
 
    def unlock(self):
384
 
        self._calls.append('ul')
385
 
 
386
 
    @needs_read_lock
387
 
    def do_with_read(self):
388
 
        return 1
389
 
 
390
 
    @needs_read_lock
391
 
    def except_with_read(self):
392
 
        raise RuntimeError
393
 
 
394
 
    @needs_write_lock
395
 
    def do_with_write(self):
396
 
        return 2
397
 
 
398
 
    @needs_write_lock
399
 
    def except_with_write(self):
400
 
        raise RuntimeError
401
 
 
402
 
 
403
 
class TestDecorators(TestCase):
404
 
 
405
 
    def test_needs_read_lock(self):
406
 
        branch = TestDecorator()
407
 
        self.assertEqual(1, branch.do_with_read())
408
 
        self.assertEqual(['lr', 'ul'], branch._calls)
409
 
 
410
 
    def test_excepts_in_read_lock(self):
411
 
        branch = TestDecorator()
412
 
        self.assertRaises(RuntimeError, branch.except_with_read)
413
 
        self.assertEqual(['lr', 'ul'], branch._calls)
414
 
 
415
 
    def test_needs_write_lock(self):
416
 
        branch = TestDecorator()
417
 
        self.assertEqual(2, branch.do_with_write())
418
 
        self.assertEqual(['lw', 'ul'], branch._calls)
419
 
 
420
 
    def test_excepts_in_write_lock(self):
421
 
        branch = TestDecorator()
422
 
        self.assertRaises(RuntimeError, branch.except_with_write)
423
 
        self.assertEqual(['lw', 'ul'], branch._calls)
424
 
 
425
 
 
426
 
class TestBranchTransaction(TestCaseWithBranch):
 
184
class TestBranchTransaction(TestCaseInTempDir):
427
185
 
428
186
    def setUp(self):
429
187
        super(TestBranchTransaction, self).setUp()
430
 
        self.branch = None
 
188
        self.branch = Branch.initialize('.')
431
189
        
432
190
    def test_default_get_transaction(self):
433
191
        """branch.get_transaction on a new branch should give a PassThrough."""
434
 
        self.failUnless(isinstance(self.get_branch().get_transaction(),
 
192
        self.failUnless(isinstance(self.branch.get_transaction(),
435
193
                                   transactions.PassThroughTransaction))
436
194
 
437
195
    def test__set_new_transaction(self):
438
 
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
 
196
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
439
197
 
440
198
    def test__set_over_existing_transaction_raises(self):
441
 
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
 
199
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
442
200
        self.assertRaises(errors.LockError,
443
 
                          self.get_branch()._set_transaction,
 
201
                          self.branch._set_transaction,
444
202
                          transactions.ReadOnlyTransaction())
445
203
 
446
204
    def test_finish_no_transaction_raises(self):
447
 
        self.assertRaises(errors.LockError, self.get_branch()._finish_transaction)
 
205
        self.assertRaises(errors.LockError, self.branch._finish_transaction)
448
206
 
449
207
    def test_finish_readonly_transaction_works(self):
450
 
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
451
 
        self.get_branch()._finish_transaction()
452
 
        self.assertEqual(None, self.get_branch().control_files._transaction)
 
208
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
209
        self.branch._finish_transaction()
 
210
        self.assertEqual(None, self.branch._transaction)
453
211
 
454
212
    def test_unlock_calls_finish(self):
455
 
        self.get_branch().lock_read()
 
213
        self.branch.lock_read()
456
214
        transaction = InstrumentedTransaction()
457
 
        self.get_branch().control_files._transaction = transaction
458
 
        self.get_branch().unlock()
 
215
        self.branch._transaction = transaction
 
216
        self.branch.unlock()
459
217
        self.assertEqual(['finish'], transaction.calls)
460
218
 
461
219
    def test_lock_read_acquires_ro_transaction(self):
462
 
        self.get_branch().lock_read()
463
 
        self.failUnless(isinstance(self.get_branch().get_transaction(),
 
220
        self.branch.lock_read()
 
221
        self.failUnless(isinstance(self.branch.get_transaction(),
464
222
                                   transactions.ReadOnlyTransaction))
465
 
        self.get_branch().unlock()
 
223
        self.branch.unlock()
466
224
        
467
 
    def test_lock_write_acquires_write_transaction(self):
468
 
        self.get_branch().lock_write()
 
225
    def test_lock_write_acquires_passthrough_transaction(self):
 
226
        self.branch.lock_write()
469
227
        # cannot use get_transaction as its magic
470
 
        self.failUnless(isinstance(self.get_branch().control_files._transaction,
471
 
                                   transactions.WriteTransaction))
472
 
        self.get_branch().unlock()
473
 
 
474
 
 
475
 
class TestBranchPushLocations(TestCaseWithBranch):
476
 
 
477
 
    def test_get_push_location_unset(self):
478
 
        self.assertEqual(None, self.get_branch().get_push_location())
479
 
 
480
 
    def test_get_push_location_exact(self):
481
 
        from bzrlib.config import (branches_config_filename,
482
 
                                   ensure_config_dir_exists)
483
 
        ensure_config_dir_exists()
484
 
        fn = branches_config_filename()
485
 
        print >> open(fn, 'wt'), ("[%s]\n"
486
 
                                  "push_location=foo" %
487
 
                                  self.get_branch().base[:-1])
488
 
        self.assertEqual("foo", self.get_branch().get_push_location())
489
 
 
490
 
    def test_set_push_location(self):
491
 
        from bzrlib.config import (branches_config_filename,
492
 
                                   ensure_config_dir_exists)
493
 
        ensure_config_dir_exists()
494
 
        fn = branches_config_filename()
495
 
        self.get_branch().set_push_location('foo')
496
 
        self.assertFileEqual("[%s]\n"
497
 
                             "push_location = foo" % self.get_branch().base[:-1],
498
 
                             fn)
499
 
 
500
 
    # TODO RBC 20051029 test getting a push location from a branch in a 
501
 
    # recursive section - that is, it appends the branch name.
502
 
 
503
 
 
504
 
class TestFormat(TestCaseWithBranch):
505
 
    """Tests for the format itself."""
506
 
 
507
 
    def test_format_initialize_find_open(self):
508
 
        # loopback test to check the current format initializes to itself.
509
 
        if not self.branch_format.is_supported():
510
 
            # unsupported formats are not loopback testable
511
 
            # because the default open will not open them and
512
 
            # they may not be initializable.
513
 
            return
514
 
        # supported formats must be able to init and open
515
 
        t = get_transport(self.get_url())
516
 
        readonly_t = get_transport(self.get_readonly_url())
517
 
        made_branch = self.make_branch('.')
518
 
        self.failUnless(isinstance(made_branch, bzrlib.branch.Branch))
519
 
 
520
 
        # find it via bzrdir opening:
521
 
        opened_control = bzrdir.BzrDir.open(readonly_t.base)
522
 
        direct_opened_branch = opened_control.open_branch()
523
 
        self.assertEqual(direct_opened_branch.__class__, made_branch.__class__)
524
 
        self.assertEqual(opened_control, direct_opened_branch.bzrdir)
525
 
        self.failUnless(isinstance(direct_opened_branch._format,
526
 
                        self.branch_format.__class__))
527
 
 
528
 
        # find it via Branch.open
529
 
        opened_branch = bzrlib.branch.Branch.open(readonly_t.base)
530
 
        self.failUnless(isinstance(opened_branch, made_branch.__class__))
531
 
        self.assertEqual(made_branch._format.__class__,
532
 
                         opened_branch._format.__class__)
533
 
        # if it has a unique id string, can we probe for it ?
534
 
        try:
535
 
            self.branch_format.get_format_string()
536
 
        except NotImplementedError:
537
 
            return
538
 
        self.assertEqual(self.branch_format,
539
 
                         bzrlib.branch.BranchFormat.find_format(opened_control))
 
228
        self.failUnless(isinstance(self.branch._transaction,
 
229
                                   transactions.PassThroughTransaction))
 
230
        self.branch.unlock()