1301
1295
class LowLevelKnitIndexTests_c(LowLevelKnitIndexTests):
1303
_test_needs_features = [CompiledKnitFeature]
1297
_test_needs_features = [compiled_knit_feature]
1305
1299
def get_knit_index(self, transport, name, mode):
1306
1300
mapper = ConstantMapper(name)
1307
orig = knit._load_data
1309
knit._load_data = orig
1310
self.addCleanup(reset)
1311
from bzrlib._knit_load_data_c import _load_data_c
1312
knit._load_data = _load_data_c
1301
from bzrlib._knit_load_data_pyx import _load_data_c
1302
self.overrideAttr(knit, '_load_data', _load_data_c)
1313
1303
allow_writes = lambda: mode == 'w'
1314
return _KndxIndex(transport, mapper, lambda:None, allow_writes, lambda:True)
1304
return _KndxIndex(transport, mapper, lambda:None,
1305
allow_writes, lambda:True)
1308
class Test_KnitAnnotator(TestCaseWithMemoryTransport):
1310
def make_annotator(self):
1311
factory = knit.make_pack_factory(True, True, 1)
1312
vf = factory(self.get_transport())
1313
return knit._KnitAnnotator(vf)
1315
def test__expand_fulltext(self):
1316
ann = self.make_annotator()
1317
rev_key = ('rev-id',)
1318
ann._num_compression_children[rev_key] = 1
1319
res = ann._expand_record(rev_key, (('parent-id',),), None,
1320
['line1\n', 'line2\n'], ('fulltext', True))
1321
# The content object and text lines should be cached appropriately
1322
self.assertEqual(['line1\n', 'line2'], res)
1323
content_obj = ann._content_objects[rev_key]
1324
self.assertEqual(['line1\n', 'line2\n'], content_obj._lines)
1325
self.assertEqual(res, content_obj.text())
1326
self.assertEqual(res, ann._text_cache[rev_key])
1328
def test__expand_delta_comp_parent_not_available(self):
1329
# Parent isn't available yet, so we return nothing, but queue up this
1330
# node for later processing
1331
ann = self.make_annotator()
1332
rev_key = ('rev-id',)
1333
parent_key = ('parent-id',)
1334
record = ['0,1,1\n', 'new-line\n']
1335
details = ('line-delta', False)
1336
res = ann._expand_record(rev_key, (parent_key,), parent_key,
1338
self.assertEqual(None, res)
1339
self.assertTrue(parent_key in ann._pending_deltas)
1340
pending = ann._pending_deltas[parent_key]
1341
self.assertEqual(1, len(pending))
1342
self.assertEqual((rev_key, (parent_key,), record, details), pending[0])
1344
def test__expand_record_tracks_num_children(self):
1345
ann = self.make_annotator()
1346
rev_key = ('rev-id',)
1347
rev2_key = ('rev2-id',)
1348
parent_key = ('parent-id',)
1349
record = ['0,1,1\n', 'new-line\n']
1350
details = ('line-delta', False)
1351
ann._num_compression_children[parent_key] = 2
1352
ann._expand_record(parent_key, (), None, ['line1\n', 'line2\n'],
1353
('fulltext', False))
1354
res = ann._expand_record(rev_key, (parent_key,), parent_key,
1356
self.assertEqual({parent_key: 1}, ann._num_compression_children)
1357
# Expanding the second child should remove the content object, and the
1358
# num_compression_children entry
1359
res = ann._expand_record(rev2_key, (parent_key,), parent_key,
1361
self.assertFalse(parent_key in ann._content_objects)
1362
self.assertEqual({}, ann._num_compression_children)
1363
# We should not cache the content_objects for rev2 and rev, because
1364
# they do not have compression children of their own.
1365
self.assertEqual({}, ann._content_objects)
1367
def test__expand_delta_records_blocks(self):
1368
ann = self.make_annotator()
1369
rev_key = ('rev-id',)
1370
parent_key = ('parent-id',)
1371
record = ['0,1,1\n', 'new-line\n']
1372
details = ('line-delta', True)
1373
ann._num_compression_children[parent_key] = 2
1374
ann._expand_record(parent_key, (), None,
1375
['line1\n', 'line2\n', 'line3\n'],
1376
('fulltext', False))
1377
ann._expand_record(rev_key, (parent_key,), parent_key, record, details)
1378
self.assertEqual({(rev_key, parent_key): [(1, 1, 1), (3, 3, 0)]},
1379
ann._matching_blocks)
1380
rev2_key = ('rev2-id',)
1381
record = ['0,1,1\n', 'new-line\n']
1382
details = ('line-delta', False)
1383
ann._expand_record(rev2_key, (parent_key,), parent_key, record, details)
1384
self.assertEqual([(1, 1, 2), (3, 3, 0)],
1385
ann._matching_blocks[(rev2_key, parent_key)])
1387
def test__get_parent_ann_uses_matching_blocks(self):
1388
ann = self.make_annotator()
1389
rev_key = ('rev-id',)
1390
parent_key = ('parent-id',)
1391
parent_ann = [(parent_key,)]*3
1392
block_key = (rev_key, parent_key)
1393
ann._annotations_cache[parent_key] = parent_ann
1394
ann._matching_blocks[block_key] = [(0, 1, 1), (3, 3, 0)]
1395
# We should not try to access any parent_lines content, because we know
1396
# we already have the matching blocks
1397
par_ann, blocks = ann._get_parent_annotations_and_matches(rev_key,
1398
['1\n', '2\n', '3\n'], parent_key)
1399
self.assertEqual(parent_ann, par_ann)
1400
self.assertEqual([(0, 1, 1), (3, 3, 0)], blocks)
1401
self.assertEqual({}, ann._matching_blocks)
1403
def test__process_pending(self):
1404
ann = self.make_annotator()
1405
rev_key = ('rev-id',)
1408
record = ['0,1,1\n', 'new-line\n']
1409
details = ('line-delta', False)
1410
p1_record = ['line1\n', 'line2\n']
1411
ann._num_compression_children[p1_key] = 1
1412
res = ann._expand_record(rev_key, (p1_key,p2_key), p1_key,
1414
self.assertEqual(None, res)
1415
# self.assertTrue(p1_key in ann._pending_deltas)
1416
self.assertEqual({}, ann._pending_annotation)
1417
# Now insert p1, and we should be able to expand the delta
1418
res = ann._expand_record(p1_key, (), None, p1_record,
1419
('fulltext', False))
1420
self.assertEqual(p1_record, res)
1421
ann._annotations_cache[p1_key] = [(p1_key,)]*2
1422
res = ann._process_pending(p1_key)
1423
self.assertEqual([], res)
1424
self.assertFalse(p1_key in ann._pending_deltas)
1425
self.assertTrue(p2_key in ann._pending_annotation)
1426
self.assertEqual({p2_key: [(rev_key, (p1_key, p2_key))]},
1427
ann._pending_annotation)
1428
# Now fill in parent 2, and pending annotation should be satisfied
1429
res = ann._expand_record(p2_key, (), None, [], ('fulltext', False))
1430
ann._annotations_cache[p2_key] = []
1431
res = ann._process_pending(p2_key)
1432
self.assertEqual([rev_key], res)
1433
self.assertEqual({}, ann._pending_annotation)
1434
self.assertEqual({}, ann._pending_deltas)
1436
def test_record_delta_removes_basis(self):
1437
ann = self.make_annotator()
1438
ann._expand_record(('parent-id',), (), None,
1439
['line1\n', 'line2\n'], ('fulltext', False))
1440
ann._num_compression_children['parent-id'] = 2
1442
def test_annotate_special_text(self):
1443
ann = self.make_annotator()
1445
rev1_key = ('rev-1',)
1446
rev2_key = ('rev-2',)
1447
rev3_key = ('rev-3',)
1448
spec_key = ('special:',)
1449
vf.add_lines(rev1_key, [], ['initial content\n'])
1450
vf.add_lines(rev2_key, [rev1_key], ['initial content\n',
1453
vf.add_lines(rev3_key, [rev1_key], ['initial content\n',
1456
spec_text = ('initial content\n'
1460
ann.add_special_text(spec_key, [rev2_key, rev3_key], spec_text)
1461
anns, lines = ann.annotate(spec_key)
1462
self.assertEqual([(rev1_key,),
1463
(rev2_key, rev3_key),
1467
self.assertEqualDiff(spec_text, ''.join(lines))
1317
1470
class KnitTests(TestCaseWithTransport):