985
985
self.locations = locations
987
self.batch_memos = {}
988
self.memos_to_get = []
987
989
self.total_bytes = 0
988
990
self.last_read_memo = None
989
991
self.manager = None
991
993
def add_key(self, key):
992
"""Add another to key to fetch."""
994
"""Add another to key to fetch.
996
:return: The estimated number of bytes needed to fetch the batch so
993
999
self.keys.append(key)
994
1000
index_memo, _, _, _ = self.locations[key]
995
1001
read_memo = index_memo[0:3]
996
# This looks a bit dangerous, but it's ok: we're assuming that memos in
997
# _group_cache now will still be there when yield_factories is called
998
# (and that uncached memos don't become cached). This ought to be
999
# true. But if it isn't that's ok, yield_factories will still work.
1000
# The only negative effect is that the estimated 'total_bytes' value
1001
# here will be wrong, so we might fetch bigger/smaller batches than
1003
if read_memo not in self.gcvf._group_cache:
1002
# Three possibilities for this read_memo:
1003
# - it's already part of this batch; or
1004
# - it's not yet part of this batch, but is already cached; or
1005
# - it's not yet part of this batch and will need to be fetched.
1006
if read_memo in self.batch_memos:
1007
# This read memo is already in this batch.
1010
cached_block = self.gcvf._group_cache[read_memo]
1012
# This read memo is new to this batch, and the data isn't cached
1014
self.batch_memos[read_memo] = None
1015
self.memos_to_get.append(read_memo)
1004
1016
byte_length = read_memo[2]
1005
1017
self.total_bytes += byte_length
1019
# This read memo is new to this batch, but cached.
1020
# Keep a reference to the cached block in batch_memos because it's
1021
# certain that we'll use it when this batch is processed, but
1022
# there's a risk that it would fall out of _group_cache between now
1024
self.batch_memos[read_memo] = cached_block
1025
return self.total_bytes
1007
1027
def _flush_manager(self):
1008
1028
if self.manager is not None:
1022
1042
if self.manager is None and not self.keys:
1024
# First, determine the list of memos to get.
1026
last_read_memo = self.last_read_memo
1027
for key in self.keys:
1028
index_memo = self.locations[key][0]
1029
read_memo = index_memo[:3]
1030
if last_read_memo != read_memo:
1031
memos_to_get.append(read_memo)
1032
last_read_memo = read_memo
1033
# Second, we fetch all those memos in one batch.
1034
blocks = self.gcvf._get_blocks(memos_to_get)
1035
# Finally, we turn blocks into factories and yield them.
1044
# Fetch all memos in this batch.
1045
blocks = self.gcvf._get_blocks(self.memos_to_get)
1046
# Turn blocks into factories and yield them.
1047
memos_to_get_stack = list(self.memos_to_get)
1048
memos_to_get_stack.reverse()
1036
1049
for key in self.keys:
1037
1050
index_memo, _, parents, _ = self.locations[key]
1038
1051
read_memo = index_memo[:3]
1042
1055
# now, so yield records
1043
1056
for factory in self._flush_manager():
1045
# Now start a new manager. The next block from _get_blocks
1046
# will be the block we need.
1047
block = blocks.next()
1058
# Now start a new manager.
1059
if memos_to_get_stack and memos_to_get_stack[-1] == read_memo:
1060
# The next block from _get_blocks will be the block we
1062
block_read_memo, block = blocks.next()
1063
if block_read_memo != read_memo:
1064
raise AssertionError(
1065
"block_read_memo out of sync with read_memo")
1066
self.batch_memos[read_memo] = block
1067
memos_to_get_stack.pop()
1069
block = self.batch_memos[read_memo]
1048
1070
self.manager = _LazyGroupContentManager(block)
1049
1071
self.last_read_memo = read_memo
1050
1072
start, end = index_memo[3:5]
1222
1247
def _get_blocks(self, read_memos):
1223
1248
"""Get GroupCompressBlocks for the given read_memos.
1225
Blocks are returned in the order specified in read_memos.
1250
:returns: a series of (read_memo, block) pairs, in the order they were
1228
1254
for read_memo in read_memos:
1246
1272
raw_records = self._access.get_raw_records(not_cached)
1247
1273
for read_memo in read_memos:
1249
yield cached[read_memo]
1275
yield read_memos, cached[read_memo]
1250
1276
except KeyError:
1251
1277
# read the group
1252
1278
zdata = raw_records.next()
1257
1283
block = GroupCompressBlock.from_bytes(zdata)
1258
1284
self._group_cache[read_memo] = block
1259
1285
cached[read_memo] = block
1286
yield read_memo, block
1262
1288
def get_missing_compression_parent_keys(self):
1263
1289
"""Return the keys of missing compression parents.
1441
1467
if key in self._unadded_refs:
1442
1468
# Flush batch, then yield unadded ref from
1443
1469
# self._compressor.
1444
for _ in batcher.yield_factories(full_flush=True):
1470
for factory in batcher.yield_factories(full_flush=True):
1446
1472
bytes, sha1 = self._compressor.extract(key)
1447
1473
parents = self._unadded_refs[key]
1448
1474
yield FulltextContentFactory(key, parents, sha1, bytes)
1450
batcher.add_key(key)
1451
if batcher.total_bytes > BATCH_SIZE:
1476
if batcher.add_key(key) > BATCH_SIZE:
1452
1477
# Ok, this batch is big enough. Yield some results.
1453
for _ in batcher.yield_factories():
1478
for factory in batcher.yield_factories():
1456
for _ in batcher.yield_factories(full_flush=True):
1481
for factory in batcher.yield_factories(full_flush=True):
1458
1483
for record in source.get_record_stream(keys, ordering,
1459
1484
include_delta_closure):
1461
for _ in batcher.yield_factories(full_flush=True):
1486
for factory in batcher.yield_factories(full_flush=True):
1464
1489
def get_sha1s(self, keys):
1465
1490
"""See VersionedFiles.get_sha1s()."""