~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_knit.py

  • Committer: Aaron Bentley
  • Date: 2008-04-24 04:58:42 UTC
  • mfrom: (3377 +trunk)
  • mto: This revision was merged to the branch mainline in revision 3380.
  • Revision ID: aaron@aaronbentley.com-20080424045842-0cajl9v6s4u52kaw
Merge bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
37
37
from bzrlib.index import *
38
38
from bzrlib.knit import (
39
39
    AnnotatedKnitContent,
 
40
    DATA_SUFFIX,
 
41
    INDEX_SUFFIX,
40
42
    KnitContent,
41
43
    KnitGraphIndex,
42
44
    KnitVersionedFile,
45
47
    _KnitAccess,
46
48
    _KnitData,
47
49
    _KnitIndex,
 
50
    make_file_knit,
48
51
    _PackAccess,
49
52
    PlainKnitContent,
50
53
    _StreamAccess,
53
56
    KnitSequenceMatcher,
54
57
    )
55
58
from bzrlib.osutils import split_lines
 
59
from bzrlib.symbol_versioning import one_four
56
60
from bzrlib.tests import (
57
61
    Feature,
58
62
    TestCase,
158
162
        self.assertEqual(content.annotate(),
159
163
            [("bogus", "text1"), ("bogus", "text2")])
160
164
 
161
 
    def test_annotate_iter(self):
162
 
        content = self._make_content([])
163
 
        it = content.annotate_iter()
164
 
        self.assertRaises(StopIteration, it.next)
165
 
 
166
 
        content = self._make_content([("bogus", "text1"), ("bogus", "text2")])
167
 
        it = content.annotate_iter()
168
 
        self.assertEqual(it.next(), ("bogus", "text1"))
169
 
        self.assertEqual(it.next(), ("bogus", "text2"))
170
 
        self.assertRaises(StopIteration, it.next)
171
 
 
172
165
    def test_line_delta(self):
173
166
        content1 = self._make_content([("", "a"), ("", "b")])
174
167
        content2 = self._make_content([("", "a"), ("", "a"), ("", "c")])
196
189
        self.assertEqual(content.annotate(),
197
190
            [("origin1", "text1"), ("origin2", "text2")])
198
191
 
199
 
    def test_annotate_iter(self):
200
 
        content = self._make_content([])
201
 
        it = content.annotate_iter()
202
 
        self.assertRaises(StopIteration, it.next)
203
 
 
204
 
        content = self._make_content([("origin1", "text1"), ("origin2", "text2")])
205
 
        it = content.annotate_iter()
206
 
        self.assertEqual(it.next(), ("origin1", "text1"))
207
 
        self.assertEqual(it.next(), ("origin2", "text2"))
208
 
        self.assertRaises(StopIteration, it.next)
209
 
 
210
192
    def test_line_delta(self):
211
193
        content1 = self._make_content([("", "a"), ("", "b")])
212
194
        content2 = self._make_content([("", "a"), ("", "a"), ("", "c")])
492
474
        self.addCleanup(reset)
493
475
        from bzrlib._knit_load_data_py import _load_data_py
494
476
        knit._load_data = _load_data_py
495
 
        return _KnitIndex(*args, **kwargs)
 
477
        return _KnitIndex(get_scope=lambda:None, *args, **kwargs)
496
478
 
497
479
    def test_no_such_file(self):
498
480
        transport = MockTransport()
677
659
        self.assertRaises(RevisionNotPresent,
678
660
            index.get_ancestry_with_ghosts, ["e"])
679
661
 
680
 
    def test_iter_parents(self):
681
 
        transport = MockTransport()
682
 
        index = self.get_knit_index(transport, "filename", "w", create=True)
683
 
        # no parents
684
 
        index.add_version('r0', ['option'], (None, 0, 1), [])
685
 
        # 1 parent
686
 
        index.add_version('r1', ['option'], (None, 0, 1), ['r0'])
687
 
        # 2 parents
688
 
        index.add_version('r2', ['option'], (None, 0, 1), ['r1', 'r0'])
689
 
        # XXX TODO a ghost
690
 
        # cases: each sample data individually:
691
 
        self.assertEqual(set([('r0', ())]),
692
 
            set(index.iter_parents(['r0'])))
693
 
        self.assertEqual(set([('r1', ('r0', ))]),
694
 
            set(index.iter_parents(['r1'])))
695
 
        self.assertEqual(set([('r2', ('r1', 'r0'))]),
696
 
            set(index.iter_parents(['r2'])))
697
 
        # no nodes returned for a missing node
698
 
        self.assertEqual(set(),
699
 
            set(index.iter_parents(['missing'])))
700
 
        # 1 node returned with missing nodes skipped
701
 
        self.assertEqual(set([('r1', ('r0', ))]),
702
 
            set(index.iter_parents(['ghost1', 'r1', 'ghost'])))
703
 
        # 2 nodes returned
704
 
        self.assertEqual(set([('r0', ()), ('r1', ('r0', ))]),
705
 
            set(index.iter_parents(['r0', 'r1'])))
706
 
        # 2 nodes returned, missing skipped
707
 
        self.assertEqual(set([('r0', ()), ('r1', ('r0', ))]),
708
 
            set(index.iter_parents(['a', 'r0', 'b', 'r1', 'c'])))
709
 
 
710
662
    def test_num_versions(self):
711
663
        transport = MockTransport([
712
664
            _KnitIndex.HEADER
1075
1027
        self.addCleanup(reset)
1076
1028
        from bzrlib._knit_load_data_c import _load_data_c
1077
1029
        knit._load_data = _load_data_c
1078
 
        return _KnitIndex(*args, **kwargs)
1079
 
 
 
1030
        return _KnitIndex(get_scope=lambda:None, *args, **kwargs)
1080
1031
 
1081
1032
 
1082
1033
class KnitTests(TestCaseWithTransport):
1083
1034
    """Class containing knit test helper routines."""
1084
1035
 
1085
1036
    def make_test_knit(self, annotate=False, delay_create=False, index=None,
1086
 
                       name='test'):
 
1037
                       name='test', delta=True, access_mode='w'):
1087
1038
        if not annotate:
1088
1039
            factory = KnitPlainFactory()
1089
1040
        else:
1090
1041
            factory = None
1091
 
        return KnitVersionedFile(name, get_transport('.'), access_mode='w',
1092
 
                                 factory=factory, create=True,
1093
 
                                 delay_create=delay_create, index=index)
 
1042
        if index is None:
 
1043
            index = _KnitIndex(get_transport('.'), name + INDEX_SUFFIX,
 
1044
                access_mode, create=True, file_mode=None,
 
1045
                create_parent_dir=False, delay_create=delay_create,
 
1046
                dir_mode=None, get_scope=lambda:None)
 
1047
        access = _KnitAccess(get_transport('.'), name + DATA_SUFFIX, None,
 
1048
            None, delay_create, False)
 
1049
        return KnitVersionedFile(name, get_transport('.'), factory=factory,
 
1050
            create=True, delay_create=delay_create, index=index,
 
1051
            access_method=access, delta=delta)
1094
1052
 
1095
1053
    def assertRecordContentEqual(self, knit, version_id, candidate_content):
1096
1054
        """Assert that some raw record content matches the raw record content
1115
1073
    def test_make_explicit_index(self):
1116
1074
        """We can supply an index to use."""
1117
1075
        knit = KnitVersionedFile('test', get_transport('.'),
1118
 
            index='strangelove')
 
1076
            index='strangelove', access_method="a")
1119
1077
        self.assertEqual(knit._index, 'strangelove')
1120
1078
 
1121
1079
    def test_knit_add(self):
1162
1120
        k = self.make_test_knit()
1163
1121
        k.add_lines('text-1', [], split_lines(TEXT_1))
1164
1122
        del k
1165
 
        k2 = KnitVersionedFile('test', get_transport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
 
1123
        k2 = make_file_knit('test', get_transport('.'), access_mode='r',
 
1124
            factory=KnitPlainFactory(), create=True)
1166
1125
        self.assertTrue(k2.has_version('text-1'))
1167
1126
        self.assertEqualDiff(''.join(k2.get_lines('text-1')), TEXT_1)
1168
1127
 
1190
1149
    def test_incomplete(self):
1191
1150
        """Test if texts without a ending line-end can be inserted and
1192
1151
        extracted."""
1193
 
        k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
 
1152
        k = make_file_knit('test', get_transport('.'), delta=False, create=True)
1194
1153
        k.add_lines('text-1', [], ['a\n',    'b'  ])
1195
1154
        k.add_lines('text-2', ['text-1'], ['a\rb\n', 'b\n'])
1196
1155
        # reopening ensures maximum room for confusion
1197
 
        k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
 
1156
        k = make_file_knit('test', get_transport('.'), delta=False, create=True)
1198
1157
        self.assertEquals(k.get_lines('text-1'), ['a\n',    'b'  ])
1199
1158
        self.assertEquals(k.get_lines('text-2'), ['a\rb\n', 'b\n'])
1200
1159
 
1222
1181
 
1223
1182
    def test_add_delta(self):
1224
1183
        """Store in knit with parents"""
1225
 
        k = KnitVersionedFile('test', get_transport('.'), factory=KnitPlainFactory(),
1226
 
            delta=True, create=True)
 
1184
        k = self.make_test_knit(annotate=False)
1227
1185
        self.add_stock_one_and_one_a(k)
1228
 
        k.clear_cache()
1229
1186
        self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
1230
1187
 
1231
1188
    def test_add_delta_knit_graph_index(self):
1233
1190
        index = InMemoryGraphIndex(2)
1234
1191
        knit_index = KnitGraphIndex(index, add_callback=index.add_nodes,
1235
1192
            deltas=True)
1236
 
        k = KnitVersionedFile('test', get_transport('.'),
1237
 
            delta=True, create=True, index=knit_index)
 
1193
        k = self.make_test_knit(annotate=True, index=knit_index)
1238
1194
        self.add_stock_one_and_one_a(k)
1239
 
        k.clear_cache()
1240
1195
        self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
1241
1196
        # check the index had the right data added.
1242
1197
        self.assertEqual(set([
1248
1203
 
1249
1204
    def test_annotate(self):
1250
1205
        """Annotations"""
1251
 
        k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
1252
 
            delta=True, create=True)
 
1206
        k = self.make_test_knit(annotate=True, name='knit')
1253
1207
        self.insert_and_test_small_annotate(k)
1254
1208
 
1255
1209
    def insert_and_test_small_annotate(self, k):
1263
1217
 
1264
1218
    def test_annotate_fulltext(self):
1265
1219
        """Annotations"""
1266
 
        k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
1267
 
            delta=False, create=True)
 
1220
        k = self.make_test_knit(annotate=True, name='knit', delta=False)
1268
1221
        self.insert_and_test_small_annotate(k)
1269
1222
 
1270
1223
    def test_annotate_merge_1(self):
1341
1294
        self.assertEquals(origins[2], ('text-1', 'c\n'))
1342
1295
 
1343
1296
    def _test_join_with_factories(self, k1_factory, k2_factory):
1344
 
        k1 = KnitVersionedFile('test1', get_transport('.'), factory=k1_factory, create=True)
 
1297
        k1 = make_file_knit('test1', get_transport('.'), factory=k1_factory, create=True)
1345
1298
        k1.add_lines('text-a', [], ['a1\n', 'a2\n', 'a3\n'])
1346
1299
        k1.add_lines('text-b', ['text-a'], ['a1\n', 'b2\n', 'a3\n'])
1347
1300
        k1.add_lines('text-c', [], ['c1\n', 'c2\n', 'c3\n'])
1348
1301
        k1.add_lines('text-d', ['text-c'], ['c1\n', 'd2\n', 'd3\n'])
1349
1302
        k1.add_lines('text-m', ['text-b', 'text-d'], ['a1\n', 'b2\n', 'd3\n'])
1350
 
        k2 = KnitVersionedFile('test2', get_transport('.'), factory=k2_factory, create=True)
 
1303
        k2 = make_file_knit('test2', get_transport('.'), factory=k2_factory, create=True)
1351
1304
        count = k2.join(k1, version_ids=['text-m'])
1352
1305
        self.assertEquals(count, 5)
1353
1306
        self.assertTrue(k2.has_version('text-a'))
1374
1327
        self._test_join_with_factories(KnitPlainFactory(), None)
1375
1328
 
1376
1329
    def test_reannotate(self):
1377
 
        k1 = KnitVersionedFile('knit1', get_transport('.'),
 
1330
        k1 = make_file_knit('knit1', get_transport('.'),
1378
1331
                               factory=KnitAnnotateFactory(), create=True)
1379
1332
        # 0
1380
1333
        k1.add_lines('text-a', [], ['a\n', 'b\n'])
1381
1334
        # 1
1382
1335
        k1.add_lines('text-b', ['text-a'], ['a\n', 'c\n'])
1383
1336
 
1384
 
        k2 = KnitVersionedFile('test2', get_transport('.'),
 
1337
        k2 = make_file_knit('test2', get_transport('.'),
1385
1338
                               factory=KnitAnnotateFactory(), create=True)
1386
1339
        k2.join(k1, version_ids=['text-b'])
1387
1340
 
1404
1357
 
1405
1358
    def test_get_line_delta_texts(self):
1406
1359
        """Make sure we can call get_texts on text with reused line deltas"""
1407
 
        k1 = KnitVersionedFile('test1', get_transport('.'), 
1408
 
                               factory=KnitPlainFactory(), create=True)
 
1360
        k1 = make_file_knit('test1', get_transport('.'),
 
1361
            factory=KnitPlainFactory(), create=True)
1409
1362
        for t in range(3):
1410
1363
            if t == 0:
1411
1364
                parents = []
1416
1369
        
1417
1370
    def test_iter_lines_reads_in_order(self):
1418
1371
        instrumented_t = get_transport('trace+memory:///')
1419
 
        k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
 
1372
        k1 = make_file_knit('id', instrumented_t, create=True, delta=True)
1420
1373
        self.assertEqual([('get', 'id.kndx',)], instrumented_t._activity)
1421
1374
        # add texts with no required ordering
1422
1375
        k1.add_lines('base', [], ['text\n'])
1423
1376
        k1.add_lines('base2', [], ['text2\n'])
1424
 
        k1.clear_cache()
1425
1377
        # clear the logged activity, but preserve the list instance in case of
1426
1378
        # clones pointing at it.
1427
1379
        del instrumented_t._activity[:]
1452
1404
            "\nrevid2 line-delta 84 82 0 :",
1453
1405
            'test.kndx')
1454
1406
        # we should be able to load this file again
1455
 
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
 
1407
        knit = make_file_knit('test', get_transport('.'), access_mode='r')
1456
1408
        self.assertEqual(['revid', 'revid2'], knit.versions())
1457
1409
        # write a short write to the file and ensure that its ignored
1458
1410
        indexfile = file('test.kndx', 'ab')
1459
1411
        indexfile.write('\nrevid3 line-delta 166 82 1 2 3 4 5 .phwoar:demo ')
1460
1412
        indexfile.close()
1461
1413
        # we should be able to load this file again
1462
 
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='w')
 
1414
        knit = make_file_knit('test', get_transport('.'), access_mode='w')
1463
1415
        self.assertEqual(['revid', 'revid2'], knit.versions())
1464
1416
        # and add a revision with the same id the failed write had
1465
1417
        knit.add_lines('revid3', ['revid2'], ['a\n'])
1466
1418
        # and when reading it revid3 should now appear.
1467
 
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
 
1419
        knit = make_file_knit('test', get_transport('.'), access_mode='r')
1468
1420
        self.assertEqual(['revid', 'revid2', 'revid3'], knit.versions())
1469
1421
        self.assertEqual({'revid3':('revid2',)}, knit.get_parent_map(['revid3']))
1470
1422
 
1485
1437
        """create_parent_dir can create knits in nonexistant dirs"""
1486
1438
        # Has no effect if we don't set 'delay_create'
1487
1439
        trans = get_transport('.')
1488
 
        self.assertRaises(NoSuchFile, KnitVersionedFile, 'dir/test',
 
1440
        self.assertRaises(NoSuchFile, make_file_knit, 'dir/test',
1489
1441
                          trans, access_mode='w', factory=None,
1490
1442
                          create=True, create_parent_dir=True)
1491
1443
        # Nothing should have changed yet
1492
 
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
1444
        knit = make_file_knit('dir/test', trans, access_mode='w',
1493
1445
                                 factory=None, create=True,
1494
1446
                                 create_parent_dir=True,
1495
1447
                                 delay_create=True)
1510
1462
        if not trans._can_roundtrip_unix_modebits():
1511
1463
            # Can't roundtrip, so no need to run this test
1512
1464
            return
1513
 
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
1514
 
                                 factory=None, create=True,
1515
 
                                 create_parent_dir=True,
1516
 
                                 delay_create=True,
1517
 
                                 file_mode=0600,
1518
 
                                 dir_mode=0700)
 
1465
        knit = make_file_knit('dir/test', trans, access_mode='w', factory=None,
 
1466
            create=True, create_parent_dir=True, delay_create=True,
 
1467
            file_mode=0600, dir_mode=0700)
1519
1468
        knit.add_lines('revid', [], ['a\n'])
1520
1469
        self.assertTransportMode(trans, 'dir', 0700)
1521
1470
        self.assertTransportMode(trans, 'dir/test.knit', 0600)
1526
1475
        if not trans._can_roundtrip_unix_modebits():
1527
1476
            # Can't roundtrip, so no need to run this test
1528
1477
            return
1529
 
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
1530
 
                                 factory=None, create=True,
1531
 
                                 create_parent_dir=True,
1532
 
                                 delay_create=True,
1533
 
                                 file_mode=0660,
1534
 
                                 dir_mode=0770)
 
1478
        knit = make_file_knit('dir/test', trans, access_mode='w', factory=None,
 
1479
            create=True, create_parent_dir=True, delay_create=True,
 
1480
            file_mode=0660, dir_mode=0770)
1535
1481
        knit.add_lines('revid', [], ['a\n'])
1536
1482
        self.assertTransportMode(trans, 'dir', 0770)
1537
1483
        self.assertTransportMode(trans, 'dir/test.knit', 0660)
1542
1488
        if not trans._can_roundtrip_unix_modebits():
1543
1489
            # Can't roundtrip, so no need to run this test
1544
1490
            return
1545
 
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
1546
 
                                 factory=None, create=True,
1547
 
                                 create_parent_dir=True,
1548
 
                                 delay_create=True,
1549
 
                                 file_mode=0666,
1550
 
                                 dir_mode=0777)
 
1491
        knit = make_file_knit('dir/test', trans, access_mode='w', factory=None,
 
1492
            create=True, create_parent_dir=True, delay_create=True,
 
1493
            file_mode=0666, dir_mode=0777)
1551
1494
        knit.add_lines('revid', [], ['a\n'])
1552
1495
        self.assertTransportMode(trans, 'dir', 0777)
1553
1496
        self.assertTransportMode(trans, 'dir/test.knit', 0666)
1924
1867
            errors.KnitDataStreamUnknown,
1925
1868
            target.insert_data_stream, data_stream)
1926
1869
 
 
1870
    def test_insert_data_stream_bug_208418(self):
 
1871
        """You can insert a stream with an incompatible format, even when:
 
1872
          * the stream has a line-delta record,
 
1873
          * whose parent is in the target, also stored as a line-delta
 
1874
 
 
1875
        See <https://launchpad.net/bugs/208418>.
 
1876
        """
 
1877
        base_lines = split_lines(TEXT_1)
 
1878
        # Make the target
 
1879
        target = self.make_test_knit(name='target', annotate=True)
 
1880
        target.add_lines('version-1', [], base_lines)
 
1881
        target.add_lines('version-2', ['version-1'], base_lines + ['a\n'])
 
1882
        # The second record should be a delta.
 
1883
        self.assertEqual('line-delta', target._index.get_method('version-2'))
 
1884
        
 
1885
        # Make a source, with a different format, but the same data
 
1886
        source = self.make_test_knit(name='source', annotate=False)
 
1887
        source.add_lines('version-1', [], base_lines)
 
1888
        source.add_lines('version-2', ['version-1'], base_lines + ['a\n'])
 
1889
        # Now add another record, which should be stored as a delta against
 
1890
        # version-2.
 
1891
        source.add_lines('version-3', ['version-2'], base_lines + ['b\n'])
 
1892
        self.assertEqual('line-delta', source._index.get_method('version-3'))
 
1893
 
 
1894
        # Make a stream of the new version
 
1895
        data_stream = source.get_data_stream(['version-3'])
 
1896
        # And insert into the target
 
1897
        target.insert_data_stream(data_stream)
 
1898
        # No errors should have been raised.
 
1899
 
 
1900
 
1927
1901
    #  * test that a stream of "already present version, then new version"
1928
1902
    #    inserts correctly.
1929
1903
 
2056
2030
    def test_weave_to_knit_matches(self):
2057
2031
        # check that the WeaveToKnit is_compatible function
2058
2032
        # registers True for a Weave to a Knit.
2059
 
        w = Weave()
 
2033
        w = Weave(get_scope=lambda:None)
2060
2034
        k = self.make_test_knit()
2061
2035
        self.failUnless(WeaveToKnit.is_compatible(w, k))
2062
2036
        self.failIf(WeaveToKnit.is_compatible(k, w))
2064
2038
        self.failIf(WeaveToKnit.is_compatible(k, k))
2065
2039
 
2066
2040
 
2067
 
class TestKnitCaching(KnitTests):
2068
 
    
2069
 
    def create_knit(self):
2070
 
        k = self.make_test_knit(True)
2071
 
        k.add_lines('text-1', [], split_lines(TEXT_1))
2072
 
        k.add_lines('text-2', [], split_lines(TEXT_2))
2073
 
        return k
2074
 
 
2075
 
    def test_no_caching(self):
2076
 
        k = self.create_knit()
2077
 
        # Nothing should be cached without setting 'enable_cache'
2078
 
        self.assertEqual({}, k._data._cache)
2079
 
 
2080
 
    def test_cache_data_read_raw(self):
2081
 
        k = self.create_knit()
2082
 
 
2083
 
        # Now cache and read
2084
 
        k.enable_cache()
2085
 
 
2086
 
        def read_one_raw(version):
2087
 
            pos_map = k._get_components_positions([version])
2088
 
            method, index_memo, next = pos_map[version]
2089
 
            lst = list(k._data.read_records_iter_raw([(version, index_memo)]))
2090
 
            self.assertEqual(1, len(lst))
2091
 
            return lst[0]
2092
 
 
2093
 
        val = read_one_raw('text-1')
2094
 
        self.assertEqual({'text-1':val[1]}, k._data._cache)
2095
 
 
2096
 
        k.clear_cache()
2097
 
        # After clear, new reads are not cached
2098
 
        self.assertEqual({}, k._data._cache)
2099
 
 
2100
 
        val2 = read_one_raw('text-1')
2101
 
        self.assertEqual(val, val2)
2102
 
        self.assertEqual({}, k._data._cache)
2103
 
 
2104
 
    def test_cache_data_read(self):
2105
 
        k = self.create_knit()
2106
 
 
2107
 
        def read_one(version):
2108
 
            pos_map = k._get_components_positions([version])
2109
 
            method, index_memo, next = pos_map[version]
2110
 
            lst = list(k._data.read_records_iter([(version, index_memo)]))
2111
 
            self.assertEqual(1, len(lst))
2112
 
            return lst[0]
2113
 
 
2114
 
        # Now cache and read
2115
 
        k.enable_cache()
2116
 
 
2117
 
        val = read_one('text-2')
2118
 
        self.assertEqual(['text-2'], k._data._cache.keys())
2119
 
        self.assertEqual('text-2', val[0])
2120
 
        content, digest = k._data._parse_record('text-2',
2121
 
                                                k._data._cache['text-2'])
2122
 
        self.assertEqual(content, val[1])
2123
 
        self.assertEqual(digest, val[2])
2124
 
 
2125
 
        k.clear_cache()
2126
 
        self.assertEqual({}, k._data._cache)
2127
 
 
2128
 
        val2 = read_one('text-2')
2129
 
        self.assertEqual(val, val2)
2130
 
        self.assertEqual({}, k._data._cache)
2131
 
 
2132
 
    def test_cache_read(self):
2133
 
        k = self.create_knit()
2134
 
        k.enable_cache()
2135
 
 
2136
 
        text = k.get_text('text-1')
2137
 
        self.assertEqual(TEXT_1, text)
2138
 
        self.assertEqual(['text-1'], k._data._cache.keys())
2139
 
 
2140
 
        k.clear_cache()
2141
 
        self.assertEqual({}, k._data._cache)
2142
 
 
2143
 
        text = k.get_text('text-1')
2144
 
        self.assertEqual(TEXT_1, text)
2145
 
        self.assertEqual({}, k._data._cache)
2146
 
 
2147
 
 
2148
2041
class TestKnitIndex(KnitTests):
2149
2042
 
2150
2043
    def test_add_versions_dictionary_compresses(self):
2485
2378
             ('tip', 'no-eol,line-delta', (None, 0, 100), ['parent'])])
2486
2379
        self.assertEqual([], self.caught_entries)
2487
2380
 
2488
 
    def test_iter_parents(self):
2489
 
        index1 = self.make_g_index('1', 1, [
2490
 
        # no parents
2491
 
            (('r0', ), 'N0 100', ([], )),
2492
 
        # 1 parent
2493
 
            (('r1', ), '', ([('r0', )], ))])
2494
 
        index2 = self.make_g_index('2', 1, [
2495
 
        # 2 parents
2496
 
            (('r2', ), 'N0 100', ([('r1', ), ('r0', )], )),
2497
 
            ])
2498
 
        combined_index = CombinedGraphIndex([index1, index2])
2499
 
        index = KnitGraphIndex(combined_index)
2500
 
        # XXX TODO a ghost
2501
 
        # cases: each sample data individually:
2502
 
        self.assertEqual(set([('r0', ())]),
2503
 
            set(index.iter_parents(['r0'])))
2504
 
        self.assertEqual(set([('r1', ('r0', ))]),
2505
 
            set(index.iter_parents(['r1'])))
2506
 
        self.assertEqual(set([('r2', ('r1', 'r0'))]),
2507
 
            set(index.iter_parents(['r2'])))
2508
 
        # no nodes returned for a missing node
2509
 
        self.assertEqual(set(),
2510
 
            set(index.iter_parents(['missing'])))
2511
 
        # 1 node returned with missing nodes skipped
2512
 
        self.assertEqual(set([('r1', ('r0', ))]),
2513
 
            set(index.iter_parents(['ghost1', 'r1', 'ghost'])))
2514
 
        # 2 nodes returned
2515
 
        self.assertEqual(set([('r0', ()), ('r1', ('r0', ))]),
2516
 
            set(index.iter_parents(['r0', 'r1'])))
2517
 
        # 2 nodes returned, missing skipped
2518
 
        self.assertEqual(set([('r0', ()), ('r1', ('r0', ))]),
2519
 
            set(index.iter_parents(['a', 'r0', 'b', 'r1', 'c'])))
2520
 
 
2521
 
 
2522
2381
class TestNoParentsGraphIndexKnit(KnitTests):
2523
2382
    """Tests for knits using KnitGraphIndex with no parents."""
2524
2383
 
2736
2595
             ('tip', 'no-eol,line-delta', (None, 0, 100), [])])
2737
2596
        self.assertEqual([], self.caught_entries)
2738
2597
 
2739
 
    def test_iter_parents(self):
2740
 
        index = self.two_graph_index()
2741
 
        self.assertEqual(set([
2742
 
            ('tip', ()), ('tail', ()), ('parent', ()), ('separate', ())
2743
 
            ]),
2744
 
            set(index.iter_parents(['tip', 'tail', 'ghost', 'parent', 'separate'])))
2745
 
        self.assertEqual(set([('tip', ())]),
2746
 
            set(index.iter_parents(['tip'])))
2747
 
        self.assertEqual(set(),
2748
 
            set(index.iter_parents([])))
2749
 
 
2750
 
 
2751
2598
class TestPackKnits(KnitTests):
2752
2599
    """Tests that use a _PackAccess and KnitGraphIndex."""
2753
2600
 
2802
2649
            set(result),
2803
2650
            set(index.get_ancestry(ancestry_versions, False)))
2804
2651
 
2805
 
    def assertIterParents(self, knit, versions, parent_versions, result):
2806
 
        """Check the result of an iter_parents call on knit."""
2807
 
        index = self.get_index(knit, knit.get_data_stream(versions))
2808
 
        self.assertEqual(result, index.iter_parents(parent_versions))
2809
 
 
2810
2652
    def assertGetMethod(self, knit, versions, version, result):
2811
2653
        index = self.get_index(knit, knit.get_data_stream(versions))
2812
2654
        self.assertEqual(result, index.get_method(version))
2877
2719
        # we thunk across.
2878
2720
        self.assertGetMethod(knit, ['c'], 'b', 'fulltext')
2879
2721
 
2880
 
    def test_iter_parents(self):
2881
 
        knit = self.make_knit_with_4_versions_2_dags()
2882
 
        self.assertIterParents(knit, ['a'], ['a'], [('a', ())])
2883
 
        self.assertIterParents(knit, ['a', 'b'], ['a', 'b'],
2884
 
            [('a', ()), ('b', ())])
2885
 
        self.assertIterParents(knit, ['a', 'b', 'c'], ['a', 'b', 'c'],
2886
 
            [('a', ()), ('b', ()), ('c', ('b', 'a'))])
2887
 
        self.assertIterParents(knit, ['a', 'b', 'c', 'd'],
2888
 
            ['a', 'b', 'c', 'd'],
2889
 
            [('a', ()), ('b', ()), ('c', ('b', 'a')), ('d', ('e', 'f'))])
2890
 
        self.assertIterParents(knit, ['c'], ['a', 'b', 'c'],
2891
 
            [('c', ('b', 'a'))])
2892
 
 
2893
2722
    def test_get_options(self):
2894
2723
        knit = self.make_knit_with_4_versions_2_dags()
2895
2724
        self.assertGetOptions(knit, 'a', ['no-eol', 'fulltext'])
2981
2810
            knit.get_data_stream([]))
2982
2811
        self.assertRaises(errors.KnitCorrupt,
2983
2812
            list, access.get_raw_records([(True, "A", None, None)]))
 
2813
 
 
2814
 
 
2815
class TestFormatSignatures(KnitTests):
 
2816
 
 
2817
    def test_knit_format_signatures(self):
 
2818
        """Different formats of knit have different signature strings."""
 
2819
        knit = self.make_test_knit(name='a', annotate=True)
 
2820
        self.assertEqual('knit-annotated', knit.get_format_signature())
 
2821
        knit = self.make_test_knit(name='p', annotate=False)
 
2822
        self.assertEqual('knit-plain', knit.get_format_signature())