95
139
# Group Compress Block v1 Zlib
96
140
GCB_HEADER = 'gcb1z\n'
97
# Group Compress Block v1 Lzma
98
GCB_LZ_HEADER = 'gcb1l\n'
99
GCB_KNOWN_HEADERS = (GCB_HEADER, GCB_LZ_HEADER)
101
142
def __init__(self):
102
143
# map by key? or just order in file?
103
self._compressor_name = None
104
self._z_content_chunks = None
105
self._z_content_decompressor = None
106
self._z_content_length = None
107
self._content_length = None
108
145
self._content = None
109
self._content_chunks = None
148
def _parse_header(self):
149
"""Parse the meta-info from the stream."""
111
151
def __len__(self):
112
# This is the maximum number of bytes this object will reference if
113
# everything is decompressed. However, if we decompress less than
114
# everything... (this would cause some problems for LRUSizeCache)
115
return self._content_length + self._z_content_length
117
def _ensure_content(self, num_bytes=None):
118
"""Make sure that content has been expanded enough.
120
:param num_bytes: Ensure that we have extracted at least num_bytes of
121
content. If None, consume everything
123
if self._content_length is None:
124
raise AssertionError('self._content_length should never be None')
125
if num_bytes is None:
126
num_bytes = self._content_length
127
elif (self._content_length is not None
128
and num_bytes > self._content_length):
129
raise AssertionError(
130
'requested num_bytes (%d) > content length (%d)'
131
% (num_bytes, self._content_length))
132
# Expand the content if required
133
if self._content is None:
134
if self._content_chunks is not None:
135
self._content = ''.join(self._content_chunks)
136
self._content_chunks = None
137
if self._content is None:
138
# We join self._z_content_chunks here, because if we are
139
# decompressing, then it is *very* likely that we have a single
141
if self._z_content_chunks is None:
142
raise AssertionError('No content to decompress')
143
z_content = ''.join(self._z_content_chunks)
146
elif self._compressor_name == 'lzma':
147
# We don't do partial lzma decomp yet
148
self._content = pylzma.decompress(z_content)
149
elif self._compressor_name == 'zlib':
150
# Start a zlib decompressor
151
if num_bytes * 4 > self._content_length * 3:
152
# If we are requesting more that 3/4ths of the content,
153
# just extract the whole thing in a single pass
154
num_bytes = self._content_length
155
self._content = zlib.decompress(z_content)
157
self._z_content_decompressor = zlib.decompressobj()
158
# Seed the decompressor with the uncompressed bytes, so
159
# that the rest of the code is simplified
160
self._content = self._z_content_decompressor.decompress(
161
z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
162
if not self._z_content_decompressor.unconsumed_tail:
163
self._z_content_decompressor = None
165
raise AssertionError('Unknown compressor: %r'
166
% self._compressor_name)
167
# Any bytes remaining to be decompressed will be in the decompressors
170
# Do we have enough bytes already?
171
if len(self._content) >= num_bytes:
173
# If we got this far, and don't have a decompressor, something is wrong
174
if self._z_content_decompressor is None:
175
raise AssertionError(
176
'No decompressor to decompress %d bytes' % num_bytes)
177
remaining_decomp = self._z_content_decompressor.unconsumed_tail
178
if not remaining_decomp:
179
raise AssertionError('Nothing left to decompress')
180
needed_bytes = num_bytes - len(self._content)
181
# We always set max_size to 32kB over the minimum needed, so that
182
# zlib will give us as much as we really want.
183
# TODO: If this isn't good enough, we could make a loop here,
184
# that keeps expanding the request until we get enough
185
self._content += self._z_content_decompressor.decompress(
186
remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
187
if len(self._content) < num_bytes:
188
raise AssertionError('%d bytes wanted, only %d available'
189
% (num_bytes, len(self._content)))
190
if not self._z_content_decompressor.unconsumed_tail:
191
# The stream is finished
192
self._z_content_decompressor = None
194
def _parse_bytes(self, bytes, pos):
195
"""Read the various lengths from the header.
197
This also populates the various 'compressed' buffers.
199
:return: The position in bytes just after the last newline
201
# At present, we have 2 integers for the compressed and uncompressed
202
# content. In base10 (ascii) 14 bytes can represent > 1TB, so to avoid
203
# checking too far, cap the search to 14 bytes.
204
pos2 = bytes.index('\n', pos, pos + 14)
205
self._z_content_length = int(bytes[pos:pos2])
207
pos2 = bytes.index('\n', pos, pos + 14)
208
self._content_length = int(bytes[pos:pos2])
210
if len(bytes) != (pos + self._z_content_length):
211
# XXX: Define some GCCorrupt error ?
212
raise AssertionError('Invalid bytes: (%d) != %d + %d' %
213
(len(bytes), pos, self._z_content_length))
214
self._z_content_chunks = (bytes[pos:],)
217
def _z_content(self):
218
"""Return z_content_chunks as a simple string.
220
Meant only to be used by the test suite.
222
if self._z_content_chunks is not None:
223
return ''.join(self._z_content_chunks)
227
155
def from_bytes(cls, bytes):
229
if bytes[:6] not in cls.GCB_KNOWN_HEADERS:
230
raise ValueError('bytes did not start with any of %r'
231
% (cls.GCB_KNOWN_HEADERS,))
232
# XXX: why not testing the whole header ?
234
out._compressor_name = 'zlib'
235
elif bytes[4] == 'l':
236
out._compressor_name = 'lzma'
238
raise ValueError('unknown compressor: %r' % (bytes,))
239
out._parse_bytes(bytes, 6)
157
if bytes[:6] != cls.GCB_HEADER:
158
raise gc_errors.InvalidGroupCompressBlock(
159
'bytes did not start with %r' % (cls.GCB_HEADER,))
160
pos = bytes.index('\n', 6)
161
z_header_length = int(bytes[6:pos])
163
pos2 = bytes.index('\n', pos)
164
header_length = int(bytes[pos:pos2])
165
if z_header_length == 0:
166
assert header_length == 0
169
pos2 = pos + z_header_length
170
z_header_bytes = bytes[pos:pos2]
171
assert len(z_header_bytes) == z_header_length
172
d = zlib.decompressobj()
173
header_bytes = d.decompress(z_header_bytes)
174
assert len(header_bytes) == header_length
176
lines = header_bytes.split('\n')
177
header_len = len(header_bytes)
181
if not line: #End of record
184
out.add_entry(**info_dict)
187
key, value = line.split(':', 1)
189
value = tuple(map(intern, value.split('\x00')))
190
elif key in ('start', 'length'):
193
value = intern(value)
194
info_dict[key] = value
195
zcontent = bytes[pos2:]
197
out._content = d.decompress(zcontent)
198
assert d.flush() == ''
199
out._size = header_len + len(out._content)
242
def extract(self, key, start, end, sha1=None):
202
def extract(self, key, sha1=None):
243
203
"""Extract the text for a specific key.
245
205
:param key: The label used for this content
246
206
:param sha1: TODO (should we validate only when sha1 is supplied?)
247
207
:return: The bytes for the content
249
if start == end == 0:
251
self._ensure_content(end)
252
# The bytes are 'f' or 'd' for the type, then a variable-length
253
# base128 integer for the content size, then the actual content
254
# We know that the variable-length integer won't be longer than 5
255
# bytes (it takes 5 bytes to encode 2^32)
256
c = self._content[start]
261
raise ValueError('Unknown content control code: %s'
264
content_len, len_len = decode_base128_int(
265
self._content[start + 1:start + 6])
266
content_start = start + 1 + len_len
267
if end != content_start + content_len:
268
raise ValueError('end != len according to field header'
269
' %s != %s' % (end, content_start + content_len))
271
bytes = self._content[content_start:end]
273
bytes = apply_delta_to_source(self._content, content_start, end)
276
def set_chunked_content(self, content_chunks, length):
277
"""Set the content of this block to the given chunks."""
278
# If we have lots of short lines, it is may be more efficient to join
279
# the content ahead of time. If the content is <10MiB, we don't really
280
# care about the extra memory consumption, so we can just pack it and
281
# be done. However, timing showed 18s => 17.9s for repacking 1k revs of
282
# mysql, which is below the noise margin
283
self._content_length = length
284
self._content_chunks = content_chunks
286
self._z_content_chunks = None
288
def set_content(self, content):
289
"""Set the content of this block."""
290
self._content_length = len(content)
291
self._content = content
292
self._z_content_chunks = None
294
def _create_z_content_using_lzma(self):
295
if self._content_chunks is not None:
296
self._content = ''.join(self._content_chunks)
297
self._content_chunks = None
298
if self._content is None:
299
raise AssertionError('Nothing to compress')
300
z_content = pylzma.compress(self._content)
301
self._z_content_chunks = (z_content,)
302
self._z_content_length = len(z_content)
304
def _create_z_content_from_chunks(self, chunks):
305
compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
306
# Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
307
# (measured peak is maybe 30MB over the above...)
308
compressed_chunks = map(compressor.compress, chunks)
309
compressed_chunks.append(compressor.flush())
310
# Ignore empty chunks
311
self._z_content_chunks = [c for c in compressed_chunks if c]
312
self._z_content_length = sum(map(len, self._z_content_chunks))
314
def _create_z_content(self):
315
if self._z_content_chunks is not None:
318
self._create_z_content_using_lzma()
320
if self._content_chunks is not None:
321
chunks = self._content_chunks
323
chunks = (self._content,)
324
self._create_z_content_from_chunks(chunks)
327
"""Create the byte stream as a series of 'chunks'"""
328
self._create_z_content()
330
header = self.GCB_LZ_HEADER
332
header = self.GCB_HEADER
333
chunks = ['%s%d\n%d\n'
334
% (header, self._z_content_length, self._content_length),
209
entry = self._entries[key]
210
if entry.type == 'fulltext':
211
bytes = self._content[entry.start:entry.start + entry.length]
212
elif entry.type == 'delta':
213
delta = self._content[entry.start:entry.start + entry.length]
214
bytes = _groupcompress_pyx.apply_delta(self._content, delta)
218
def add_entry(self, key, type, sha1, start, length):
219
"""Add new meta info about an entry.
221
:param key: The key for the new content
222
:param type: Whether this is a delta or fulltext entry (external?)
223
:param sha1: sha1sum of the fulltext of this entry
224
:param start: where the encoded bytes start
225
:param length: total number of bytes in the encoded form
228
entry = GroupCompressBlockEntry(key, type, sha1, start, length)
229
assert key not in self._entries
230
self._entries[key] = entry
233
def to_bytes(self, content=''):
234
"""Encode the information into a byte stream."""
236
for key in sorted(self._entries):
237
entry = self._entries[key]
244
) % ('\x00'.join(entry.key),
251
bytes = ''.join(chunks)
252
info_len = len(bytes)
253
c = zlib.compressobj()
255
z_bytes.append(c.compress(bytes))
257
z_bytes.append(c.flush(zlib.Z_SYNC_FLUSH))
258
z_len = sum(map(len, z_bytes))
260
z_bytes.append(c.compress(content))
261
z_bytes.append(c.flush())
262
chunks = [self.GCB_HEADER,
264
'%d\n' % (info_len,),
336
chunks.extend(self._z_content_chunks)
337
total_len = sum(map(len, chunks))
338
return total_len, chunks
341
"""Encode the information into a byte stream."""
342
total_len, chunks = self.to_chunks()
267
chunks.extend(z_bytes)
343
268
return ''.join(chunks)
345
def _dump(self, include_text=False):
346
"""Take this block, and spit out a human-readable structure.
348
:param include_text: Inserts also include text bits, chose whether you
349
want this displayed in the dump or not.
350
:return: A dump of the given block. The layout is something like:
351
[('f', length), ('d', delta_length, text_length, [delta_info])]
352
delta_info := [('i', num_bytes, text), ('c', offset, num_bytes),
355
self._ensure_content()
358
while pos < self._content_length:
359
kind = self._content[pos]
361
if kind not in ('f', 'd'):
362
raise ValueError('invalid kind character: %r' % (kind,))
363
content_len, len_len = decode_base128_int(
364
self._content[pos:pos + 5])
366
if content_len + pos > self._content_length:
367
raise ValueError('invalid content_len %d for record @ pos %d'
368
% (content_len, pos - len_len - 1))
369
if kind == 'f': # Fulltext
371
text = self._content[pos:pos+content_len]
372
result.append(('f', content_len, text))
374
result.append(('f', content_len))
375
elif kind == 'd': # Delta
376
delta_content = self._content[pos:pos+content_len]
378
# The first entry in a delta is the decompressed length
379
decomp_len, delta_pos = decode_base128_int(delta_content)
380
result.append(('d', content_len, decomp_len, delta_info))
382
while delta_pos < content_len:
383
c = ord(delta_content[delta_pos])
387
delta_pos) = decode_copy_instruction(delta_content, c,
390
text = self._content[offset:offset+length]
391
delta_info.append(('c', offset, length, text))
393
delta_info.append(('c', offset, length))
394
measured_len += length
397
txt = delta_content[delta_pos:delta_pos+c]
400
delta_info.append(('i', c, txt))
403
if delta_pos != content_len:
404
raise ValueError('Delta consumed a bad number of bytes:'
405
' %d != %d' % (delta_pos, content_len))
406
if measured_len != decomp_len:
407
raise ValueError('Delta claimed fulltext was %d bytes, but'
408
' extraction resulted in %d bytes'
409
% (decomp_len, measured_len))
414
class _LazyGroupCompressFactory(object):
415
"""Yield content from a GroupCompressBlock on demand."""
417
def __init__(self, key, parents, manager, start, end, first):
418
"""Create a _LazyGroupCompressFactory
420
:param key: The key of just this record
421
:param parents: The parents of this key (possibly None)
422
:param gc_block: A GroupCompressBlock object
423
:param start: Offset of the first byte for this record in the
425
:param end: Offset of the byte just after the end of this record
426
(ie, bytes = content[start:end])
427
:param first: Is this the first Factory for the given block?
430
self.parents = parents
432
# Note: This attribute coupled with Manager._factories creates a
433
# reference cycle. Perhaps we would rather use a weakref(), or
434
# find an appropriate time to release the ref. After the first
435
# get_bytes_as call? After Manager.get_record_stream() returns
437
self._manager = manager
439
self.storage_kind = 'groupcompress-block'
441
self.storage_kind = 'groupcompress-block-ref'
447
return '%s(%s, first=%s)' % (self.__class__.__name__,
448
self.key, self._first)
450
def get_bytes_as(self, storage_kind):
451
if storage_kind == self.storage_kind:
453
# wire bytes, something...
454
return self._manager._wire_bytes()
457
if storage_kind in ('fulltext', 'chunked'):
458
if self._bytes is None:
459
# Grab and cache the raw bytes for this entry
460
# and break the ref-cycle with _manager since we don't need it
462
self._manager._prepare_for_extract()
463
block = self._manager._block
464
self._bytes = block.extract(self.key, self._start, self._end)
465
# There are code paths that first extract as fulltext, and then
466
# extract as storage_kind (smart fetch). So we don't break the
467
# refcycle here, but instead in manager.get_record_stream()
468
if storage_kind == 'fulltext':
472
raise errors.UnavailableRepresentation(self.key, storage_kind,
476
class _LazyGroupContentManager(object):
477
"""This manages a group of _LazyGroupCompressFactory objects."""
479
_max_cut_fraction = 0.75 # We allow a block to be trimmed to 75% of
480
# current size, and still be considered
482
_full_block_size = 4*1024*1024
483
_full_mixed_block_size = 2*1024*1024
484
_full_enough_block_size = 3*1024*1024 # size at which we won't repack
485
_full_enough_mixed_block_size = 2*768*1024 # 1.5MB
487
def __init__(self, block):
489
# We need to preserve the ordering
493
def add_factory(self, key, parents, start, end):
494
if not self._factories:
498
# Note that this creates a reference cycle....
499
factory = _LazyGroupCompressFactory(key, parents, self,
500
start, end, first=first)
501
# max() works here, but as a function call, doing a compare seems to be
502
# significantly faster, timeit says 250ms for max() and 100ms for the
504
if end > self._last_byte:
505
self._last_byte = end
506
self._factories.append(factory)
508
def get_record_stream(self):
509
"""Get a record for all keys added so far."""
510
for factory in self._factories:
512
# Break the ref-cycle
513
factory._bytes = None
514
factory._manager = None
515
# TODO: Consider setting self._factories = None after the above loop,
516
# as it will break the reference cycle
518
def _trim_block(self, last_byte):
519
"""Create a new GroupCompressBlock, with just some of the content."""
520
# None of the factories need to be adjusted, because the content is
521
# located in an identical place. Just that some of the unreferenced
522
# trailing bytes are stripped
523
trace.mutter('stripping trailing bytes from groupcompress block'
524
' %d => %d', self._block._content_length, last_byte)
525
new_block = GroupCompressBlock()
526
self._block._ensure_content(last_byte)
527
new_block.set_content(self._block._content[:last_byte])
528
self._block = new_block
530
def _rebuild_block(self):
531
"""Create a new GroupCompressBlock with only the referenced texts."""
532
compressor = GroupCompressor()
534
old_length = self._block._content_length
536
for factory in self._factories:
537
bytes = factory.get_bytes_as('fulltext')
538
(found_sha1, start_point, end_point,
539
type) = compressor.compress(factory.key, bytes, factory.sha1)
540
# Now update this factory with the new offsets, etc
541
factory.sha1 = found_sha1
542
factory._start = start_point
543
factory._end = end_point
544
self._last_byte = end_point
545
new_block = compressor.flush()
546
# TODO: Should we check that new_block really *is* smaller than the old
547
# block? It seems hard to come up with a method that it would
548
# expand, since we do full compression again. Perhaps based on a
549
# request that ends up poorly ordered?
550
delta = time.time() - tstart
551
self._block = new_block
552
trace.mutter('creating new compressed block on-the-fly in %.3fs'
553
' %d bytes => %d bytes', delta, old_length,
554
self._block._content_length)
556
def _prepare_for_extract(self):
557
"""A _LazyGroupCompressFactory is about to extract to fulltext."""
558
# We expect that if one child is going to fulltext, all will be. This
559
# helps prevent all of them from extracting a small amount at a time.
560
# Which in itself isn't terribly expensive, but resizing 2MB 32kB at a
561
# time (self._block._content) is a little expensive.
562
self._block._ensure_content(self._last_byte)
564
def _check_rebuild_action(self):
565
"""Check to see if our block should be repacked."""
568
for factory in self._factories:
569
total_bytes_used += factory._end - factory._start
570
if last_byte_used < factory._end:
571
last_byte_used = factory._end
572
# If we are using more than half of the bytes from the block, we have
573
# nothing else to check
574
if total_bytes_used * 2 >= self._block._content_length:
575
return None, last_byte_used, total_bytes_used
576
# We are using less than 50% of the content. Is the content we are
577
# using at the beginning of the block? If so, we can just trim the
578
# tail, rather than rebuilding from scratch.
579
if total_bytes_used * 2 > last_byte_used:
580
return 'trim', last_byte_used, total_bytes_used
582
# We are using a small amount of the data, and it isn't just packed
583
# nicely at the front, so rebuild the content.
584
# Note: This would be *nicer* as a strip-data-from-group, rather than
585
# building it up again from scratch
586
# It might be reasonable to consider the fulltext sizes for
587
# different bits when deciding this, too. As you may have a small
588
# fulltext, and a trivial delta, and you are just trading around
589
# for another fulltext. If we do a simple 'prune' you may end up
590
# expanding many deltas into fulltexts, as well.
591
# If we build a cheap enough 'strip', then we could try a strip,
592
# if that expands the content, we then rebuild.
593
return 'rebuild', last_byte_used, total_bytes_used
595
def check_is_well_utilized(self):
596
"""Is the current block considered 'well utilized'?
598
This heuristic asks if the current block considers itself to be a fully
599
developed group, rather than just a loose collection of data.
601
if len(self._factories) == 1:
602
# A block of length 1 could be improved by combining with other
603
# groups - don't look deeper. Even larger than max size groups
604
# could compress well with adjacent versions of the same thing.
606
action, last_byte_used, total_bytes_used = self._check_rebuild_action()
607
block_size = self._block._content_length
608
if total_bytes_used < block_size * self._max_cut_fraction:
609
# This block wants to trim itself small enough that we want to
610
# consider it under-utilized.
612
# TODO: This code is meant to be the twin of _insert_record_stream's
613
# 'start_new_block' logic. It would probably be better to factor
614
# out that logic into a shared location, so that it stays
616
# We currently assume a block is properly utilized whenever it is >75%
617
# of the size of a 'full' block. In normal operation, a block is
618
# considered full when it hits 4MB of same-file content. So any block
619
# >3MB is 'full enough'.
620
# The only time this isn't true is when a given block has large-object
621
# content. (a single file >4MB, etc.)
622
# Under these circumstances, we allow a block to grow to
623
# 2 x largest_content. Which means that if a given block had a large
624
# object, it may actually be under-utilized. However, given that this
625
# is 'pack-on-the-fly' it is probably reasonable to not repack large
626
# content blobs on-the-fly. Note that because we return False for all
627
# 1-item blobs, we will repack them; we may wish to reevaluate our
628
# treatment of large object blobs in the future.
629
if block_size >= self._full_enough_block_size:
631
# If a block is <3MB, it still may be considered 'full' if it contains
632
# mixed content. The current rule is 2MB of mixed content is considered
633
# full. So check to see if this block contains mixed content, and
634
# set the threshold appropriately.
636
for factory in self._factories:
637
prefix = factory.key[:-1]
638
if common_prefix is None:
639
common_prefix = prefix
640
elif prefix != common_prefix:
641
# Mixed content, check the size appropriately
642
if block_size >= self._full_enough_mixed_block_size:
645
# The content failed both the mixed check and the single-content check
646
# so obviously it is not fully utilized
647
# TODO: there is one other constraint that isn't being checked
648
# namely, that the entries in the block are in the appropriate
649
# order. For example, you could insert the entries in exactly
650
# reverse groupcompress order, and we would think that is ok.
651
# (all the right objects are in one group, and it is fully
652
# utilized, etc.) For now, we assume that case is rare,
653
# especially since we should always fetch in 'groupcompress'
657
def _check_rebuild_block(self):
658
action, last_byte_used, total_bytes_used = self._check_rebuild_action()
662
self._trim_block(last_byte_used)
663
elif action == 'rebuild':
664
self._rebuild_block()
666
raise ValueError('unknown rebuild action: %r' % (action,))
668
def _wire_bytes(self):
669
"""Return a byte stream suitable for transmitting over the wire."""
670
self._check_rebuild_block()
671
# The outer block starts with:
672
# 'groupcompress-block\n'
673
# <length of compressed key info>\n
674
# <length of uncompressed info>\n
675
# <length of gc block>\n
678
lines = ['groupcompress-block\n']
679
# The minimal info we need is the key, the start offset, and the
680
# parents. The length and type are encoded in the record itself.
681
# However, passing in the other bits makes it easier. The list of
682
# keys, and the start offset, the length
684
# 1 line with parents, '' for ()
685
# 1 line for start offset
686
# 1 line for end byte
688
for factory in self._factories:
689
key_bytes = '\x00'.join(factory.key)
690
parents = factory.parents
692
parent_bytes = 'None:'
694
parent_bytes = '\t'.join('\x00'.join(key) for key in parents)
695
record_header = '%s\n%s\n%d\n%d\n' % (
696
key_bytes, parent_bytes, factory._start, factory._end)
697
header_lines.append(record_header)
698
# TODO: Can we break the refcycle at this point and set
699
# factory._manager = None?
700
header_bytes = ''.join(header_lines)
702
header_bytes_len = len(header_bytes)
703
z_header_bytes = zlib.compress(header_bytes)
705
z_header_bytes_len = len(z_header_bytes)
706
block_bytes_len, block_chunks = self._block.to_chunks()
707
lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
709
lines.append(z_header_bytes)
710
lines.extend(block_chunks)
711
del z_header_bytes, block_chunks
712
# TODO: This is a point where we will double the memory consumption. To
713
# avoid this, we probably have to switch to a 'chunked' api
714
return ''.join(lines)
717
def from_bytes(cls, bytes):
718
# TODO: This does extra string copying, probably better to do it a
719
# different way. At a minimum this creates 2 copies of the
721
(storage_kind, z_header_len, header_len,
722
block_len, rest) = bytes.split('\n', 4)
724
if storage_kind != 'groupcompress-block':
725
raise ValueError('Unknown storage kind: %s' % (storage_kind,))
726
z_header_len = int(z_header_len)
727
if len(rest) < z_header_len:
728
raise ValueError('Compressed header len shorter than all bytes')
729
z_header = rest[:z_header_len]
730
header_len = int(header_len)
731
header = zlib.decompress(z_header)
732
if len(header) != header_len:
733
raise ValueError('invalid length for decompressed bytes')
735
block_len = int(block_len)
736
if len(rest) != z_header_len + block_len:
737
raise ValueError('Invalid length for block')
738
block_bytes = rest[z_header_len:]
740
# So now we have a valid GCB, we just need to parse the factories that
742
header_lines = header.split('\n')
744
last = header_lines.pop()
746
raise ValueError('header lines did not end with a trailing'
748
if len(header_lines) % 4 != 0:
749
raise ValueError('The header was not an even multiple of 4 lines')
750
block = GroupCompressBlock.from_bytes(block_bytes)
753
for start in xrange(0, len(header_lines), 4):
755
key = tuple(header_lines[start].split('\x00'))
756
parents_line = header_lines[start+1]
757
if parents_line == 'None:':
760
parents = tuple([tuple(segment.split('\x00'))
761
for segment in parents_line.split('\t')
763
start_offset = int(header_lines[start+2])
764
end_offset = int(header_lines[start+3])
765
result.add_factory(key, parents, start_offset, end_offset)
769
def network_block_to_records(storage_kind, bytes, line_end):
770
if storage_kind != 'groupcompress-block':
771
raise ValueError('Unknown storage kind: %s' % (storage_kind,))
772
manager = _LazyGroupContentManager.from_bytes(bytes)
773
return manager.get_record_stream()
776
class _CommonGroupCompressor(object):
779
"""Create a GroupCompressor."""
271
class GroupCompressor(object):
272
"""Produce a serialised group of compressed texts.
274
It contains code very similar to SequenceMatcher because of having a similar
275
task. However some key differences apply:
276
- there is no junk, we want a minimal edit not a human readable diff.
277
- we don't filter very common lines (because we don't know where a good
278
range will start, and after the first text we want to be emitting minmal
280
- we chain the left side, not the right side
281
- we incrementally update the adjacency matrix as new lines are provided.
282
- we look for matches in all of the left side, so the routine which does
283
the analagous task of find_longest_match does not need to filter on the
287
def __init__(self, delta=True):
288
"""Create a GroupCompressor.
290
:param delta: If False, do not compress records.
292
# Consider seeding the lines with some sort of GC Start flag, or
293
# putting it as part of the output stream, rather than in the
782
296
self.endpoint = 0
783
297
self.input_bytes = 0
784
298
self.labels_deltas = {}
785
self._delta_index = None # Set by the children
299
self._delta_index = _groupcompress_pyx.DeltaIndex()
786
300
self._block = GroupCompressBlock()
788
def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
302
def compress(self, key, bytes, expected_sha, soft=False):
789
303
"""Compress lines with label key.
791
305
:param key: A key tuple. It is stored in the output
796
310
:param expected_sha: If non-None, the sha the lines are believed to
797
311
have. During compression the sha is calculated; a mismatch will
799
:param nostore_sha: If the computed sha1 sum matches, we will raise
800
ExistingContent rather than adding the text.
801
313
:param soft: Do a 'soft' compression. This means that we require larger
802
314
ranges to match to be considered for a copy command.
804
:return: The sha1 of lines, the start and end offsets in the delta, and
805
the type ('fulltext' or 'delta').
807
:seealso VersionedFiles.add_lines:
315
:return: The sha1 of lines, and the number of bytes accumulated in
316
the group output so far.
809
if not bytes: # empty, like a dir entry, etc
810
if nostore_sha == _null_sha1:
811
raise errors.ExistingContent()
812
return _null_sha1, 0, 0, 'fulltext'
813
# we assume someone knew what they were doing when they passed it in
814
if expected_sha is not None:
318
if not _FAST or expected_sha is None:
319
sha1 = sha_string(bytes)
815
321
sha1 = expected_sha
817
sha1 = osutils.sha_string(bytes)
818
if nostore_sha is not None:
819
if sha1 == nostore_sha:
820
raise errors.ExistingContent()
821
322
if key[-1] is None:
822
323
key = key[:-1] + ('sha1:' + sha1,)
824
start, end, type = self._compress(key, bytes, len(bytes) / 2, soft)
825
return sha1, start, end, type
827
def _compress(self, key, bytes, max_delta_size, soft=False):
828
"""Compress lines with label key.
830
:param key: A key tuple. It is stored in the output for identification
831
of the text during decompression.
833
:param bytes: The bytes to be compressed
835
:param max_delta_size: The size above which we issue a fulltext instead
838
:param soft: Do a 'soft' compression. This means that we require larger
839
ranges to match to be considered for a copy command.
841
:return: The sha1 of lines, the start and end offsets in the delta, and
842
the type ('fulltext' or 'delta').
844
raise NotImplementedError(self._compress)
846
def extract(self, key):
847
"""Extract a key previously added to the compressor.
849
:param key: The key to extract.
850
:return: An iterable over bytes and the sha1.
852
(start_byte, start_chunk, end_byte, end_chunk) = self.labels_deltas[key]
853
delta_chunks = self.chunks[start_chunk:end_chunk]
854
stored_bytes = ''.join(delta_chunks)
855
if stored_bytes[0] == 'f':
856
fulltext_len, offset = decode_base128_int(stored_bytes[1:10])
857
data_len = fulltext_len + 1 + offset
858
if data_len != len(stored_bytes):
859
raise ValueError('Index claimed fulltext len, but stored bytes'
861
% (len(stored_bytes), data_len))
862
bytes = stored_bytes[offset + 1:]
864
# XXX: This is inefficient at best
865
source = ''.join(self.chunks[:start_chunk])
866
if stored_bytes[0] != 'd':
867
raise ValueError('Unknown content kind, bytes claim %s'
868
% (stored_bytes[0],))
869
delta_len, offset = decode_base128_int(stored_bytes[1:10])
870
data_len = delta_len + 1 + offset
871
if data_len != len(stored_bytes):
872
raise ValueError('Index claimed delta len, but stored bytes'
874
% (len(stored_bytes), data_len))
875
bytes = apply_delta(source, stored_bytes[offset + 1:])
876
bytes_sha1 = osutils.sha_string(bytes)
877
return bytes, bytes_sha1
880
"""Finish this group, creating a formatted stream.
882
After calling this, the compressor should no longer be used
884
self._block.set_chunked_content(self.chunks, self.endpoint)
886
self._delta_index = None
890
"""Call this if you want to 'revoke' the last compression.
892
After this, the data structures will be rolled back, but you cannot do
895
self._delta_index = None
896
del self.chunks[self._last[0]:]
897
self.endpoint = self._last[1]
901
"""Return the overall compression ratio."""
902
return float(self.input_bytes) / float(self.endpoint)
905
class PythonGroupCompressor(_CommonGroupCompressor):
908
"""Create a GroupCompressor.
910
Used only if the pyrex version is not available.
912
super(PythonGroupCompressor, self).__init__()
913
self._delta_index = LinesDeltaIndex([])
914
# The actual content is managed by LinesDeltaIndex
915
self.chunks = self._delta_index.lines
917
def _compress(self, key, bytes, max_delta_size, soft=False):
918
"""see _CommonGroupCompressor._compress"""
919
input_len = len(bytes)
920
new_lines = osutils.split_lines(bytes)
921
out_lines, index_lines = self._delta_index.make_delta(
922
new_lines, bytes_length=input_len, soft=soft)
923
delta_length = sum(map(len, out_lines))
924
if delta_length > max_delta_size:
925
# The delta is longer than the fulltext, insert a fulltext
927
out_lines = ['f', encode_base128_int(input_len)]
928
out_lines.extend(new_lines)
929
index_lines = [False, False]
930
index_lines.extend([True] * len(new_lines))
932
# this is a worthy delta, output it
935
# Update the delta_length to include those two encoded integers
936
out_lines[1] = encode_base128_int(delta_length)
938
start = self.endpoint
939
chunk_start = len(self.chunks)
940
self._last = (chunk_start, self.endpoint)
941
self._delta_index.extend_lines(out_lines, index_lines)
942
self.endpoint = self._delta_index.endpoint
943
self.input_bytes += input_len
944
chunk_end = len(self.chunks)
945
self.labels_deltas[key] = (start, chunk_start,
946
self.endpoint, chunk_end)
947
return start, self.endpoint, type
950
class PyrexGroupCompressor(_CommonGroupCompressor):
951
"""Produce a serialised group of compressed texts.
953
It contains code very similar to SequenceMatcher because of having a similar
954
task. However some key differences apply:
955
- there is no junk, we want a minimal edit not a human readable diff.
956
- we don't filter very common lines (because we don't know where a good
957
range will start, and after the first text we want to be emitting minmal
959
- we chain the left side, not the right side
960
- we incrementally update the adjacency matrix as new lines are provided.
961
- we look for matches in all of the left side, so the routine which does
962
the analagous task of find_longest_match does not need to filter on the
967
super(PyrexGroupCompressor, self).__init__()
968
self._delta_index = DeltaIndex()
970
def _compress(self, key, bytes, max_delta_size, soft=False):
971
"""see _CommonGroupCompressor._compress"""
972
324
input_len = len(bytes)
973
325
# By having action/label/sha1/len, we can parse the group if the index
974
326
# was ever destroyed, we have the key in 'label', we know the final
1416
621
# keys might be a generator
1417
622
orig_keys = list(keys)
623
keys = set(orig_keys)
1421
626
if (not self._index.has_graph
1422
and ordering in ('topological', 'groupcompress')):
627
and ordering in ('topological', 'gc-optimal')):
1423
628
# Cannot topological order when no graph has been stored.
1424
# but we allow 'as-requested' or 'unordered'
1425
629
ordering = 'unordered'
1427
remaining_keys = keys
1430
keys = set(remaining_keys)
1431
for content_factory in self._get_remaining_record_stream(keys,
1432
orig_keys, ordering, include_delta_closure):
1433
remaining_keys.discard(content_factory.key)
1434
yield content_factory
1436
except errors.RetryWithNewPacks, e:
1437
self._access.reload_or_raise(e)
1439
def _find_from_fallback(self, missing):
1440
"""Find whatever keys you can from the fallbacks.
1442
:param missing: A set of missing keys. This set will be mutated as keys
1443
are found from a fallback_vfs
1444
:return: (parent_map, key_to_source_map, source_results)
1445
parent_map the overall key => parent_keys
1446
key_to_source_map a dict from {key: source}
1447
source_results a list of (source: keys)
1450
key_to_source_map = {}
1452
for source in self._fallback_vfs:
1455
source_parents = source.get_parent_map(missing)
1456
parent_map.update(source_parents)
1457
source_parents = list(source_parents)
1458
source_results.append((source, source_parents))
1459
key_to_source_map.update((key, source) for key in source_parents)
1460
missing.difference_update(source_parents)
1461
return parent_map, key_to_source_map, source_results
1463
def _get_ordered_source_keys(self, ordering, parent_map, key_to_source_map):
1464
"""Get the (source, [keys]) list.
1466
The returned objects should be in the order defined by 'ordering',
1467
which can weave between different sources.
1468
:param ordering: Must be one of 'topological' or 'groupcompress'
1469
:return: List of [(source, [keys])] tuples, such that all keys are in
1470
the defined order, regardless of source.
1472
if ordering == 'topological':
1473
present_keys = topo_sort(parent_map)
1475
# ordering == 'groupcompress'
1476
# XXX: This only optimizes for the target ordering. We may need
1477
# to balance that with the time it takes to extract
1478
# ordering, by somehow grouping based on
1479
# locations[key][0:3]
1480
present_keys = sort_gc_optimal(parent_map)
1481
# Now group by source:
1483
current_source = None
1484
for key in present_keys:
1485
source = key_to_source_map.get(key, self)
1486
if source is not current_source:
1487
source_keys.append((source, []))
1488
current_source = source
1489
source_keys[-1][1].append(key)
1492
def _get_as_requested_source_keys(self, orig_keys, locations, unadded_keys,
1495
current_source = None
1496
for key in orig_keys:
1497
if key in locations or key in unadded_keys:
1499
elif key in key_to_source_map:
1500
source = key_to_source_map[key]
1503
if source is not current_source:
1504
source_keys.append((source, []))
1505
current_source = source
1506
source_keys[-1][1].append(key)
1509
def _get_io_ordered_source_keys(self, locations, unadded_keys,
1512
# This is the group the bytes are stored in, followed by the
1513
# location in the group
1514
return locations[key][0]
1515
present_keys = sorted(locations.iterkeys(), key=get_group)
1516
# We don't have an ordering for keys in the in-memory object, but
1517
# lets process the in-memory ones first.
1518
present_keys = list(unadded_keys) + present_keys
1519
# Now grab all of the ones from other sources
1520
source_keys = [(self, present_keys)]
1521
source_keys.extend(source_result)
1524
def _get_remaining_record_stream(self, keys, orig_keys, ordering,
1525
include_delta_closure):
1526
"""Get a stream of records for keys.
1528
:param keys: The keys to include.
1529
:param ordering: one of 'unordered', 'topological', 'groupcompress' or
1531
:param include_delta_closure: If True then the closure across any
1532
compression parents will be included (in the opaque data).
1533
:return: An iterator of ContentFactory objects, each of which is only
1534
valid until the iterator is advanced.
1536
630
# Cheap: iterate
1537
631
locations = self._index.get_build_details(keys)
1538
unadded_keys = set(self._unadded_refs).intersection(keys)
1539
missing = keys.difference(locations)
1540
missing.difference_update(unadded_keys)
1541
(fallback_parent_map, key_to_source_map,
1542
source_result) = self._find_from_fallback(missing)
1543
if ordering in ('topological', 'groupcompress'):
632
local_keys = frozenset(keys).intersection(set(self._unadded_refs))
633
if ordering == 'topological':
1544
634
# would be better to not globally sort initially but instead
1545
635
# start with one key, recurse to its oldest parent, then grab
1546
636
# everything in the same group, etc.
1547
637
parent_map = dict((key, details[2]) for key, details in
1548
638
locations.iteritems())
1549
for key in unadded_keys:
1550
parent_map[key] = self._unadded_refs[key]
1551
parent_map.update(fallback_parent_map)
1552
source_keys = self._get_ordered_source_keys(ordering, parent_map,
639
for key in local_keys:
640
parent_map[key] = self._unadded_refs[key]
641
present_keys = topo_sort(parent_map)
642
# Now group by source:
643
elif ordering == 'gc-optimal':
644
parent_map = dict((key, details[2]) for key, details in
645
locations.iteritems())
646
for key in local_keys:
647
parent_map[key] = self._unadded_refs[key]
648
# XXX: This only optimizes for the target ordering. We may need to
649
# balance that with the time it takes to extract ordering, by
650
# somehow grouping based on locations[key][0:3]
651
present_keys = sort_gc_optimal(parent_map)
1554
652
elif ordering == 'as-requested':
1555
source_keys = self._get_as_requested_source_keys(orig_keys,
1556
locations, unadded_keys, key_to_source_map)
653
present_keys = [key for key in orig_keys if key in locations
654
or key in local_keys]
1558
656
# We want to yield the keys in a semi-optimal (read-wise) ordering.
1559
657
# Otherwise we thrash the _group_cache and destroy performance
1560
source_keys = self._get_io_ordered_source_keys(locations,
1561
unadded_keys, source_result)
659
# This is the group the bytes are stored in, followed by the
660
# location in the group
661
return locations[key][0]
662
present_keys = sorted(locations.iterkeys(), key=get_group)
663
# We don't have an ordering for keys in the in-memory object, but
664
# lets process the in-memory ones first.
665
present_keys = list(local_keys) + present_keys
666
locations.update((key, None) for key in local_keys)
667
absent_keys = keys.difference(set(locations))
668
for key in absent_keys:
1563
669
yield AbsentContentFactory(key)
1564
# Batch up as many keys as we can until either:
1565
# - we encounter an unadded ref, or
1566
# - we run out of keys, or
1567
# - the total bytes to retrieve for this batch > BATCH_SIZE
1568
batcher = _BatchingBlockFetcher(self, locations)
1569
for source, keys in source_keys:
1572
if key in self._unadded_refs:
1573
# Flush batch, then yield unadded ref from
1575
for factory in batcher.yield_factories(full_flush=True):
1577
bytes, sha1 = self._compressor.extract(key)
1578
parents = self._unadded_refs[key]
1579
yield FulltextContentFactory(key, parents, sha1, bytes)
1581
if batcher.add_key(key) > BATCH_SIZE:
1582
# Ok, this batch is big enough. Yield some results.
1583
for factory in batcher.yield_factories():
670
for key in present_keys:
671
if key in self._unadded_refs:
672
chunks, sha1 = self._compressor.extract(key)
673
parents = self._unadded_refs[key]
1586
for factory in batcher.yield_factories(full_flush=True):
1588
for record in source.get_record_stream(keys, ordering,
1589
include_delta_closure):
1591
for factory in batcher.yield_factories(full_flush=True):
675
index_memo, _, parents, (method, _) = locations[key]
676
block = self._get_block(index_memo)
677
entry, bytes = block.extract(key)
679
if not _FAST and sha_string(bytes) != sha1:
680
raise AssertionError('sha1 sum did not match')
681
yield FulltextContentFactory(key, parents, sha1, bytes)
1594
683
def get_sha1s(self, keys):
1595
684
"""See VersionedFiles.get_sha1s()."""
1646
725
# This will go up to fulltexts for gc to gc fetching, which isn't
1648
self._compressor = GroupCompressor()
727
self._compressor = GroupCompressor(self._delta)
1649
728
self._unadded_refs = {}
1650
729
keys_to_add = []
1652
bytes_len, chunks = self._compressor.flush().to_chunks()
1653
self._compressor = GroupCompressor()
1654
# Note: At this point we still have 1 copy of the fulltext (in
1655
# record and the var 'bytes'), and this generates 2 copies of
1656
# the compressed text (one for bytes, one in chunks)
1657
# TODO: Push 'chunks' down into the _access api, so that we don't
1658
# have to double compressed memory here
1659
# TODO: Figure out how to indicate that we would be happy to free
1660
# the fulltext content at this point. Note that sometimes we
1661
# will want it later (streaming CHK pages), but most of the
1662
# time we won't (everything else)
1663
bytes = ''.join(chunks)
733
# TODO: we may want to have the header compressed in the same chain
734
# as the data, or we may not, evaulate it
735
# having them compressed together is probably a win for
736
# revisions and the 'inv' portion of chk inventories. As the
737
# label in the header is duplicated in the text.
738
# For chk pages and real bytes, I would guess this is not
740
bytes = self._compressor._block.to_bytes(
741
''.join(self._compressor.lines))
1665
742
index, start, length = self._access.add_raw_records(
1666
743
[(None, len(bytes))], bytes)[0]
1668
745
for key, reads, refs in keys_to_add:
1669
746
nodes.append((key, "%d %d %s" % (start, length, reads), refs))
1670
747
self._index.add_records(nodes, random_id=random_id)
1671
self._unadded_refs = {}
1674
748
last_prefix = None
1675
max_fulltext_len = 0
1676
max_fulltext_prefix = None
1677
insert_manager = None
1680
# XXX: TODO: remove this, it is just for safety checking for now
1681
inserted_keys = set()
1682
reuse_this_block = reuse_blocks
1683
749
for record in stream:
1684
750
# Raise an error when a record is missing.
1685
751
if record.storage_kind == 'absent':
1686
752
raise errors.RevisionNotPresent(record.key, self)
1688
if record.key in inserted_keys:
1689
trace.note('Insert claimed random_id=True,'
1690
' but then inserted %r two times', record.key)
1692
inserted_keys.add(record.key)
1694
# If the reuse_blocks flag is set, check to see if we can just
1695
# copy a groupcompress block as-is.
1696
# We only check on the first record (groupcompress-block) not
1697
# on all of the (groupcompress-block-ref) entries.
1698
# The reuse_this_block flag is then kept for as long as
1699
if record.storage_kind == 'groupcompress-block':
1700
# Check to see if we really want to re-use this block
1701
insert_manager = record._manager
1702
reuse_this_block = insert_manager.check_is_well_utilized()
1704
reuse_this_block = False
1705
if reuse_this_block:
1706
# We still want to reuse this block
1707
if record.storage_kind == 'groupcompress-block':
1708
# Insert the raw block into the target repo
1709
insert_manager = record._manager
1710
bytes = record._manager._block.to_bytes()
1711
_, start, length = self._access.add_raw_records(
1712
[(None, len(bytes))], bytes)[0]
1715
block_length = length
1716
if record.storage_kind in ('groupcompress-block',
1717
'groupcompress-block-ref'):
1718
if insert_manager is None:
1719
raise AssertionError('No insert_manager set')
1720
if insert_manager is not record._manager:
1721
raise AssertionError('insert_manager does not match'
1722
' the current record, we cannot be positive'
1723
' that the appropriate content was inserted.'
1725
value = "%d %d %d %d" % (block_start, block_length,
1726
record._start, record._end)
1727
nodes = [(record.key, value, (record.parents,))]
1728
# TODO: Consider buffering up many nodes to be added, not
1729
# sure how much overhead this has, but we're seeing
1730
# ~23s / 120s in add_records calls
1731
self._index.add_records(nodes, random_id=random_id)
1734
754
bytes = record.get_bytes_as('fulltext')
1735
755
except errors.UnavailableRepresentation:
1736
756
adapter_key = record.storage_kind, 'fulltext'
1737
757
adapter = get_adapter(adapter_key)
1738
758
bytes = adapter.get_bytes(record)
1739
760
if len(record.key) > 1:
1740
761
prefix = record.key[0]
1741
soft = (prefix == last_prefix)
1745
if max_fulltext_len < len(bytes):
1746
max_fulltext_len = len(bytes)
1747
max_fulltext_prefix = prefix
1748
(found_sha1, start_point, end_point,
1749
type) = self._compressor.compress(record.key,
1750
bytes, record.sha1, soft=soft,
1751
nostore_sha=nostore_sha)
1752
# delta_ratio = float(len(bytes)) / (end_point - start_point)
1753
# Check if we want to continue to include that text
1754
if (prefix == max_fulltext_prefix
1755
and end_point < 2 * max_fulltext_len):
1756
# As long as we are on the same file_id, we will fill at least
1757
# 2 * max_fulltext_len
1758
start_new_block = False
1759
elif end_point > 4*1024*1024:
1760
start_new_block = True
1761
elif (prefix is not None and prefix != last_prefix
1762
and end_point > 2*1024*1024):
1763
start_new_block = True
1765
start_new_block = False
1766
last_prefix = prefix
1768
self._compressor.pop_last()
1770
max_fulltext_len = len(bytes)
1771
(found_sha1, start_point, end_point,
1772
type) = self._compressor.compress(record.key, bytes,
762
if (last_prefix is not None and prefix != last_prefix):
764
if basis_end > 1024 * 1024 * 2:
766
self._compressor = GroupCompressor(self._delta)
767
self._unadded_refs = {}
772
found_sha1, end_point = self._compressor.compress(record.key,
773
bytes, record.sha1, soft=soft)
1774
774
if record.key[-1] is None:
1775
775
key = record.key[:-1] + ('sha1:' + found_sha1,)
1777
777
key = record.key
1778
778
self._unadded_refs[key] = record.parents
1779
779
yield found_sha1
1780
as_st = static_tuple.StaticTuple.from_sequence
1781
if record.parents is not None:
1782
parents = as_st([as_st(p) for p in record.parents])
1785
refs = static_tuple.StaticTuple(parents)
1786
keys_to_add.append((key, '%d %d' % (start_point, end_point), refs))
780
keys_to_add.append((key, '%d %d' % (basis_end, end_point),
782
basis_end = end_point
783
if basis_end > 1024 * 1024 * 4:
785
self._compressor = GroupCompressor(self._delta)
786
self._unadded_refs = {}
1787
790
if len(keys_to_add):
1789
792
self._compressor = None
793
self._unadded_refs = {}
1791
795
def iter_lines_added_or_present_in_keys(self, keys, pb=None):
1792
796
"""Iterate over the lines in the versioned files from keys.