135
138
self._content = ''.join(self._content_chunks)
136
139
self._content_chunks = None
137
140
if self._content is None:
138
# We join self._z_content_chunks here, because if we are
139
# decompressing, then it is *very* likely that we have a single
141
if self._z_content_chunks is None:
141
if self._z_content is None:
142
142
raise AssertionError('No content to decompress')
143
z_content = ''.join(self._z_content_chunks)
143
if self._z_content == '':
145
144
self._content = ''
146
145
elif self._compressor_name == 'lzma':
147
146
# We don't do partial lzma decomp yet
148
self._content = pylzma.decompress(z_content)
147
self._content = pylzma.decompress(self._z_content)
149
148
elif self._compressor_name == 'zlib':
150
149
# Start a zlib decompressor
151
if num_bytes * 4 > self._content_length * 3:
152
# If we are requesting more that 3/4ths of the content,
153
# just extract the whole thing in a single pass
154
num_bytes = self._content_length
155
self._content = zlib.decompress(z_content)
150
if num_bytes is None:
151
self._content = zlib.decompress(self._z_content)
157
153
self._z_content_decompressor = zlib.decompressobj()
158
154
# Seed the decompressor with the uncompressed bytes, so
159
155
# that the rest of the code is simplified
160
156
self._content = self._z_content_decompressor.decompress(
161
z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
162
if not self._z_content_decompressor.unconsumed_tail:
163
self._z_content_decompressor = None
157
self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
165
159
raise AssertionError('Unknown compressor: %r'
166
160
% self._compressor_name)
168
162
# 'unconsumed_tail'
170
164
# Do we have enough bytes already?
171
if len(self._content) >= num_bytes:
165
if num_bytes is not None and len(self._content) >= num_bytes:
167
if num_bytes is None and self._z_content_decompressor is None:
168
# We must have already decompressed everything
173
170
# If we got this far, and don't have a decompressor, something is wrong
174
171
if self._z_content_decompressor is None:
175
172
raise AssertionError(
176
173
'No decompressor to decompress %d bytes' % num_bytes)
177
174
remaining_decomp = self._z_content_decompressor.unconsumed_tail
178
if not remaining_decomp:
179
raise AssertionError('Nothing left to decompress')
180
needed_bytes = num_bytes - len(self._content)
181
# We always set max_size to 32kB over the minimum needed, so that
182
# zlib will give us as much as we really want.
183
# TODO: If this isn't good enough, we could make a loop here,
184
# that keeps expanding the request until we get enough
185
self._content += self._z_content_decompressor.decompress(
186
remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
187
if len(self._content) < num_bytes:
188
raise AssertionError('%d bytes wanted, only %d available'
189
% (num_bytes, len(self._content)))
190
if not self._z_content_decompressor.unconsumed_tail:
191
# The stream is finished
192
self._z_content_decompressor = None
175
if num_bytes is None:
177
# We don't know how much is left, but we'll decompress it all
178
self._content += self._z_content_decompressor.decompress(
180
# Note: There's what I consider a bug in zlib.decompressobj
181
# If you pass back in the entire unconsumed_tail, only
182
# this time you don't pass a max-size, it doesn't
183
# change the unconsumed_tail back to None/''.
184
# However, we know we are done with the whole stream
185
self._z_content_decompressor = None
186
# XXX: Why is this the only place in this routine we set this?
187
self._content_length = len(self._content)
189
if not remaining_decomp:
190
raise AssertionError('Nothing left to decompress')
191
needed_bytes = num_bytes - len(self._content)
192
# We always set max_size to 32kB over the minimum needed, so that
193
# zlib will give us as much as we really want.
194
# TODO: If this isn't good enough, we could make a loop here,
195
# that keeps expanding the request until we get enough
196
self._content += self._z_content_decompressor.decompress(
197
remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
198
if len(self._content) < num_bytes:
199
raise AssertionError('%d bytes wanted, only %d available'
200
% (num_bytes, len(self._content)))
201
if not self._z_content_decompressor.unconsumed_tail:
202
# The stream is finished
203
self._z_content_decompressor = None
194
205
def _parse_bytes(self, bytes, pos):
195
206
"""Read the various lengths from the header.
297
298
self._content_chunks = None
298
299
if self._content is None:
299
300
raise AssertionError('Nothing to compress')
300
z_content = pylzma.compress(self._content)
301
self._z_content_chunks = (z_content,)
302
self._z_content_length = len(z_content)
301
self._z_content = pylzma.compress(self._content)
302
self._z_content_length = len(self._z_content)
304
def _create_z_content_from_chunks(self, chunks):
304
def _create_z_content_from_chunks(self):
305
305
compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
306
# Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
307
# (measured peak is maybe 30MB over the above...)
308
compressed_chunks = map(compressor.compress, chunks)
306
compressed_chunks = map(compressor.compress, self._content_chunks)
309
307
compressed_chunks.append(compressor.flush())
310
# Ignore empty chunks
311
self._z_content_chunks = [c for c in compressed_chunks if c]
312
self._z_content_length = sum(map(len, self._z_content_chunks))
308
self._z_content = ''.join(compressed_chunks)
309
self._z_content_length = len(self._z_content)
314
311
def _create_z_content(self):
315
if self._z_content_chunks is not None:
312
if self._z_content is not None:
318
315
self._create_z_content_using_lzma()
320
317
if self._content_chunks is not None:
321
chunks = self._content_chunks
323
chunks = (self._content,)
324
self._create_z_content_from_chunks(chunks)
318
self._create_z_content_from_chunks()
320
self._z_content = zlib.compress(self._content)
321
self._z_content_length = len(self._z_content)
327
"""Create the byte stream as a series of 'chunks'"""
324
"""Encode the information into a byte stream."""
328
325
self._create_z_content()
330
327
header = self.GCB_LZ_HEADER
332
329
header = self.GCB_HEADER
333
chunks = ['%s%d\n%d\n'
334
% (header, self._z_content_length, self._content_length),
331
'%d\n%d\n' % (self._z_content_length, self._content_length),
336
chunks.extend(self._z_content_chunks)
337
total_len = sum(map(len, chunks))
338
return total_len, chunks
341
"""Encode the information into a byte stream."""
342
total_len, chunks = self.to_chunks()
343
334
return ''.join(chunks)
345
336
def _dump(self, include_text=False):
561
545
# time (self._block._content) is a little expensive.
562
546
self._block._ensure_content(self._last_byte)
564
def _check_rebuild_action(self):
548
def _check_rebuild_block(self):
565
549
"""Check to see if our block should be repacked."""
566
550
total_bytes_used = 0
567
551
last_byte_used = 0
568
552
for factory in self._factories:
569
553
total_bytes_used += factory._end - factory._start
570
if last_byte_used < factory._end:
571
last_byte_used = factory._end
572
# If we are using more than half of the bytes from the block, we have
573
# nothing else to check
554
last_byte_used = max(last_byte_used, factory._end)
555
# If we are using most of the bytes from the block, we have nothing
556
# else to check (currently more that 1/2)
574
557
if total_bytes_used * 2 >= self._block._content_length:
575
return None, last_byte_used, total_bytes_used
576
# We are using less than 50% of the content. Is the content we are
577
# using at the beginning of the block? If so, we can just trim the
578
# tail, rather than rebuilding from scratch.
559
# Can we just strip off the trailing bytes? If we are going to be
560
# transmitting more than 50% of the front of the content, go ahead
579
561
if total_bytes_used * 2 > last_byte_used:
580
return 'trim', last_byte_used, total_bytes_used
562
self._trim_block(last_byte_used)
582
565
# We are using a small amount of the data, and it isn't just packed
583
566
# nicely at the front, so rebuild the content.
590
573
# expanding many deltas into fulltexts, as well.
591
574
# If we build a cheap enough 'strip', then we could try a strip,
592
575
# if that expands the content, we then rebuild.
593
return 'rebuild', last_byte_used, total_bytes_used
595
def check_is_well_utilized(self):
596
"""Is the current block considered 'well utilized'?
598
This heuristic asks if the current block considers itself to be a fully
599
developed group, rather than just a loose collection of data.
601
if len(self._factories) == 1:
602
# A block of length 1 could be improved by combining with other
603
# groups - don't look deeper. Even larger than max size groups
604
# could compress well with adjacent versions of the same thing.
606
action, last_byte_used, total_bytes_used = self._check_rebuild_action()
607
block_size = self._block._content_length
608
if total_bytes_used < block_size * self._max_cut_fraction:
609
# This block wants to trim itself small enough that we want to
610
# consider it under-utilized.
612
# TODO: This code is meant to be the twin of _insert_record_stream's
613
# 'start_new_block' logic. It would probably be better to factor
614
# out that logic into a shared location, so that it stays
616
# We currently assume a block is properly utilized whenever it is >75%
617
# of the size of a 'full' block. In normal operation, a block is
618
# considered full when it hits 4MB of same-file content. So any block
619
# >3MB is 'full enough'.
620
# The only time this isn't true is when a given block has large-object
621
# content. (a single file >4MB, etc.)
622
# Under these circumstances, we allow a block to grow to
623
# 2 x largest_content. Which means that if a given block had a large
624
# object, it may actually be under-utilized. However, given that this
625
# is 'pack-on-the-fly' it is probably reasonable to not repack large
626
# content blobs on-the-fly. Note that because we return False for all
627
# 1-item blobs, we will repack them; we may wish to reevaluate our
628
# treatment of large object blobs in the future.
629
if block_size >= self._full_enough_block_size:
631
# If a block is <3MB, it still may be considered 'full' if it contains
632
# mixed content. The current rule is 2MB of mixed content is considered
633
# full. So check to see if this block contains mixed content, and
634
# set the threshold appropriately.
636
for factory in self._factories:
637
prefix = factory.key[:-1]
638
if common_prefix is None:
639
common_prefix = prefix
640
elif prefix != common_prefix:
641
# Mixed content, check the size appropriately
642
if block_size >= self._full_enough_mixed_block_size:
645
# The content failed both the mixed check and the single-content check
646
# so obviously it is not fully utilized
647
# TODO: there is one other constraint that isn't being checked
648
# namely, that the entries in the block are in the appropriate
649
# order. For example, you could insert the entries in exactly
650
# reverse groupcompress order, and we would think that is ok.
651
# (all the right objects are in one group, and it is fully
652
# utilized, etc.) For now, we assume that case is rare,
653
# especially since we should always fetch in 'groupcompress'
657
def _check_rebuild_block(self):
658
action, last_byte_used, total_bytes_used = self._check_rebuild_action()
662
self._trim_block(last_byte_used)
663
elif action == 'rebuild':
664
self._rebuild_block()
666
raise ValueError('unknown rebuild action: %r' % (action,))
576
self._rebuild_block()
668
578
def _wire_bytes(self):
669
579
"""Return a byte stream suitable for transmitting over the wire."""
703
613
z_header_bytes = zlib.compress(header_bytes)
705
615
z_header_bytes_len = len(z_header_bytes)
706
block_bytes_len, block_chunks = self._block.to_chunks()
616
block_bytes = self._block.to_bytes()
707
617
lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
709
619
lines.append(z_header_bytes)
710
lines.extend(block_chunks)
711
del z_header_bytes, block_chunks
712
# TODO: This is a point where we will double the memory consumption. To
713
# avoid this, we probably have to switch to a 'chunked' api
620
lines.append(block_bytes)
621
del z_header_bytes, block_bytes
714
622
return ''.join(lines)
717
625
def from_bytes(cls, bytes):
718
626
# TODO: This does extra string copying, probably better to do it a
719
# different way. At a minimum this creates 2 copies of the
721
628
(storage_kind, z_header_len, header_len,
722
629
block_len, rest) = bytes.split('\n', 4)
1060
975
versioned_files.stream.close()
1063
class _BatchingBlockFetcher(object):
1064
"""Fetch group compress blocks in batches.
1066
:ivar total_bytes: int of expected number of bytes needed to fetch the
1067
currently pending batch.
1070
def __init__(self, gcvf, locations):
1072
self.locations = locations
1074
self.batch_memos = {}
1075
self.memos_to_get = []
1076
self.total_bytes = 0
1077
self.last_read_memo = None
1080
def add_key(self, key):
1081
"""Add another to key to fetch.
1083
:return: The estimated number of bytes needed to fetch the batch so
1086
self.keys.append(key)
1087
index_memo, _, _, _ = self.locations[key]
1088
read_memo = index_memo[0:3]
1089
# Three possibilities for this read_memo:
1090
# - it's already part of this batch; or
1091
# - it's not yet part of this batch, but is already cached; or
1092
# - it's not yet part of this batch and will need to be fetched.
1093
if read_memo in self.batch_memos:
1094
# This read memo is already in this batch.
1095
return self.total_bytes
1097
cached_block = self.gcvf._group_cache[read_memo]
1099
# This read memo is new to this batch, and the data isn't cached
1101
self.batch_memos[read_memo] = None
1102
self.memos_to_get.append(read_memo)
1103
byte_length = read_memo[2]
1104
self.total_bytes += byte_length
1106
# This read memo is new to this batch, but cached.
1107
# Keep a reference to the cached block in batch_memos because it's
1108
# certain that we'll use it when this batch is processed, but
1109
# there's a risk that it would fall out of _group_cache between now
1111
self.batch_memos[read_memo] = cached_block
1112
return self.total_bytes
1114
def _flush_manager(self):
1115
if self.manager is not None:
1116
for factory in self.manager.get_record_stream():
1119
self.last_read_memo = None
1121
def yield_factories(self, full_flush=False):
1122
"""Yield factories for keys added since the last yield. They will be
1123
returned in the order they were added via add_key.
1125
:param full_flush: by default, some results may not be returned in case
1126
they can be part of the next batch. If full_flush is True, then
1127
all results are returned.
1129
if self.manager is None and not self.keys:
1131
# Fetch all memos in this batch.
1132
blocks = self.gcvf._get_blocks(self.memos_to_get)
1133
# Turn blocks into factories and yield them.
1134
memos_to_get_stack = list(self.memos_to_get)
1135
memos_to_get_stack.reverse()
1136
for key in self.keys:
1137
index_memo, _, parents, _ = self.locations[key]
1138
read_memo = index_memo[:3]
1139
if self.last_read_memo != read_memo:
1140
# We are starting a new block. If we have a
1141
# manager, we have found everything that fits for
1142
# now, so yield records
1143
for factory in self._flush_manager():
1145
# Now start a new manager.
1146
if memos_to_get_stack and memos_to_get_stack[-1] == read_memo:
1147
# The next block from _get_blocks will be the block we
1149
block_read_memo, block = blocks.next()
1150
if block_read_memo != read_memo:
1151
raise AssertionError(
1152
"block_read_memo out of sync with read_memo"
1153
"(%r != %r)" % (block_read_memo, read_memo))
1154
self.batch_memos[read_memo] = block
1155
memos_to_get_stack.pop()
1157
block = self.batch_memos[read_memo]
1158
self.manager = _LazyGroupContentManager(block)
1159
self.last_read_memo = read_memo
1160
start, end = index_memo[3:5]
1161
self.manager.add_factory(key, parents, start, end)
1163
for factory in self._flush_manager():
1166
self.batch_memos.clear()
1167
del self.memos_to_get[:]
1168
self.total_bytes = 0
1171
978
class GroupCompressVersionedFiles(VersionedFiles):
1172
979
"""A group-compress based VersionedFiles implementation."""
1174
def __init__(self, index, access, delta=True, _unadded_refs=None):
981
def __init__(self, index, access, delta=True):
1175
982
"""Create a GroupCompressVersionedFiles object.
1177
984
:param index: The index object storing access and graph data.
1178
985
:param access: The access object storing raw data.
1179
986
:param delta: Whether to delta compress or just entropy compress.
1180
:param _unadded_refs: private parameter, don't use.
1182
988
self._index = index
1183
989
self._access = access
1184
990
self._delta = delta
1185
if _unadded_refs is None:
1187
self._unadded_refs = _unadded_refs
991
self._unadded_refs = {}
1188
992
self._group_cache = LRUSizeCache(max_size=50*1024*1024)
1189
993
self._fallback_vfs = []
1191
def without_fallbacks(self):
1192
"""Return a clone of this object without any fallbacks configured."""
1193
return GroupCompressVersionedFiles(self._index, self._access,
1194
self._delta, _unadded_refs=dict(self._unadded_refs))
1196
995
def add_lines(self, key, parents, lines, parent_texts=None,
1197
996
left_matching_blocks=None, nostore_sha=None, random_id=False,
1198
997
check_content=True):
1276
1075
def get_annotator(self):
1277
1076
return annotate.Annotator(self)
1279
def check(self, progress_bar=None, keys=None):
1078
def check(self, progress_bar=None):
1280
1079
"""See VersionedFiles.check()."""
1283
for record in self.get_record_stream(keys, 'unordered', True):
1284
record.get_bytes_as('fulltext')
1286
return self.get_record_stream(keys, 'unordered', True)
1288
def clear_cache(self):
1289
"""See VersionedFiles.clear_cache()"""
1290
self._group_cache.clear()
1291
self._index._graph_index.clear_cache()
1292
self._index._int_cache.clear()
1081
for record in self.get_record_stream(keys, 'unordered', True):
1082
record.get_bytes_as('fulltext')
1294
1084
def _check_add(self, key, lines, random_id, check_content):
1295
1085
"""check that version_id and lines are safe to add."""
1354
1128
missing.difference_update(set(new_result))
1355
1129
return result, source_results
1357
def _get_blocks(self, read_memos):
1358
"""Get GroupCompressBlocks for the given read_memos.
1360
:returns: a series of (read_memo, block) pairs, in the order they were
1364
for read_memo in read_memos:
1366
block = self._group_cache[read_memo]
1370
cached[read_memo] = block
1372
not_cached_seen = set()
1373
for read_memo in read_memos:
1374
if read_memo in cached:
1375
# Don't fetch what we already have
1377
if read_memo in not_cached_seen:
1378
# Don't try to fetch the same data twice
1380
not_cached.append(read_memo)
1381
not_cached_seen.add(read_memo)
1382
raw_records = self._access.get_raw_records(not_cached)
1383
for read_memo in read_memos:
1385
yield read_memo, cached[read_memo]
1387
# Read the block, and cache it.
1388
zdata = raw_records.next()
1389
block = GroupCompressBlock.from_bytes(zdata)
1390
self._group_cache[read_memo] = block
1391
cached[read_memo] = block
1392
yield read_memo, block
1131
def _get_block(self, index_memo):
1132
read_memo = index_memo[0:3]
1135
block = self._group_cache[read_memo]
1138
zdata = self._access.get_raw_records([read_memo]).next()
1139
# decompress - whole thing - this is not a bug, as it
1140
# permits caching. We might want to store the partially
1141
# decompresed group and decompress object, so that recent
1142
# texts are not penalised by big groups.
1143
block = GroupCompressBlock.from_bytes(zdata)
1144
self._group_cache[read_memo] = block
1146
# print len(zdata), len(plain)
1147
# parse - requires split_lines, better to have byte offsets
1148
# here (but not by much - we only split the region for the
1149
# recipe, and we often want to end up with lines anyway.
1394
1152
def get_missing_compression_parent_keys(self):
1395
1153
"""Return the keys of missing compression parents.
1561
1319
unadded_keys, source_result)
1562
1320
for key in missing:
1563
1321
yield AbsentContentFactory(key)
1564
# Batch up as many keys as we can until either:
1565
# - we encounter an unadded ref, or
1566
# - we run out of keys, or
1567
# - the total bytes to retrieve for this batch > BATCH_SIZE
1568
batcher = _BatchingBlockFetcher(self, locations)
1323
last_read_memo = None
1324
# TODO: This works fairly well at batching up existing groups into a
1325
# streamable format, and possibly allowing for taking one big
1326
# group and splitting it when it isn't fully utilized.
1327
# However, it doesn't allow us to find under-utilized groups and
1328
# combine them into a bigger group on the fly.
1329
# (Consider the issue with how chk_map inserts texts
1330
# one-at-a-time.) This could be done at insert_record_stream()
1331
# time, but it probably would decrease the number of
1332
# bytes-on-the-wire for fetch.
1569
1333
for source, keys in source_keys:
1570
1334
if source is self:
1571
1335
for key in keys:
1572
1336
if key in self._unadded_refs:
1573
# Flush batch, then yield unadded ref from
1575
for factory in batcher.yield_factories(full_flush=True):
1337
if manager is not None:
1338
for factory in manager.get_record_stream():
1340
last_read_memo = manager = None
1577
1341
bytes, sha1 = self._compressor.extract(key)
1578
1342
parents = self._unadded_refs[key]
1579
1343
yield FulltextContentFactory(key, parents, sha1, bytes)
1581
if batcher.add_key(key) > BATCH_SIZE:
1582
# Ok, this batch is big enough. Yield some results.
1583
for factory in batcher.yield_factories():
1345
index_memo, _, parents, (method, _) = locations[key]
1346
read_memo = index_memo[0:3]
1347
if last_read_memo != read_memo:
1348
# We are starting a new block. If we have a
1349
# manager, we have found everything that fits for
1350
# now, so yield records
1351
if manager is not None:
1352
for factory in manager.get_record_stream():
1354
# Now start a new manager
1355
block = self._get_block(index_memo)
1356
manager = _LazyGroupContentManager(block)
1357
last_read_memo = read_memo
1358
start, end = index_memo[3:5]
1359
manager.add_factory(key, parents, start, end)
1586
for factory in batcher.yield_factories(full_flush=True):
1361
if manager is not None:
1362
for factory in manager.get_record_stream():
1364
last_read_memo = manager = None
1588
1365
for record in source.get_record_stream(keys, ordering,
1589
1366
include_delta_closure):
1591
for factory in batcher.yield_factories(full_flush=True):
1368
if manager is not None:
1369
for factory in manager.get_record_stream():
1594
1372
def get_sha1s(self, keys):
1595
1373
"""See VersionedFiles.get_sha1s()."""
1649
1427
self._unadded_refs = {}
1650
1428
keys_to_add = []
1652
bytes_len, chunks = self._compressor.flush().to_chunks()
1653
self._compressor = GroupCompressor()
1654
# Note: At this point we still have 1 copy of the fulltext (in
1655
# record and the var 'bytes'), and this generates 2 copies of
1656
# the compressed text (one for bytes, one in chunks)
1657
# TODO: Push 'chunks' down into the _access api, so that we don't
1658
# have to double compressed memory here
1659
# TODO: Figure out how to indicate that we would be happy to free
1660
# the fulltext content at this point. Note that sometimes we
1661
# will want it later (streaming CHK pages), but most of the
1662
# time we won't (everything else)
1663
bytes = ''.join(chunks)
1430
bytes = self._compressor.flush().to_bytes()
1665
1431
index, start, length = self._access.add_raw_records(
1666
1432
[(None, len(bytes))], bytes)[0]
1693
1459
if reuse_blocks:
1694
1460
# If the reuse_blocks flag is set, check to see if we can just
1695
1461
# copy a groupcompress block as-is.
1696
# We only check on the first record (groupcompress-block) not
1697
# on all of the (groupcompress-block-ref) entries.
1698
# The reuse_this_block flag is then kept for as long as
1699
if record.storage_kind == 'groupcompress-block':
1700
# Check to see if we really want to re-use this block
1701
insert_manager = record._manager
1702
reuse_this_block = insert_manager.check_is_well_utilized()
1704
reuse_this_block = False
1705
if reuse_this_block:
1706
# We still want to reuse this block
1707
1462
if record.storage_kind == 'groupcompress-block':
1708
1463
# Insert the raw block into the target repo
1709
1464
insert_manager = record._manager
1465
insert_manager._check_rebuild_block()
1710
1466
bytes = record._manager._block.to_bytes()
1711
1467
_, start, length = self._access.add_raw_records(
1712
1468
[(None, len(bytes))], bytes)[0]
1842
class _GCBuildDetails(object):
1843
"""A blob of data about the build details.
1845
This stores the minimal data, which then allows compatibility with the old
1846
api, without taking as much memory.
1849
__slots__ = ('_index', '_group_start', '_group_end', '_basis_end',
1850
'_delta_end', '_parents')
1853
compression_parent = None
1855
def __init__(self, parents, position_info):
1856
self._parents = parents
1857
(self._index, self._group_start, self._group_end, self._basis_end,
1858
self._delta_end) = position_info
1861
return '%s(%s, %s)' % (self.__class__.__name__,
1862
self.index_memo, self._parents)
1865
def index_memo(self):
1866
return (self._index, self._group_start, self._group_end,
1867
self._basis_end, self._delta_end)
1870
def record_details(self):
1871
return static_tuple.StaticTuple(self.method, None)
1873
def __getitem__(self, offset):
1874
"""Compatibility thunk to act like a tuple."""
1876
return self.index_memo
1878
return self.compression_parent # Always None
1880
return self._parents
1882
return self.record_details
1884
raise IndexError('offset out of range')
1890
1588
class _GCGraphIndex(object):
1891
1589
"""Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
1893
1591
def __init__(self, graph_index, is_locked, parents=True,
1894
1592
add_callback=None, track_external_parent_refs=False,
1895
inconsistency_fatal=True, track_new_keys=False):
1593
inconsistency_fatal=True):
1896
1594
"""Construct a _GCGraphIndex on a graph_index.
1898
1596
:param graph_index: An implementation of bzrlib.index.GraphIndex.
2103
1787
"""Convert an index value to position details."""
2104
1788
bits = node[2].split(' ')
2105
1789
# It would be nice not to read the entire gzip.
2106
# start and stop are put into _int_cache because they are very common.
2107
# They define the 'group' that an entry is in, and many groups can have
2108
# thousands of objects.
2109
# Branching Launchpad, for example, saves ~600k integers, at 12 bytes
2110
# each, or about 7MB. Note that it might be even more when you consider
2111
# how PyInt is allocated in separate slabs. And you can't return a slab
2112
# to the OS if even 1 int on it is in use. Note though that Python uses
2113
# a LIFO when re-using PyInt slots, which might cause more
2115
1790
start = int(bits[0])
2116
start = self._int_cache.setdefault(start, start)
2117
1791
stop = int(bits[1])
2118
stop = self._int_cache.setdefault(stop, stop)
2119
1792
basis_end = int(bits[2])
2120
1793
delta_end = int(bits[3])
2121
# We can't use StaticTuple here, because node[0] is a BTreeGraphIndex
2123
return (node[0], start, stop, basis_end, delta_end)
1794
return node[0], start, stop, basis_end, delta_end
2125
1796
def scan_unvalidated_index(self, graph_index):
2126
1797
"""Inform this _GCGraphIndex that there is an unvalidated index.
2128
1799
This allows this _GCGraphIndex to keep track of any missing
2129
1800
compression parents we may want to have filled in to make those
2130
indices valid. It also allows _GCGraphIndex to track any new keys.
2132
1803
:param graph_index: A GraphIndex
2134
key_dependencies = self._key_dependencies
2135
if key_dependencies is None:
2137
for node in graph_index.iter_all_entries():
2138
# Add parent refs from graph_index (and discard parent refs
2139
# that the graph_index has).
2140
key_dependencies.add_references(node[1], node[3][0])
1805
if self._key_dependencies is not None:
1806
# Add parent refs from graph_index (and discard parent refs that
1807
# the graph_index has).
1808
add_refs = self._key_dependencies.add_references
1809
for node in graph_index.iter_all_entries():
1810
add_refs(node[1], node[3][0])
2143
1814
from bzrlib._groupcompress_py import (