177
200
value = intern(value)
178
201
info_dict[key] = value
203
def _ensure_content(self, num_bytes=None):
204
"""Make sure that content has been expanded enough.
206
:param num_bytes: Ensure that we have extracted at least num_bytes of
207
content. If None, consume everything
209
# TODO: If we re-use the same content block at different times during
210
# get_record_stream(), it is possible that the first pass will
211
# get inserted, triggering an extract/_ensure_content() which
212
# will get rid of _z_content. And then the next use of the block
213
# will try to access _z_content (to send it over the wire), and
214
# fail because it is already extracted. Consider never releasing
215
# _z_content because of this.
216
if num_bytes is None:
217
num_bytes = self._content_length
218
if self._content_length is not None:
219
assert num_bytes <= self._content_length
220
if self._content is None:
221
assert self._z_content is not None
222
if self._z_content == '':
224
elif self._compressor_name == 'lzma':
225
# We don't do partial lzma decomp yet
226
self._content = pylma.decompress(self._z_content)
228
# Start a zlib decompressor
229
assert self._compressor_name == 'zlib'
230
if num_bytes is None:
231
self._content = zlib.decompress(self._z_content)
233
self._z_content_decompressor = zlib.decompressobj()
234
# Seed the decompressor with the uncompressed bytes, so
235
# that the rest of the code is simplified
236
self._content = self._z_content_decompressor.decompress(
237
self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
238
# Any bytes remaining to be decompressed will be in the
239
# decompressors 'unconsumed_tail'
240
# Do we have enough bytes already?
241
if num_bytes is not None and len(self._content) >= num_bytes:
243
if num_bytes is None and self._z_content_decompressor is None:
244
# We must have already decompressed everything
246
# If we got this far, and don't have a decompressor, something is wrong
247
assert self._z_content_decompressor is not None
248
remaining_decomp = self._z_content_decompressor.unconsumed_tail
249
if num_bytes is None:
251
# We don't know how much is left, but we'll decompress it all
252
self._content += self._z_content_decompressor.decompress(
254
# Note: There what I consider a bug in zlib.decompressobj
255
# If you pass back in the entire unconsumed_tail, only
256
# this time you don't pass a max-size, it doesn't
257
# change the unconsumed_tail back to None/''.
258
# However, we know we are done with the whole stream
259
self._z_content_decompressor = None
260
self._content_length = len(self._content)
262
# If we have nothing left to decomp, we ran out of decomp bytes
263
assert remaining_decomp
264
needed_bytes = num_bytes - len(self._content)
265
# We always set max_size to 32kB over the minimum needed, so that
266
# zlib will give us as much as we really want.
267
# TODO: If this isn't good enough, we could make a loop here,
268
# that keeps expanding the request until we get enough
269
self._content += self._z_content_decompressor.decompress(
270
remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
271
assert len(self._content) >= num_bytes
272
if not self._z_content_decompressor.unconsumed_tail:
273
# The stream is finished
274
self._z_content_decompressor = None
276
def _parse_bytes(self, bytes):
277
"""Read the various lengths from the header.
279
This also populates the various 'compressed' buffers.
281
:return: The position in bytes just after the last newline
283
# At present, there are 4 lengths to be read, we have 2 integers for
284
# the length of the compressed and uncompressed header, and 2 integers
285
# for the compressed and uncompressed content
286
# 14 bytes can represent > 1TB, so to avoid checking too far, cap the
287
# search to 14 bytes.
288
pos = bytes.index('\n', 6, 20)
289
self._z_header_length = int(bytes[6:pos])
291
pos2 = bytes.index('\n', pos, pos + 14)
292
self._header_length = int(bytes[pos:pos2])
293
end_of_z_lengths = pos2
295
# Older versions don't have the content lengths, if we want to preserve
296
# backwards compatibility, we could try/except over these, and allow
299
pos = bytes.index('\n', pos2, pos2 + 14)
300
self._z_content_length = int(bytes[pos2:pos])
302
pos2 = bytes.index('\n', pos, pos + 14)
303
self._content_length = int(bytes[pos:pos2])
305
assert len(bytes) == (pos + self._z_header_length +
306
self._z_content_length)
307
pos2 = pos + self._z_header_length
308
self._z_header = bytes[pos:pos2]
309
self._z_content = bytes[pos2:]
310
assert len(self._z_content) == self._z_content_length
312
# This is the older form, which did not encode its content length
313
pos = end_of_z_lengths + 1
314
pos2 = pos + self._z_header_length
315
self._z_header = bytes[pos:pos2]
316
self._z_content = bytes[pos2:]
317
self._z_content_length = len(self._z_content)
181
320
def from_bytes(cls, bytes):
183
322
if bytes[:6] not in (cls.GCB_HEADER, cls.GCB_LZ_HEADER):
184
323
raise ValueError('bytes did not start with %r' % (cls.GCB_HEADER,))
185
324
if bytes[4] == 'z':
186
decomp = zlib.decompress
325
out._compressor_name = 'zlib'
187
326
elif bytes[4] == 'l':
188
decomp = pylzma.decompress
327
out._compressor_name = 'lzma'
190
329
raise ValueError('unknown compressor: %r' % (bytes,))
191
pos = bytes.index('\n', 6)
192
z_header_length = int(bytes[6:pos])
194
pos2 = bytes.index('\n', pos)
195
header_length = int(bytes[pos:pos2])
196
if z_header_length == 0:
197
if header_length != 0:
198
raise ValueError('z_header_length 0, but header length != 0')
199
zcontent = bytes[pos2+1:]
201
out._content = decomp(zcontent)
202
out._size = len(out._content)
205
pos2 = pos + z_header_length
206
z_header_bytes = bytes[pos:pos2]
207
if len(z_header_bytes) != z_header_length:
208
raise ValueError('Wrong length of compressed header. %s != %s'
209
% (len(z_header_bytes), z_header_length))
210
header_bytes = decomp(z_header_bytes)
211
if len(header_bytes) != header_length:
212
raise ValueError('Wrong length of header. %s != %s'
213
% (len(header_bytes), header_length))
215
out._parse_header_bytes(header_bytes)
217
zcontent = bytes[pos2:]
219
out._content = decomp(zcontent)
220
out._size = header_length + len(out._content)
330
out._parse_bytes(bytes)
223
def extract(self, key, index_memo, sha1=None):
335
def extract(self, key, start, end, sha1=None):
224
336
"""Extract the text for a specific key.
226
338
:param key: The label used for this content
227
339
:param sha1: TODO (should we validate only when sha1 is supplied?)
228
340
:return: The bytes for the content
230
if _NO_LABELS or not self._entries:
231
start, end = index_memo[3:5]
232
# The bytes are 'f' or 'd' for the type, then a variable-length
233
# base128 integer for the content size, then the actual content
234
# We know that the variable-length integer won't be longer than 10
235
# bytes (it only takes 5 bytes to encode 2^32)
236
c = self._content[start]
241
raise ValueError('Unknown content control code: %s'
244
entry = GroupCompressBlockEntry(key, type, sha1=None,
245
start=start, length=end-start)
247
entry = self._entries[key]
248
c = self._content[entry.start]
249
if entry.type == 'fulltext':
251
raise ValueError('Label claimed fulltext, byte claims: %s'
253
elif entry.type == 'delta':
255
raise ValueError('Label claimed delta, byte claims: %s'
342
# Make sure we have enough bytes for this record
343
# TODO: if we didn't want to track the end of this entry, we could
344
# _ensure_content(start+enough_bytes_for_type_and_length), and
345
# then decode the entry length, and
346
# _ensure_content(start+1+length)
347
# It is 2 calls to _ensure_content(), but we always buffer a bit
348
# extra anyway, and it means 1 less offset stored in the index,
349
# and transmitted over the wire
351
# it takes 5 bytes to encode 2^32, so we need 1 byte to hold the
352
# 'f' or 'd' declaration, and then 5 more for the record length.
353
self._ensure_content(start + 6)
355
self._ensure_content(end)
356
# The bytes are 'f' or 'd' for the type, then a variable-length
357
# base128 integer for the content size, then the actual content
358
# We know that the variable-length integer won't be longer than 5
359
# bytes (it takes 5 bytes to encode 2^32)
360
c = self._content[start]
365
raise ValueError('Unknown content control code: %s'
258
368
content_len, len_len = decode_base128_int(
259
self._content[entry.start + 1:entry.start + 11])
260
content_start = entry.start + 1 + len_len
261
end = entry.start + entry.length
369
self._content[start + 1:start + 6])
370
content_start = start + 1 + len_len
372
end = content_start + content_len
373
self._ensure_content(end)
375
if end != content_start + content_len:
376
raise ValueError('end != len according to field header'
377
' %s != %s' % (end, content_start + content_len))
378
entry = GroupCompressBlockEntry(key, type, sha1=None,
379
start=start, length=end-start)
262
380
content = self._content[content_start:end]
317
442
# label in the header is duplicated in the text.
318
443
# For chk pages and real bytes, I would guess this is not
320
z_len = sum(map(len, z_bytes))
326
z_bytes.append(compress(content))
449
if self._z_content is not None:
450
content_len = self._content_length
451
z_content_len = self._z_content_length
452
z_content_bytes = self._z_content
454
assert self._content is not None
455
content_len = self._content_length
456
z_content_bytes = compress(self._content)
457
self._z_content = z_content_bytes
458
z_content_len = len(z_content_bytes)
459
self._z_content_length = z_content_len
328
461
header = self.GCB_LZ_HEADER
330
463
header = self.GCB_HEADER
331
464
chunks = [header,
333
'%d\n' % (info_len,),
465
'%d\n%d\n%d\n%d\n' % (z_header_len, info_len,
466
z_content_len, content_len)
336
chunks.extend(z_bytes)
468
chunks.append(z_header_bytes)
469
chunks.append(z_content_bytes)
337
470
return ''.join(chunks)
473
class _LazyGroupCompressFactory(object):
474
"""Yield content from a GroupCompressBlock on demand."""
476
def __init__(self, key, parents, manager, start, end, first):
477
"""Create a _LazyGroupCompressFactory
479
:param key: The key of just this record
480
:param parents: The parents of this key (possibly None)
481
:param gc_block: A GroupCompressBlock object
482
:param start: Offset of the first byte for this record in the
484
:param end: Offset of the byte just after the end of this record
485
(ie, bytes = content[start:end])
486
:param first: Is this the first Factory for the given block?
489
self.parents = parents
491
# Note: This attribute coupled with Manager._factories creates a
492
# reference cycle. Perhaps we would rather use a weakref(), or
493
# find an appropriate time to release the ref. After the first
494
# get_bytes_as call? After Manager.get_record_stream() returns
496
self._manager = manager
497
self.storage_kind = 'groupcompress-block'
499
self.storage_kind = 'groupcompress-block-ref'
505
return '%s(%s, first=%s)' % (self.__class__.__name__,
506
self.key, self._first)
508
def get_bytes_as(self, storage_kind):
509
if storage_kind == self.storage_kind:
511
# wire bytes, something...
512
return self._manager._wire_bytes()
515
if storage_kind in ('fulltext', 'chunked'):
516
self._manager._prepare_for_extract()
517
block = self._manager._block
518
_, bytes = block.extract(self.key, self._start, self._end)
519
if storage_kind == 'fulltext':
523
raise errors.UnavailableRepresentation(self.key, storage_kind,
527
class _LazyGroupContentManager(object):
528
"""This manages a group of _LazyGroupCompressFactory objects."""
530
def __init__(self, block):
532
# We need to preserve the ordering
536
def add_factory(self, key, parents, start, end):
537
if not self._factories:
541
# Note that this creates a reference cycle....
542
factory = _LazyGroupCompressFactory(key, parents, self,
543
start, end, first=first)
544
self._last_byte = max(end, self._last_byte)
545
self._factories.append(factory)
547
def get_record_stream(self):
548
"""Get a record for all keys added so far."""
549
for factory in self._factories:
551
# TODO: Consider setting self._factories = None after the above loop,
552
# as it will break the reference cycle
554
def _trim_block(self, last_byte):
555
"""Create a new GroupCompressBlock, with just some of the content."""
556
# None of the factories need to be adjusted, because the content is
557
# located in an identical place. Just that some of the unreferenced
558
# trailing bytes are stripped
559
trace.mutter('stripping trailing bytes from groupcompress block'
560
' %d => %d', self._block._content_length, last_byte)
561
new_block = GroupCompressBlock()
562
self._block._ensure_content(last_byte)
563
new_block.set_content(self._block._content[:last_byte])
564
self._block = new_block
566
def _rebuild_block(self):
567
"""Create a new GroupCompressBlock with only the referenced texts."""
568
compressor = GroupCompressor()
570
old_length = self._block._content_length
572
for factory in self._factories:
573
bytes = factory.get_bytes_as('fulltext')
574
(found_sha1, end_point, type,
575
length) = compressor.compress(factory.key, bytes, factory.sha1)
576
# Now update this factory with the new offsets, etc
577
factory.sha1 = found_sha1
578
factory._start = cur_endpoint
579
factory._end = end_point
580
cur_endpoint = end_point
581
self._last_byte = cur_endpoint
582
new_block = compressor.flush()
583
# TODO: Should we check that new_block really *is* smaller than the old
584
# block? It seems hard to come up with a method that it would
585
# expand, since we do full compression again. Perhaps based on a
586
# request that ends up poorly ordered?
587
delta = time.time() - tstart
588
self._block = new_block
589
trace.mutter('creating new compressed block on-the-fly in %.3fs'
590
' %d bytes => %d bytes', delta, old_length,
591
self._block._content_length)
593
def _prepare_for_extract(self):
594
"""A _LazyGroupCompressFactory is about to extract to fulltext."""
595
# We expect that if one child is going to fulltext, all will be. This
596
# helps prevent all of them from extracting a small amount at a time.
597
# Which in itself isn't terribly expensive, but resizing 2MB 32kB at a
598
# time (self._block._content) is a little expensive.
599
self._block._ensure_content(self._last_byte)
601
def _check_rebuild_block(self):
602
"""Check to see if our block should be repacked."""
605
for factory in self._factories:
606
total_bytes_used += factory._end - factory._start
607
last_byte_used = max(last_byte_used, factory._end)
608
# If we are using most of the bytes from the block, we have nothing
609
# else to check (currently more that 1/2)
610
if total_bytes_used * 2 >= self._block._content_length:
612
# Can we just strip off the trailing bytes? If we are going to be
613
# transmitting more than 50% of the front of the content, go ahead
614
if total_bytes_used * 2 > last_byte_used:
615
self._trim_block(last_byte_used)
618
# We are using a small amount of the data, and it isn't just packed
619
# nicely at the front, so rebuild the content.
620
# Note: This would be *nicer* as a strip-data-from-group, rather than
621
# building it up again from scratch
622
# It might be reasonable to consider the fulltext sizes for
623
# different bits when deciding this, too. As you may have a small
624
# fulltext, and a trivial delta, and you are just trading around
625
# for another fulltext. If we do a simple 'prune' you may end up
626
# expanding many deltas into fulltexts, as well.
627
# If we build a cheap enough 'strip', then we could try a strip,
628
# if that expands the content, we then rebuild.
629
self._rebuild_block()
631
def _wire_bytes(self):
632
"""Return a byte stream suitable for transmitting over the wire."""
633
self._check_rebuild_block()
634
# The outer block starts with:
635
# 'groupcompress-block\n'
636
# <length of compressed key info>\n
637
# <length of uncompressed info>\n
638
# <length of gc block>\n
641
lines = ['groupcompress-block\n']
642
# The minimal info we need is the key, the start offset, and the
643
# parents. The length and type are encoded in the record itself.
644
# However, passing in the other bits makes it easier. The list of
645
# keys, and the start offset, the length
647
# 1 line with parents, '' for ()
648
# 1 line for start offset
649
# 1 line for end byte
651
for factory in self._factories:
652
key_bytes = '\x00'.join(factory.key)
653
parents = factory.parents
655
parent_bytes = 'None:'
657
parent_bytes = '\t'.join('\x00'.join(key) for key in parents)
658
record_header = '%s\n%s\n%d\n%d\n' % (
659
key_bytes, parent_bytes, factory._start, factory._end)
660
header_lines.append(record_header)
661
header_bytes = ''.join(header_lines)
663
header_bytes_len = len(header_bytes)
664
z_header_bytes = zlib.compress(header_bytes)
666
z_header_bytes_len = len(z_header_bytes)
667
block_bytes = self._block.to_bytes()
668
lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
670
lines.append(z_header_bytes)
671
lines.append(block_bytes)
672
del z_header_bytes, block_bytes
673
return ''.join(lines)
676
def from_bytes(cls, bytes):
677
# TODO: This does extra string copying, probably better to do it a
679
(storage_kind, z_header_len, header_len,
680
block_len, rest) = bytes.split('\n', 4)
682
if storage_kind != 'groupcompress-block':
683
raise ValueError('Unknown storage kind: %s' % (storage_kind,))
684
z_header_len = int(z_header_len)
685
if len(rest) < z_header_len:
686
raise ValueError('Compressed header len shorter than all bytes')
687
z_header = rest[:z_header_len]
688
header_len = int(header_len)
689
header = zlib.decompress(z_header)
690
if len(header) != header_len:
691
raise ValueError('invalid length for decompressed bytes')
693
block_len = int(block_len)
694
if len(rest) != z_header_len + block_len:
695
raise ValueError('Invalid length for block')
696
block_bytes = rest[z_header_len:]
698
# So now we have a valid GCB, we just need to parse the factories that
700
header_lines = header.split('\n')
702
last = header_lines.pop()
704
raise ValueError('header lines did not end with a trailing'
706
if len(header_lines) % 4 != 0:
707
raise ValueError('The header was not an even multiple of 4 lines')
708
block = GroupCompressBlock.from_bytes(block_bytes)
711
for start in xrange(0, len(header_lines), 4):
713
key = tuple(header_lines[start].split('\x00'))
714
parents_line = header_lines[start+1]
715
if parents_line == 'None:':
718
parents = tuple([tuple(segment.split('\x00'))
719
for segment in parents_line.split('\t')
721
start_offset = int(header_lines[start+2])
722
end_offset = int(header_lines[start+3])
723
result.add_factory(key, parents, start_offset, end_offset)
727
def network_block_to_records(storage_kind, bytes, line_end):
728
if storage_kind != 'groupcompress-block':
729
raise ValueError('Unknown storage kind: %s' % (storage_kind,))
730
manager = _LazyGroupContentManager.from_bytes(bytes)
731
return manager.get_record_stream()
340
734
class GroupCompressor(object):
341
735
"""Produce a serialised group of compressed texts.
899
1297
unadded_keys, source_result)
900
1298
for key in missing:
901
1299
yield AbsentContentFactory(key)
1301
# TODO: This works fairly well at batching up existing groups into a
1302
# streamable format, and possibly allowing for taking one big
1303
# group and splitting it when it isn't fully utilized.
1304
# However, it doesn't allow us to find under-utilized groups and
1305
# combine them into a bigger group on the fly.
1306
# (Consider the issue with how chk_map inserts texts
1307
# one-at-a-time.) This could be done at insert_record_stream()
1308
# time, but it probably would decrease the number of
1309
# bytes-on-the-wire for fetch.
902
1310
for source, keys in source_keys:
903
1311
if source is self:
904
1312
for key in keys:
905
1313
if key in self._unadded_refs:
1314
if manager is not None:
1315
# Yield everything buffered so far
1316
for factory in manager.get_record_stream():
906
1319
bytes, sha1 = self._compressor.extract(key)
907
1320
parents = self._unadded_refs[key]
1321
yield FulltextContentFactory(key, parents, sha1, bytes)
909
1323
index_memo, _, parents, (method, _) = locations[key]
910
1324
block = self._get_block(index_memo)
911
entry, bytes = block.extract(key, index_memo)
913
# TODO: If we don't have labels, then the sha1 here is computed
914
# from the data, so we don't want to re-sha the string.
915
if not _FAST and sha_string(bytes) != sha1:
916
raise AssertionError('sha1 sum did not match')
917
yield FulltextContentFactory(key, parents, sha1, bytes)
1325
start, end = index_memo[3:5]
1327
manager = _LazyGroupContentManager(block)
1328
elif manager._block is not block:
1329
# Flush and create a new manager
1330
for factory in manager.get_record_stream():
1332
manager = _LazyGroupContentManager(block)
1333
manager.add_factory(key, parents, start, end)
1335
if manager is not None:
1336
# Yield everything buffered so far
1337
for factory in manager.get_record_stream():
919
1340
for record in source.get_record_stream(keys, ordering,
920
1341
include_delta_closure):
1343
if manager is not None:
1344
# Yield everything buffered so far
1345
for factory in manager.get_record_stream():
923
1349
def get_sha1s(self, keys):
924
1350
"""See VersionedFiles.get_sha1s()."""