1436
1436
errors.KnitDataStreamIncompatible,
1437
1437
target.insert_data_stream, data_stream)
1439
def test_insert_data_stream_buffering_limit(self):
1440
"""insert_data_stream will batch the incoming records up to a certain
1443
This isn't testing correctness in the way other tests in this file do,
1444
just a performance/resource-use characteristic.
1446
target = self.make_test_knit(name='target')
1447
# Instrument target. We want to log the size of writes, and not
1448
# actually perform the insert because we aren't using real data.
1449
add_raw_records_calls = []
1450
def fake_add_raw_records(records, bytes):
1451
add_raw_records_calls.append(len(bytes))
1452
target._add_raw_records = fake_add_raw_records
1455
target.get_format_signature(),
1456
[('v1', [], 30, []), ('v2', [], 30, []), ('v3', [], 30, [])],
1457
StringIO('x' * 90).read
1460
# Insert 3 records of size 30, when bufsize is 64. No individual write
1461
# should exceed 64, so in this case we expect [60, 30] (i.e. the first
1462
# two records will be read and written in one go).
1463
target.insert_data_stream(data_stream, buffer_size=64)
1464
self.assertEqual([60, 30], add_raw_records_calls)
1466
def test_insert_data_stream_buffering_large_records(self):
1467
"""insert_data_stream's batching copes with records larger than the
1470
target = self.make_test_knit(name='target')
1471
# Instrument target. We want to log the size of writes, and not
1472
# actually perform the insert because we aren't using real data.
1473
add_raw_records_calls = []
1474
def fake_add_raw_records(records, bytes):
1475
add_raw_records_calls.append(len(bytes))
1476
target._add_raw_records = fake_add_raw_records
1479
target.get_format_signature(),
1480
[('v1', [], 100, []), ('v2', [], 100, [])],
1481
StringIO('x' * 200).read
1484
# Insert 1 record of size 100, when the buffer_size is much smaller than
1485
# that. Note that _add_raw_records is never called with no records,
1486
# i.e. if the buffer is empty, then flushing it does not trigger an
1488
target.insert_data_stream(data_stream, buffer_size=20)
1489
self.assertEqual([100, 100], add_raw_records_calls)
1491
def test_insert_data_stream_buffering_flushed_by_known_record(self):
1492
"""insert_data_stream's flushes its buffers (if any) when it needs to do
1493
consistency checks on a record from the stream.
1495
target = self.make_test_knit(name='target')
1496
# Insert a record real record.
1497
target.add_lines('v1', [], split_lines(TEXT_1))
1498
# Now instrument target. We want to log the size of writes, and not
1499
# actually perform the insert because we aren't using real data.
1500
add_raw_records_calls = []
1501
def fake_add_raw_records(records, bytes):
1502
add_raw_records_calls.append(len(bytes))
1503
target._add_raw_records = fake_add_raw_records
1505
# Create a file with a superficially valid knit header, gzip it.
1507
gzip_file = gzip.GzipFile(mode='wb', fileobj=sio)
1508
gzip_file.write('xx v1 yy %s\n' % target.get_sha1('v1'))
1512
target.get_format_signature(),
1513
[('v1', [], len(sio.getvalue()), [])],
1517
# Nothing is written; the buffer had nothing to flush.
1518
target.insert_data_stream(data_stream)
1519
self.assertEqual([], add_raw_records_calls)
1439
# def test_insert_data_stream_buffering_limit(self):
1440
# """insert_data_stream will batch the incoming records up to a certain
1443
# This isn't testing correctness in the way other tests in this file do,
1444
# just a performance/resource-use characteristic.
1446
# target = self.make_test_knit(name='target')
1447
# # Instrument target. We want to log the size of writes, and not
1448
# # actually perform the insert because we aren't using real data.
1449
# add_raw_records_calls = []
1450
# def fake_add_raw_records(records, bytes):
1451
# add_raw_records_calls.append(len(bytes))
1452
# target._add_raw_records = fake_add_raw_records
1455
# target.get_format_signature(),
1456
# [('v1', [], 30, []), ('v2', [], 30, []), ('v3', [], 30, [])],
1457
# StringIO('x' * 90).read
1460
# # Insert 3 records of size 30, when bufsize is 64. No individual write
1461
# # should exceed 64, so in this case we expect [60, 30] (i.e. the first
1462
# # two records will be read and written in one go).
1463
# target.insert_data_stream(data_stream, buffer_size=64)
1464
# self.assertEqual([60, 30], add_raw_records_calls)
1466
# def test_insert_data_stream_buffering_large_records(self):
1467
# """insert_data_stream's batching copes with records larger than the
1470
# target = self.make_test_knit(name='target')
1471
# # Instrument target. We want to log the size of writes, and not
1472
# # actually perform the insert because we aren't using real data.
1473
# add_raw_records_calls = []
1474
# def fake_add_raw_records(records, bytes):
1475
# add_raw_records_calls.append(len(bytes))
1476
# target._add_raw_records = fake_add_raw_records
1479
# target.get_format_signature(),
1480
# [('v1', [], 100, []), ('v2', [], 100, [])],
1481
# StringIO('x' * 200).read
1484
# # Insert 1 record of size 100, when the buffer_size is much smaller than
1485
# # that. Note that _add_raw_records is never called with no records,
1486
# # i.e. if the buffer is empty, then flushing it does not trigger an
1488
# target.insert_data_stream(data_stream, buffer_size=20)
1489
# self.assertEqual([100, 100], add_raw_records_calls)
1491
# def test_insert_data_stream_buffering_flushed_by_known_record(self):
1492
# """insert_data_stream's flushes its buffers (if any) when it needs to do
1493
# consistency checks on a record from the stream.
1495
# target = self.make_test_knit(name='target')
1496
# # Insert a record real record.
1497
# target.add_lines('v1', [], split_lines(TEXT_1))
1498
# # Now instrument target. We want to log the size of writes, and not
1499
# # actually perform the insert because we aren't using real data.
1500
# add_raw_records_calls = []
1501
# def fake_add_raw_records(records, bytes):
1502
# add_raw_records_calls.append(len(bytes))
1503
# target._add_raw_records = fake_add_raw_records
1505
# # Create a file with a superficially valid knit header, gzip it.
1507
# gzip_file = gzip.GzipFile(mode='wb', fileobj=sio)
1508
# gzip_file.write('xx v1 yy %s\n' % target.get_sha1('v1'))
1512
# target.get_format_signature(),
1513
# [('v1', [], len(sio.getvalue()), [])],
1517
# # Nothing is written; the buffer had nothing to flush.
1518
# target.insert_data_stream(data_stream)
1519
# self.assertEqual([], add_raw_records_calls)
1521
1521
# * test that a stream of "already present version, then new version"
1522
1522
# inserts correctly.