~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/selftest/testbranch.py

  • Committer: Martin Pool
  • Date: 2005-09-13 23:08:19 UTC
  • Revision ID: mbp@sourcefrog.net-20050913230819-6ceae96050d32faa
ignore .bzr-shelf

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
2
 
#
 
1
# (C) 2005 Canonical Ltd
 
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
#
 
7
 
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
#
 
12
 
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
 
"""Tests for branch implementations - tests a branch format."""
18
 
 
19
 
import os
20
 
import sys
21
 
 
22
 
from bzrlib import (
23
 
    branch,
24
 
    bzrdir,
25
 
    errors,
26
 
    gpg,
27
 
    urlutils,
28
 
    transactions,
29
 
    remote,
30
 
    repository,
31
 
    )
32
 
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
33
 
from bzrlib.delta import TreeDelta
34
 
from bzrlib.errors import (FileExists,
35
 
                           NoSuchRevision,
36
 
                           NoSuchFile,
37
 
                           UninitializableFormat,
38
 
                           NotBranchError,
39
 
                           )
40
 
from bzrlib.osutils import getcwd
41
 
import bzrlib.revision
42
 
from bzrlib.tests import TestCase, TestCaseWithTransport, TestSkipped
43
 
from bzrlib.tests.branch_implementations import TestCaseWithBranch
44
 
from bzrlib.tests.HttpServer import HttpServer
45
 
from bzrlib.trace import mutter
46
 
from bzrlib.transport import get_transport
47
 
from bzrlib.transport.memory import MemoryServer
48
 
from bzrlib.upgrade import upgrade
49
 
from bzrlib.workingtree import WorkingTree
50
 
from bzrlib.symbol_versioning import (
51
 
    zero_ninetyone,
52
 
    )
53
 
 
54
 
 
55
 
class TestBranch(TestCaseWithBranch):
56
 
 
57
 
    def test_create_tree_with_merge(self):
58
 
        tree = self.create_tree_with_merge()
59
 
        ancestry_graph = tree.branch.repository.get_revision_graph('rev-3')
60
 
        self.assertEqual({'rev-1':(),
61
 
                          'rev-2':('rev-1', ),
62
 
                          'rev-1.1.1':('rev-1', ),
63
 
                          'rev-3':('rev-2', 'rev-1.1.1', ),
64
 
                         }, ancestry_graph)
65
 
 
66
 
    def test_revision_ids_are_utf8(self):
67
 
        wt = self.make_branch_and_tree('tree')
68
 
        wt.commit('f', rev_id='rev1')
69
 
        wt.commit('f', rev_id='rev2')
70
 
        wt.commit('f', rev_id='rev3')
71
 
 
72
 
        br = self.get_branch()
73
 
        br.fetch(wt.branch)
74
 
        br.set_revision_history(['rev1', 'rev2', 'rev3'])
75
 
        rh = br.revision_history()
76
 
        self.assertEqual(['rev1', 'rev2', 'rev3'], rh)
77
 
        for revision_id in rh:
78
 
            self.assertIsInstance(revision_id, str)
79
 
        last = br.last_revision()
80
 
        self.assertEqual('rev3', last)
81
 
        self.assertIsInstance(last, str)
82
 
        revno, last = br.last_revision_info()
83
 
        self.assertEqual(3, revno)
84
 
        self.assertEqual('rev3', last)
85
 
        self.assertIsInstance(last, str)
86
 
 
87
 
    def test_fetch_revisions(self):
88
 
        """Test fetch-revision operation."""
89
 
        wt = self.make_branch_and_tree('b1')
90
 
        b1 = wt.branch
91
 
        self.build_tree_contents([('b1/foo', 'hello')])
92
 
        wt.add(['foo'], ['foo-id'])
93
 
        wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
94
 
 
95
 
        b2 = self.make_branch('b2')
96
 
        self.assertEqual((1, []), b2.fetch(b1))
97
 
 
98
 
        rev = b2.repository.get_revision('revision-1')
99
 
        tree = b2.repository.revision_tree('revision-1')
100
 
        self.assertEqual(tree.get_file_text('foo-id'), 'hello')
101
 
 
102
 
    def test_get_revision_delta(self):
103
 
        tree_a = self.make_branch_and_tree('a')
104
 
        self.build_tree(['a/foo'])
105
 
        tree_a.add('foo', 'file1')
106
 
        tree_a.commit('rev1', rev_id='rev1')
107
 
        self.build_tree(['a/vla'])
108
 
        tree_a.add('vla', 'file2')
109
 
        tree_a.commit('rev2', rev_id='rev2')
110
 
 
111
 
        delta = tree_a.branch.get_revision_delta(1)
112
 
        self.assertIsInstance(delta, TreeDelta)
113
 
        self.assertEqual([('foo', 'file1', 'file')], delta.added)
114
 
        delta = tree_a.branch.get_revision_delta(2)
115
 
        self.assertIsInstance(delta, TreeDelta)
116
 
        self.assertEqual([('vla', 'file2', 'file')], delta.added)
117
 
 
118
 
    def get_unbalanced_tree_pair(self):
119
 
        """Return two branches, a and b, with one file in a."""
120
 
        tree_a = self.make_branch_and_tree('a')
121
 
        self.build_tree_contents([('a/b', 'b')])
122
 
        tree_a.add('b')
123
 
        tree_a.commit("silly commit", rev_id='A')
124
 
 
125
 
        tree_b = self.make_branch_and_tree('b')
126
 
        return tree_a, tree_b
127
 
 
128
 
    def get_balanced_branch_pair(self):
129
 
        """Returns br_a, br_b as with one commit in a, and b has a's stores."""
130
 
        tree_a, tree_b = self.get_unbalanced_tree_pair()
131
 
        tree_b.branch.repository.fetch(tree_a.branch.repository)
132
 
        return tree_a, tree_b
133
 
 
134
 
    def test_clone_partial(self):
135
 
        """Copy only part of the history of a branch."""
136
 
        # TODO: RBC 20060208 test with a revision not on revision-history.
137
 
        #       what should that behaviour be ? Emailed the list.
138
 
        # First, make a branch with two commits.
139
 
        wt_a = self.make_branch_and_tree('a')
140
 
        self.build_tree(['a/one'])
141
 
        wt_a.add(['one'])
142
 
        wt_a.commit('commit one', rev_id='1')
143
 
        self.build_tree(['a/two'])
144
 
        wt_a.add(['two'])
145
 
        wt_a.commit('commit two', rev_id='2')
146
 
        # Now make a copy of the repository.
147
 
        repo_b = self.make_repository('b')
148
 
        wt_a.branch.repository.copy_content_into(repo_b)
149
 
        # wt_a might be a lightweight checkout, so get a hold of the actual
150
 
        # branch (because you can't do a partial clone of a lightweight
151
 
        # checkout).
152
 
        branch = wt_a.branch.bzrdir.open_branch()
153
 
        # Then make a branch where the new repository is, but specify a revision
154
 
        # ID.  The new branch's history will stop at the specified revision.
155
 
        br_b = branch.clone(repo_b.bzrdir, revision_id='1')
156
 
        self.assertEqual('1', br_b.last_revision())
157
 
 
158
 
    def get_parented_branch(self):
159
 
        wt_a = self.make_branch_and_tree('a')
160
 
        self.build_tree(['a/one'])
161
 
        wt_a.add(['one'])
162
 
        wt_a.commit('commit one', rev_id='1')
163
 
 
164
 
        branch_b = wt_a.bzrdir.sprout('b', revision_id='1').open_branch()
165
 
        self.assertEqual(wt_a.branch.base, branch_b.get_parent())
166
 
        return branch_b
167
 
 
168
 
    def test_clone_branch_nickname(self):
169
 
        # test the nick name is preserved always
170
 
        raise TestSkipped('XXX branch cloning is not yet tested..')
171
 
 
172
 
    def test_clone_branch_parent(self):
173
 
        # test the parent is preserved always
174
 
        branch_b = self.get_parented_branch()
175
 
        repo_c = self.make_repository('c')
176
 
        branch_b.repository.copy_content_into(repo_c)
177
 
        branch_c = branch_b.clone(repo_c.bzrdir)
178
 
        self.assertNotEqual(None, branch_c.get_parent())
179
 
        self.assertEqual(branch_b.get_parent(), branch_c.get_parent())
180
 
 
181
 
        # We can also set a specific parent, and it should be honored
182
 
        random_parent = 'http://bazaar-vcs.org/path/to/branch'
183
 
        branch_b.set_parent(random_parent)
184
 
        repo_d = self.make_repository('d')
185
 
        branch_b.repository.copy_content_into(repo_d)
186
 
        branch_d = branch_b.clone(repo_d.bzrdir)
187
 
        self.assertEqual(random_parent, branch_d.get_parent())
188
 
 
189
 
    def test_submit_branch(self):
190
 
        """Submit location can be queried and set"""
191
 
        branch = self.make_branch('branch')
192
 
        self.assertEqual(branch.get_submit_branch(), None)
193
 
        branch.set_submit_branch('sftp://example.com')
194
 
        self.assertEqual(branch.get_submit_branch(), 'sftp://example.com')
195
 
        branch.set_submit_branch('sftp://example.net')
196
 
        self.assertEqual(branch.get_submit_branch(), 'sftp://example.net')
197
 
        
198
 
    def test_public_branch(self):
199
 
        """public location can be queried and set"""
200
 
        branch = self.make_branch('branch')
201
 
        self.assertEqual(branch.get_public_branch(), None)
202
 
        branch.set_public_branch('sftp://example.com')
203
 
        self.assertEqual(branch.get_public_branch(), 'sftp://example.com')
204
 
        branch.set_public_branch('sftp://example.net')
205
 
        self.assertEqual(branch.get_public_branch(), 'sftp://example.net')
206
 
        branch.set_public_branch(None)
207
 
        self.assertEqual(branch.get_public_branch(), None)
208
 
 
209
 
    def test_record_initial_ghost(self):
210
 
        """Branches should support having ghosts."""
211
 
        wt = self.make_branch_and_tree('.')
212
 
        wt.set_parent_ids(['non:existent@rev--ision--0--2'],
213
 
            allow_leftmost_as_ghost=True)
214
 
        rev_id = wt.commit('commit against a ghost first parent.')
215
 
        rev = wt.branch.repository.get_revision(rev_id)
216
 
        self.assertEqual(rev.parent_ids, ['non:existent@rev--ision--0--2'])
217
 
        # parent_sha1s is not populated now, WTF. rbc 20051003
218
 
        self.assertEqual(len(rev.parent_sha1s), 0)
219
 
 
220
 
    def test_record_two_ghosts(self):
221
 
        """Recording with all ghosts works."""
222
 
        wt = self.make_branch_and_tree('.')
223
 
        wt.set_parent_ids([
224
 
                'foo@azkhazan-123123-abcabc',
225
 
                'wibble@fofof--20050401--1928390812',
226
 
            ],
227
 
            allow_leftmost_as_ghost=True)
228
 
        rev_id = wt.commit("commit from ghost base with one merge")
229
 
        # the revision should have been committed with two parents
230
 
        rev = wt.branch.repository.get_revision(rev_id)
231
 
        self.assertEqual(['foo@azkhazan-123123-abcabc',
232
 
            'wibble@fofof--20050401--1928390812'],
233
 
            rev.parent_ids)
234
 
 
235
 
    def test_bad_revision(self):
236
 
        self.assertRaises(errors.InvalidRevisionId,
237
 
                          self.get_branch().repository.get_revision,
238
 
                          None)
239
 
 
240
 
# TODO 20051003 RBC:
241
 
# compare the gpg-to-sign info for a commit with a ghost and 
242
 
#     an identical tree without a ghost
243
 
# fetch missing should rewrite the TOC of weaves to list newly available parents.
244
 
        
245
 
    def test_sign_existing_revision(self):
246
 
        wt = self.make_branch_and_tree('.')
247
 
        branch = wt.branch
248
 
        wt.commit("base", allow_pointless=True, rev_id='A')
249
 
        from bzrlib.testament import Testament
250
 
        strategy = gpg.LoopbackGPGStrategy(None)
251
 
        branch.repository.lock_write()
252
 
        branch.repository.start_write_group()
253
 
        branch.repository.sign_revision('A', strategy)
254
 
        branch.repository.commit_write_group()
255
 
        branch.repository.unlock()
256
 
        self.assertEqual('-----BEGIN PSEUDO-SIGNED CONTENT-----\n' +
257
 
                         Testament.from_revision(branch.repository,
258
 
                         'A').as_short_text() +
259
 
                         '-----END PSEUDO-SIGNED CONTENT-----\n',
260
 
                         branch.repository.get_signature_text('A'))
261
 
 
262
 
    def test_store_signature(self):
263
 
        wt = self.make_branch_and_tree('.')
264
 
        branch = wt.branch
265
 
        branch.repository.store_revision_signature(
266
 
            gpg.LoopbackGPGStrategy(None), 'FOO', 'A')
267
 
        self.assertRaises(errors.NoSuchRevision,
268
 
                          branch.repository.has_signature_for_revision_id,
269
 
                          'A')
270
 
        wt.commit("base", allow_pointless=True, rev_id='A')
271
 
        self.assertEqual('-----BEGIN PSEUDO-SIGNED CONTENT-----\n'
272
 
                         'FOO-----END PSEUDO-SIGNED CONTENT-----\n',
273
 
                         branch.repository.get_signature_text('A'))
274
 
 
275
 
    def test_branch_keeps_signatures(self):
276
 
        wt = self.make_branch_and_tree('source')
277
 
        wt.commit('A', allow_pointless=True, rev_id='A')
278
 
        repo = wt.branch.repository
279
 
        repo.lock_write()
280
 
        repo.start_write_group()
281
 
        repo.sign_revision('A', gpg.LoopbackGPGStrategy(None))
282
 
        repo.commit_write_group()
283
 
        repo.unlock()
284
 
        #FIXME: clone should work to urls,
285
 
        # wt.clone should work to disks.
286
 
        self.build_tree(['target/'])
287
 
        d2 = repo.bzrdir.clone(urlutils.local_path_to_url('target'))
288
 
        self.assertEqual(repo.get_signature_text('A'),
289
 
                         d2.open_repository().get_signature_text('A'))
290
 
 
291
 
    def test_nicks(self):
292
 
        """Test explicit and implicit branch nicknames.
293
 
        
294
 
        Nicknames are implicitly the name of the branch's directory, unless an
295
 
        explicit nickname is set.  That is, an explicit nickname always
296
 
        overrides the implicit one.
297
 
        """
298
 
        t = get_transport(self.get_url())
299
 
        branch = self.make_branch('bzr.dev')
300
 
        # The nick will be 'bzr.dev', because there is no explicit nick set.
301
 
        self.assertEqual(branch.nick, 'bzr.dev')
302
 
        # Move the branch to a different directory, 'bzr.ab'.  Now that branch
303
 
        # will report its nick as 'bzr.ab'.
304
 
        t.move('bzr.dev', 'bzr.ab')
305
 
        branch = Branch.open(self.get_url('bzr.ab'))
306
 
        self.assertEqual(branch.nick, 'bzr.ab')
307
 
        # Set the branch nick explicitly.  This will ensure there's a branch
308
 
        # config file in the branch.
309
 
        branch.nick = "Aaron's branch"
310
 
        branch.nick = "Aaron's branch"
311
 
        if not isinstance(branch, remote.RemoteBranch):
312
 
            controlfilename = branch.control_files.controlfilename
313
 
            self.failUnless(t.has(t.relpath(controlfilename("branch.conf"))))
314
 
        # Because the nick has been set explicitly, the nick is now always
315
 
        # "Aaron's branch", regardless of directory name.
316
 
        self.assertEqual(branch.nick, "Aaron's branch")
317
 
        t.move('bzr.ab', 'integration')
318
 
        branch = Branch.open(self.get_url('integration'))
319
 
        self.assertEqual(branch.nick, "Aaron's branch")
320
 
        branch.nick = u"\u1234"
321
 
        self.assertEqual(branch.nick, u"\u1234")
322
 
 
323
 
    def test_commit_nicks(self):
324
 
        """Nicknames are committed to the revision"""
325
 
        wt = self.make_branch_and_tree('bzr.dev')
326
 
        branch = wt.branch
327
 
        branch.nick = "My happy branch"
328
 
        wt.commit('My commit respect da nick.')
329
 
        committed = branch.repository.get_revision(branch.last_revision())
330
 
        self.assertEqual(committed.properties["branch-nick"],
331
 
                         "My happy branch")
332
 
 
333
 
    def test_create_open_branch_uses_repository(self):
334
 
        try:
335
 
            repo = self.make_repository('.', shared=True)
336
 
        except errors.IncompatibleFormat:
337
 
            return
338
 
        child_transport = repo.bzrdir.root_transport.clone('child')
339
 
        child_transport.mkdir('.')
340
 
        child_dir = self.bzrdir_format.initialize_on_transport(child_transport)
341
 
        try:
342
 
            child_branch = self.branch_format.initialize(child_dir)
343
 
        except errors.UninitializableFormat:
344
 
            # branch references are not default init'able.
345
 
            return
346
 
        self.assertEqual(repo.bzrdir.root_transport.base,
347
 
                         child_branch.repository.bzrdir.root_transport.base)
348
 
        child_branch = branch.Branch.open(self.get_url('child'))
349
 
        self.assertEqual(repo.bzrdir.root_transport.base,
350
 
                         child_branch.repository.bzrdir.root_transport.base)
351
 
 
352
 
    def test_format_description(self):
353
 
        tree = self.make_branch_and_tree('tree')
354
 
        text = tree.branch._format.get_format_description()
355
 
        self.failUnless(len(text))
356
 
 
357
 
    def test_check_branch_report_results(self):
358
 
        """Checking a branch produces results which can be printed"""
359
 
        branch = self.make_branch('.')
360
 
        result = branch.check()
361
 
        # reports results through logging
362
 
        result.report_results(verbose=True)
363
 
        result.report_results(verbose=False)
364
 
 
365
 
    def test_get_commit_builder(self):
366
 
        branch = self.make_branch(".")
367
 
        branch.lock_write()
368
 
        builder = branch.get_commit_builder([])
369
 
        self.assertIsInstance(builder, repository.CommitBuilder)
370
 
        branch.repository.commit_write_group()
371
 
        branch.unlock()
372
 
 
373
 
    def test_generate_revision_history(self):
374
 
        """Create a fake revision history easily."""
375
 
        tree = self.make_branch_and_tree('.')
376
 
        rev1 = tree.commit('foo')
377
 
        orig_history = tree.branch.revision_history()
378
 
        rev2 = tree.commit('bar', allow_pointless=True)
379
 
        tree.branch.generate_revision_history(rev1)
380
 
        self.assertEqual(orig_history, tree.branch.revision_history())
381
 
 
382
 
    def test_generate_revision_history_NULL_REVISION(self):
383
 
        tree = self.make_branch_and_tree('.')
384
 
        rev1 = tree.commit('foo')
385
 
        tree.branch.generate_revision_history(bzrlib.revision.NULL_REVISION)
386
 
        self.assertEqual([], tree.branch.revision_history())
387
 
 
388
 
    def test_create_checkout(self):
389
 
        tree_a = self.make_branch_and_tree('a')
390
 
        branch_a = tree_a.branch
391
 
        checkout_b = branch_a.create_checkout('b')
392
 
        self.assertEqual('null:', checkout_b.last_revision())
393
 
        checkout_b.commit('rev1', rev_id='rev1')
394
 
        self.assertEqual('rev1', branch_a.last_revision())
395
 
        self.assertNotEqual(checkout_b.branch.base, branch_a.base)
396
 
 
397
 
        checkout_c = branch_a.create_checkout('c', lightweight=True)
398
 
        self.assertEqual('rev1', checkout_c.last_revision())
399
 
        checkout_c.commit('rev2', rev_id='rev2')
400
 
        self.assertEqual('rev2', branch_a.last_revision())
401
 
        self.assertEqual(checkout_c.branch.base, branch_a.base)
402
 
 
403
 
        os.mkdir('d')
404
 
        checkout_d = branch_a.create_checkout('d', lightweight=True)
405
 
        self.assertEqual('rev2', checkout_d.last_revision())
406
 
        os.mkdir('e')
407
 
        checkout_e = branch_a.create_checkout('e')
408
 
        self.assertEqual('rev2', checkout_e.last_revision())
409
 
 
410
 
    def test_create_anonymous_lightweight_checkout(self):
411
 
        """A lightweight checkout from a readonly branch should succeed."""
412
 
        tree_a = self.make_branch_and_tree('a')
413
 
        rev_id = tree_a.commit('put some content in the branch')
414
 
        # open the branch via a readonly transport
415
 
        source_branch = bzrlib.branch.Branch.open(self.get_readonly_url('a'))
416
 
        # sanity check that the test will be valid
417
 
        self.assertRaises((errors.LockError, errors.TransportNotPossible),
418
 
            source_branch.lock_write)
419
 
        checkout = source_branch.create_checkout('c', lightweight=True)
420
 
        self.assertEqual(rev_id, checkout.last_revision())
421
 
 
422
 
    def test_create_anonymous_heavyweight_checkout(self):
423
 
        """A regular checkout from a readonly branch should succeed."""
424
 
        tree_a = self.make_branch_and_tree('a')
425
 
        rev_id = tree_a.commit('put some content in the branch')
426
 
        # open the branch via a readonly transport
427
 
        source_branch = bzrlib.branch.Branch.open(self.get_readonly_url('a'))
428
 
        # sanity check that the test will be valid
429
 
        self.assertRaises((errors.LockError, errors.TransportNotPossible),
430
 
            source_branch.lock_write)
431
 
        checkout = source_branch.create_checkout('c')
432
 
        self.assertEqual(rev_id, checkout.last_revision())
433
 
 
434
 
    def test_set_revision_history(self):
435
 
        tree = self.make_branch_and_tree('a')
436
 
        tree.commit('a commit', rev_id='rev1')
437
 
        br = tree.branch
438
 
        br.set_revision_history(["rev1"])
439
 
        self.assertEquals(br.revision_history(), ["rev1"])
440
 
        br.set_revision_history([])
441
 
        self.assertEquals(br.revision_history(), [])
442
 
 
443
 
 
444
 
class ChrootedTests(TestCaseWithBranch):
445
 
    """A support class that provides readonly urls outside the local namespace.
446
 
 
447
 
    This is done by checking if self.transport_server is a MemoryServer. if it
448
 
    is then we are chrooted already, if it is not then an HttpServer is used
449
 
    for readonly urls.
450
 
    """
451
 
 
452
 
    def setUp(self):
453
 
        super(ChrootedTests, self).setUp()
454
 
        if not self.vfs_transport_factory == MemoryServer:
455
 
            self.transport_readonly_server = HttpServer
456
 
 
457
 
    def test_open_containing(self):
458
 
        self.assertRaises(NotBranchError, Branch.open_containing,
459
 
                          self.get_readonly_url(''))
460
 
        self.assertRaises(NotBranchError, Branch.open_containing,
461
 
                          self.get_readonly_url('g/p/q'))
462
 
        branch = self.make_branch('.')
463
 
        branch, relpath = Branch.open_containing(self.get_readonly_url(''))
464
 
        self.assertEqual('', relpath)
465
 
        branch, relpath = Branch.open_containing(self.get_readonly_url('g/p/q'))
466
 
        self.assertEqual('g/p/q', relpath)
467
 
        
468
 
 
469
 
class InstrumentedTransaction(object):
470
 
 
471
 
    def finish(self):
472
 
        self.calls.append('finish')
473
 
 
474
 
    def __init__(self):
475
 
        self.calls = []
476
 
 
477
 
 
478
 
class TestDecorator(object):
479
 
 
480
 
    def __init__(self):
481
 
        self._calls = []
482
 
 
483
 
    def lock_read(self):
484
 
        self._calls.append('lr')
485
 
 
486
 
    def lock_write(self):
487
 
        self._calls.append('lw')
488
 
 
489
 
    def unlock(self):
490
 
        self._calls.append('ul')
491
 
 
492
 
    @needs_read_lock
493
 
    def do_with_read(self):
494
 
        return 1
495
 
 
496
 
    @needs_read_lock
497
 
    def except_with_read(self):
498
 
        raise RuntimeError
499
 
 
500
 
    @needs_write_lock
501
 
    def do_with_write(self):
502
 
        return 2
503
 
 
504
 
    @needs_write_lock
505
 
    def except_with_write(self):
506
 
        raise RuntimeError
507
 
 
508
 
 
509
 
class TestDecorators(TestCase):
510
 
 
511
 
    def test_needs_read_lock(self):
512
 
        branch = TestDecorator()
513
 
        self.assertEqual(1, branch.do_with_read())
514
 
        self.assertEqual(['lr', 'ul'], branch._calls)
515
 
 
516
 
    def test_excepts_in_read_lock(self):
517
 
        branch = TestDecorator()
518
 
        self.assertRaises(RuntimeError, branch.except_with_read)
519
 
        self.assertEqual(['lr', 'ul'], branch._calls)
520
 
 
521
 
    def test_needs_write_lock(self):
522
 
        branch = TestDecorator()
523
 
        self.assertEqual(2, branch.do_with_write())
524
 
        self.assertEqual(['lw', 'ul'], branch._calls)
525
 
 
526
 
    def test_excepts_in_write_lock(self):
527
 
        branch = TestDecorator()
528
 
        self.assertRaises(RuntimeError, branch.except_with_write)
529
 
        self.assertEqual(['lw', 'ul'], branch._calls)
530
 
 
531
 
 
532
 
class TestBranchPushLocations(TestCaseWithBranch):
533
 
 
534
 
    def test_get_push_location_unset(self):
535
 
        self.assertEqual(None, self.get_branch().get_push_location())
536
 
 
537
 
    def test_get_push_location_exact(self):
538
 
        from bzrlib.config import (locations_config_filename,
539
 
                                   ensure_config_dir_exists)
540
 
        ensure_config_dir_exists()
541
 
        fn = locations_config_filename()
542
 
        print >> open(fn, 'wt'), ("[%s]\n"
543
 
                                  "push_location=foo" %
544
 
                                  self.get_branch().base[:-1])
545
 
        self.assertEqual("foo", self.get_branch().get_push_location())
546
 
 
547
 
    def test_set_push_location(self):
548
 
        branch = self.get_branch()
549
 
        branch.set_push_location('foo')
550
 
        self.assertEqual('foo', branch.get_push_location())
551
 
 
552
 
 
553
 
class TestFormat(TestCaseWithBranch):
554
 
    """Tests for the format itself."""
555
 
 
556
 
    def test_get_reference(self):
557
 
        """get_reference on all regular branches should return None."""
558
 
        if not self.branch_format.is_supported():
559
 
            # unsupported formats are not loopback testable
560
 
            # because the default open will not open them and
561
 
            # they may not be initializable.
562
 
            return
563
 
        made_branch = self.make_branch('.')
564
 
        self.assertEqual(None,
565
 
            made_branch._format.get_reference(made_branch.bzrdir))
566
 
 
567
 
    def test_format_initialize_find_open(self):
568
 
        # loopback test to check the current format initializes to itself.
569
 
        if not self.branch_format.is_supported():
570
 
            # unsupported formats are not loopback testable
571
 
            # because the default open will not open them and
572
 
            # they may not be initializable.
573
 
            return
574
 
        # supported formats must be able to init and open
575
 
        t = get_transport(self.get_url())
576
 
        readonly_t = get_transport(self.get_readonly_url())
577
 
        made_branch = self.make_branch('.')
578
 
        self.failUnless(isinstance(made_branch, branch.Branch))
579
 
 
580
 
        # find it via bzrdir opening:
581
 
        opened_control = bzrdir.BzrDir.open(readonly_t.base)
582
 
        direct_opened_branch = opened_control.open_branch()
583
 
        self.assertEqual(direct_opened_branch.__class__, made_branch.__class__)
584
 
        self.assertEqual(opened_control, direct_opened_branch.bzrdir)
585
 
        self.failUnless(isinstance(direct_opened_branch._format,
586
 
                        self.branch_format.__class__))
587
 
 
588
 
        # find it via Branch.open
589
 
        opened_branch = branch.Branch.open(readonly_t.base)
590
 
        self.failUnless(isinstance(opened_branch, made_branch.__class__))
591
 
        self.assertEqual(made_branch._format.__class__,
592
 
                         opened_branch._format.__class__)
593
 
        # if it has a unique id string, can we probe for it ?
594
 
        try:
595
 
            self.branch_format.get_format_string()
596
 
        except NotImplementedError:
597
 
            return
598
 
        self.assertEqual(self.branch_format,
599
 
                         opened_control.find_branch_format())
600
 
 
601
 
 
602
 
class TestBound(TestCaseWithBranch):
603
 
 
604
 
    def test_bind_unbind(self):
605
 
        branch = self.make_branch('1')
606
 
        branch2 = self.make_branch('2')
607
 
        try:
608
 
            branch.bind(branch2)
609
 
        except errors.UpgradeRequired:
610
 
            raise TestSkipped('Format does not support binding')
611
 
        self.assertTrue(branch.unbind())
612
 
        self.assertFalse(branch.unbind())
613
 
        self.assertIs(None, branch.get_bound_location())
614
 
 
615
 
    def test_old_bound_location(self):
616
 
        branch = self.make_branch('branch1')
617
 
        try:
618
 
            self.assertIs(None, branch.get_old_bound_location())
619
 
        except errors.UpgradeRequired:
620
 
            raise TestSkipped('Format does not store old bound locations')
621
 
        branch2 = self.make_branch('branch2')
622
 
        branch.bind(branch2)
623
 
        self.assertIs(None, branch.get_old_bound_location())
624
 
        branch.unbind()
625
 
        self.assertContainsRe(branch.get_old_bound_location(), '\/branch2\/$')
626
 
 
627
 
 
628
 
class TestStrict(TestCaseWithBranch):
629
 
 
630
 
    def test_strict_history(self):
631
 
        tree1 = self.make_branch_and_tree('tree1')
632
 
        try:
633
 
            tree1.branch.set_append_revisions_only(True)
634
 
        except errors.UpgradeRequired:
635
 
            raise TestSkipped('Format does not support strict history')
636
 
        tree1.commit('empty commit')
637
 
        tree2 = tree1.bzrdir.sprout('tree2').open_workingtree()
638
 
        tree2.commit('empty commit 2')
639
 
        tree1.pull(tree2.branch)
640
 
        tree1.commit('empty commit 3')
641
 
        tree2.commit('empty commit 4')
642
 
        self.assertRaises(errors.DivergedBranches, tree1.pull, tree2.branch)
643
 
        tree2.merge_from_branch(tree1.branch)
644
 
        tree2.commit('empty commit 5')
645
 
        self.assertRaises(errors.AppendRevisionsOnlyViolation, tree1.pull,
646
 
                          tree2.branch)
647
 
        tree3 = tree1.bzrdir.sprout('tree3').open_workingtree()
648
 
        tree3.merge_from_branch(tree2.branch)
649
 
        tree3.commit('empty commit 6')
650
 
        tree2.pull(tree3.branch)
 
17
from bzrlib.selftest import TestCaseInTempDir
 
18
 
 
19
 
 
20
class TestAppendRevisions(TestCaseInTempDir):
 
21
    """Test appending more than one revision"""
 
22
    def test_append_revisions(self):
 
23
        from bzrlib.branch import Branch
 
24
        br = Branch(".", init=True)
 
25
        br.append_revision("rev1")
 
26
        self.assertEquals(br.revision_history(), ["rev1",])
 
27
        br.append_revision("rev2", "rev3")
 
28
        self.assertEquals(br.revision_history(), ["rev1", "rev2", "rev3"])
 
29
 
 
30
 
 
31
# TODO: rewrite this as a regular unittest, without relying on the displayed output        
 
32
#         >>> from bzrlib.commit import commit
 
33
#         >>> bzrlib.trace.silent = True
 
34
#         >>> br1 = ScratchBranch(files=['foo', 'bar'])
 
35
#         >>> br1.add('foo')
 
36
#         >>> br1.add('bar')
 
37
#         >>> commit(br1, "lala!", rev_id="REVISION-ID-1", verbose=False)
 
38
#         >>> br2 = ScratchBranch()
 
39
#         >>> br2.update_revisions(br1)
 
40
#         Added 2 texts.
 
41
#         Added 1 inventories.
 
42
#         Added 1 revisions.
 
43
#         >>> br2.revision_history()
 
44
#         [u'REVISION-ID-1']
 
45
#         >>> br2.update_revisions(br1)
 
46
#         Added 0 revisions.
 
47
#         >>> br1.text_store.total_size() == br2.text_store.total_size()
 
48
#         True