985
990
self.manager = None
987
992
def add_key(self, key):
993
"""Add another to key to fetch."""
988
994
self.keys.append(key)
989
995
index_memo, _, _, _ = self.locations[key]
990
start, end = index_memo[3:5]
991
# XXX: if this key has already been fetched in another group, it
992
# shouldn't count towards this total.
993
self.total_bytes += end - start
996
read_memo = index_memo[0:3]
997
if read_memo not in self.gcvf._group_cache:
998
start, end = index_memo[3:5]
999
self.total_bytes += end - start
995
def _frob_batch(self, last_read_memo, manager, full_flush=False):
997
last_read_memo_tmp = last_read_memo
1001
def empty_manager(self):
1002
if self.manager is not None:
1003
for factory in self.manager.get_record_stream():
1007
def yield_factories(self, full_flush=False):
1008
if self.manager is None and not self.keys:
1012
last_read_memo = self.last_read_memo
998
1013
for key in self.keys:
999
index_memo, _, parents, (method, _) = self.locations[key]
1000
read_memo = index_memo[0:3]
1014
index_memo = self.locations[key][0]
1015
read_memo = index_memo[:3]
1001
1016
if last_read_memo != read_memo:
1002
blocks_to_get.append((key, read_memo))
1017
memos_to_get.append(read_memo)
1018
keys_to_get.append(key)
1003
1019
last_read_memo = read_memo
1004
last_read_memo = last_read_memo_tmp
1005
blocks = self.gcvf._get_blocks(
1006
[read_memo for _, read_memo in blocks_to_get])
1020
blocks = self.gcvf._get_blocks(memos_to_get)
1008
for block, (key, _) in zip(blocks, blocks_to_get):
1022
for block, key in zip(blocks, keys_to_get):
1009
1023
block_map[key] = block
1024
last_read_memo = self.last_read_memo
1010
1025
for key in self.keys:
1011
index_memo, _, parents, (method, _) = self.locations[key]
1012
read_memo = index_memo[0:3]
1026
index_memo, _, parents, _ = self.locations[key]
1027
read_memo = index_memo[:3]
1013
1028
if last_read_memo != read_memo:
1014
1029
# We are starting a new block. If we have a
1015
1030
# manager, we have found everything that fits for
1016
1031
# now, so yield records
1017
if manager is not None:
1018
for factory in manager.get_record_stream():
1032
for factory in self.empty_manager():
1020
1034
# Now start a new manager
1021
1035
block = block_map[key]
1022
manager = _LazyGroupContentManager(block)
1036
self.manager = _LazyGroupContentManager(block)
1023
1037
last_read_memo = read_memo
1024
1038
start, end = index_memo[3:5]
1025
manager.add_factory(key, parents, start, end)
1039
self.manager.add_factory(key, parents, start, end)
1027
if manager is not None:
1028
for factory in manager.get_record_stream():
1030
last_read_memo = manager = None
1041
for factory in self.empty_manager():
1043
last_read_memo = None
1031
1044
self.last_read_memo = last_read_memo
1032
self.manager = manager
1033
1045
del self.keys[:]
1034
1046
self.total_bytes = 0
1415
1437
unadded_keys, source_result)
1416
1438
for key in missing:
1417
1439
yield AbsentContentFactory(key)
1419
last_read_memo = None
1420
# TODO: This works fairly well at batching up existing groups into a
1421
# streamable format, and possibly allowing for taking one big
1422
# group and splitting it when it isn't fully utilized.
1423
# However, it doesn't allow us to find under-utilized groups and
1424
# combine them into a bigger group on the fly.
1425
# (Consider the issue with how chk_map inserts texts
1426
# one-at-a-time.) This could be done at insert_record_stream()
1427
# time, but it probably would decrease the number of
1428
# bytes-on-the-wire for fetch.
1440
# Batch up as many keys as we can until either:
1441
# - we encounter an unadded ref, or
1442
# - we run out of keys, or
1443
# - the total bytes to retrieve for this batch > 64k
1444
batcher = _BatchingBlockFetcher(self, locations)
1429
1445
for source, keys in source_keys:
1430
1446
if source is self:
1431
# Batch up as many keys as we can until either:
1432
# - we encounter an unadded ref, or
1433
# - we run out of keys, or
1434
# - the total bytes to retrieve for this batch > 64k
1435
batch_bytes_total = 0
1436
batcher = _Batcher(self, locations)
1437
1447
for key in keys:
1438
1448
if key in self._unadded_refs:
1439
1449
# flush batch, then yield unadded ref from
1440
1450
# self._compressor
1441
for _ in batcher._frob_batch(last_read_memo, manager,
1451
for _ in batcher.yield_factories(full_flush=True):
1444
last_read_memo = batcher.last_read_memo
1445
manager = batcher.manager
1446
1453
bytes, sha1 = self._compressor.extract(key)
1447
1454
parents = self._unadded_refs[key]
1448
1455
yield FulltextContentFactory(key, parents, sha1, bytes)
1450
1457
batcher.add_key(key)
1451
1458
if batcher.total_bytes > 2**16:
1452
1459
# Ok! Our batch is full. Let's do it.
1453
for _ in batcher._frob_batch(last_read_memo, manager):
1460
for _ in batcher.yield_factories():
1455
last_read_memo = batcher.last_read_memo
1456
manager = batcher.manager
1458
for _ in batcher._frob_batch(last_read_memo, manager,
1461
last_read_memo = batcher.last_read_memo
1462
manager = batcher.manager
1464
if manager is not None:
1465
for factory in manager.get_record_stream():
1467
last_read_memo = manager = None
1463
for _ in batcher.yield_factories(full_flush=True):
1465
batcher.last_read_memo = None
1468
1466
for record in source.get_record_stream(keys, ordering,
1469
1467
include_delta_closure):
1471
if manager is not None:
1472
for factory in manager.get_record_stream():
1469
for _ in batcher.yield_factories(full_flush=True):
1475
1472
def get_sha1s(self, keys):
1476
1473
"""See VersionedFiles.get_sha1s()."""