~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/selftest/testbranch.py

  • Committer: Robey Pointer
  • Date: 2005-11-07 05:02:04 UTC
  • mto: (1185.41.1 bzr.sftp)
  • mto: This revision was merged to the branch mainline in revision 1512.
  • Revision ID: robey@lag.net-20051107050204-885fb734a3dd3906
prefetch files under paramiko 1.5.1 for improved speed

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006 Canonical Ltd
 
1
# (C) 2005 Canonical Ltd
2
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
 
"""Tests for branch implementations - tests a branch format."""
18
 
 
19
17
import os
20
 
import sys
21
18
 
22
 
from bzrlib import (
23
 
    branch,
24
 
    bzrdir,
25
 
    errors,
26
 
    gpg,
27
 
    urlutils,
28
 
    transactions,
29
 
    repository,
30
 
    )
31
19
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
32
 
from bzrlib.delta import TreeDelta
33
 
from bzrlib.errors import (FileExists,
34
 
                           NoSuchRevision,
35
 
                           NoSuchFile,
36
 
                           UninitializableFormat,
37
 
                           NotBranchError,
38
 
                           )
39
 
from bzrlib.osutils import getcwd
40
 
import bzrlib.revision
41
 
from bzrlib.tests import TestCase, TestCaseWithTransport, TestSkipped
42
 
from bzrlib.tests.bzrdir_implementations.test_bzrdir import TestCaseWithBzrDir
 
20
from bzrlib.clone import copy_branch
 
21
from bzrlib.commit import commit
 
22
import bzrlib.errors as errors
 
23
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
 
24
import bzrlib.gpg
 
25
from bzrlib.selftest import TestCase, TestCaseInTempDir
 
26
from bzrlib.selftest.HTTPTestUtil import TestCaseWithWebserver
43
27
from bzrlib.trace import mutter
44
 
from bzrlib.transport import get_transport
45
 
from bzrlib.transport.http import HttpServer
46
 
from bzrlib.transport.memory import MemoryServer
47
 
from bzrlib.upgrade import upgrade
48
 
from bzrlib.workingtree import WorkingTree
49
 
 
 
28
import bzrlib.transactions as transactions
 
29
from bzrlib.revision import NULL_REVISION
50
30
 
51
31
# TODO: Make a branch using basis branch, and check that it 
52
32
# doesn't request any files that could have been avoided, by 
53
33
# hooking into the Transport.
54
34
 
55
 
 
56
 
class TestCaseWithBranch(TestCaseWithBzrDir):
57
 
 
58
 
    def setUp(self):
59
 
        super(TestCaseWithBranch, self).setUp()
60
 
        self.branch = None
61
 
 
62
 
    def get_branch(self):
63
 
        if self.branch is None:
64
 
            self.branch = self.make_branch('')
65
 
        return self.branch
66
 
 
67
 
    def make_branch(self, relpath, format=None):
68
 
        repo = self.make_repository(relpath, format=format)
69
 
        # fixme RBC 20060210 this isnt necessarily a fixable thing,
70
 
        # Skipped is the wrong exception to raise.
71
 
        try:
72
 
            return self.branch_format.initialize(repo.bzrdir)
73
 
        except errors.UninitializableFormat:
74
 
            raise TestSkipped('Uninitializable branch format')
75
 
 
76
 
    def make_repository(self, relpath, shared=False, format=None):
77
 
        made_control = self.make_bzrdir(relpath, format=format)
78
 
        return made_control.create_repository(shared=shared)
79
 
 
80
 
 
81
 
class TestBranch(TestCaseWithBranch):
 
35
class TestBranch(TestCaseInTempDir):
82
36
 
83
37
    def test_append_revisions(self):
84
38
        """Test appending more than one revision"""
85
 
        br = self.get_branch()
 
39
        br = Branch.initialize(".")
86
40
        br.append_revision("rev1")
87
41
        self.assertEquals(br.revision_history(), ["rev1",])
88
42
        br.append_revision("rev2", "rev3")
90
44
 
91
45
    def test_fetch_revisions(self):
92
46
        """Test fetch-revision operation."""
93
 
        get_transport(self.get_url()).mkdir('b1')
94
 
        get_transport(self.get_url()).mkdir('b2')
95
 
        wt = self.make_branch_and_tree('b1')
96
 
        b1 = wt.branch
97
 
        b2 = self.make_branch('b2')
98
 
        file('b1/foo', 'w').write('hello')
99
 
        wt.add(['foo'], ['foo-id'])
100
 
        wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
 
47
        from bzrlib.fetch import Fetcher
 
48
        os.mkdir('b1')
 
49
        os.mkdir('b2')
 
50
        b1 = Branch.initialize('b1')
 
51
        b2 = Branch.initialize('b2')
 
52
        file(os.sep.join(['b1', 'foo']), 'w').write('hello')
 
53
        b1.add(['foo'], ['foo-id'])
 
54
        b1.commit('lala!', rev_id='revision-1', allow_pointless=False)
101
55
 
102
56
        mutter('start fetch')
103
 
        self.assertEqual((1, []), b2.fetch(b1))
104
 
 
105
 
        rev = b2.repository.get_revision('revision-1')
106
 
        tree = b2.repository.revision_tree('revision-1')
107
 
        self.assertEqual(tree.get_file_text('foo-id'), 'hello')
108
 
 
109
 
    def test_get_revision_delta(self):
110
 
        tree_a = self.make_branch_and_tree('a')
111
 
        self.build_tree(['a/foo'])
112
 
        tree_a.add('foo', 'file1')
113
 
        tree_a.commit('rev1', rev_id='rev1')
114
 
        self.build_tree(['a/vla'])
115
 
        tree_a.add('vla', 'file2')
116
 
        tree_a.commit('rev2', rev_id='rev2')
117
 
 
118
 
        delta = tree_a.branch.get_revision_delta(1)
119
 
        self.assertIsInstance(delta, TreeDelta)
120
 
        self.assertEqual([('foo', 'file1', 'file')], delta.added)
121
 
        delta = tree_a.branch.get_revision_delta(2)
122
 
        self.assertIsInstance(delta, TreeDelta)
123
 
        self.assertEqual([('vla', 'file2', 'file')], delta.added)
124
 
 
125
 
    def get_unbalanced_tree_pair(self):
 
57
        f = Fetcher(from_branch=b1, to_branch=b2)
 
58
        eq = self.assertEquals
 
59
        eq(f.count_copied, 1)
 
60
        eq(f.last_revision, 'revision-1')
 
61
 
 
62
        rev = b2.get_revision('revision-1')
 
63
        tree = b2.revision_tree('revision-1')
 
64
        eq(tree.get_file_text('foo-id'), 'hello')
 
65
 
 
66
    def test_revision_tree(self):
 
67
        b1 = Branch.initialize('.')
 
68
        b1.commit('lala!', rev_id='revision-1', allow_pointless=True)
 
69
        tree = b1.revision_tree('revision-1')
 
70
        tree = b1.revision_tree(None)
 
71
        self.assertEqual(len(tree.list_files()), 0)
 
72
        tree = b1.revision_tree(NULL_REVISION)
 
73
        self.assertEqual(len(tree.list_files()), 0)
 
74
 
 
75
    def get_unbalanced_branch_pair(self):
126
76
        """Return two branches, a and b, with one file in a."""
127
 
        get_transport(self.get_url()).mkdir('a')
128
 
        tree_a = self.make_branch_and_tree('a')
 
77
        os.mkdir('a')
 
78
        br_a = Branch.initialize("a")
129
79
        file('a/b', 'wb').write('b')
130
 
        tree_a.add('b')
131
 
        tree_a.commit("silly commit", rev_id='A')
132
 
 
133
 
        get_transport(self.get_url()).mkdir('b')
134
 
        tree_b = self.make_branch_and_tree('b')
135
 
        return tree_a, tree_b
 
80
        br_a.add('b')
 
81
        commit(br_a, "silly commit", rev_id='A')
 
82
        os.mkdir('b')
 
83
        br_b = Branch.initialize("b")
 
84
        return br_a, br_b
136
85
 
137
86
    def get_balanced_branch_pair(self):
138
87
        """Returns br_a, br_b as with one commit in a, and b has a's stores."""
139
 
        tree_a, tree_b = self.get_unbalanced_tree_pair()
140
 
        tree_b.branch.repository.fetch(tree_a.branch.repository)
141
 
        return tree_a, tree_b
142
 
 
143
 
    def test_clone_branch(self):
144
 
        """Copy the stores from one branch to another"""
145
 
        tree_a, tree_b = self.get_balanced_branch_pair()
146
 
        tree_b.commit("silly commit")
 
88
        br_a, br_b = self.get_unbalanced_branch_pair()
 
89
        br_a.push_stores(br_b)
 
90
        return br_a, br_b
 
91
 
 
92
    def test_push_stores(self):
 
93
        """Copy the stores from one branch to another"""
 
94
        br_a, br_b = self.get_unbalanced_branch_pair()
 
95
        # ensure the revision is missing.
 
96
        self.assertRaises(NoSuchRevision, br_b.get_revision, 
 
97
                          br_a.revision_history()[0])
 
98
        br_a.push_stores(br_b)
 
99
        # check that b now has all the data from a's first commit.
 
100
        rev = br_b.get_revision(br_a.revision_history()[0])
 
101
        tree = br_b.revision_tree(br_a.revision_history()[0])
 
102
        for file_id in tree:
 
103
            if tree.inventory[file_id].kind == "file":
 
104
                tree.get_file(file_id).read()
 
105
        return br_a, br_b
 
106
 
 
107
    def test_copy_branch(self):
 
108
        """Copy the stores from one branch to another"""
 
109
        br_a, br_b = self.get_balanced_branch_pair()
 
110
        commit(br_b, "silly commit")
147
111
        os.mkdir('c')
148
 
        # this fails to test that the history from a was not used.
149
 
        dir_c = tree_a.bzrdir.clone('c', basis=tree_b.bzrdir)
150
 
        self.assertEqual(tree_a.branch.revision_history(),
151
 
                         dir_c.open_branch().revision_history())
 
112
        br_c = copy_branch(br_a, 'c', basis_branch=br_b)
 
113
        self.assertEqual(br_a.revision_history(), br_c.revision_history())
152
114
 
153
 
    def test_clone_partial(self):
 
115
    def test_copy_partial(self):
154
116
        """Copy only part of the history of a branch."""
155
 
        # TODO: RBC 20060208 test with a revision not on revision-history.
156
 
        #       what should that behaviour be ? Emailed the list.
157
 
        wt_a = self.make_branch_and_tree('a')
158
 
        self.build_tree(['a/one'])
159
 
        wt_a.add(['one'])
160
 
        wt_a.commit('commit one', rev_id='1')
161
 
        self.build_tree(['a/two'])
162
 
        wt_a.add(['two'])
163
 
        wt_a.commit('commit two', rev_id='2')
164
 
        repo_b = self.make_repository('b')
165
 
        wt_a.bzrdir.open_repository().copy_content_into(repo_b)
166
 
        br_b = wt_a.bzrdir.open_branch().clone(repo_b.bzrdir, revision_id='1')
167
 
        self.assertEqual(br_b.last_revision(), '1')
168
 
 
169
 
    def test_sprout_partial(self):
170
 
        # test sprouting with a prefix of the revision-history.
171
 
        # also needs not-on-revision-history behaviour defined.
172
 
        wt_a = self.make_branch_and_tree('a')
173
 
        self.build_tree(['a/one'])
174
 
        wt_a.add(['one'])
175
 
        wt_a.commit('commit one', rev_id='1')
176
 
        self.build_tree(['a/two'])
177
 
        wt_a.add(['two'])
178
 
        wt_a.commit('commit two', rev_id='2')
179
 
        repo_b = self.make_repository('b')
180
 
        wt_a.bzrdir.open_repository().copy_content_into(repo_b)
181
 
        br_b = wt_a.bzrdir.open_branch().sprout(repo_b.bzrdir, revision_id='1')
182
 
        self.assertEqual(br_b.last_revision(), '1')
183
 
 
184
 
    def test_clone_branch_nickname(self):
185
 
        # test the nick name is preserved always
186
 
        raise TestSkipped('XXX branch cloning is not yet tested..')
187
 
 
188
 
    def test_clone_branch_parent(self):
189
 
        # test the parent is preserved always
190
 
        raise TestSkipped('XXX branch cloning is not yet tested..')
191
 
        
192
 
    def test_sprout_branch_nickname(self):
193
 
        # test the nick name is reset always
194
 
        raise TestSkipped('XXX branch sprouting is not yet tested..')
195
 
 
196
 
    def test_sprout_branch_parent(self):
197
 
        source = self.make_branch('source')
198
 
        target = source.bzrdir.sprout(self.get_url('target')).open_branch()
199
 
        self.assertEqual(source.bzrdir.root_transport.base, target.get_parent())
200
 
 
201
 
    def test_submit_branch(self):
202
 
        """Submit location can be queried and set"""
203
 
        branch = self.make_branch('branch')
204
 
        self.assertEqual(branch.get_submit_branch(), None)
205
 
        branch.set_submit_branch('sftp://example.com')
206
 
        self.assertEqual(branch.get_submit_branch(), 'sftp://example.com')
207
 
        branch.set_submit_branch('sftp://example.net')
208
 
        self.assertEqual(branch.get_submit_branch(), 'sftp://example.net')
 
117
        self.build_tree(['a/', 'a/one'])
 
118
        br_a = Branch.initialize('a')
 
119
        br_a.add(['one'])
 
120
        br_a.commit('commit one', rev_id='u@d-1')
 
121
        self.build_tree(['a/two'])
 
122
        br_a.add(['two'])
 
123
        br_a.commit('commit two', rev_id='u@d-2')
 
124
        br_b = copy_branch(br_a, 'b', revision='u@d-1')
 
125
        self.assertEqual(br_b.last_revision(), 'u@d-1')
 
126
        self.assertTrue(os.path.exists('b/one'))
 
127
        self.assertFalse(os.path.exists('b/two'))
209
128
        
210
129
    def test_record_initial_ghost_merge(self):
211
130
        """A pending merge with no revision present is still a merge."""
212
 
        wt = self.make_branch_and_tree('.')
213
 
        branch = wt.branch
214
 
        wt.add_pending_merge('non:existent@rev--ision--0--2')
215
 
        wt.commit('pretend to merge nonexistent-revision', rev_id='first')
216
 
        rev = branch.repository.get_revision(branch.last_revision())
 
131
        branch = Branch.initialize('.')
 
132
        branch.add_pending_merge('non:existent@rev--ision--0--2')
 
133
        branch.commit('pretend to merge nonexistent-revision', rev_id='first')
 
134
        rev = branch.get_revision(branch.last_revision())
217
135
        self.assertEqual(len(rev.parent_ids), 1)
218
136
        # parent_sha1s is not populated now, WTF. rbc 20051003
219
137
        self.assertEqual(len(rev.parent_sha1s), 0)
220
138
        self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
221
139
 
222
140
    def test_bad_revision(self):
223
 
        self.assertRaises(errors.InvalidRevisionId,
224
 
                          self.get_branch().repository.get_revision,
225
 
                          None)
 
141
        branch = Branch.initialize('.')
 
142
        self.assertRaises(errors.InvalidRevisionId, branch.get_revision, None)
226
143
 
227
144
# TODO 20051003 RBC:
228
145
# compare the gpg-to-sign info for a commit with a ghost and 
231
148
        
232
149
    def test_pending_merges(self):
233
150
        """Tracking pending-merged revisions."""
234
 
        wt = self.make_branch_and_tree('.')
235
 
        b = wt.branch
236
 
        self.assertEquals(wt.pending_merges(), [])
237
 
        wt.add_pending_merge('foo@azkhazan-123123-abcabc')
238
 
        self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
239
 
        wt.add_pending_merge('foo@azkhazan-123123-abcabc')
240
 
        self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
241
 
        wt.add_pending_merge('wibble@fofof--20050401--1928390812')
242
 
        self.assertEquals(wt.pending_merges(),
 
151
        b = Branch.initialize('.')
 
152
 
 
153
        self.assertEquals(b.pending_merges(), [])
 
154
        b.add_pending_merge('foo@azkhazan-123123-abcabc')
 
155
        self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
 
156
        b.add_pending_merge('foo@azkhazan-123123-abcabc')
 
157
        self.assertEquals(b.pending_merges(), ['foo@azkhazan-123123-abcabc'])
 
158
        b.add_pending_merge('wibble@fofof--20050401--1928390812')
 
159
        self.assertEquals(b.pending_merges(),
243
160
                          ['foo@azkhazan-123123-abcabc',
244
161
                           'wibble@fofof--20050401--1928390812'])
245
 
        wt.commit("commit from base with two merges")
246
 
        rev = b.repository.get_revision(b.revision_history()[0])
 
162
        b.commit("commit from base with two merges")
 
163
        rev = b.get_revision(b.revision_history()[0])
247
164
        self.assertEquals(len(rev.parent_ids), 2)
248
165
        self.assertEquals(rev.parent_ids[0],
249
166
                          'foo@azkhazan-123123-abcabc')
250
167
        self.assertEquals(rev.parent_ids[1],
251
168
                           'wibble@fofof--20050401--1928390812')
252
169
        # list should be cleared when we do a commit
253
 
        self.assertEquals(wt.pending_merges(), [])
 
170
        self.assertEquals(b.pending_merges(), [])
254
171
 
255
172
    def test_sign_existing_revision(self):
256
 
        wt = self.make_branch_and_tree('.')
257
 
        branch = wt.branch
258
 
        wt.commit("base", allow_pointless=True, rev_id='A')
 
173
        branch = Branch.initialize('.')
 
174
        branch.commit("base", allow_pointless=True, rev_id='A')
259
175
        from bzrlib.testament import Testament
260
 
        strategy = gpg.LoopbackGPGStrategy(None)
261
 
        branch.repository.sign_revision('A', strategy)
262
 
        self.assertEqual(Testament.from_revision(branch.repository, 
263
 
                         'A').as_short_text(),
264
 
                         branch.repository.get_signature_text('A'))
 
176
        branch.sign_revision('A', bzrlib.gpg.LoopbackGPGStrategy(None))
 
177
        self.assertEqual(Testament.from_revision(branch, 'A').as_short_text(),
 
178
                         branch.revision_store.get('A', 'sig').read())
265
179
 
266
180
    def test_store_signature(self):
267
 
        wt = self.make_branch_and_tree('.')
268
 
        branch = wt.branch
269
 
        branch.repository.store_revision_signature(
270
 
            gpg.LoopbackGPGStrategy(None), 'FOO', 'A')
271
 
        self.assertRaises(errors.NoSuchRevision,
272
 
                          branch.repository.has_signature_for_revision_id,
273
 
                          'A')
274
 
        wt.commit("base", allow_pointless=True, rev_id='A')
275
 
        self.assertEqual('FOO', 
276
 
                         branch.repository.get_signature_text('A'))
277
 
 
278
 
    def test_branch_keeps_signatures(self):
279
 
        wt = self.make_branch_and_tree('source')
280
 
        wt.commit('A', allow_pointless=True, rev_id='A')
281
 
        wt.branch.repository.sign_revision('A',
282
 
            gpg.LoopbackGPGStrategy(None))
283
 
        #FIXME: clone should work to urls,
284
 
        # wt.clone should work to disks.
285
 
        self.build_tree(['target/'])
286
 
        d2 = wt.bzrdir.clone('target')
287
 
        self.assertEqual(wt.branch.repository.get_signature_text('A'),
288
 
                         d2.open_repository().get_signature_text('A'))
289
 
 
290
 
    def test_nicks(self):
291
 
        """Branch nicknames"""
292
 
        t = get_transport(self.get_url())
293
 
        t.mkdir('bzr.dev')
294
 
        branch = self.make_branch('bzr.dev')
295
 
        self.assertEqual(branch.nick, 'bzr.dev')
296
 
        t.move('bzr.dev', 'bzr.ab')
297
 
        branch = Branch.open(self.get_url('bzr.ab'))
298
 
        self.assertEqual(branch.nick, 'bzr.ab')
299
 
        branch.nick = "Aaron's branch"
300
 
        branch.nick = "Aaron's branch"
301
 
        self.failUnless(
302
 
            t.has(
303
 
                t.relpath(
304
 
                    branch.control_files.controlfilename("branch.conf")
305
 
                    )
306
 
                )
307
 
            )
308
 
        self.assertEqual(branch.nick, "Aaron's branch")
309
 
        t.move('bzr.ab', 'integration')
310
 
        branch = Branch.open(self.get_url('integration'))
311
 
        self.assertEqual(branch.nick, "Aaron's branch")
312
 
        branch.nick = u"\u1234"
313
 
        self.assertEqual(branch.nick, u"\u1234")
314
 
 
315
 
    def test_commit_nicks(self):
316
 
        """Nicknames are committed to the revision"""
317
 
        get_transport(self.get_url()).mkdir('bzr.dev')
318
 
        wt = self.make_branch_and_tree('bzr.dev')
319
 
        branch = wt.branch
320
 
        branch.nick = "My happy branch"
321
 
        wt.commit('My commit respect da nick.')
322
 
        committed = branch.repository.get_revision(branch.last_revision())
323
 
        self.assertEqual(committed.properties["branch-nick"], 
324
 
                         "My happy branch")
325
 
 
326
 
    def test_create_open_branch_uses_repository(self):
327
 
        try:
328
 
            repo = self.make_repository('.', shared=True)
329
 
        except errors.IncompatibleFormat:
330
 
            return
331
 
        repo.bzrdir.root_transport.mkdir('child')
332
 
        child_dir = self.bzrdir_format.initialize('child')
333
 
        try:
334
 
            child_branch = self.branch_format.initialize(child_dir)
335
 
        except errors.UninitializableFormat:
336
 
            # branch references are not default init'able.
337
 
            return
338
 
        self.assertEqual(repo.bzrdir.root_transport.base,
339
 
                         child_branch.repository.bzrdir.root_transport.base)
340
 
        child_branch = branch.Branch.open(self.get_url('child'))
341
 
        self.assertEqual(repo.bzrdir.root_transport.base,
342
 
                         child_branch.repository.bzrdir.root_transport.base)
343
 
 
344
 
    def test_format_description(self):
345
 
        tree = self.make_branch_and_tree('tree')
346
 
        text = tree.branch._format.get_format_description()
347
 
        self.failUnless(len(text))
348
 
 
349
 
    def test_check_branch_report_results(self):
350
 
        """Checking a branch produces results which can be printed"""
351
 
        branch = self.make_branch('.')
352
 
        result = branch.check()
353
 
        # reports results through logging
354
 
        result.report_results(verbose=True)
355
 
        result.report_results(verbose=False)
356
 
 
357
 
    def test_get_commit_builder(self):
358
 
        self.assertIsInstance(self.make_branch(".").get_commit_builder([]), 
359
 
            repository.CommitBuilder)
360
 
 
361
 
    def test_generate_revision_history(self):
362
 
        """Create a fake revision history easily."""
363
 
        tree = self.make_branch_and_tree('.')
364
 
        rev1 = tree.commit('foo')
365
 
        orig_history = tree.branch.revision_history()
366
 
        rev2 = tree.commit('bar', allow_pointless=True)
367
 
        tree.branch.generate_revision_history(rev1)
368
 
        self.assertEqual(orig_history, tree.branch.revision_history())
369
 
 
370
 
    def test_generate_revision_history_NULL_REVISION(self):
371
 
        tree = self.make_branch_and_tree('.')
372
 
        rev1 = tree.commit('foo')
373
 
        tree.branch.generate_revision_history(bzrlib.revision.NULL_REVISION)
374
 
        self.assertEqual([], tree.branch.revision_history())
375
 
 
376
 
 
377
 
class ChrootedTests(TestCaseWithBranch):
378
 
    """A support class that provides readonly urls outside the local namespace.
379
 
 
380
 
    This is done by checking if self.transport_server is a MemoryServer. if it
381
 
    is then we are chrooted already, if it is not then an HttpServer is used
382
 
    for readonly urls.
383
 
    """
384
 
 
385
 
    def setUp(self):
386
 
        super(ChrootedTests, self).setUp()
387
 
        if not self.transport_server == MemoryServer:
388
 
            self.transport_readonly_server = HttpServer
 
181
        branch = Branch.initialize('.')
 
182
        branch.store_revision_signature(bzrlib.gpg.LoopbackGPGStrategy(None),
 
183
                                        'FOO', 'A')
 
184
        self.assertEqual('FOO', branch.revision_store.get('A', 'sig').read())
 
185
 
 
186
    def test__relcontrolfilename(self):
 
187
        branch = Branch.initialize('.')
 
188
        self.assertEqual('.bzr/%25', branch._rel_controlfilename('%'))
 
189
        
 
190
    def test__relcontrolfilename_empty(self):
 
191
        branch = Branch.initialize('.')
 
192
        self.assertEqual('.bzr', branch._rel_controlfilename(''))
 
193
 
 
194
 
 
195
class TestRemote(TestCaseWithWebserver):
389
196
 
390
197
    def test_open_containing(self):
391
198
        self.assertRaises(NotBranchError, Branch.open_containing,
392
 
                          self.get_readonly_url(''))
 
199
                          self.get_remote_url(''))
393
200
        self.assertRaises(NotBranchError, Branch.open_containing,
394
 
                          self.get_readonly_url('g/p/q'))
395
 
        branch = self.make_branch('.')
396
 
        branch, relpath = Branch.open_containing(self.get_readonly_url(''))
 
201
                          self.get_remote_url('g/p/q'))
 
202
        b = Branch.initialize('.')
 
203
        branch, relpath = Branch.open_containing(self.get_remote_url(''))
397
204
        self.assertEqual('', relpath)
398
 
        branch, relpath = Branch.open_containing(self.get_readonly_url('g/p/q'))
 
205
        branch, relpath = Branch.open_containing(self.get_remote_url('g/p/q'))
399
206
        self.assertEqual('g/p/q', relpath)
400
207
        
 
208
# TODO: rewrite this as a regular unittest, without relying on the displayed output        
 
209
#         >>> from bzrlib.commit import commit
 
210
#         >>> bzrlib.trace.silent = True
 
211
#         >>> br1 = ScratchBranch(files=['foo', 'bar'])
 
212
#         >>> br1.add('foo')
 
213
#         >>> br1.add('bar')
 
214
#         >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
 
215
#         >>> br2 = ScratchBranch()
 
216
#         >>> br2.update_revisions(br1)
 
217
#         Added 2 texts.
 
218
#         Added 1 inventories.
 
219
#         Added 1 revisions.
 
220
#         >>> br2.revision_history()
 
221
#         [u'REVISION-ID-1']
 
222
#         >>> br2.update_revisions(br1)
 
223
#         Added 0 revisions.
 
224
#         >>> br1.text_store.total_size() == br2.text_store.total_size()
 
225
#         True
401
226
 
402
227
class InstrumentedTransaction(object):
403
228
 
462
287
        self.assertEqual(['lw', 'ul'], branch._calls)
463
288
 
464
289
 
465
 
class TestBranchTransaction(TestCaseWithBranch):
 
290
class TestBranchTransaction(TestCaseInTempDir):
466
291
 
467
292
    def setUp(self):
468
293
        super(TestBranchTransaction, self).setUp()
469
 
        self.branch = None
 
294
        self.branch = Branch.initialize('.')
470
295
        
471
296
    def test_default_get_transaction(self):
472
297
        """branch.get_transaction on a new branch should give a PassThrough."""
473
 
        self.failUnless(isinstance(self.get_branch().get_transaction(),
 
298
        self.failUnless(isinstance(self.branch.get_transaction(),
474
299
                                   transactions.PassThroughTransaction))
475
300
 
476
301
    def test__set_new_transaction(self):
477
 
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
 
302
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
478
303
 
479
304
    def test__set_over_existing_transaction_raises(self):
480
 
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
 
305
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
481
306
        self.assertRaises(errors.LockError,
482
 
                          self.get_branch()._set_transaction,
 
307
                          self.branch._set_transaction,
483
308
                          transactions.ReadOnlyTransaction())
484
309
 
485
310
    def test_finish_no_transaction_raises(self):
486
 
        self.assertRaises(errors.LockError, self.get_branch()._finish_transaction)
 
311
        self.assertRaises(errors.LockError, self.branch._finish_transaction)
487
312
 
488
313
    def test_finish_readonly_transaction_works(self):
489
 
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
490
 
        self.get_branch()._finish_transaction()
491
 
        self.assertEqual(None, self.get_branch().control_files._transaction)
 
314
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
315
        self.branch._finish_transaction()
 
316
        self.assertEqual(None, self.branch._transaction)
492
317
 
493
318
    def test_unlock_calls_finish(self):
494
 
        self.get_branch().lock_read()
 
319
        self.branch.lock_read()
495
320
        transaction = InstrumentedTransaction()
496
 
        self.get_branch().control_files._transaction = transaction
497
 
        self.get_branch().unlock()
 
321
        self.branch._transaction = transaction
 
322
        self.branch.unlock()
498
323
        self.assertEqual(['finish'], transaction.calls)
499
324
 
500
325
    def test_lock_read_acquires_ro_transaction(self):
501
 
        self.get_branch().lock_read()
502
 
        self.failUnless(isinstance(self.get_branch().get_transaction(),
 
326
        self.branch.lock_read()
 
327
        self.failUnless(isinstance(self.branch.get_transaction(),
503
328
                                   transactions.ReadOnlyTransaction))
504
 
        self.get_branch().unlock()
 
329
        self.branch.unlock()
505
330
        
506
 
    def test_lock_write_acquires_write_transaction(self):
507
 
        self.get_branch().lock_write()
 
331
    def test_lock_write_acquires_passthrough_transaction(self):
 
332
        self.branch.lock_write()
508
333
        # cannot use get_transaction as its magic
509
 
        self.failUnless(isinstance(self.get_branch().control_files._transaction,
510
 
                                   transactions.WriteTransaction))
511
 
        self.get_branch().unlock()
512
 
 
513
 
 
514
 
class TestBranchPushLocations(TestCaseWithBranch):
515
 
 
 
334
        self.failUnless(isinstance(self.branch._transaction,
 
335
                                   transactions.PassThroughTransaction))
 
336
        self.branch.unlock()
 
337
 
 
338
 
 
339
class TestBranchPushLocations(TestCaseInTempDir):
 
340
 
 
341
    def setUp(self):
 
342
        super(TestBranchPushLocations, self).setUp()
 
343
        self.branch = Branch.initialize('.')
 
344
        
516
345
    def test_get_push_location_unset(self):
517
 
        self.assertEqual(None, self.get_branch().get_push_location())
 
346
        self.assertEqual(None, self.branch.get_push_location())
518
347
 
519
348
    def test_get_push_location_exact(self):
520
 
        from bzrlib.config import (locations_config_filename,
521
 
                                   ensure_config_dir_exists)
522
 
        ensure_config_dir_exists()
523
 
        fn = locations_config_filename()
524
 
        print >> open(fn, 'wt'), ("[%s]\n"
525
 
                                  "push_location=foo" %
526
 
                                  self.get_branch().base[:-1])
527
 
        self.assertEqual("foo", self.get_branch().get_push_location())
 
349
        self.build_tree(['.bazaar/'])
 
350
        print >> open('.bazaar/branches.conf', 'wt'), ("[%s]\n"
 
351
                                                       "push_location=foo" %
 
352
                                                       os.getcwdu())
 
353
        self.assertEqual("foo", self.branch.get_push_location())
528
354
 
529
355
    def test_set_push_location(self):
530
 
        from bzrlib.config import (locations_config_filename,
531
 
                                   ensure_config_dir_exists)
532
 
        ensure_config_dir_exists()
533
 
        fn = locations_config_filename()
534
 
        branch = self.get_branch()
535
 
        branch.set_push_location('foo')
536
 
        local_path = urlutils.local_path_from_url(branch.base[:-1])
 
356
        self.branch.set_push_location('foo')
537
357
        self.assertFileEqual("[%s]\n"
538
 
                             "push_location = foo" % local_path,
539
 
                             fn)
 
358
                             "push_location = foo" % os.getcwdu(),
 
359
                             '.bazaar/branches.conf')
540
360
 
541
361
    # TODO RBC 20051029 test getting a push location from a branch in a 
542
362
    # recursive section - that is, it appends the branch name.
543
 
 
544
 
 
545
 
class TestFormat(TestCaseWithBranch):
546
 
    """Tests for the format itself."""
547
 
 
548
 
    def test_format_initialize_find_open(self):
549
 
        # loopback test to check the current format initializes to itself.
550
 
        if not self.branch_format.is_supported():
551
 
            # unsupported formats are not loopback testable
552
 
            # because the default open will not open them and
553
 
            # they may not be initializable.
554
 
            return
555
 
        # supported formats must be able to init and open
556
 
        t = get_transport(self.get_url())
557
 
        readonly_t = get_transport(self.get_readonly_url())
558
 
        made_branch = self.make_branch('.')
559
 
        self.failUnless(isinstance(made_branch, branch.Branch))
560
 
 
561
 
        # find it via bzrdir opening:
562
 
        opened_control = bzrdir.BzrDir.open(readonly_t.base)
563
 
        direct_opened_branch = opened_control.open_branch()
564
 
        self.assertEqual(direct_opened_branch.__class__, made_branch.__class__)
565
 
        self.assertEqual(opened_control, direct_opened_branch.bzrdir)
566
 
        self.failUnless(isinstance(direct_opened_branch._format,
567
 
                        self.branch_format.__class__))
568
 
 
569
 
        # find it via Branch.open
570
 
        opened_branch = branch.Branch.open(readonly_t.base)
571
 
        self.failUnless(isinstance(opened_branch, made_branch.__class__))
572
 
        self.assertEqual(made_branch._format.__class__,
573
 
                         opened_branch._format.__class__)
574
 
        # if it has a unique id string, can we probe for it ?
575
 
        try:
576
 
            self.branch_format.get_format_string()
577
 
        except NotImplementedError:
578
 
            return
579
 
        self.assertEqual(self.branch_format,
580
 
                         branch.BranchFormat.find_format(opened_control))
581
 
 
582