~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/branch_implementations/test_branch.py

  • Committer: Robert Collins
  • Date: 2007-04-23 02:29:35 UTC
  • mfrom: (2441 +trunk)
  • mto: This revision was merged to the branch mainline in revision 2442.
  • Revision ID: robertc@robertcollins.net-20070423022935-9hhongamvk6bfdso
Resolve conflicts with bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
26
26
    gpg,
27
27
    urlutils,
28
28
    transactions,
 
29
    remote,
29
30
    repository,
30
31
    )
31
32
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
48
49
from bzrlib.workingtree import WorkingTree
49
50
 
50
51
 
51
 
# TODO: Make a branch using basis branch, and check that it 
52
 
# doesn't request any files that could have been avoided, by 
53
 
# hooking into the Transport.
54
 
 
55
 
 
56
52
class TestCaseWithBranch(TestCaseWithBzrDir):
57
53
 
58
54
    def setUp(self):
118
114
 
119
115
    def test_fetch_revisions(self):
120
116
        """Test fetch-revision operation."""
121
 
        get_transport(self.get_url()).mkdir('b1')
122
 
        get_transport(self.get_url()).mkdir('b2')
123
117
        wt = self.make_branch_and_tree('b1')
124
118
        b1 = wt.branch
125
 
        b2 = self.make_branch('b2')
126
 
        file('b1/foo', 'w').write('hello')
 
119
        self.build_tree_contents([('b1/foo', 'hello')])
127
120
        wt.add(['foo'], ['foo-id'])
128
121
        wt.commit('lala!', rev_id='revision-1', allow_pointless=False)
129
122
 
130
 
        mutter('start fetch')
 
123
        b2 = self.make_branch('b2')
131
124
        self.assertEqual((1, []), b2.fetch(b1))
132
125
 
133
126
        rev = b2.repository.get_revision('revision-1')
152
145
 
153
146
    def get_unbalanced_tree_pair(self):
154
147
        """Return two branches, a and b, with one file in a."""
155
 
        get_transport(self.get_url()).mkdir('a')
156
148
        tree_a = self.make_branch_and_tree('a')
157
 
        file('a/b', 'wb').write('b')
 
149
        self.build_tree_contents([('a/b', 'b')])
158
150
        tree_a.add('b')
159
151
        tree_a.commit("silly commit", rev_id='A')
160
152
 
161
 
        get_transport(self.get_url()).mkdir('b')
162
153
        tree_b = self.make_branch_and_tree('b')
163
154
        return tree_a, tree_b
164
155
 
168
159
        tree_b.branch.repository.fetch(tree_a.branch.repository)
169
160
        return tree_a, tree_b
170
161
 
171
 
    def test_clone_branch(self):
172
 
        """Copy the stores from one branch to another"""
173
 
        tree_a, tree_b = self.get_balanced_branch_pair()
174
 
        tree_b.commit("silly commit")
175
 
        os.mkdir('c')
176
 
        # this fails to test that the history from a was not used.
177
 
        dir_c = tree_a.bzrdir.clone('c', basis=tree_b.bzrdir)
178
 
        self.assertEqual(tree_a.branch.revision_history(),
179
 
                         dir_c.open_branch().revision_history())
180
 
 
181
162
    def test_clone_partial(self):
182
163
        """Copy only part of the history of a branch."""
183
164
        # TODO: RBC 20060208 test with a revision not on revision-history.
184
165
        #       what should that behaviour be ? Emailed the list.
 
166
        # First, make a branch with two commits.
185
167
        wt_a = self.make_branch_and_tree('a')
186
168
        self.build_tree(['a/one'])
187
169
        wt_a.add(['one'])
189
171
        self.build_tree(['a/two'])
190
172
        wt_a.add(['two'])
191
173
        wt_a.commit('commit two', rev_id='2')
 
174
        # Now make a copy of the repository.
192
175
        repo_b = self.make_repository('b')
193
 
        wt_a.bzrdir.open_repository().copy_content_into(repo_b)
194
 
        br_b = wt_a.bzrdir.open_branch().clone(repo_b.bzrdir, revision_id='1')
 
176
        wt_a.branch.repository.copy_content_into(repo_b)
 
177
        # wt_a might be a lightweight checkout, so get a hold of the actual
 
178
        # branch (because you can't do a partial clone of a lightweight
 
179
        # checkout).
 
180
        branch = wt_a.branch.bzrdir.open_branch()
 
181
        # Then make a branch where the new repository is, but specify a revision
 
182
        # ID.  The new branch's history will stop at the specified revision.
 
183
        br_b = branch.clone(repo_b.bzrdir, revision_id='1')
195
184
        self.assertEqual('1', br_b.last_revision())
196
185
 
197
186
    def test_sprout_partial(self):
205
194
        wt_a.add(['two'])
206
195
        wt_a.commit('commit two', rev_id='2')
207
196
        repo_b = self.make_repository('b')
208
 
        wt_a.bzrdir.open_repository().copy_content_into(repo_b)
209
 
        br_b = wt_a.bzrdir.open_branch().sprout(repo_b.bzrdir, revision_id='1')
 
197
        repo_a = wt_a.branch.repository
 
198
        repo_a.copy_content_into(repo_b)
 
199
        br_b = wt_a.branch.sprout(repo_b.bzrdir, revision_id='1')
210
200
        self.assertEqual('1', br_b.last_revision())
211
201
 
212
202
    def get_parented_branch(self):
240
230
        branch_d = branch_b.clone(repo_d.bzrdir)
241
231
        self.assertEqual(random_parent, branch_d.get_parent())
242
232
 
243
 
    def test_copy_content_incomplete(self):
244
 
        tree = self.make_branch_and_tree('commit_tree')
245
 
        self.build_tree(['foo'], transport=tree.bzrdir.root_transport)
246
 
        tree.add('foo')
247
 
        tree.commit('revision 1', rev_id='1')
248
 
        source = self.make_branch_and_tree('source')
249
 
        # this gives us an incomplete repository
250
 
        tree.bzrdir.open_repository().copy_content_into(
251
 
            source.branch.repository)
252
 
        tree.commit('revision 2', rev_id='2', allow_pointless=True)
253
 
        tree.bzrdir.open_branch().copy_content_into(source.branch)
254
 
 
255
 
 
256
233
    def test_sprout_branch_nickname(self):
257
234
        # test the nick name is reset always
258
235
        raise TestSkipped('XXX branch sprouting is not yet tested..')
271
248
        branch.set_submit_branch('sftp://example.net')
272
249
        self.assertEqual(branch.get_submit_branch(), 'sftp://example.net')
273
250
        
 
251
    def test_public_branch(self):
 
252
        """public location can be queried and set"""
 
253
        branch = self.make_branch('branch')
 
254
        self.assertEqual(branch.get_public_branch(), None)
 
255
        branch.set_public_branch('sftp://example.com')
 
256
        self.assertEqual(branch.get_public_branch(), 'sftp://example.com')
 
257
        branch.set_public_branch('sftp://example.net')
 
258
        self.assertEqual(branch.get_public_branch(), 'sftp://example.net')
 
259
        branch.set_public_branch(None)
 
260
        self.assertEqual(branch.get_public_branch(), None)
 
261
 
274
262
    def test_record_initial_ghost(self):
275
263
        """Branches should support having ghosts."""
276
264
        wt = self.make_branch_and_tree('.')
314
302
        from bzrlib.testament import Testament
315
303
        strategy = gpg.LoopbackGPGStrategy(None)
316
304
        branch.repository.sign_revision('A', strategy)
317
 
        self.assertEqual(Testament.from_revision(branch.repository, 
318
 
                         'A').as_short_text(),
 
305
        self.assertEqual('-----BEGIN PSEUDO-SIGNED CONTENT-----\n' +
 
306
                         Testament.from_revision(branch.repository,
 
307
                         'A').as_short_text() +
 
308
                         '-----END PSEUDO-SIGNED CONTENT-----\n',
319
309
                         branch.repository.get_signature_text('A'))
320
310
 
321
311
    def test_store_signature(self):
327
317
                          branch.repository.has_signature_for_revision_id,
328
318
                          'A')
329
319
        wt.commit("base", allow_pointless=True, rev_id='A')
330
 
        self.assertEqual('FOO', 
 
320
        self.assertEqual('-----BEGIN PSEUDO-SIGNED CONTENT-----\n'
 
321
                         'FOO-----END PSEUDO-SIGNED CONTENT-----\n',
331
322
                         branch.repository.get_signature_text('A'))
332
323
 
333
324
    def test_branch_keeps_signatures(self):
334
325
        wt = self.make_branch_and_tree('source')
335
326
        wt.commit('A', allow_pointless=True, rev_id='A')
336
 
        wt.branch.repository.sign_revision('A',
337
 
            gpg.LoopbackGPGStrategy(None))
 
327
        repo = wt.branch.repository
 
328
        repo.sign_revision('A', gpg.LoopbackGPGStrategy(None))
338
329
        #FIXME: clone should work to urls,
339
330
        # wt.clone should work to disks.
340
331
        self.build_tree(['target/'])
341
 
        d2 = wt.bzrdir.clone('target')
342
 
        self.assertEqual(wt.branch.repository.get_signature_text('A'),
 
332
        d2 = repo.bzrdir.clone(urlutils.local_path_to_url('target'))
 
333
        self.assertEqual(repo.get_signature_text('A'),
343
334
                         d2.open_repository().get_signature_text('A'))
344
335
 
345
336
    def test_nicks(self):
346
 
        """Branch nicknames"""
 
337
        """Test explicit and implicit branch nicknames.
 
338
        
 
339
        Nicknames are implicitly the name of the branch's directory, unless an
 
340
        explicit nickname is set.  That is, an explicit nickname always
 
341
        overrides the implicit one.
 
342
        """
347
343
        t = get_transport(self.get_url())
348
 
        t.mkdir('bzr.dev')
349
344
        branch = self.make_branch('bzr.dev')
 
345
        # The nick will be 'bzr.dev', because there is no explicit nick set.
350
346
        self.assertEqual(branch.nick, 'bzr.dev')
 
347
        # Move the branch to a different directory, 'bzr.ab'.  Now that branch
 
348
        # will report its nick as 'bzr.ab'.
351
349
        t.move('bzr.dev', 'bzr.ab')
352
350
        branch = Branch.open(self.get_url('bzr.ab'))
353
351
        self.assertEqual(branch.nick, 'bzr.ab')
354
 
        branch.nick = "Aaron's branch"
355
 
        branch.nick = "Aaron's branch"
356
 
        self.failUnless(
357
 
            t.has(
358
 
                t.relpath(
359
 
                    branch.control_files.controlfilename("branch.conf")
360
 
                    )
361
 
                )
362
 
            )
 
352
        # Set the branch nick explicitly.  This will ensure there's a branch
 
353
        # config file in the branch.
 
354
        branch.nick = "Aaron's branch"
 
355
        branch.nick = "Aaron's branch"
 
356
        if not isinstance(branch, remote.RemoteBranch):
 
357
            controlfilename = branch.control_files.controlfilename
 
358
            self.failUnless(t.has(t.relpath(controlfilename("branch.conf"))))
 
359
        # Because the nick has been set explicitly, the nick is now always
 
360
        # "Aaron's branch", regardless of directory name.
363
361
        self.assertEqual(branch.nick, "Aaron's branch")
364
362
        t.move('bzr.ab', 'integration')
365
363
        branch = Branch.open(self.get_url('integration'))
367
365
        branch.nick = u"\u1234"
368
366
        self.assertEqual(branch.nick, u"\u1234")
369
367
 
 
368
    def test_commit_nicks(self):
 
369
        """Nicknames are committed to the revision"""
 
370
        wt = self.make_branch_and_tree('bzr.dev')
 
371
        branch = wt.branch
 
372
        branch.nick = "My happy branch"
 
373
        wt.commit('My commit respect da nick.')
 
374
        committed = branch.repository.get_revision(branch.last_revision())
 
375
        self.assertEqual(committed.properties["branch-nick"],
 
376
                         "My happy branch")
 
377
 
370
378
    def test_create_open_branch_uses_repository(self):
371
379
        try:
372
380
            repo = self.make_repository('.', shared=True)
373
381
        except errors.IncompatibleFormat:
374
382
            return
375
 
        repo.bzrdir.root_transport.mkdir('child')
376
 
        child_dir = self.bzrdir_format.initialize('child')
 
383
        child_transport = repo.bzrdir.root_transport.clone('child')
 
384
        child_transport.mkdir('.')
 
385
        child_dir = self.bzrdir_format.initialize_on_transport(child_transport)
377
386
        try:
378
387
            child_branch = self.branch_format.initialize(child_dir)
379
388
        except errors.UninitializableFormat:
443
452
        """A lightweight checkout from a readonly branch should succeed."""
444
453
        tree_a = self.make_branch_and_tree('a')
445
454
        rev_id = tree_a.commit('put some content in the branch')
446
 
        source_branch = bzrlib.branch.Branch.open(
447
 
            'readonly+' + tree_a.bzrdir.root_transport.base)
 
455
        # open the branch via a readonly transport
 
456
        source_branch = bzrlib.branch.Branch.open(self.get_readonly_url('a'))
448
457
        # sanity check that the test will be valid
449
458
        self.assertRaises((errors.LockError, errors.TransportNotPossible),
450
459
            source_branch.lock_write)
455
464
        """A regular checkout from a readonly branch should succeed."""
456
465
        tree_a = self.make_branch_and_tree('a')
457
466
        rev_id = tree_a.commit('put some content in the branch')
458
 
        source_branch = bzrlib.branch.Branch.open(
459
 
            'readonly+' + tree_a.bzrdir.root_transport.base)
 
467
        # open the branch via a readonly transport
 
468
        source_branch = bzrlib.branch.Branch.open(self.get_readonly_url('a'))
460
469
        # sanity check that the test will be valid
461
470
        self.assertRaises((errors.LockError, errors.TransportNotPossible),
462
471
            source_branch.lock_write)
463
472
        checkout = source_branch.create_checkout('c')
464
473
        self.assertEqual(rev_id, checkout.last_revision())
465
474
 
 
475
    def test_set_revision_history(self):
 
476
        tree = self.make_branch_and_tree('a')
 
477
        tree.commit('a commit', rev_id='rev1')
 
478
        br = tree.branch
 
479
        br.set_revision_history(["rev1"])
 
480
        self.assertEquals(br.revision_history(), ["rev1"])
 
481
        br.set_revision_history([])
 
482
        self.assertEquals(br.revision_history(), [])
 
483
 
466
484
 
467
485
class ChrootedTests(TestCaseWithBranch):
468
486
    """A support class that provides readonly urls outside the local namespace.
474
492
 
475
493
    def setUp(self):
476
494
        super(ChrootedTests, self).setUp()
477
 
        if not self.transport_server == MemoryServer:
 
495
        if not self.vfs_transport_factory == MemoryServer:
478
496
            self.transport_readonly_server = HttpServer
479
497
 
480
498
    def test_open_containing(self):
552
570
        self.assertEqual(['lw', 'ul'], branch._calls)
553
571
 
554
572
 
555
 
class TestBranchTransaction(TestCaseWithBranch):
556
 
 
557
 
    def setUp(self):
558
 
        super(TestBranchTransaction, self).setUp()
559
 
        self.branch = None
560
 
        
561
 
    def test_default_get_transaction(self):
562
 
        """branch.get_transaction on a new branch should give a PassThrough."""
563
 
        self.failUnless(isinstance(self.get_branch().get_transaction(),
564
 
                                   transactions.PassThroughTransaction))
565
 
 
566
 
    def test__set_new_transaction(self):
567
 
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
568
 
 
569
 
    def test__set_over_existing_transaction_raises(self):
570
 
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
571
 
        self.assertRaises(errors.LockError,
572
 
                          self.get_branch()._set_transaction,
573
 
                          transactions.ReadOnlyTransaction())
574
 
 
575
 
    def test_finish_no_transaction_raises(self):
576
 
        self.assertRaises(errors.LockError, self.get_branch()._finish_transaction)
577
 
 
578
 
    def test_finish_readonly_transaction_works(self):
579
 
        self.get_branch()._set_transaction(transactions.ReadOnlyTransaction())
580
 
        self.get_branch()._finish_transaction()
581
 
        self.assertEqual(None, self.get_branch().control_files._transaction)
582
 
 
583
 
    def test_unlock_calls_finish(self):
584
 
        self.get_branch().lock_read()
585
 
        transaction = InstrumentedTransaction()
586
 
        self.get_branch().control_files._transaction = transaction
587
 
        self.get_branch().unlock()
588
 
        self.assertEqual(['finish'], transaction.calls)
589
 
 
590
 
    def test_lock_read_acquires_ro_transaction(self):
591
 
        self.get_branch().lock_read()
592
 
        self.failUnless(isinstance(self.get_branch().get_transaction(),
593
 
                                   transactions.ReadOnlyTransaction))
594
 
        self.get_branch().unlock()
595
 
        
596
 
    def test_lock_write_acquires_write_transaction(self):
597
 
        self.get_branch().lock_write()
598
 
        # cannot use get_transaction as its magic
599
 
        self.failUnless(isinstance(self.get_branch().control_files._transaction,
600
 
                                   transactions.WriteTransaction))
601
 
        self.get_branch().unlock()
602
 
 
603
 
 
604
573
class TestBranchPushLocations(TestCaseWithBranch):
605
574
 
606
575
    def test_get_push_location_unset(self):
625
594
class TestFormat(TestCaseWithBranch):
626
595
    """Tests for the format itself."""
627
596
 
 
597
    def test_get_reference(self):
 
598
        """get_reference on all regular branches should return None."""
 
599
        if not self.branch_format.is_supported():
 
600
            # unsupported formats are not loopback testable
 
601
            # because the default open will not open them and
 
602
            # they may not be initializable.
 
603
            return
 
604
        made_branch = self.make_branch('.')
 
605
        self.assertEqual(None,
 
606
            made_branch._format.get_reference(made_branch.bzrdir))
 
607
 
628
608
    def test_format_initialize_find_open(self):
629
609
        # loopback test to check the current format initializes to itself.
630
610
        if not self.branch_format.is_supported():
657
637
        except NotImplementedError:
658
638
            return
659
639
        self.assertEqual(self.branch_format,
660
 
                         branch.BranchFormat.find_format(opened_control))
 
640
                         opened_control.find_branch_format())
661
641
 
662
642
 
663
643
class TestBound(TestCaseWithBranch):