974
976
versioned_files.stream.close()
979
class _BatchingBlockFetcher(object):
980
"""Fetch group compress blocks in batches.
982
:ivar total_bytes: int of expected number of bytes needed to fetch the
983
currently pending batch.
986
def __init__(self, gcvf, locations):
988
self.locations = locations
990
self.batch_memos = {}
991
self.memos_to_get = []
993
self.last_read_memo = None
996
def add_key(self, key):
997
"""Add another to key to fetch.
999
:return: The estimated number of bytes needed to fetch the batch so
1002
self.keys.append(key)
1003
index_memo, _, _, _ = self.locations[key]
1004
read_memo = index_memo[0:3]
1005
# Three possibilities for this read_memo:
1006
# - it's already part of this batch; or
1007
# - it's not yet part of this batch, but is already cached; or
1008
# - it's not yet part of this batch and will need to be fetched.
1009
if read_memo in self.batch_memos:
1010
# This read memo is already in this batch.
1011
return self.total_bytes
1013
cached_block = self.gcvf._group_cache[read_memo]
1015
# This read memo is new to this batch, and the data isn't cached
1017
self.batch_memos[read_memo] = None
1018
self.memos_to_get.append(read_memo)
1019
byte_length = read_memo[2]
1020
self.total_bytes += byte_length
1022
# This read memo is new to this batch, but cached.
1023
# Keep a reference to the cached block in batch_memos because it's
1024
# certain that we'll use it when this batch is processed, but
1025
# there's a risk that it would fall out of _group_cache between now
1027
self.batch_memos[read_memo] = cached_block
1028
return self.total_bytes
1030
def _flush_manager(self):
1031
if self.manager is not None:
1032
for factory in self.manager.get_record_stream():
1035
self.last_read_memo = None
1037
def yield_factories(self, full_flush=False):
1038
"""Yield factories for keys added since the last yield. They will be
1039
returned in the order they were added via add_key.
1041
:param full_flush: by default, some results may not be returned in case
1042
they can be part of the next batch. If full_flush is True, then
1043
all results are returned.
1045
if self.manager is None and not self.keys:
1047
# Fetch all memos in this batch.
1048
blocks = self.gcvf._get_blocks(self.memos_to_get)
1049
# Turn blocks into factories and yield them.
1050
memos_to_get_stack = list(self.memos_to_get)
1051
memos_to_get_stack.reverse()
1052
for key in self.keys:
1053
index_memo, _, parents, _ = self.locations[key]
1054
read_memo = index_memo[:3]
1055
if self.last_read_memo != read_memo:
1056
# We are starting a new block. If we have a
1057
# manager, we have found everything that fits for
1058
# now, so yield records
1059
for factory in self._flush_manager():
1061
# Now start a new manager.
1062
if memos_to_get_stack and memos_to_get_stack[-1] == read_memo:
1063
# The next block from _get_blocks will be the block we
1065
block_read_memo, block = blocks.next()
1066
if block_read_memo != read_memo:
1067
raise AssertionError(
1068
"block_read_memo out of sync with read_memo"
1069
"(%r != %r)" % (block_read_memo, read_memo))
1070
self.batch_memos[read_memo] = block
1071
memos_to_get_stack.pop()
1073
block = self.batch_memos[read_memo]
1074
self.manager = _LazyGroupContentManager(block)
1075
self.last_read_memo = read_memo
1076
start, end = index_memo[3:5]
1077
self.manager.add_factory(key, parents, start, end)
1079
for factory in self._flush_manager():
1082
self.batch_memos.clear()
1083
del self.memos_to_get[:]
1084
self.total_bytes = 0
977
1087
class GroupCompressVersionedFiles(VersionedFiles):
978
1088
"""A group-compress based VersionedFiles implementation."""
1137
1247
missing.difference_update(set(new_result))
1138
1248
return result, source_results
1140
def _get_block(self, index_memo):
1141
read_memo = index_memo[0:3]
1144
block = self._group_cache[read_memo]
1147
zdata = self._access.get_raw_records([read_memo]).next()
1148
# decompress - whole thing - this is not a bug, as it
1149
# permits caching. We might want to store the partially
1150
# decompresed group and decompress object, so that recent
1151
# texts are not penalised by big groups.
1152
block = GroupCompressBlock.from_bytes(zdata)
1153
self._group_cache[read_memo] = block
1155
# print len(zdata), len(plain)
1156
# parse - requires split_lines, better to have byte offsets
1157
# here (but not by much - we only split the region for the
1158
# recipe, and we often want to end up with lines anyway.
1250
def _get_blocks(self, read_memos):
1251
"""Get GroupCompressBlocks for the given read_memos.
1253
:returns: a series of (read_memo, block) pairs, in the order they were
1257
for read_memo in read_memos:
1259
block = self._group_cache[read_memo]
1263
cached[read_memo] = block
1265
not_cached_seen = set()
1266
for read_memo in read_memos:
1267
if read_memo in cached:
1268
# Don't fetch what we already have
1270
if read_memo in not_cached_seen:
1271
# Don't try to fetch the same data twice
1273
not_cached.append(read_memo)
1274
not_cached_seen.add(read_memo)
1275
raw_records = self._access.get_raw_records(not_cached)
1276
for read_memo in read_memos:
1278
yield read_memo, cached[read_memo]
1280
# Read the block, and cache it.
1281
zdata = raw_records.next()
1282
block = GroupCompressBlock.from_bytes(zdata)
1283
self._group_cache[read_memo] = block
1284
cached[read_memo] = block
1285
yield read_memo, block
1161
1287
def get_missing_compression_parent_keys(self):
1162
1288
"""Return the keys of missing compression parents.
1328
1454
unadded_keys, source_result)
1329
1455
for key in missing:
1330
1456
yield AbsentContentFactory(key)
1332
last_read_memo = None
1333
# TODO: This works fairly well at batching up existing groups into a
1334
# streamable format, and possibly allowing for taking one big
1335
# group and splitting it when it isn't fully utilized.
1336
# However, it doesn't allow us to find under-utilized groups and
1337
# combine them into a bigger group on the fly.
1338
# (Consider the issue with how chk_map inserts texts
1339
# one-at-a-time.) This could be done at insert_record_stream()
1340
# time, but it probably would decrease the number of
1341
# bytes-on-the-wire for fetch.
1457
# Batch up as many keys as we can until either:
1458
# - we encounter an unadded ref, or
1459
# - we run out of keys, or
1460
# - the total bytes to retrieve for this batch > BATCH_SIZE
1461
batcher = _BatchingBlockFetcher(self, locations)
1342
1462
for source, keys in source_keys:
1343
1463
if source is self:
1344
1464
for key in keys:
1345
1465
if key in self._unadded_refs:
1346
if manager is not None:
1347
for factory in manager.get_record_stream():
1349
last_read_memo = manager = None
1466
# Flush batch, then yield unadded ref from
1468
for factory in batcher.yield_factories(full_flush=True):
1350
1470
bytes, sha1 = self._compressor.extract(key)
1351
1471
parents = self._unadded_refs[key]
1352
1472
yield FulltextContentFactory(key, parents, sha1, bytes)
1354
index_memo, _, parents, (method, _) = locations[key]
1355
read_memo = index_memo[0:3]
1356
if last_read_memo != read_memo:
1357
# We are starting a new block. If we have a
1358
# manager, we have found everything that fits for
1359
# now, so yield records
1360
if manager is not None:
1361
for factory in manager.get_record_stream():
1363
# Now start a new manager
1364
block = self._get_block(index_memo)
1365
manager = _LazyGroupContentManager(block)
1366
last_read_memo = read_memo
1367
start, end = index_memo[3:5]
1368
manager.add_factory(key, parents, start, end)
1474
if batcher.add_key(key) > BATCH_SIZE:
1475
# Ok, this batch is big enough. Yield some results.
1476
for factory in batcher.yield_factories():
1370
if manager is not None:
1371
for factory in manager.get_record_stream():
1373
last_read_memo = manager = None
1479
for factory in batcher.yield_factories(full_flush=True):
1374
1481
for record in source.get_record_stream(keys, ordering,
1375
1482
include_delta_closure):
1377
if manager is not None:
1378
for factory in manager.get_record_stream():
1484
for factory in batcher.yield_factories(full_flush=True):
1381
1487
def get_sha1s(self, keys):
1382
1488
"""See VersionedFiles.get_sha1s()."""