~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_repository.py

  • Committer: Jelmer Vernooij
  • Date: 2011-02-19 15:23:08 UTC
  • mto: (5582.12.2 weave-plugin)
  • mto: This revision was merged to the branch mainline in revision 5718.
  • Revision ID: jelmer@samba.org-20110219152308-5shhc4rj0ez4oa12
move xml4 to weave plugin.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2011 Canonical Ltd
 
1
# Copyright (C) 2006-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
428
428
        self.assertGetsDefaultInterRepository(dummy_a, dummy_b)
429
429
 
430
430
 
431
 
class TestRepositoryFormat1(knitrepo.RepositoryFormatKnit1):
432
 
 
433
 
    def get_format_string(self):
434
 
        return "Test Format 1"
435
 
 
436
 
 
437
 
class TestRepositoryFormat2(knitrepo.RepositoryFormatKnit1):
438
 
 
439
 
    def get_format_string(self):
440
 
        return "Test Format 2"
441
 
 
442
 
 
443
431
class TestRepositoryConverter(TestCaseWithTransport):
444
432
 
445
433
    def test_convert_empty(self):
446
 
        source_format = TestRepositoryFormat1()
447
 
        target_format = TestRepositoryFormat2()
448
 
        repository.format_registry.register(source_format)
449
 
        self.addCleanup(repository.format_registry.remove,
450
 
            source_format)
451
 
        repository.format_registry.register(target_format)
452
 
        self.addCleanup(repository.format_registry.remove,
453
 
            target_format)
454
 
        t = self.get_transport()
 
434
        from bzrlib.plugins.weave_fmt.repository import RepositoryFormat7
 
435
        t = transport.get_transport(".")
455
436
        t.mkdir('repository')
456
437
        repo_dir = bzrdir.BzrDirMetaFormat1().initialize('repository')
457
 
        repo = TestRepositoryFormat1().initialize(repo_dir)
 
438
        repo = RepositoryFormat7().initialize(repo_dir)
 
439
        target_format = knitrepo.RepositoryFormatKnit1()
458
440
        converter = repository.CopyConverter(target_format)
459
441
        pb = bzrlib.ui.ui_factory.nested_progress_bar()
460
442
        try:
465
447
        self.assertTrue(isinstance(target_format, repo._format.__class__))
466
448
 
467
449
 
 
450
class TestMisc(TestCase):
 
451
 
 
452
    def test_unescape_xml(self):
 
453
        """We get some kind of error when malformed entities are passed"""
 
454
        self.assertRaises(KeyError, repository._unescape_xml, 'foo&bar;')
 
455
 
 
456
 
468
457
class TestRepositoryFormatKnit3(TestCaseWithTransport):
469
458
 
470
459
    def test_attribute__fetch_order(self):
1474
1463
        self.assertTrue(new_pack.signature_index._optimize_for_size)
1475
1464
 
1476
1465
 
1477
 
class TestGCCHKPacker(TestCaseWithTransport):
1478
 
 
1479
 
    def make_abc_branch(self):
1480
 
        builder = self.make_branch_builder('source')
1481
 
        builder.start_series()
1482
 
        builder.build_snapshot('A', None, [
1483
 
            ('add', ('', 'root-id', 'directory', None)),
1484
 
            ('add', ('file', 'file-id', 'file', 'content\n')),
1485
 
            ])
1486
 
        builder.build_snapshot('B', ['A'], [
1487
 
            ('add', ('dir', 'dir-id', 'directory', None))])
1488
 
        builder.build_snapshot('C', ['B'], [
1489
 
            ('modify', ('file-id', 'new content\n'))])
1490
 
        builder.finish_series()
1491
 
        return builder.get_branch()
1492
 
 
1493
 
    def make_branch_with_disjoint_inventory_and_revision(self):
1494
 
        """a repo with separate packs for a revisions Revision and Inventory.
1495
 
 
1496
 
        There will be one pack file that holds the Revision content, and one
1497
 
        for the Inventory content.
1498
 
 
1499
 
        :return: (repository,
1500
 
                  pack_name_with_rev_A_Revision,
1501
 
                  pack_name_with_rev_A_Inventory,
1502
 
                  pack_name_with_rev_C_content)
1503
 
        """
1504
 
        b_source = self.make_abc_branch()
1505
 
        b_base = b_source.bzrdir.sprout('base', revision_id='A').open_branch()
1506
 
        b_stacked = b_base.bzrdir.sprout('stacked', stacked=True).open_branch()
1507
 
        b_stacked.lock_write()
1508
 
        self.addCleanup(b_stacked.unlock)
1509
 
        b_stacked.fetch(b_source, 'B')
1510
 
        # Now re-open the stacked repo directly (no fallbacks) so that we can
1511
 
        # fill in the A rev.
1512
 
        repo_not_stacked = b_stacked.bzrdir.open_repository()
1513
 
        repo_not_stacked.lock_write()
1514
 
        self.addCleanup(repo_not_stacked.unlock)
1515
 
        # Now we should have a pack file with A's inventory, but not its
1516
 
        # Revision
1517
 
        self.assertEqual([('A',), ('B',)],
1518
 
                         sorted(repo_not_stacked.inventories.keys()))
1519
 
        self.assertEqual([('B',)],
1520
 
                         sorted(repo_not_stacked.revisions.keys()))
1521
 
        stacked_pack_names = repo_not_stacked._pack_collection.names()
1522
 
        # We have a couple names here, figure out which has A's inventory
1523
 
        for name in stacked_pack_names:
1524
 
            pack = repo_not_stacked._pack_collection.get_pack_by_name(name)
1525
 
            keys = [n[1] for n in pack.inventory_index.iter_all_entries()]
1526
 
            if ('A',) in keys:
1527
 
                inv_a_pack_name = name
1528
 
                break
1529
 
        else:
1530
 
            self.fail('Could not find pack containing A\'s inventory')
1531
 
        repo_not_stacked.fetch(b_source.repository, 'A')
1532
 
        self.assertEqual([('A',), ('B',)],
1533
 
                         sorted(repo_not_stacked.revisions.keys()))
1534
 
        new_pack_names = set(repo_not_stacked._pack_collection.names())
1535
 
        rev_a_pack_names = new_pack_names.difference(stacked_pack_names)
1536
 
        self.assertEqual(1, len(rev_a_pack_names))
1537
 
        rev_a_pack_name = list(rev_a_pack_names)[0]
1538
 
        # Now fetch 'C', so we have a couple pack files to join
1539
 
        repo_not_stacked.fetch(b_source.repository, 'C')
1540
 
        rev_c_pack_names = set(repo_not_stacked._pack_collection.names())
1541
 
        rev_c_pack_names = rev_c_pack_names.difference(new_pack_names)
1542
 
        self.assertEqual(1, len(rev_c_pack_names))
1543
 
        rev_c_pack_name = list(rev_c_pack_names)[0]
1544
 
        return (repo_not_stacked, rev_a_pack_name, inv_a_pack_name,
1545
 
                rev_c_pack_name)
1546
 
 
1547
 
    def test_pack_with_distant_inventories(self):
1548
 
        # See https://bugs.launchpad.net/bzr/+bug/437003
1549
 
        # When repacking, it is possible to have an inventory in a different
1550
 
        # pack file than the associated revision. An autopack can then come
1551
 
        # along, and miss that inventory, and complain.
1552
 
        (repo, rev_a_pack_name, inv_a_pack_name, rev_c_pack_name
1553
 
         ) = self.make_branch_with_disjoint_inventory_and_revision()
1554
 
        a_pack = repo._pack_collection.get_pack_by_name(rev_a_pack_name)
1555
 
        c_pack = repo._pack_collection.get_pack_by_name(rev_c_pack_name)
1556
 
        packer = groupcompress_repo.GCCHKPacker(repo._pack_collection,
1557
 
                    [a_pack, c_pack], '.test-pack')
1558
 
        # This would raise ValueError in bug #437003, but should not raise an
1559
 
        # error once fixed.
1560
 
        packer.pack()
1561
 
 
1562
 
    def test_pack_with_missing_inventory(self):
1563
 
        # Similar to test_pack_with_missing_inventory, but this time, we force
1564
 
        # the A inventory to actually be gone from the repository.
1565
 
        (repo, rev_a_pack_name, inv_a_pack_name, rev_c_pack_name
1566
 
         ) = self.make_branch_with_disjoint_inventory_and_revision()
1567
 
        inv_a_pack = repo._pack_collection.get_pack_by_name(inv_a_pack_name)
1568
 
        repo._pack_collection._remove_pack_from_memory(inv_a_pack)
1569
 
        packer = groupcompress_repo.GCCHKPacker(repo._pack_collection,
1570
 
            repo._pack_collection.all_packs(), '.test-pack')
1571
 
        e = self.assertRaises(ValueError, packer.pack)
1572
 
        packer.new_pack.abort()
1573
 
        self.assertContainsRe(str(e),
1574
 
            r"We are missing inventories for revisions: .*'A'")
1575
 
 
1576
 
 
1577
1466
class TestCrossFormatPacks(TestCaseWithTransport):
1578
1467
 
1579
1468
    def log_pack(self, hint=None):