1287
1300
class LowLevelKnitIndexTests_c(LowLevelKnitIndexTests):
1289
_test_needs_features = [compiled_knit_feature]
1302
_test_needs_features = [CompiledKnitFeature]
1291
1304
def get_knit_index(self, transport, name, mode):
1292
1305
mapper = ConstantMapper(name)
1293
from bzrlib._knit_load_data_pyx import _load_data_c
1294
self.overrideAttr(knit, '_load_data', _load_data_c)
1306
orig = knit._load_data
1308
knit._load_data = orig
1309
self.addCleanup(reset)
1310
from bzrlib._knit_load_data_c import _load_data_c
1311
knit._load_data = _load_data_c
1295
1312
allow_writes = lambda: mode == 'w'
1296
return _KndxIndex(transport, mapper, lambda:None,
1297
allow_writes, lambda:True)
1300
class Test_KnitAnnotator(TestCaseWithMemoryTransport):
1302
def make_annotator(self):
1303
factory = knit.make_pack_factory(True, True, 1)
1304
vf = factory(self.get_transport())
1305
return knit._KnitAnnotator(vf)
1307
def test__expand_fulltext(self):
1308
ann = self.make_annotator()
1309
rev_key = ('rev-id',)
1310
ann._num_compression_children[rev_key] = 1
1311
res = ann._expand_record(rev_key, (('parent-id',),), None,
1312
['line1\n', 'line2\n'], ('fulltext', True))
1313
# The content object and text lines should be cached appropriately
1314
self.assertEqual(['line1\n', 'line2'], res)
1315
content_obj = ann._content_objects[rev_key]
1316
self.assertEqual(['line1\n', 'line2\n'], content_obj._lines)
1317
self.assertEqual(res, content_obj.text())
1318
self.assertEqual(res, ann._text_cache[rev_key])
1320
def test__expand_delta_comp_parent_not_available(self):
1321
# Parent isn't available yet, so we return nothing, but queue up this
1322
# node for later processing
1323
ann = self.make_annotator()
1324
rev_key = ('rev-id',)
1325
parent_key = ('parent-id',)
1326
record = ['0,1,1\n', 'new-line\n']
1327
details = ('line-delta', False)
1328
res = ann._expand_record(rev_key, (parent_key,), parent_key,
1330
self.assertEqual(None, res)
1331
self.assertTrue(parent_key in ann._pending_deltas)
1332
pending = ann._pending_deltas[parent_key]
1333
self.assertEqual(1, len(pending))
1334
self.assertEqual((rev_key, (parent_key,), record, details), pending[0])
1336
def test__expand_record_tracks_num_children(self):
1337
ann = self.make_annotator()
1338
rev_key = ('rev-id',)
1339
rev2_key = ('rev2-id',)
1340
parent_key = ('parent-id',)
1341
record = ['0,1,1\n', 'new-line\n']
1342
details = ('line-delta', False)
1343
ann._num_compression_children[parent_key] = 2
1344
ann._expand_record(parent_key, (), None, ['line1\n', 'line2\n'],
1345
('fulltext', False))
1346
res = ann._expand_record(rev_key, (parent_key,), parent_key,
1348
self.assertEqual({parent_key: 1}, ann._num_compression_children)
1349
# Expanding the second child should remove the content object, and the
1350
# num_compression_children entry
1351
res = ann._expand_record(rev2_key, (parent_key,), parent_key,
1353
self.assertFalse(parent_key in ann._content_objects)
1354
self.assertEqual({}, ann._num_compression_children)
1355
# We should not cache the content_objects for rev2 and rev, because
1356
# they do not have compression children of their own.
1357
self.assertEqual({}, ann._content_objects)
1359
def test__expand_delta_records_blocks(self):
1360
ann = self.make_annotator()
1361
rev_key = ('rev-id',)
1362
parent_key = ('parent-id',)
1363
record = ['0,1,1\n', 'new-line\n']
1364
details = ('line-delta', True)
1365
ann._num_compression_children[parent_key] = 2
1366
ann._expand_record(parent_key, (), None,
1367
['line1\n', 'line2\n', 'line3\n'],
1368
('fulltext', False))
1369
ann._expand_record(rev_key, (parent_key,), parent_key, record, details)
1370
self.assertEqual({(rev_key, parent_key): [(1, 1, 1), (3, 3, 0)]},
1371
ann._matching_blocks)
1372
rev2_key = ('rev2-id',)
1373
record = ['0,1,1\n', 'new-line\n']
1374
details = ('line-delta', False)
1375
ann._expand_record(rev2_key, (parent_key,), parent_key, record, details)
1376
self.assertEqual([(1, 1, 2), (3, 3, 0)],
1377
ann._matching_blocks[(rev2_key, parent_key)])
1379
def test__get_parent_ann_uses_matching_blocks(self):
1380
ann = self.make_annotator()
1381
rev_key = ('rev-id',)
1382
parent_key = ('parent-id',)
1383
parent_ann = [(parent_key,)]*3
1384
block_key = (rev_key, parent_key)
1385
ann._annotations_cache[parent_key] = parent_ann
1386
ann._matching_blocks[block_key] = [(0, 1, 1), (3, 3, 0)]
1387
# We should not try to access any parent_lines content, because we know
1388
# we already have the matching blocks
1389
par_ann, blocks = ann._get_parent_annotations_and_matches(rev_key,
1390
['1\n', '2\n', '3\n'], parent_key)
1391
self.assertEqual(parent_ann, par_ann)
1392
self.assertEqual([(0, 1, 1), (3, 3, 0)], blocks)
1393
self.assertEqual({}, ann._matching_blocks)
1395
def test__process_pending(self):
1396
ann = self.make_annotator()
1397
rev_key = ('rev-id',)
1400
record = ['0,1,1\n', 'new-line\n']
1401
details = ('line-delta', False)
1402
p1_record = ['line1\n', 'line2\n']
1403
ann._num_compression_children[p1_key] = 1
1404
res = ann._expand_record(rev_key, (p1_key,p2_key), p1_key,
1406
self.assertEqual(None, res)
1407
# self.assertTrue(p1_key in ann._pending_deltas)
1408
self.assertEqual({}, ann._pending_annotation)
1409
# Now insert p1, and we should be able to expand the delta
1410
res = ann._expand_record(p1_key, (), None, p1_record,
1411
('fulltext', False))
1412
self.assertEqual(p1_record, res)
1413
ann._annotations_cache[p1_key] = [(p1_key,)]*2
1414
res = ann._process_pending(p1_key)
1415
self.assertEqual([], res)
1416
self.assertFalse(p1_key in ann._pending_deltas)
1417
self.assertTrue(p2_key in ann._pending_annotation)
1418
self.assertEqual({p2_key: [(rev_key, (p1_key, p2_key))]},
1419
ann._pending_annotation)
1420
# Now fill in parent 2, and pending annotation should be satisfied
1421
res = ann._expand_record(p2_key, (), None, [], ('fulltext', False))
1422
ann._annotations_cache[p2_key] = []
1423
res = ann._process_pending(p2_key)
1424
self.assertEqual([rev_key], res)
1425
self.assertEqual({}, ann._pending_annotation)
1426
self.assertEqual({}, ann._pending_deltas)
1428
def test_record_delta_removes_basis(self):
1429
ann = self.make_annotator()
1430
ann._expand_record(('parent-id',), (), None,
1431
['line1\n', 'line2\n'], ('fulltext', False))
1432
ann._num_compression_children['parent-id'] = 2
1434
def test_annotate_special_text(self):
1435
ann = self.make_annotator()
1437
rev1_key = ('rev-1',)
1438
rev2_key = ('rev-2',)
1439
rev3_key = ('rev-3',)
1440
spec_key = ('special:',)
1441
vf.add_lines(rev1_key, [], ['initial content\n'])
1442
vf.add_lines(rev2_key, [rev1_key], ['initial content\n',
1445
vf.add_lines(rev3_key, [rev1_key], ['initial content\n',
1448
spec_text = ('initial content\n'
1452
ann.add_special_text(spec_key, [rev2_key, rev3_key], spec_text)
1453
anns, lines = ann.annotate(spec_key)
1454
self.assertEqual([(rev1_key,),
1455
(rev2_key, rev3_key),
1459
self.assertEqualDiff(spec_text, ''.join(lines))
1313
return _KndxIndex(transport, mapper, lambda:None, allow_writes, lambda:True)
1462
1316
class KnitTests(TestCaseWithTransport):