~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/selftest/testbranch.py

  • Committer: John Arbash Meinel
  • Date: 2005-11-14 17:02:35 UTC
  • mto: (1587.1.6 bound-branches)
  • mto: This revision was merged to the branch mainline in revision 1590.
  • Revision ID: john@arbash-meinel.com-20051114170235-f85afa458bae956e
Fixing up the error message for a failed bind.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006 Canonical Ltd
 
1
# (C) 2005 Canonical Ltd
2
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
 
"""Tests for branch implementations - tests a branch format."""
18
 
 
19
17
import os
20
 
import sys
21
18
 
22
 
from bzrlib import branch, bzrdir, errors, gpg, transactions, repository
23
19
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
24
 
from bzrlib.delta import TreeDelta
25
 
from bzrlib.errors import (FileExists,
26
 
                           NoSuchRevision,
27
 
                           NoSuchFile,
28
 
                           UninitializableFormat,
29
 
                           NotBranchError,
30
 
                           )
31
 
from bzrlib.osutils import getcwd
32
 
from bzrlib.tests import TestCase, TestCaseWithTransport, TestSkipped
33
 
from bzrlib.tests.bzrdir_implementations.test_bzrdir import TestCaseWithBzrDir
 
20
from bzrlib.clone import copy_branch
 
21
from bzrlib.commit import commit
 
22
import bzrlib.errors as errors
 
23
from bzrlib.errors import NoSuchRevision, UnlistableBranch, NotBranchError
 
24
import bzrlib.gpg
 
25
from bzrlib.selftest import TestCase, TestCaseInTempDir
 
26
from bzrlib.selftest.HTTPTestUtil import TestCaseWithWebserver
34
27
from bzrlib.trace import mutter
35
 
from bzrlib.transport import get_transport
36
 
from bzrlib.transport.http import HttpServer
37
 
from bzrlib.transport.memory import MemoryServer
38
 
from bzrlib.upgrade import upgrade
39
 
from bzrlib.workingtree import WorkingTree
40
 
 
 
28
import bzrlib.transactions as transactions
 
29
from bzrlib.revision import NULL_REVISION
41
30
 
42
31
# TODO: Make a branch using basis branch, and check that it 
43
32
# doesn't request any files that could have been avoided, by 
44
33
# hooking into the Transport.
45
34
 
46
 
 
47
 
class TestCaseWithBranch(TestCaseWithBzrDir):
48
 
 
49
 
    def setUp(self):
50
 
        super(TestCaseWithBranch, self).setUp()
51
 
        self.branch = None
52
 
 
53
 
    def get_branch(self):
54
 
        if self.branch is None:
55
 
            self.branch = self.make_branch('')
56
 
        return self.branch
57
 
 
58
 
    def make_branch(self, relpath, format=None):
59
 
        repo = self.make_repository(relpath, format=format)
60
 
        # fixme RBC 20060210 this isnt necessarily a fixable thing,
61
 
        # Skipped is the wrong exception to raise.
62
 
        try:
63
 
            return self.branch_format.initialize(repo.bzrdir)
64
 
        except errors.UninitializableFormat:
65
 
            raise TestSkipped('Uninitializable branch format')
66
 
 
67
 
    def make_repository(self, relpath, shared=False, format=None):
68
 
        made_control = self.make_bzrdir(relpath, format=format)
69
 
        return made_control.create_repository(shared=shared)
70
 
 
71
 
 
72
 
class TestBranch(TestCaseWithBranch):
 
35
class TestBranch(TestCaseInTempDir):
73
36
 
74
37
    def test_append_revisions(self):
75
38
        """Test appending more than one revision"""
76
 
        br = self.get_branch()
 
39
        br = Branch.initialize(".")
77
40
        br.append_revision("rev1")
78
41
        self.assertEquals(br.revision_history(), ["rev1",])
79
42
        br.append_revision("rev2", "rev3")
81
44
 
82
45
    def test_fetch_revisions(self):
83
46
        """Test fetch-revision operation."""
84
 
        get_transport(self.get_url()).mkdir('b1')
85
 
        get_transport(self.get_url()).mkdir('b2')
86
 
        wt = self.make_branch_and_tree('b1')
87
 
        b1 = wt.branch
88
 
        b2 = self.make_branch('b2')
89
 
        file('b1/foo', 'w').write('hello')
90
 
        wt.add(['foo'], ['foo-id'])
91
 
        wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
 
47
        from bzrlib.fetch import Fetcher
 
48
        os.mkdir('b1')
 
49
        os.mkdir('b2')
 
50
        b1 = Branch.initialize('b1')
 
51
        b2 = Branch.initialize('b2')
 
52
        file(os.sep.join(['b1', 'foo']), 'w').write('hello')
 
53
        b1.add(['foo'], ['foo-id'])
 
54
        b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=False)
92
55
 
93
56
        mutter('start fetch')
94
 
        self.assertEqual((1, []), b2.fetch(b1))
95
 
 
96
 
        rev = b2.repository.get_revision('revision-1')
97
 
        tree = b2.repository.revision_tree('revision-1')
98
 
        self.assertEqual(tree.get_file_text('foo-id'), 'hello')
99
 
 
100
 
    def test_get_revision_delta(self):
101
 
        tree_a = self.make_branch_and_tree('a')
102
 
        self.build_tree(['a/foo'])
103
 
        tree_a.add('foo', 'file1')
104
 
        tree_a.commit('rev1', rev_id='rev1')
105
 
        self.build_tree(['a/vla'])
106
 
        tree_a.add('vla', 'file2')
107
 
        tree_a.commit('rev2', rev_id='rev2')
108
 
 
109
 
        delta = tree_a.branch.get_revision_delta(1)
110
 
        self.assertIsInstance(delta, TreeDelta)
111
 
        self.assertEqual([('foo', 'file1', 'file')], delta.added)
112
 
        delta = tree_a.branch.get_revision_delta(2)
113
 
        self.assertIsInstance(delta, TreeDelta)
114
 
        self.assertEqual([('vla', 'file2', 'file')], delta.added)
115
 
 
116
 
    def get_unbalanced_tree_pair(self):
 
57
        f = Fetcher(from_branch=b1, to_branch=b2)
 
58
        eq = self.assertEquals
 
59
        eq(f.count_copied, 1)
 
60
        eq(f.last_revision, 'revision-1')
 
61
 
 
62
        rev = b2.get_revision('revision-1')
 
63
        tree = b2.revision_tree('revision-1')
 
64
        eq(tree.get_file_text('foo-id'), 'hello')
 
65
 
 
66
    def test_revision_tree(self):
 
67
        b1 = Branch.initialize('.')
 
68
        b1.working_tree().commit('lala!', rev_id='revision-1', allow_pointless=True)
 
69
        tree = b1.revision_tree('revision-1')
 
70
        tree = b1.revision_tree(None)
 
71
        self.assertEqual(len(tree.list_files()), 0)
 
72
        tree = b1.revision_tree(NULL_REVISION)
 
73
        self.assertEqual(len(tree.list_files()), 0)
 
74
 
 
75
    def get_unbalanced_branch_pair(self):
117
76
        """Return two branches, a and b, with one file in a."""
118
 
        get_transport(self.get_url()).mkdir('a')
119
 
        tree_a = self.make_branch_and_tree('a')
 
77
        os.mkdir('a')
 
78
        br_a = Branch.initialize("a")
120
79
        file('a/b', 'wb').write('b')
121
 
        tree_a.add('b')
122
 
        tree_a.commit("silly commit", rev_id='A')
123
 
 
124
 
        get_transport(self.get_url()).mkdir('b')
125
 
        tree_b = self.make_branch_and_tree('b')
126
 
        return tree_a, tree_b
 
80
        br_a.add('b')
 
81
        commit(br_a, "silly commit", rev_id='A')
 
82
        os.mkdir('b')
 
83
        br_b = Branch.initialize("b")
 
84
        return br_a, br_b
127
85
 
128
86
    def get_balanced_branch_pair(self):
129
87
        """Returns br_a, br_b as with one commit in a, and b has a's stores."""
130
 
        tree_a, tree_b = self.get_unbalanced_tree_pair()
131
 
        tree_b.branch.repository.fetch(tree_a.branch.repository)
132
 
        return tree_a, tree_b
133
 
 
134
 
    def test_clone_branch(self):
135
 
        """Copy the stores from one branch to another"""
136
 
        tree_a, tree_b = self.get_balanced_branch_pair()
137
 
        tree_b.commit("silly commit")
 
88
        br_a, br_b = self.get_unbalanced_branch_pair()
 
89
        br_a.push_stores(br_b)
 
90
        return br_a, br_b
 
91
 
 
92
    def test_push_stores(self):
 
93
        """Copy the stores from one branch to another"""
 
94
        br_a, br_b = self.get_unbalanced_branch_pair()
 
95
        # ensure the revision is missing.
 
96
        self.assertRaises(NoSuchRevision, br_b.get_revision, 
 
97
                          br_a.revision_history()[0])
 
98
        br_a.push_stores(br_b)
 
99
        # check that b now has all the data from a's first commit.
 
100
        rev = br_b.get_revision(br_a.revision_history()[0])
 
101
        tree = br_b.revision_tree(br_a.revision_history()[0])
 
102
        for file_id in tree:
 
103
            if tree.inventory[file_id].kind == "file":
 
104
                tree.get_file(file_id).read()
 
105
        return br_a, br_b
 
106
 
 
107
    def test_copy_branch(self):
 
108
        """Copy the stores from one branch to another"""
 
109
        br_a, br_b = self.get_balanced_branch_pair()
 
110
        commit(br_b, "silly commit")
138
111
        os.mkdir('c')
139
 
        # this fails to test that the history from a was not used.
140
 
        dir_c = tree_a.bzrdir.clone('c', basis=tree_b.bzrdir)
141
 
        self.assertEqual(tree_a.branch.revision_history(),
142
 
                         dir_c.open_branch().revision_history())
 
112
        br_c = copy_branch(br_a, 'c', basis_branch=br_b)
 
113
        self.assertEqual(br_a.revision_history(), br_c.revision_history())
143
114
 
144
 
    def test_clone_partial(self):
 
115
    def test_copy_partial(self):
145
116
        """Copy only part of the history of a branch."""
146
 
        # TODO: RBC 20060208 test with a revision not on revision-history.
147
 
        #       what should that behaviour be ? Emailed the list.
148
 
        wt_a = self.make_branch_and_tree('a')
149
 
        self.build_tree(['a/one'])
150
 
        wt_a.add(['one'])
151
 
        wt_a.commit('commit one', rev_id='1')
152
 
        self.build_tree(['a/two'])
153
 
        wt_a.add(['two'])
154
 
        wt_a.commit('commit two', rev_id='2')
155
 
        repo_b = self.make_repository('b')
156
 
        wt_a.bzrdir.open_repository().copy_content_into(repo_b)
157
 
        br_b = wt_a.bzrdir.open_branch().clone(repo_b.bzrdir, revision_id='1')
158
 
        self.assertEqual(br_b.last_revision(), '1')
159
 
 
160
 
    def test_sprout_partial(self):
161
 
        # test sprouting with a prefix of the revision-history.
162
 
        # also needs not-on-revision-history behaviour defined.
163
 
        wt_a = self.make_branch_and_tree('a')
164
 
        self.build_tree(['a/one'])
165
 
        wt_a.add(['one'])
166
 
        wt_a.commit('commit one', rev_id='1')
167
 
        self.build_tree(['a/two'])
168
 
        wt_a.add(['two'])
169
 
        wt_a.commit('commit two', rev_id='2')
170
 
        repo_b = self.make_repository('b')
171
 
        wt_a.bzrdir.open_repository().copy_content_into(repo_b)
172
 
        br_b = wt_a.bzrdir.open_branch().sprout(repo_b.bzrdir, revision_id='1')
173
 
        self.assertEqual(br_b.last_revision(), '1')
174
 
 
175
 
    def test_clone_branch_nickname(self):
176
 
        # test the nick name is preserved always
177
 
        raise TestSkipped('XXX branch cloning is not yet tested..')
178
 
 
179
 
    def test_clone_branch_parent(self):
180
 
        # test the parent is preserved always
181
 
        raise TestSkipped('XXX branch cloning is not yet tested..')
182
 
        
183
 
    def test_sprout_branch_nickname(self):
184
 
        # test the nick name is reset always
185
 
        raise TestSkipped('XXX branch sprouting is not yet tested..')
186
 
 
187
 
    def test_sprout_branch_parent(self):
188
 
        source = self.make_branch('source')
189
 
        target = source.bzrdir.sprout(self.get_url('target')).open_branch()
190
 
        self.assertEqual(source.bzrdir.root_transport.base, target.get_parent())
191
 
 
192
 
    def test_submit_branch(self):
193
 
        """Submit location can be queried and set"""
194
 
        branch = self.make_branch('branch')
195
 
        self.assertEqual(branch.get_submit_branch(), None)
196
 
        branch.set_submit_branch('sftp://example.com')
197
 
        self.assertEqual(branch.get_submit_branch(), 'sftp://example.com')
198
 
        branch.set_submit_branch('sftp://example.net')
199
 
        self.assertEqual(branch.get_submit_branch(), 'sftp://example.net')
 
117
        self.build_tree(['a/', 'a/one'])
 
118
        br_a = Branch.initialize('a')
 
119
        br_a.add(['one'])
 
120
        br_a.working_tree().commit('commit one', rev_id='u@d-1')
 
121
        self.build_tree(['a/two'])
 
122
        br_a.add(['two'])
 
123
        br_a.working_tree().commit('commit two', rev_id='u@d-2')
 
124
        br_b = copy_branch(br_a, 'b', revision='u@d-1')
 
125
        self.assertEqual(br_b.last_revision(), 'u@d-1')
 
126
        self.assertTrue(os.path.exists('b/one'))
 
127
        self.assertFalse(os.path.exists('b/two'))
200
128
        
201
129
    def test_record_initial_ghost_merge(self):
202
130
        """A pending merge with no revision present is still a merge."""
203
 
        wt = self.make_branch_and_tree('.')
204
 
        branch = wt.branch
205
 
        wt.add_pending_merge('non:existent@rev--ision--0--2')
206
 
        wt.commit('pretend to merge nonexistent-revision', rev_id='first')
207
 
        rev = branch.repository.get_revision(branch.last_revision())
 
131
        branch = Branch.initialize('.')
 
132
        branch.working_tree().add_pending_merge('non:existent@rev--ision--0--2')
 
133
        branch.working_tree().commit('pretend to merge nonexistent-revision', rev_id='first')
 
134
        rev = branch.get_revision(branch.last_revision())
208
135
        self.assertEqual(len(rev.parent_ids), 1)
209
136
        # parent_sha1s is not populated now, WTF. rbc 20051003
210
137
        self.assertEqual(len(rev.parent_sha1s), 0)
211
138
        self.assertEqual(rev.parent_ids[0], 'non:existent@rev--ision--0--2')
212
139
 
213
140
    def test_bad_revision(self):
214
 
        self.assertRaises(errors.InvalidRevisionId,
215
 
                          self.get_branch().repository.get_revision,
216
 
                          None)
 
141
        branch = Branch.initialize('.')
 
142
        self.assertRaises(errors.InvalidRevisionId, branch.get_revision, None)
217
143
 
218
144
# TODO 20051003 RBC:
219
145
# compare the gpg-to-sign info for a commit with a ghost and 
222
148
        
223
149
    def test_pending_merges(self):
224
150
        """Tracking pending-merged revisions."""
225
 
        wt = self.make_branch_and_tree('.')
226
 
        b = wt.branch
 
151
        b = Branch.initialize('.')
 
152
        wt = b.working_tree()
227
153
        self.assertEquals(wt.pending_merges(), [])
228
154
        wt.add_pending_merge('foo@azkhazan-123123-abcabc')
229
155
        self.assertEquals(wt.pending_merges(), ['foo@azkhazan-123123-abcabc'])
233
159
        self.assertEquals(wt.pending_merges(),
234
160
                          ['foo@azkhazan-123123-abcabc',
235
161
                           'wibble@fofof--20050401--1928390812'])
236
 
        wt.commit("commit from base with two merges")
237
 
        rev = b.repository.get_revision(b.revision_history()[0])
 
162
        b.working_tree().commit("commit from base with two merges")
 
163
        rev = b.get_revision(b.revision_history()[0])
238
164
        self.assertEquals(len(rev.parent_ids), 2)
239
165
        self.assertEquals(rev.parent_ids[0],
240
166
                          'foo@azkhazan-123123-abcabc')
244
170
        self.assertEquals(wt.pending_merges(), [])
245
171
 
246
172
    def test_sign_existing_revision(self):
247
 
        wt = self.make_branch_and_tree('.')
248
 
        branch = wt.branch
249
 
        wt.commit("base", allow_pointless=True, rev_id='A')
 
173
        branch = Branch.initialize('.')
 
174
        branch.working_tree().commit("base", allow_pointless=True, rev_id='A')
250
175
        from bzrlib.testament import Testament
251
 
        strategy = gpg.LoopbackGPGStrategy(None)
252
 
        branch.repository.sign_revision('A', strategy)
253
 
        self.assertEqual(Testament.from_revision(branch.repository, 
254
 
                         'A').as_short_text(),
255
 
                         branch.repository.get_signature_text('A'))
 
176
        branch.sign_revision('A', bzrlib.gpg.LoopbackGPGStrategy(None))
 
177
        self.assertEqual(Testament.from_revision(branch, 'A').as_short_text(),
 
178
                         branch.revision_store.get('A', 'sig').read())
256
179
 
257
180
    def test_store_signature(self):
258
 
        wt = self.make_branch_and_tree('.')
259
 
        branch = wt.branch
260
 
        branch.repository.store_revision_signature(
261
 
            gpg.LoopbackGPGStrategy(None), 'FOO', 'A')
262
 
        self.assertRaises(errors.NoSuchRevision,
263
 
                          branch.repository.has_signature_for_revision_id,
264
 
                          'A')
265
 
        wt.commit("base", allow_pointless=True, rev_id='A')
266
 
        self.assertEqual('FOO', 
267
 
                         branch.repository.get_signature_text('A'))
268
 
 
269
 
    def test_branch_keeps_signatures(self):
270
 
        wt = self.make_branch_and_tree('source')
271
 
        wt.commit('A', allow_pointless=True, rev_id='A')
272
 
        wt.branch.repository.sign_revision('A',
273
 
            gpg.LoopbackGPGStrategy(None))
274
 
        #FIXME: clone should work to urls,
275
 
        # wt.clone should work to disks.
276
 
        self.build_tree(['target/'])
277
 
        d2 = wt.bzrdir.clone('target')
278
 
        self.assertEqual(wt.branch.repository.get_signature_text('A'),
279
 
                         d2.open_repository().get_signature_text('A'))
280
 
 
281
 
    def test_nicks(self):
282
 
        """Branch nicknames"""
283
 
        t = get_transport(self.get_url())
284
 
        t.mkdir('bzr.dev')
285
 
        branch = self.make_branch('bzr.dev')
286
 
        self.assertEqual(branch.nick, 'bzr.dev')
287
 
        t.move('bzr.dev', 'bzr.ab')
288
 
        branch = Branch.open(self.get_url('bzr.ab'))
289
 
        self.assertEqual(branch.nick, 'bzr.ab')
290
 
        branch.nick = "Aaron's branch"
291
 
        branch.nick = "Aaron's branch"
292
 
        self.failUnless(
293
 
            t.has(
294
 
                t.relpath(
295
 
                    branch.control_files.controlfilename("branch.conf")
296
 
                    )
297
 
                )
298
 
            )
299
 
        self.assertEqual(branch.nick, "Aaron's branch")
300
 
        t.move('bzr.ab', 'integration')
301
 
        branch = Branch.open(self.get_url('integration'))
302
 
        self.assertEqual(branch.nick, "Aaron's branch")
303
 
        branch.nick = u"\u1234"
304
 
        self.assertEqual(branch.nick, u"\u1234")
305
 
 
306
 
    def test_commit_nicks(self):
307
 
        """Nicknames are committed to the revision"""
308
 
        get_transport(self.get_url()).mkdir('bzr.dev')
309
 
        wt = self.make_branch_and_tree('bzr.dev')
310
 
        branch = wt.branch
311
 
        branch.nick = "My happy branch"
312
 
        wt.commit('My commit respect da nick.')
313
 
        committed = branch.repository.get_revision(branch.last_revision())
314
 
        self.assertEqual(committed.properties["branch-nick"], 
315
 
                         "My happy branch")
316
 
 
317
 
    def test_create_open_branch_uses_repository(self):
318
 
        try:
319
 
            repo = self.make_repository('.', shared=True)
320
 
        except errors.IncompatibleFormat:
321
 
            return
322
 
        repo.bzrdir.root_transport.mkdir('child')
323
 
        child_dir = self.bzrdir_format.initialize('child')
324
 
        try:
325
 
            child_branch = self.branch_format.initialize(child_dir)
326
 
        except errors.UninitializableFormat:
327
 
            # branch references are not default init'able.
328
 
            return
329
 
        self.assertEqual(repo.bzrdir.root_transport.base,
330
 
                         child_branch.repository.bzrdir.root_transport.base)
331
 
        child_branch = branch.Branch.open(self.get_url('child'))
332
 
        self.assertEqual(repo.bzrdir.root_transport.base,
333
 
                         child_branch.repository.bzrdir.root_transport.base)
334
 
 
335
 
    def test_format_description(self):
336
 
        tree = self.make_branch_and_tree('tree')
337
 
        text = tree.branch._format.get_format_description()
338
 
        self.failUnless(len(text))
339
 
 
340
 
    def test_check_branch_report_results(self):
341
 
        """Checking a branch produces results which can be printed"""
342
 
        branch = self.make_branch('.')
343
 
        result = branch.check()
344
 
        # reports results through logging
345
 
        result.report_results(verbose=True)
346
 
        result.report_results(verbose=False)
347
 
 
348
 
    def test_get_commit_builder(self):
349
 
        self.assertIsInstance(self.make_branch(".").get_commit_builder([]), 
350
 
            repository.CommitBuilder)
351
 
 
352
 
    def test_generate_revision_history(self):
353
 
        """Create a fake revision history easily."""
354
 
        tree = self.make_branch_and_tree('.')
355
 
        rev1 = tree.commit('foo')
356
 
        orig_history = tree.branch.revision_history()
357
 
        rev2 = tree.commit('bar', allow_pointless=True)
358
 
        tree.branch.generate_revision_history(rev1)
359
 
        self.assertEqual(orig_history, tree.branch.revision_history())
360
 
 
361
 
 
362
 
class ChrootedTests(TestCaseWithBranch):
363
 
    """A support class that provides readonly urls outside the local namespace.
364
 
 
365
 
    This is done by checking if self.transport_server is a MemoryServer. if it
366
 
    is then we are chrooted already, if it is not then an HttpServer is used
367
 
    for readonly urls.
368
 
    """
369
 
 
370
 
    def setUp(self):
371
 
        super(ChrootedTests, self).setUp()
372
 
        if not self.transport_server == MemoryServer:
373
 
            self.transport_readonly_server = HttpServer
 
181
        branch = Branch.initialize('.')
 
182
        branch.store_revision_signature(bzrlib.gpg.LoopbackGPGStrategy(None),
 
183
                                        'FOO', 'A')
 
184
        self.assertEqual('FOO', branch.revision_store.get('A', 'sig').read())
 
185
 
 
186
    def test__relcontrolfilename(self):
 
187
        branch = Branch.initialize('.')
 
188
        self.assertEqual('.bzr/%25', branch._rel_controlfilename('%'))
 
189
        
 
190
    def test__relcontrolfilename_empty(self):
 
191
        branch = Branch.initialize('.')
 
192
        self.assertEqual('.bzr', branch._rel_controlfilename(''))
 
193
 
 
194
 
 
195
class TestRemote(TestCaseWithWebserver):
374
196
 
375
197
    def test_open_containing(self):
376
198
        self.assertRaises(NotBranchError, Branch.open_containing,
377
 
                          self.get_readonly_url(''))
 
199
                          self.get_remote_url(''))
378
200
        self.assertRaises(NotBranchError, Branch.open_containing,
379
 
                          self.get_readonly_url('g/p/q'))
380
 
        branch = self.make_branch('.')
381
 
        branch, relpath = Branch.open_containing(self.get_readonly_url(''))
 
201
                          self.get_remote_url('g/p/q'))
 
202
        b = Branch.initialize('.')
 
203
        branch, relpath = Branch.open_containing(self.get_remote_url(''))
382
204
        self.assertEqual('', relpath)
383
 
        branch, relpath = Branch.open_containing(self.get_readonly_url('g/p/q'))
 
205
        branch, relpath = Branch.open_containing(self.get_remote_url('g/p/q'))
384
206
        self.assertEqual('g/p/q', relpath)
385
207
        
 
208
# TODO: rewrite this as a regular unittest, without relying on the displayed output        
 
209
#         >>> from bzrlib.commit import commit
 
210
#         >>> bzrlib.trace.silent = True
 
211
#         >>> br1 = ScratchBranch(files=['foo', 'bar'])
 
212
#         >>> br1.add('foo')
 
213
#         >>> br1.add('bar')
 
214
#         >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
 
215
#         >>> br2 = ScratchBranch()
 
216
#         >>> br2.update_revisions(br1)
 
217
#         Added 2 texts.
 
218
#         Added 1 inventories.
 
219
#         Added 1 revisions.
 
220
#         >>> br2.revision_history()
 
221
#         [u'REVISION-ID-1']
 
222
#         >>> br2.update_revisions(br1)
 
223
#         Added 0 revisions.
 
224
#         >>> br1.text_store.total_size() == br2.text_store.total_size()
 
225
#         True
386
226
 
387
227
class InstrumentedTransaction(object):
388
228
 
447
287
        self.assertEqual(['lw', 'ul'], branch._calls)
448
288
 
449
289
 
450
 
class TestBranchTransaction(TestCaseWithBranch):
 
290
class TestBranchTransaction(TestCaseInTempDir):
451
291
 
452
292
    def setUp(self):
453
293
        super(TestBranchTransaction, self).setUp()
454
 
        self.branch = None
 
294
        self.branch = Branch.initialize('.')
455
295
        
456
296
    def test_default_get_transaction(self):
457
297
        """branch.get_transaction on a new branch should give a PassThrough."""
458
 
        self.failUnless(isinstance(self.get_branch().get_transaction(),
 
298
        self.failUnless(isinstance(self.branch.get_transaction(),
459
299
                                   transactions.PassThroughTransaction))
460
300
 
461
301
    def test__set_new_transaction(self):
462
 
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
 
302
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
463
303
 
464
304
    def test__set_over_existing_transaction_raises(self):
465
 
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
 
305
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
466
306
        self.assertRaises(errors.LockError,
467
 
                          self.get_branch()._set_transaction,
 
307
                          self.branch._set_transaction,
468
308
                          transactions.ReadOnlyTransaction())
469
309
 
470
310
    def test_finish_no_transaction_raises(self):
471
 
        self.assertRaises(errors.LockError, self.get_branch()._finish_transaction)
 
311
        self.assertRaises(errors.LockError, self.branch._finish_transaction)
472
312
 
473
313
    def test_finish_readonly_transaction_works(self):
474
 
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
475
 
        self.get_branch()._finish_transaction()
476
 
        self.assertEqual(None, self.get_branch().control_files._transaction)
 
314
        self.branch._set_transaction(transactions.ReadOnlyTransaction())
 
315
        self.branch._finish_transaction()
 
316
        self.assertEqual(None, self.branch._transaction)
477
317
 
478
318
    def test_unlock_calls_finish(self):
479
 
        self.get_branch().lock_read()
 
319
        self.branch.lock_read()
480
320
        transaction = InstrumentedTransaction()
481
 
        self.get_branch().control_files._transaction = transaction
482
 
        self.get_branch().unlock()
 
321
        self.branch._transaction = transaction
 
322
        self.branch.unlock()
483
323
        self.assertEqual(['finish'], transaction.calls)
484
324
 
485
325
    def test_lock_read_acquires_ro_transaction(self):
486
 
        self.get_branch().lock_read()
487
 
        self.failUnless(isinstance(self.get_branch().get_transaction(),
 
326
        self.branch.lock_read()
 
327
        self.failUnless(isinstance(self.branch.get_transaction(),
488
328
                                   transactions.ReadOnlyTransaction))
489
 
        self.get_branch().unlock()
 
329
        self.branch.unlock()
490
330
        
491
 
    def test_lock_write_acquires_write_transaction(self):
492
 
        self.get_branch().lock_write()
 
331
    def test_lock_write_acquires_passthrough_transaction(self):
 
332
        self.branch.lock_write()
493
333
        # cannot use get_transaction as its magic
494
 
        self.failUnless(isinstance(self.get_branch().control_files._transaction,
495
 
                                   transactions.WriteTransaction))
496
 
        self.get_branch().unlock()
497
 
 
498
 
 
499
 
class TestBranchPushLocations(TestCaseWithBranch):
500
 
 
 
334
        self.failUnless(isinstance(self.branch._transaction,
 
335
                                   transactions.PassThroughTransaction))
 
336
        self.branch.unlock()
 
337
 
 
338
 
 
339
class TestBranchPushLocations(TestCaseInTempDir):
 
340
 
 
341
    def setUp(self):
 
342
        super(TestBranchPushLocations, self).setUp()
 
343
        self.branch = Branch.initialize('.')
 
344
        
501
345
    def test_get_push_location_unset(self):
502
 
        self.assertEqual(None, self.get_branch().get_push_location())
 
346
        self.assertEqual(None, self.branch.get_push_location())
503
347
 
504
348
    def test_get_push_location_exact(self):
505
 
        from bzrlib.config import (locations_config_filename,
506
 
                                   ensure_config_dir_exists)
507
 
        ensure_config_dir_exists()
508
 
        fn = locations_config_filename()
509
 
        print >> open(fn, 'wt'), ("[%s]\n"
510
 
                                  "push_location=foo" %
511
 
                                  self.get_branch().base[:-1])
512
 
        self.assertEqual("foo", self.get_branch().get_push_location())
 
349
        self.build_tree(['.bazaar/'])
 
350
        print >> open('.bazaar/branches.conf', 'wt'), ("[%s]\n"
 
351
                                                       "push_location=foo" %
 
352
                                                       os.getcwdu())
 
353
        self.assertEqual("foo", self.branch.get_push_location())
513
354
 
514
355
    def test_set_push_location(self):
515
 
        from bzrlib.config import (locations_config_filename,
516
 
                                   ensure_config_dir_exists)
517
 
        ensure_config_dir_exists()
518
 
        fn = locations_config_filename()
519
 
        self.get_branch().set_push_location('foo')
 
356
        self.branch.set_push_location('foo')
520
357
        self.assertFileEqual("[%s]\n"
521
 
                             "push_location = foo" % self.get_branch().base[:-1],
522
 
                             fn)
 
358
                             "push_location = foo" % os.getcwdu(),
 
359
                             '.bazaar/branches.conf')
523
360
 
524
361
    # TODO RBC 20051029 test getting a push location from a branch in a 
525
362
    # recursive section - that is, it appends the branch name.
526
 
 
527
 
 
528
 
class TestFormat(TestCaseWithBranch):
529
 
    """Tests for the format itself."""
530
 
 
531
 
    def test_format_initialize_find_open(self):
532
 
        # loopback test to check the current format initializes to itself.
533
 
        if not self.branch_format.is_supported():
534
 
            # unsupported formats are not loopback testable
535
 
            # because the default open will not open them and
536
 
            # they may not be initializable.
537
 
            return
538
 
        # supported formats must be able to init and open
539
 
        t = get_transport(self.get_url())
540
 
        readonly_t = get_transport(self.get_readonly_url())
541
 
        made_branch = self.make_branch('.')
542
 
        self.failUnless(isinstance(made_branch, branch.Branch))
543
 
 
544
 
        # find it via bzrdir opening:
545
 
        opened_control = bzrdir.BzrDir.open(readonly_t.base)
546
 
        direct_opened_branch = opened_control.open_branch()
547
 
        self.assertEqual(direct_opened_branch.__class__, made_branch.__class__)
548
 
        self.assertEqual(opened_control, direct_opened_branch.bzrdir)
549
 
        self.failUnless(isinstance(direct_opened_branch._format,
550
 
                        self.branch_format.__class__))
551
 
 
552
 
        # find it via Branch.open
553
 
        opened_branch = branch.Branch.open(readonly_t.base)
554
 
        self.failUnless(isinstance(opened_branch, made_branch.__class__))
555
 
        self.assertEqual(made_branch._format.__class__,
556
 
                         opened_branch._format.__class__)
557
 
        # if it has a unique id string, can we probe for it ?
558
 
        try:
559
 
            self.branch_format.get_format_string()
560
 
        except NotImplementedError:
561
 
            return
562
 
        self.assertEqual(self.branch_format,
563
 
                         branch.BranchFormat.find_format(opened_control))