1313
1322
return _KndxIndex(transport, mapper, lambda:None, allow_writes, lambda:True)
1325
class Test_KnitAnnotator(TestCaseWithMemoryTransport):
1327
def make_annotator(self):
1328
factory = knit.make_pack_factory(True, True, 1)
1329
vf = factory(self.get_transport())
1330
return knit._KnitAnnotator(vf)
1332
def test__expand_fulltext(self):
1333
ann = self.make_annotator()
1334
rev_key = ('rev-id',)
1335
ann._num_compression_children[rev_key] = 1
1336
res = ann._expand_record(rev_key, (('parent-id',),), None,
1337
['line1\n', 'line2\n'], ('fulltext', True))
1338
# The content object and text lines should be cached appropriately
1339
self.assertEqual(['line1\n', 'line2'], res)
1340
content_obj = ann._content_objects[rev_key]
1341
self.assertEqual(['line1\n', 'line2\n'], content_obj._lines)
1342
self.assertEqual(res, content_obj.text())
1343
self.assertEqual(res, ann._text_cache[rev_key])
1345
def test__expand_delta_comp_parent_not_available(self):
1346
# Parent isn't available yet, so we return nothing, but queue up this
1347
# node for later processing
1348
ann = self.make_annotator()
1349
rev_key = ('rev-id',)
1350
parent_key = ('parent-id',)
1351
record = ['0,1,1\n', 'new-line\n']
1352
details = ('line-delta', False)
1353
res = ann._expand_record(rev_key, (parent_key,), parent_key,
1355
self.assertEqual(None, res)
1356
self.assertTrue(parent_key in ann._pending_deltas)
1357
pending = ann._pending_deltas[parent_key]
1358
self.assertEqual(1, len(pending))
1359
self.assertEqual((rev_key, (parent_key,), record, details), pending[0])
1361
def test__expand_record_tracks_num_children(self):
1362
ann = self.make_annotator()
1363
rev_key = ('rev-id',)
1364
rev2_key = ('rev2-id',)
1365
parent_key = ('parent-id',)
1366
record = ['0,1,1\n', 'new-line\n']
1367
details = ('line-delta', False)
1368
ann._num_compression_children[parent_key] = 2
1369
ann._expand_record(parent_key, (), None, ['line1\n', 'line2\n'],
1370
('fulltext', False))
1371
res = ann._expand_record(rev_key, (parent_key,), parent_key,
1373
self.assertEqual({parent_key: 1}, ann._num_compression_children)
1374
# Expanding the second child should remove the content object, and the
1375
# num_compression_children entry
1376
res = ann._expand_record(rev2_key, (parent_key,), parent_key,
1378
self.assertFalse(parent_key in ann._content_objects)
1379
self.assertEqual({}, ann._num_compression_children)
1380
# We should not cache the content_objects for rev2 and rev, because
1381
# they do not have compression children of their own.
1382
self.assertEqual({}, ann._content_objects)
1384
def test__expand_delta_records_blocks(self):
1385
ann = self.make_annotator()
1386
rev_key = ('rev-id',)
1387
parent_key = ('parent-id',)
1388
record = ['0,1,1\n', 'new-line\n']
1389
details = ('line-delta', True)
1390
ann._num_compression_children[parent_key] = 2
1391
ann._expand_record(parent_key, (), None,
1392
['line1\n', 'line2\n', 'line3\n'],
1393
('fulltext', False))
1394
ann._expand_record(rev_key, (parent_key,), parent_key, record, details)
1395
self.assertEqual({(rev_key, parent_key): [(1, 1, 1), (3, 3, 0)]},
1396
ann._matching_blocks)
1397
rev2_key = ('rev2-id',)
1398
record = ['0,1,1\n', 'new-line\n']
1399
details = ('line-delta', False)
1400
ann._expand_record(rev2_key, (parent_key,), parent_key, record, details)
1401
self.assertEqual([(1, 1, 2), (3, 3, 0)],
1402
ann._matching_blocks[(rev2_key, parent_key)])
1404
def test__get_parent_ann_uses_matching_blocks(self):
1405
ann = self.make_annotator()
1406
rev_key = ('rev-id',)
1407
parent_key = ('parent-id',)
1408
parent_ann = [(parent_key,)]*3
1409
block_key = (rev_key, parent_key)
1410
ann._annotations_cache[parent_key] = parent_ann
1411
ann._matching_blocks[block_key] = [(0, 1, 1), (3, 3, 0)]
1412
# We should not try to access any parent_lines content, because we know
1413
# we already have the matching blocks
1414
par_ann, blocks = ann._get_parent_annotations_and_matches(rev_key,
1415
['1\n', '2\n', '3\n'], parent_key)
1416
self.assertEqual(parent_ann, par_ann)
1417
self.assertEqual([(0, 1, 1), (3, 3, 0)], blocks)
1418
self.assertEqual({}, ann._matching_blocks)
1420
def test__process_pending(self):
1421
ann = self.make_annotator()
1422
rev_key = ('rev-id',)
1425
record = ['0,1,1\n', 'new-line\n']
1426
details = ('line-delta', False)
1427
p1_record = ['line1\n', 'line2\n']
1428
ann._num_compression_children[p1_key] = 1
1429
res = ann._expand_record(rev_key, (p1_key,p2_key), p1_key,
1431
self.assertEqual(None, res)
1432
# self.assertTrue(p1_key in ann._pending_deltas)
1433
self.assertEqual({}, ann._pending_annotation)
1434
# Now insert p1, and we should be able to expand the delta
1435
res = ann._expand_record(p1_key, (), None, p1_record,
1436
('fulltext', False))
1437
self.assertEqual(p1_record, res)
1438
ann._annotations_cache[p1_key] = [(p1_key,)]*2
1439
res = ann._process_pending(p1_key)
1440
self.assertEqual([], res)
1441
self.assertFalse(p1_key in ann._pending_deltas)
1442
self.assertTrue(p2_key in ann._pending_annotation)
1443
self.assertEqual({p2_key: [(rev_key, (p1_key, p2_key))]},
1444
ann._pending_annotation)
1445
# Now fill in parent 2, and pending annotation should be satisfied
1446
res = ann._expand_record(p2_key, (), None, [], ('fulltext', False))
1447
ann._annotations_cache[p2_key] = []
1448
res = ann._process_pending(p2_key)
1449
self.assertEqual([rev_key], res)
1450
self.assertEqual({}, ann._pending_annotation)
1451
self.assertEqual({}, ann._pending_deltas)
1453
def test_record_delta_removes_basis(self):
1454
ann = self.make_annotator()
1455
ann._expand_record(('parent-id',), (), None,
1456
['line1\n', 'line2\n'], ('fulltext', False))
1457
ann._num_compression_children['parent-id'] = 2
1459
def test_annotate_special_text(self):
1460
ann = self.make_annotator()
1462
rev1_key = ('rev-1',)
1463
rev2_key = ('rev-2',)
1464
rev3_key = ('rev-3',)
1465
spec_key = ('special:',)
1466
vf.add_lines(rev1_key, [], ['initial content\n'])
1467
vf.add_lines(rev2_key, [rev1_key], ['initial content\n',
1470
vf.add_lines(rev3_key, [rev1_key], ['initial content\n',
1473
spec_text = ('initial content\n'
1477
ann.add_special_text(spec_key, [rev2_key, rev3_key], spec_text)
1478
anns, lines = ann.annotate(spec_key)
1479
self.assertEqual([(rev1_key,),
1480
(rev2_key, rev3_key),
1484
self.assertEqualDiff(spec_text, ''.join(lines))
1316
1487
class KnitTests(TestCaseWithTransport):
1317
1488
"""Class containing knit test helper routines."""