~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_knit.py

  • Committer: Aaron Bentley
  • Date: 2007-08-31 19:38:52 UTC
  • mto: This revision was merged to the branch mainline in revision 2777.
  • Revision ID: abentley@panoramicfeedback.com-20070831193852-6ip22sbw058yib5u
Clean up docs, test matching blocks for reannotate

Show diffs side-by-side

added added

removed removed

Lines of Context:
57
57
    )
58
58
from bzrlib.transport import TransportLogger, get_transport
59
59
from bzrlib.transport.memory import MemoryTransport
60
 
from bzrlib.util import bencode
61
60
from bzrlib.weave import Weave
62
61
 
63
62
 
987
986
class KnitTests(TestCaseWithTransport):
988
987
    """Class containing knit test helper routines."""
989
988
 
990
 
    def make_test_knit(self, annotate=False, delay_create=False, index=None,
991
 
                       name='test'):
 
989
    def make_test_knit(self, annotate=False, delay_create=False, index=None):
992
990
        if not annotate:
993
991
            factory = KnitPlainFactory()
994
992
        else:
995
993
            factory = None
996
 
        return KnitVersionedFile(name, get_transport('.'), access_mode='w',
 
994
        return KnitVersionedFile('test', get_transport('.'), access_mode='w',
997
995
                                 factory=factory, create=True,
998
996
                                 delay_create=delay_create, index=index)
999
997
 
1000
 
    def assertRecordContentEqual(self, knit, version_id, candidate_content):
1001
 
        """Assert that some raw record content matches the raw record content
1002
 
        for a particular version_id in the given knit.
1003
 
        """
1004
 
        index_memo = knit._index.get_position(version_id)
1005
 
        record = (version_id, index_memo)
1006
 
        [(_, expected_content)] = list(knit._data.read_records_iter_raw([record]))
1007
 
        self.assertEqual(expected_content, candidate_content)
1008
 
 
1009
998
 
1010
999
class BasicKnitTests(KnitTests):
1011
1000
 
1469
1458
        for plan_line, expected_line in zip(plan, AB_MERGE):
1470
1459
            self.assertEqual(plan_line, expected_line)
1471
1460
 
1472
 
    def test_get_stream_empty(self):
1473
 
        """Get a data stream for an empty knit file."""
1474
 
        k1 = self.make_test_knit()
1475
 
        format, data_list, reader_callable = k1.get_data_stream([])
1476
 
        self.assertEqual('knit-plain', format)
1477
 
        self.assertEqual([], data_list)
1478
 
        content = reader_callable(None)
1479
 
        self.assertEqual('', content)
1480
 
        self.assertIsInstance(content, str)
1481
 
 
1482
 
    def test_get_stream_one_version(self):
1483
 
        """Get a data stream for a single record out of a knit containing just
1484
 
        one record.
1485
 
        """
1486
 
        k1 = self.make_test_knit()
1487
 
        test_data = [
1488
 
            ('text-a', [], TEXT_1),
1489
 
            ]
1490
 
        expected_data_list = [
1491
 
            # version, options, length, parents
1492
 
            ('text-a', ['fulltext'], 122, []),
1493
 
           ]
1494
 
        for version_id, parents, lines in test_data:
1495
 
            k1.add_lines(version_id, parents, split_lines(lines))
1496
 
 
1497
 
        format, data_list, reader_callable = k1.get_data_stream(['text-a'])
1498
 
        self.assertEqual('knit-plain', format)
1499
 
        self.assertEqual(expected_data_list, data_list)
1500
 
        # There's only one record in the knit, so the content should be the
1501
 
        # entire knit data file's contents.
1502
 
        self.assertEqual(k1.transport.get_bytes(k1._data._access._filename),
1503
 
                         reader_callable(None))
1504
 
        
1505
 
    def test_get_stream_get_one_version_of_many(self):
1506
 
        """Get a data stream for just one version out of a knit containing many
1507
 
        versions.
1508
 
        """
1509
 
        k1 = self.make_test_knit()
1510
 
        # Insert the same data as test_knit_join, as they seem to cover a range
1511
 
        # of cases (no parents, one parent, multiple parents).
1512
 
        test_data = [
1513
 
            ('text-a', [], TEXT_1),
1514
 
            ('text-b', ['text-a'], TEXT_1),
1515
 
            ('text-c', [], TEXT_1),
1516
 
            ('text-d', ['text-c'], TEXT_1),
1517
 
            ('text-m', ['text-b', 'text-d'], TEXT_1),
1518
 
            ]
1519
 
        expected_data_list = [
1520
 
            # version, options, length, parents
1521
 
            ('text-m', ['line-delta'], 84, ['text-b', 'text-d']),
1522
 
            ]
1523
 
        for version_id, parents, lines in test_data:
1524
 
            k1.add_lines(version_id, parents, split_lines(lines))
1525
 
 
1526
 
        format, data_list, reader_callable = k1.get_data_stream(['text-m'])
1527
 
        self.assertEqual('knit-plain', format)
1528
 
        self.assertEqual(expected_data_list, data_list)
1529
 
        self.assertRecordContentEqual(k1, 'text-m', reader_callable(None))
1530
 
        
1531
 
    def test_get_stream_ghost_parent(self):
1532
 
        """Get a data stream for a version with a ghost parent."""
1533
 
        k1 = self.make_test_knit()
1534
 
        # Test data
1535
 
        k1.add_lines('text-a', [], split_lines(TEXT_1))
1536
 
        k1.add_lines_with_ghosts('text-b', ['text-a', 'text-ghost'],
1537
 
                                 split_lines(TEXT_1))
1538
 
        # Expected data
1539
 
        expected_data_list = [
1540
 
            # version, options, length, parents
1541
 
            ('text-b', ['line-delta'], 84, ['text-a', 'text-ghost']),
1542
 
            ]
1543
 
        
1544
 
        format, data_list, reader_callable = k1.get_data_stream(['text-b'])
1545
 
        self.assertEqual('knit-plain', format)
1546
 
        self.assertEqual(expected_data_list, data_list)
1547
 
        self.assertRecordContentEqual(k1, 'text-b', reader_callable(None))
1548
 
    
1549
 
    def test_get_stream_get_multiple_records(self):
1550
 
        """Get a stream for multiple records of a knit."""
1551
 
        k1 = self.make_test_knit()
1552
 
        # Insert the same data as test_knit_join, as they seem to cover a range
1553
 
        # of cases (no parents, one parent, multiple parents).
1554
 
        test_data = [
1555
 
            ('text-a', [], TEXT_1),
1556
 
            ('text-b', ['text-a'], TEXT_1),
1557
 
            ('text-c', [], TEXT_1),
1558
 
            ('text-d', ['text-c'], TEXT_1),
1559
 
            ('text-m', ['text-b', 'text-d'], TEXT_1),
1560
 
            ]
1561
 
        expected_data_list = [
1562
 
            # version, options, length, parents
1563
 
            ('text-b', ['line-delta'], 84, ['text-a']),
1564
 
            ('text-d', ['line-delta'], 84, ['text-c']),
1565
 
            ]
1566
 
        for version_id, parents, lines in test_data:
1567
 
            k1.add_lines(version_id, parents, split_lines(lines))
1568
 
 
1569
 
        # Note that even though we request the revision IDs in a particular
1570
 
        # order, the data stream may return them in any order it likes.  In this
1571
 
        # case, they'll be in the order they were inserted into the knit.
1572
 
        format, data_list, reader_callable = k1.get_data_stream(
1573
 
            ['text-d', 'text-b'])
1574
 
        self.assertEqual('knit-plain', format)
1575
 
        self.assertEqual(expected_data_list, data_list)
1576
 
        self.assertRecordContentEqual(k1, 'text-b', reader_callable(84))
1577
 
        self.assertRecordContentEqual(k1, 'text-d', reader_callable(84))
1578
 
        self.assertEqual('', reader_callable(None),
1579
 
                         "There should be no more bytes left to read.")
1580
 
 
1581
 
    def test_get_stream_all(self):
1582
 
        """Get a data stream for all the records in a knit.
1583
 
 
1584
 
        This exercises fulltext records, line-delta records, records with
1585
 
        various numbers of parents, and reading multiple records out of the
1586
 
        callable.  These cases ought to all be exercised individually by the
1587
 
        other test_get_stream_* tests; this test is basically just paranoia.
1588
 
        """
1589
 
        k1 = self.make_test_knit()
1590
 
        # Insert the same data as test_knit_join, as they seem to cover a range
1591
 
        # of cases (no parents, one parent, multiple parents).
1592
 
        test_data = [
1593
 
            ('text-a', [], TEXT_1),
1594
 
            ('text-b', ['text-a'], TEXT_1),
1595
 
            ('text-c', [], TEXT_1),
1596
 
            ('text-d', ['text-c'], TEXT_1),
1597
 
            ('text-m', ['text-b', 'text-d'], TEXT_1),
1598
 
           ]
1599
 
        expected_data_list = [
1600
 
            # version, options, length, parents
1601
 
            ('text-a', ['fulltext'], 122, []),
1602
 
            ('text-b', ['line-delta'], 84, ['text-a']),
1603
 
            ('text-c', ['fulltext'], 121, []),
1604
 
            ('text-d', ['line-delta'], 84, ['text-c']),
1605
 
            ('text-m', ['line-delta'], 84, ['text-b', 'text-d']),
1606
 
            ]
1607
 
        for version_id, parents, lines in test_data:
1608
 
            k1.add_lines(version_id, parents, split_lines(lines))
1609
 
 
1610
 
        format, data_list, reader_callable = k1.get_data_stream(
1611
 
            ['text-a', 'text-b', 'text-c', 'text-d', 'text-m'])
1612
 
        self.assertEqual('knit-plain', format)
1613
 
        self.assertEqual(expected_data_list, data_list)
1614
 
        for version_id, options, length, parents in expected_data_list:
1615
 
            bytes = reader_callable(length)
1616
 
            self.assertRecordContentEqual(k1, version_id, bytes)
1617
 
 
1618
 
    def assertKnitFilesEqual(self, knit1, knit2):
1619
 
        """Assert that the contents of the index and data files of two knits are
1620
 
        equal.
1621
 
        """
1622
 
        self.assertEqual(
1623
 
            knit1.transport.get_bytes(knit1._data._access._filename),
1624
 
            knit2.transport.get_bytes(knit2._data._access._filename))
1625
 
        self.assertEqual(
1626
 
            knit1.transport.get_bytes(knit1._index._filename),
1627
 
            knit2.transport.get_bytes(knit2._index._filename))
1628
 
 
1629
 
    def test_insert_data_stream_empty(self):
1630
 
        """Inserting a data stream with no records should not put any data into
1631
 
        the knit.
1632
 
        """
1633
 
        k1 = self.make_test_knit()
1634
 
        k1.insert_data_stream(
1635
 
            (k1.get_format_signature(), [], lambda ignored: ''))
1636
 
        self.assertEqual('', k1.transport.get_bytes(k1._data._access._filename),
1637
 
                         "The .knit should be completely empty.")
1638
 
        self.assertEqual(k1._index.HEADER,
1639
 
                         k1.transport.get_bytes(k1._index._filename),
1640
 
                         "The .kndx should have nothing apart from the header.")
1641
 
 
1642
 
    def test_insert_data_stream_one_record(self):
1643
 
        """Inserting a data stream with one record from a knit with one record
1644
 
        results in byte-identical files.
1645
 
        """
1646
 
        source = self.make_test_knit(name='source')
1647
 
        source.add_lines('text-a', [], split_lines(TEXT_1))
1648
 
        data_stream = source.get_data_stream(['text-a'])
1649
 
        
1650
 
        target = self.make_test_knit(name='target')
1651
 
        target.insert_data_stream(data_stream)
1652
 
        
1653
 
        self.assertKnitFilesEqual(source, target)
1654
 
 
1655
 
    def test_insert_data_stream_records_already_present(self):
1656
 
        """Insert a data stream where some records are alreday present in the
1657
 
        target, and some not.  Only the new records are inserted.
1658
 
        """
1659
 
        source = self.make_test_knit(name='source')
1660
 
        target = self.make_test_knit(name='target')
1661
 
        # Insert 'text-a' into both source and target
1662
 
        source.add_lines('text-a', [], split_lines(TEXT_1))
1663
 
        target.insert_data_stream(source.get_data_stream(['text-a']))
1664
 
        # Insert 'text-b' into just the source.
1665
 
        source.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
1666
 
        # Get a data stream of both text-a and text-b, and insert it.
1667
 
        data_stream = source.get_data_stream(['text-a', 'text-b'])
1668
 
        target.insert_data_stream(data_stream)
1669
 
        # The source and target will now be identical.  This means the text-a
1670
 
        # record was not added a second time.
1671
 
        self.assertKnitFilesEqual(source, target)
1672
 
 
1673
 
    def test_insert_data_stream_multiple_records(self):
1674
 
        """Inserting a data stream of all records from a knit with multiple
1675
 
        records results in byte-identical files.
1676
 
        """
1677
 
        source = self.make_test_knit(name='source')
1678
 
        source.add_lines('text-a', [], split_lines(TEXT_1))
1679
 
        source.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
1680
 
        source.add_lines('text-c', [], split_lines(TEXT_1))
1681
 
        data_stream = source.get_data_stream(['text-a', 'text-b', 'text-c'])
1682
 
        
1683
 
        target = self.make_test_knit(name='target')
1684
 
        target.insert_data_stream(data_stream)
1685
 
        
1686
 
        self.assertKnitFilesEqual(source, target)
1687
 
 
1688
 
    def test_insert_data_stream_ghost_parent(self):
1689
 
        """Insert a data stream with a record that has a ghost parent."""
1690
 
        # Make a knit with a record, text-a, that has a ghost parent.
1691
 
        source = self.make_test_knit(name='source')
1692
 
        source.add_lines_with_ghosts('text-a', ['text-ghost'],
1693
 
                                     split_lines(TEXT_1))
1694
 
        data_stream = source.get_data_stream(['text-a'])
1695
 
 
1696
 
        target = self.make_test_knit(name='target')
1697
 
        target.insert_data_stream(data_stream)
1698
 
 
1699
 
        self.assertKnitFilesEqual(source, target)
1700
 
 
1701
 
        # The target knit object is in a consistent state, i.e. the record we
1702
 
        # just added is immediately visible.
1703
 
        self.assertTrue(target.has_version('text-a'))
1704
 
        self.assertTrue(target.has_ghost('text-ghost'))
1705
 
        self.assertEqual(split_lines(TEXT_1), target.get_lines('text-a'))
1706
 
 
1707
 
    def test_insert_data_stream_inconsistent_version_lines(self):
1708
 
        """Inserting a data stream which has different content for a version_id
1709
 
        than already exists in the knit will raise KnitCorrupt.
1710
 
        """
1711
 
        source = self.make_test_knit(name='source')
1712
 
        target = self.make_test_knit(name='target')
1713
 
        # Insert a different 'text-a' into both source and target
1714
 
        source.add_lines('text-a', [], split_lines(TEXT_1))
1715
 
        target.add_lines('text-a', [], split_lines(TEXT_2))
1716
 
        # Insert a data stream with conflicting content into the target
1717
 
        data_stream = source.get_data_stream(['text-a'])
1718
 
        self.assertRaises(
1719
 
            errors.KnitCorrupt, target.insert_data_stream, data_stream)
1720
 
 
1721
 
    def test_insert_data_stream_inconsistent_version_parents(self):
1722
 
        """Inserting a data stream which has different parents for a version_id
1723
 
        than already exists in the knit will raise KnitCorrupt.
1724
 
        """
1725
 
        source = self.make_test_knit(name='source')
1726
 
        target = self.make_test_knit(name='target')
1727
 
        # Insert a different 'text-a' into both source and target.  They differ
1728
 
        # only by the parents list, the content is the same.
1729
 
        source.add_lines_with_ghosts('text-a', [], split_lines(TEXT_1))
1730
 
        target.add_lines_with_ghosts('text-a', ['a-ghost'], split_lines(TEXT_1))
1731
 
        # Insert a data stream with conflicting content into the target
1732
 
        data_stream = source.get_data_stream(['text-a'])
1733
 
        self.assertRaises(
1734
 
            errors.KnitCorrupt, target.insert_data_stream, data_stream)
1735
 
 
1736
 
    def test_insert_data_stream_incompatible_format(self):
1737
 
        """A data stream in a different format to the target knit cannot be
1738
 
        inserted.
1739
 
 
1740
 
        It will raise KnitDataStreamIncompatible.
1741
 
        """
1742
 
        data_stream = ('fake-format-signature', [], lambda _: '')
1743
 
        target = self.make_test_knit(name='target')
1744
 
        self.assertRaises(
1745
 
            errors.KnitDataStreamIncompatible,
1746
 
            target.insert_data_stream, data_stream)
1747
 
 
1748
 
    #  * test that a stream of "already present version, then new version"
1749
 
    #    inserts correctly.
1750
1461
 
1751
1462
TEXT_1 = """\
1752
1463
Banana cup cakes: