1
# groupcompress, a bzr plugin providing new compression logic.
2
# Copyright (C) 2008 Canonical Limited.
1
# Copyright (C) 2008-2011 Canonical Ltd
4
3
# This program is free software; you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License version 2 as published
6
# by the Free Software Foundation.
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
11
# GNU General Public License for more details.
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18
17
"""Core compression logic for compressing streams of related files."""
20
from itertools import izip
21
from cStringIO import StringIO
24
26
from bzrlib import (
29
30
graph as _mod_graph,
33
from bzrlib.graph import Graph
34
from bzrlib.knit import _DirectPackAccess
35
from bzrlib.osutils import (
42
37
from bzrlib.btree_index import BTreeBuilder
43
38
from bzrlib.lru_cache import LRUSizeCache
44
from bzrlib.plugins.groupcompress import equivalence_table
45
39
from bzrlib.tsort import topo_sort
46
40
from bzrlib.versionedfile import (
48
42
AbsentContentFactory,
43
ChunkedContentFactory,
49
44
FulltextContentFactory,
56
lines = iter(line_list)
58
label_line = lines.next()
59
sha1_line = lines.next()
60
if (not label_line.startswith('label: ') or
61
not sha1_line.startswith('sha1: ')):
62
raise AssertionError("bad text record %r" % lines)
63
label = tuple(label_line[7:-1].split('\x00'))
64
sha1 = sha1_line[6:-1]
68
numbers = [int(n) for n in header[2:].split(',')]
70
result.append((op, numbers[0], numbers[1], None))
72
contents = [next() for i in xrange(numbers[0])]
73
result.append((op, None, numbers[0], contents))
74
return label, sha1, result
76
def apply_delta(basis, delta):
77
"""Apply delta to this object to become new_version_id."""
80
# eq ranges occur where gaps occur
81
# start, end refer to offsets in basis
82
for op, start, count, delta_lines in delta:
84
lines.append(basis[start:start+count])
86
lines.extend(delta_lines)
87
trim_encoding_newline(lines)
91
def trim_encoding_newline(lines):
95
lines[-1] = lines[-1][:-1]
98
class GroupCompressor(object):
99
"""Produce a serialised group of compressed texts.
101
It contains code very similar to SequenceMatcher because of having a similar
102
task. However some key differences apply:
103
- there is no junk, we want a minimal edit not a human readable diff.
104
- we don't filter very common lines (because we don't know where a good
105
range will start, and after the first text we want to be emitting minmal
107
- we chain the left side, not the right side
108
- we incrementally update the adjacency matrix as new lines are provided.
109
- we look for matches in all of the left side, so the routine which does
110
the analagous task of find_longest_match does not need to filter on the
114
_equivalence_table_class = equivalence_table.EquivalenceTable
116
def __init__(self, delta=True):
117
"""Create a GroupCompressor.
119
:paeam delta: If False, do not compress records.
122
self.line_offsets = []
48
# Minimum number of uncompressed bytes to try fetch at once when retrieving
49
# groupcompress blocks.
52
_USE_LZMA = False and (pylzma is not None)
54
# osutils.sha_string('')
55
_null_sha1 = 'da39a3ee5e6b4b0d3255bfef95601890afd80709'
57
def sort_gc_optimal(parent_map):
58
"""Sort and group the keys in parent_map into groupcompress order.
60
groupcompress is defined (currently) as reverse-topological order, grouped
63
:return: A sorted-list of keys
65
# groupcompress ordering is approximately reverse topological,
66
# properly grouped by file-id.
68
for key, value in parent_map.iteritems():
69
if isinstance(key, str) or len(key) == 1:
74
per_prefix_map[prefix][key] = value
76
per_prefix_map[prefix] = {key: value}
79
for prefix in sorted(per_prefix_map):
80
present_keys.extend(reversed(topo_sort(per_prefix_map[prefix])))
84
# The max zlib window size is 32kB, so if we set 'max_size' output of the
85
# decompressor to the requested bytes + 32kB, then we should guarantee
86
# num_bytes coming out.
87
_ZLIB_DECOMP_WINDOW = 32*1024
89
class GroupCompressBlock(object):
90
"""An object which maintains the internal structure of the compressed data.
92
This tracks the meta info (start of text, length, type, etc.)
95
# Group Compress Block v1 Zlib
96
GCB_HEADER = 'gcb1z\n'
97
# Group Compress Block v1 Lzma
98
GCB_LZ_HEADER = 'gcb1l\n'
99
GCB_KNOWN_HEADERS = (GCB_HEADER, GCB_LZ_HEADER)
102
# map by key? or just order in file?
103
self._compressor_name = None
104
self._z_content_chunks = None
105
self._z_content_decompressor = None
106
self._z_content_length = None
107
self._content_length = None
109
self._content_chunks = None
112
# This is the maximum number of bytes this object will reference if
113
# everything is decompressed. However, if we decompress less than
114
# everything... (this would cause some problems for LRUSizeCache)
115
return self._content_length + self._z_content_length
117
def _ensure_content(self, num_bytes=None):
118
"""Make sure that content has been expanded enough.
120
:param num_bytes: Ensure that we have extracted at least num_bytes of
121
content. If None, consume everything
123
if self._content_length is None:
124
raise AssertionError('self._content_length should never be None')
125
if num_bytes is None:
126
num_bytes = self._content_length
127
elif (self._content_length is not None
128
and num_bytes > self._content_length):
129
raise AssertionError(
130
'requested num_bytes (%d) > content length (%d)'
131
% (num_bytes, self._content_length))
132
# Expand the content if required
133
if self._content is None:
134
if self._content_chunks is not None:
135
self._content = ''.join(self._content_chunks)
136
self._content_chunks = None
137
if self._content is None:
138
# We join self._z_content_chunks here, because if we are
139
# decompressing, then it is *very* likely that we have a single
141
if self._z_content_chunks is None:
142
raise AssertionError('No content to decompress')
143
z_content = ''.join(self._z_content_chunks)
146
elif self._compressor_name == 'lzma':
147
# We don't do partial lzma decomp yet
148
self._content = pylzma.decompress(z_content)
149
elif self._compressor_name == 'zlib':
150
# Start a zlib decompressor
151
if num_bytes * 4 > self._content_length * 3:
152
# If we are requesting more that 3/4ths of the content,
153
# just extract the whole thing in a single pass
154
num_bytes = self._content_length
155
self._content = zlib.decompress(z_content)
157
self._z_content_decompressor = zlib.decompressobj()
158
# Seed the decompressor with the uncompressed bytes, so
159
# that the rest of the code is simplified
160
self._content = self._z_content_decompressor.decompress(
161
z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
162
if not self._z_content_decompressor.unconsumed_tail:
163
self._z_content_decompressor = None
165
raise AssertionError('Unknown compressor: %r'
166
% self._compressor_name)
167
# Any bytes remaining to be decompressed will be in the decompressors
170
# Do we have enough bytes already?
171
if len(self._content) >= num_bytes:
173
# If we got this far, and don't have a decompressor, something is wrong
174
if self._z_content_decompressor is None:
175
raise AssertionError(
176
'No decompressor to decompress %d bytes' % num_bytes)
177
remaining_decomp = self._z_content_decompressor.unconsumed_tail
178
if not remaining_decomp:
179
raise AssertionError('Nothing left to decompress')
180
needed_bytes = num_bytes - len(self._content)
181
# We always set max_size to 32kB over the minimum needed, so that
182
# zlib will give us as much as we really want.
183
# TODO: If this isn't good enough, we could make a loop here,
184
# that keeps expanding the request until we get enough
185
self._content += self._z_content_decompressor.decompress(
186
remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
187
if len(self._content) < num_bytes:
188
raise AssertionError('%d bytes wanted, only %d available'
189
% (num_bytes, len(self._content)))
190
if not self._z_content_decompressor.unconsumed_tail:
191
# The stream is finished
192
self._z_content_decompressor = None
194
def _parse_bytes(self, bytes, pos):
195
"""Read the various lengths from the header.
197
This also populates the various 'compressed' buffers.
199
:return: The position in bytes just after the last newline
201
# At present, we have 2 integers for the compressed and uncompressed
202
# content. In base10 (ascii) 14 bytes can represent > 1TB, so to avoid
203
# checking too far, cap the search to 14 bytes.
204
pos2 = bytes.index('\n', pos, pos + 14)
205
self._z_content_length = int(bytes[pos:pos2])
207
pos2 = bytes.index('\n', pos, pos + 14)
208
self._content_length = int(bytes[pos:pos2])
210
if len(bytes) != (pos + self._z_content_length):
211
# XXX: Define some GCCorrupt error ?
212
raise AssertionError('Invalid bytes: (%d) != %d + %d' %
213
(len(bytes), pos, self._z_content_length))
214
self._z_content_chunks = (bytes[pos:],)
217
def _z_content(self):
218
"""Return z_content_chunks as a simple string.
220
Meant only to be used by the test suite.
222
if self._z_content_chunks is not None:
223
return ''.join(self._z_content_chunks)
227
def from_bytes(cls, bytes):
229
if bytes[:6] not in cls.GCB_KNOWN_HEADERS:
230
raise ValueError('bytes did not start with any of %r'
231
% (cls.GCB_KNOWN_HEADERS,))
232
# XXX: why not testing the whole header ?
234
out._compressor_name = 'zlib'
235
elif bytes[4] == 'l':
236
out._compressor_name = 'lzma'
238
raise ValueError('unknown compressor: %r' % (bytes,))
239
out._parse_bytes(bytes, 6)
242
def extract(self, key, start, end, sha1=None):
243
"""Extract the text for a specific key.
245
:param key: The label used for this content
246
:param sha1: TODO (should we validate only when sha1 is supplied?)
247
:return: The bytes for the content
249
if start == end == 0:
251
self._ensure_content(end)
252
# The bytes are 'f' or 'd' for the type, then a variable-length
253
# base128 integer for the content size, then the actual content
254
# We know that the variable-length integer won't be longer than 5
255
# bytes (it takes 5 bytes to encode 2^32)
256
c = self._content[start]
261
raise ValueError('Unknown content control code: %s'
264
content_len, len_len = decode_base128_int(
265
self._content[start + 1:start + 6])
266
content_start = start + 1 + len_len
267
if end != content_start + content_len:
268
raise ValueError('end != len according to field header'
269
' %s != %s' % (end, content_start + content_len))
271
bytes = self._content[content_start:end]
273
bytes = apply_delta_to_source(self._content, content_start, end)
276
def set_chunked_content(self, content_chunks, length):
277
"""Set the content of this block to the given chunks."""
278
# If we have lots of short lines, it is may be more efficient to join
279
# the content ahead of time. If the content is <10MiB, we don't really
280
# care about the extra memory consumption, so we can just pack it and
281
# be done. However, timing showed 18s => 17.9s for repacking 1k revs of
282
# mysql, which is below the noise margin
283
self._content_length = length
284
self._content_chunks = content_chunks
286
self._z_content_chunks = None
288
def set_content(self, content):
289
"""Set the content of this block."""
290
self._content_length = len(content)
291
self._content = content
292
self._z_content_chunks = None
294
def _create_z_content_using_lzma(self):
295
if self._content_chunks is not None:
296
self._content = ''.join(self._content_chunks)
297
self._content_chunks = None
298
if self._content is None:
299
raise AssertionError('Nothing to compress')
300
z_content = pylzma.compress(self._content)
301
self._z_content_chunks = (z_content,)
302
self._z_content_length = len(z_content)
304
def _create_z_content_from_chunks(self, chunks):
305
compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
306
# Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
307
# (measured peak is maybe 30MB over the above...)
308
compressed_chunks = map(compressor.compress, chunks)
309
compressed_chunks.append(compressor.flush())
310
# Ignore empty chunks
311
self._z_content_chunks = [c for c in compressed_chunks if c]
312
self._z_content_length = sum(map(len, self._z_content_chunks))
314
def _create_z_content(self):
315
if self._z_content_chunks is not None:
318
self._create_z_content_using_lzma()
320
if self._content_chunks is not None:
321
chunks = self._content_chunks
323
chunks = (self._content,)
324
self._create_z_content_from_chunks(chunks)
327
"""Create the byte stream as a series of 'chunks'"""
328
self._create_z_content()
330
header = self.GCB_LZ_HEADER
332
header = self.GCB_HEADER
333
chunks = ['%s%d\n%d\n'
334
% (header, self._z_content_length, self._content_length),
336
chunks.extend(self._z_content_chunks)
337
total_len = sum(map(len, chunks))
338
return total_len, chunks
341
"""Encode the information into a byte stream."""
342
total_len, chunks = self.to_chunks()
343
return ''.join(chunks)
345
def _dump(self, include_text=False):
346
"""Take this block, and spit out a human-readable structure.
348
:param include_text: Inserts also include text bits, chose whether you
349
want this displayed in the dump or not.
350
:return: A dump of the given block. The layout is something like:
351
[('f', length), ('d', delta_length, text_length, [delta_info])]
352
delta_info := [('i', num_bytes, text), ('c', offset, num_bytes),
355
self._ensure_content()
358
while pos < self._content_length:
359
kind = self._content[pos]
361
if kind not in ('f', 'd'):
362
raise ValueError('invalid kind character: %r' % (kind,))
363
content_len, len_len = decode_base128_int(
364
self._content[pos:pos + 5])
366
if content_len + pos > self._content_length:
367
raise ValueError('invalid content_len %d for record @ pos %d'
368
% (content_len, pos - len_len - 1))
369
if kind == 'f': # Fulltext
371
text = self._content[pos:pos+content_len]
372
result.append(('f', content_len, text))
374
result.append(('f', content_len))
375
elif kind == 'd': # Delta
376
delta_content = self._content[pos:pos+content_len]
378
# The first entry in a delta is the decompressed length
379
decomp_len, delta_pos = decode_base128_int(delta_content)
380
result.append(('d', content_len, decomp_len, delta_info))
382
while delta_pos < content_len:
383
c = ord(delta_content[delta_pos])
387
delta_pos) = decode_copy_instruction(delta_content, c,
390
text = self._content[offset:offset+length]
391
delta_info.append(('c', offset, length, text))
393
delta_info.append(('c', offset, length))
394
measured_len += length
397
txt = delta_content[delta_pos:delta_pos+c]
400
delta_info.append(('i', c, txt))
403
if delta_pos != content_len:
404
raise ValueError('Delta consumed a bad number of bytes:'
405
' %d != %d' % (delta_pos, content_len))
406
if measured_len != decomp_len:
407
raise ValueError('Delta claimed fulltext was %d bytes, but'
408
' extraction resulted in %d bytes'
409
% (decomp_len, measured_len))
414
class _LazyGroupCompressFactory(object):
415
"""Yield content from a GroupCompressBlock on demand."""
417
def __init__(self, key, parents, manager, start, end, first):
418
"""Create a _LazyGroupCompressFactory
420
:param key: The key of just this record
421
:param parents: The parents of this key (possibly None)
422
:param gc_block: A GroupCompressBlock object
423
:param start: Offset of the first byte for this record in the
425
:param end: Offset of the byte just after the end of this record
426
(ie, bytes = content[start:end])
427
:param first: Is this the first Factory for the given block?
430
self.parents = parents
432
# Note: This attribute coupled with Manager._factories creates a
433
# reference cycle. Perhaps we would rather use a weakref(), or
434
# find an appropriate time to release the ref. After the first
435
# get_bytes_as call? After Manager.get_record_stream() returns
437
self._manager = manager
439
self.storage_kind = 'groupcompress-block'
441
self.storage_kind = 'groupcompress-block-ref'
447
return '%s(%s, first=%s)' % (self.__class__.__name__,
448
self.key, self._first)
450
def get_bytes_as(self, storage_kind):
451
if storage_kind == self.storage_kind:
453
# wire bytes, something...
454
return self._manager._wire_bytes()
457
if storage_kind in ('fulltext', 'chunked'):
458
if self._bytes is None:
459
# Grab and cache the raw bytes for this entry
460
# and break the ref-cycle with _manager since we don't need it
462
self._manager._prepare_for_extract()
463
block = self._manager._block
464
self._bytes = block.extract(self.key, self._start, self._end)
465
# There are code paths that first extract as fulltext, and then
466
# extract as storage_kind (smart fetch). So we don't break the
467
# refcycle here, but instead in manager.get_record_stream()
468
if storage_kind == 'fulltext':
472
raise errors.UnavailableRepresentation(self.key, storage_kind,
476
class _LazyGroupContentManager(object):
477
"""This manages a group of _LazyGroupCompressFactory objects."""
479
_max_cut_fraction = 0.75 # We allow a block to be trimmed to 75% of
480
# current size, and still be considered
482
_full_block_size = 4*1024*1024
483
_full_mixed_block_size = 2*1024*1024
484
_full_enough_block_size = 3*1024*1024 # size at which we won't repack
485
_full_enough_mixed_block_size = 2*768*1024 # 1.5MB
487
def __init__(self, block):
489
# We need to preserve the ordering
493
def add_factory(self, key, parents, start, end):
494
if not self._factories:
498
# Note that this creates a reference cycle....
499
factory = _LazyGroupCompressFactory(key, parents, self,
500
start, end, first=first)
501
# max() works here, but as a function call, doing a compare seems to be
502
# significantly faster, timeit says 250ms for max() and 100ms for the
504
if end > self._last_byte:
505
self._last_byte = end
506
self._factories.append(factory)
508
def get_record_stream(self):
509
"""Get a record for all keys added so far."""
510
for factory in self._factories:
512
# Break the ref-cycle
513
factory._bytes = None
514
factory._manager = None
515
# TODO: Consider setting self._factories = None after the above loop,
516
# as it will break the reference cycle
518
def _trim_block(self, last_byte):
519
"""Create a new GroupCompressBlock, with just some of the content."""
520
# None of the factories need to be adjusted, because the content is
521
# located in an identical place. Just that some of the unreferenced
522
# trailing bytes are stripped
523
trace.mutter('stripping trailing bytes from groupcompress block'
524
' %d => %d', self._block._content_length, last_byte)
525
new_block = GroupCompressBlock()
526
self._block._ensure_content(last_byte)
527
new_block.set_content(self._block._content[:last_byte])
528
self._block = new_block
530
def _rebuild_block(self):
531
"""Create a new GroupCompressBlock with only the referenced texts."""
532
compressor = GroupCompressor()
534
old_length = self._block._content_length
536
for factory in self._factories:
537
bytes = factory.get_bytes_as('fulltext')
538
(found_sha1, start_point, end_point,
539
type) = compressor.compress(factory.key, bytes, factory.sha1)
540
# Now update this factory with the new offsets, etc
541
factory.sha1 = found_sha1
542
factory._start = start_point
543
factory._end = end_point
544
self._last_byte = end_point
545
new_block = compressor.flush()
546
# TODO: Should we check that new_block really *is* smaller than the old
547
# block? It seems hard to come up with a method that it would
548
# expand, since we do full compression again. Perhaps based on a
549
# request that ends up poorly ordered?
550
delta = time.time() - tstart
551
self._block = new_block
552
trace.mutter('creating new compressed block on-the-fly in %.3fs'
553
' %d bytes => %d bytes', delta, old_length,
554
self._block._content_length)
556
def _prepare_for_extract(self):
557
"""A _LazyGroupCompressFactory is about to extract to fulltext."""
558
# We expect that if one child is going to fulltext, all will be. This
559
# helps prevent all of them from extracting a small amount at a time.
560
# Which in itself isn't terribly expensive, but resizing 2MB 32kB at a
561
# time (self._block._content) is a little expensive.
562
self._block._ensure_content(self._last_byte)
564
def _check_rebuild_action(self):
565
"""Check to see if our block should be repacked."""
568
for factory in self._factories:
569
total_bytes_used += factory._end - factory._start
570
if last_byte_used < factory._end:
571
last_byte_used = factory._end
572
# If we are using more than half of the bytes from the block, we have
573
# nothing else to check
574
if total_bytes_used * 2 >= self._block._content_length:
575
return None, last_byte_used, total_bytes_used
576
# We are using less than 50% of the content. Is the content we are
577
# using at the beginning of the block? If so, we can just trim the
578
# tail, rather than rebuilding from scratch.
579
if total_bytes_used * 2 > last_byte_used:
580
return 'trim', last_byte_used, total_bytes_used
582
# We are using a small amount of the data, and it isn't just packed
583
# nicely at the front, so rebuild the content.
584
# Note: This would be *nicer* as a strip-data-from-group, rather than
585
# building it up again from scratch
586
# It might be reasonable to consider the fulltext sizes for
587
# different bits when deciding this, too. As you may have a small
588
# fulltext, and a trivial delta, and you are just trading around
589
# for another fulltext. If we do a simple 'prune' you may end up
590
# expanding many deltas into fulltexts, as well.
591
# If we build a cheap enough 'strip', then we could try a strip,
592
# if that expands the content, we then rebuild.
593
return 'rebuild', last_byte_used, total_bytes_used
595
def check_is_well_utilized(self):
596
"""Is the current block considered 'well utilized'?
598
This heuristic asks if the current block considers itself to be a fully
599
developed group, rather than just a loose collection of data.
601
if len(self._factories) == 1:
602
# A block of length 1 could be improved by combining with other
603
# groups - don't look deeper. Even larger than max size groups
604
# could compress well with adjacent versions of the same thing.
606
action, last_byte_used, total_bytes_used = self._check_rebuild_action()
607
block_size = self._block._content_length
608
if total_bytes_used < block_size * self._max_cut_fraction:
609
# This block wants to trim itself small enough that we want to
610
# consider it under-utilized.
612
# TODO: This code is meant to be the twin of _insert_record_stream's
613
# 'start_new_block' logic. It would probably be better to factor
614
# out that logic into a shared location, so that it stays
616
# We currently assume a block is properly utilized whenever it is >75%
617
# of the size of a 'full' block. In normal operation, a block is
618
# considered full when it hits 4MB of same-file content. So any block
619
# >3MB is 'full enough'.
620
# The only time this isn't true is when a given block has large-object
621
# content. (a single file >4MB, etc.)
622
# Under these circumstances, we allow a block to grow to
623
# 2 x largest_content. Which means that if a given block had a large
624
# object, it may actually be under-utilized. However, given that this
625
# is 'pack-on-the-fly' it is probably reasonable to not repack large
626
# content blobs on-the-fly. Note that because we return False for all
627
# 1-item blobs, we will repack them; we may wish to reevaluate our
628
# treatment of large object blobs in the future.
629
if block_size >= self._full_enough_block_size:
631
# If a block is <3MB, it still may be considered 'full' if it contains
632
# mixed content. The current rule is 2MB of mixed content is considered
633
# full. So check to see if this block contains mixed content, and
634
# set the threshold appropriately.
636
for factory in self._factories:
637
prefix = factory.key[:-1]
638
if common_prefix is None:
639
common_prefix = prefix
640
elif prefix != common_prefix:
641
# Mixed content, check the size appropriately
642
if block_size >= self._full_enough_mixed_block_size:
645
# The content failed both the mixed check and the single-content check
646
# so obviously it is not fully utilized
647
# TODO: there is one other constraint that isn't being checked
648
# namely, that the entries in the block are in the appropriate
649
# order. For example, you could insert the entries in exactly
650
# reverse groupcompress order, and we would think that is ok.
651
# (all the right objects are in one group, and it is fully
652
# utilized, etc.) For now, we assume that case is rare,
653
# especially since we should always fetch in 'groupcompress'
657
def _check_rebuild_block(self):
658
action, last_byte_used, total_bytes_used = self._check_rebuild_action()
662
self._trim_block(last_byte_used)
663
elif action == 'rebuild':
664
self._rebuild_block()
666
raise ValueError('unknown rebuild action: %r' % (action,))
668
def _wire_bytes(self):
669
"""Return a byte stream suitable for transmitting over the wire."""
670
self._check_rebuild_block()
671
# The outer block starts with:
672
# 'groupcompress-block\n'
673
# <length of compressed key info>\n
674
# <length of uncompressed info>\n
675
# <length of gc block>\n
678
lines = ['groupcompress-block\n']
679
# The minimal info we need is the key, the start offset, and the
680
# parents. The length and type are encoded in the record itself.
681
# However, passing in the other bits makes it easier. The list of
682
# keys, and the start offset, the length
684
# 1 line with parents, '' for ()
685
# 1 line for start offset
686
# 1 line for end byte
688
for factory in self._factories:
689
key_bytes = '\x00'.join(factory.key)
690
parents = factory.parents
692
parent_bytes = 'None:'
694
parent_bytes = '\t'.join('\x00'.join(key) for key in parents)
695
record_header = '%s\n%s\n%d\n%d\n' % (
696
key_bytes, parent_bytes, factory._start, factory._end)
697
header_lines.append(record_header)
698
# TODO: Can we break the refcycle at this point and set
699
# factory._manager = None?
700
header_bytes = ''.join(header_lines)
702
header_bytes_len = len(header_bytes)
703
z_header_bytes = zlib.compress(header_bytes)
705
z_header_bytes_len = len(z_header_bytes)
706
block_bytes_len, block_chunks = self._block.to_chunks()
707
lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
709
lines.append(z_header_bytes)
710
lines.extend(block_chunks)
711
del z_header_bytes, block_chunks
712
# TODO: This is a point where we will double the memory consumption. To
713
# avoid this, we probably have to switch to a 'chunked' api
714
return ''.join(lines)
717
def from_bytes(cls, bytes):
718
# TODO: This does extra string copying, probably better to do it a
719
# different way. At a minimum this creates 2 copies of the
721
(storage_kind, z_header_len, header_len,
722
block_len, rest) = bytes.split('\n', 4)
724
if storage_kind != 'groupcompress-block':
725
raise ValueError('Unknown storage kind: %s' % (storage_kind,))
726
z_header_len = int(z_header_len)
727
if len(rest) < z_header_len:
728
raise ValueError('Compressed header len shorter than all bytes')
729
z_header = rest[:z_header_len]
730
header_len = int(header_len)
731
header = zlib.decompress(z_header)
732
if len(header) != header_len:
733
raise ValueError('invalid length for decompressed bytes')
735
block_len = int(block_len)
736
if len(rest) != z_header_len + block_len:
737
raise ValueError('Invalid length for block')
738
block_bytes = rest[z_header_len:]
740
# So now we have a valid GCB, we just need to parse the factories that
742
header_lines = header.split('\n')
744
last = header_lines.pop()
746
raise ValueError('header lines did not end with a trailing'
748
if len(header_lines) % 4 != 0:
749
raise ValueError('The header was not an even multiple of 4 lines')
750
block = GroupCompressBlock.from_bytes(block_bytes)
753
for start in xrange(0, len(header_lines), 4):
755
key = tuple(header_lines[start].split('\x00'))
756
parents_line = header_lines[start+1]
757
if parents_line == 'None:':
760
parents = tuple([tuple(segment.split('\x00'))
761
for segment in parents_line.split('\t')
763
start_offset = int(header_lines[start+2])
764
end_offset = int(header_lines[start+3])
765
result.add_factory(key, parents, start_offset, end_offset)
769
def network_block_to_records(storage_kind, bytes, line_end):
770
if storage_kind != 'groupcompress-block':
771
raise ValueError('Unknown storage kind: %s' % (storage_kind,))
772
manager = _LazyGroupContentManager.from_bytes(bytes)
773
return manager.get_record_stream()
776
class _CommonGroupCompressor(object):
779
"""Create a GroupCompressor."""
123
782
self.endpoint = 0
124
783
self.input_bytes = 0
125
self.line_locations = self._equivalence_table_class([])
126
self.lines = self.line_locations.lines
127
784
self.labels_deltas = {}
129
def get_matching_blocks(self, lines):
130
"""Return an the ranges in lines which match self.lines.
132
:param lines: lines to compress
133
:return: A list of (old_start, new_start, length) tuples which reflect
134
a region in self.lines that is present in lines. The last element
135
of the list is always (old_len, new_len, 0) to provide a end point
136
for generating instructions from the matching blocks list.
140
line_locations = self.line_locations
141
line_locations.set_right_lines(lines)
142
# We either copy a range (while there are reusable lines) or we
143
# insert new lines. To find reusable lines we traverse
148
result_append = result.append
150
block, pos, locations = _get_longest_match(line_locations, pos,
152
if block is not None:
154
result_append((len(self.lines), len(lines), 0))
157
def compress(self, key, lines, expected_sha):
785
self._delta_index = None # Set by the children
786
self._block = GroupCompressBlock()
788
def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
158
789
"""Compress lines with label key.
160
791
:param key: A key tuple. It is stored in the output
161
792
for identification of the text during decompression. If the last
162
793
element is 'None' it is replaced with the sha1 of the text -
163
794
e.g. sha1:xxxxxxx.
164
:param lines: The lines to be compressed. Must be split
165
on \n, with the \n preserved.'
166
:param expected_sha: If non-None, the sha the lines are blieved to
795
:param bytes: The bytes to be compressed
796
:param expected_sha: If non-None, the sha the lines are believed to
167
797
have. During compression the sha is calculated; a mismatch will
169
:return: The sha1 of lines, and the number of bytes accumulated in
170
the group output so far.
799
:param nostore_sha: If the computed sha1 sum matches, we will raise
800
ExistingContent rather than adding the text.
801
:param soft: Do a 'soft' compression. This means that we require larger
802
ranges to match to be considered for a copy command.
804
:return: The sha1 of lines, the start and end offsets in the delta, and
805
the type ('fulltext' or 'delta').
807
:seealso VersionedFiles.add_lines:
172
sha1 = sha_strings(lines)
809
if not bytes: # empty, like a dir entry, etc
810
if nostore_sha == _null_sha1:
811
raise errors.ExistingContent()
812
return _null_sha1, 0, 0, 'fulltext'
813
# we assume someone knew what they were doing when they passed it in
814
if expected_sha is not None:
817
sha1 = osutils.sha_string(bytes)
818
if nostore_sha is not None:
819
if sha1 == nostore_sha:
820
raise errors.ExistingContent()
173
821
if key[-1] is None:
174
822
key = key[:-1] + ('sha1:' + sha1,)
175
label = '\x00'.join(key)
176
# setup good encoding for trailing \n support.
177
if not lines or lines[-1].endswith('\n'):
180
lines[-1] = lines[-1] + '\n'
182
new_lines.append('label: %s\n' % label)
183
new_lines.append('sha1: %s\n' % sha1)
184
index_lines = [False, False]
188
flush_range = self.flush_range
190
blocks = self.get_matching_blocks(lines)
192
# We either copy a range (while there are reusable lines) or we
193
# insert new lines. To find reusable lines we traverse
194
for old_start, new_start, range_len in blocks:
195
if new_start != current_pos:
196
# non-matching region
197
flush_range(current_pos, None, new_start - current_pos,
198
lines, new_lines, index_lines)
199
current_pos = new_start + range_len
202
flush_range(new_start, old_start, range_len, lines,
203
new_lines, index_lines)
204
delta_start = (self.endpoint, len(self.lines))
205
self.output_lines(new_lines, index_lines)
206
trim_encoding_newline(lines)
207
self.input_bytes += sum(map(len, lines))
208
delta_end = (self.endpoint, len(self.lines))
209
self.labels_deltas[key] = (delta_start, delta_end)
210
return sha1, self.endpoint
824
start, end, type = self._compress(key, bytes, len(bytes) / 2, soft)
825
return sha1, start, end, type
827
def _compress(self, key, bytes, max_delta_size, soft=False):
828
"""Compress lines with label key.
830
:param key: A key tuple. It is stored in the output for identification
831
of the text during decompression.
833
:param bytes: The bytes to be compressed
835
:param max_delta_size: The size above which we issue a fulltext instead
838
:param soft: Do a 'soft' compression. This means that we require larger
839
ranges to match to be considered for a copy command.
841
:return: The sha1 of lines, the start and end offsets in the delta, and
842
the type ('fulltext' or 'delta').
844
raise NotImplementedError(self._compress)
212
846
def extract(self, key):
213
847
"""Extract a key previously added to the compressor.
215
849
:param key: The key to extract.
216
850
:return: An iterable over bytes and the sha1.
218
delta_details = self.labels_deltas[key]
219
delta_lines = self.lines[delta_details[0][1]:delta_details[1][1]]
220
label, sha1, delta = parse(delta_lines)
222
raise AssertionError("wrong key: %r, wanted %r" % (label, key))
223
# Perhaps we want to keep the line offsets too in memory at least?
224
lines = apply_delta(''.join(self.lines), delta)
225
sha1 = sha_strings(lines)
228
def flush_range(self, range_start, copy_start, range_len, lines, new_lines, index_lines):
229
insert_instruction = "i,%d\n" % range_len
230
if copy_start is not None:
231
# range stops, flush and start a new copy range
232
stop_byte = self.line_offsets[copy_start + range_len - 1]
236
start_byte = self.line_offsets[copy_start - 1]
237
bytes = stop_byte - start_byte
238
copy_control_instruction = "c,%d,%d\n" % (start_byte, bytes)
239
if (bytes + len(insert_instruction) >
240
len(copy_control_instruction)):
241
new_lines.append(copy_control_instruction)
242
index_lines.append(False)
244
# not copying, or inserting is shorter than copying, so insert.
245
new_lines.append(insert_instruction)
246
new_lines.extend(lines[range_start:range_start+range_len])
247
index_lines.append(False)
248
index_lines.extend([copy_start is None]*range_len)
250
def output_lines(self, new_lines, index_lines):
251
"""Output some lines.
253
:param new_lines: The lines to output.
254
:param index_lines: A boolean flag for each line - when True, index
257
# indexed_newlines = [idx for idx, val in enumerate(index_lines)
258
# if val and new_lines[idx] == '\n']
259
# if indexed_newlines:
260
# import pdb; pdb.set_trace()
261
endpoint = self.endpoint
262
self.line_locations.extend_lines(new_lines, index_lines)
263
for line in new_lines:
264
endpoint += len(line)
265
self.line_offsets.append(endpoint)
266
self.endpoint = endpoint
852
(start_byte, start_chunk, end_byte, end_chunk) = self.labels_deltas[key]
853
delta_chunks = self.chunks[start_chunk:end_chunk]
854
stored_bytes = ''.join(delta_chunks)
855
if stored_bytes[0] == 'f':
856
fulltext_len, offset = decode_base128_int(stored_bytes[1:10])
857
data_len = fulltext_len + 1 + offset
858
if data_len != len(stored_bytes):
859
raise ValueError('Index claimed fulltext len, but stored bytes'
861
% (len(stored_bytes), data_len))
862
bytes = stored_bytes[offset + 1:]
864
# XXX: This is inefficient at best
865
source = ''.join(self.chunks[:start_chunk])
866
if stored_bytes[0] != 'd':
867
raise ValueError('Unknown content kind, bytes claim %s'
868
% (stored_bytes[0],))
869
delta_len, offset = decode_base128_int(stored_bytes[1:10])
870
data_len = delta_len + 1 + offset
871
if data_len != len(stored_bytes):
872
raise ValueError('Index claimed delta len, but stored bytes'
874
% (len(stored_bytes), data_len))
875
bytes = apply_delta(source, stored_bytes[offset + 1:])
876
bytes_sha1 = osutils.sha_string(bytes)
877
return bytes, bytes_sha1
880
"""Finish this group, creating a formatted stream.
882
After calling this, the compressor should no longer be used
884
self._block.set_chunked_content(self.chunks, self.endpoint)
886
self._delta_index = None
890
"""Call this if you want to 'revoke' the last compression.
892
After this, the data structures will be rolled back, but you cannot do
895
self._delta_index = None
896
del self.chunks[self._last[0]:]
897
self.endpoint = self._last[1]
269
901
"""Return the overall compression ratio."""
270
902
return float(self.input_bytes) / float(self.endpoint)
273
def make_pack_factory(graph, delta, keylength):
905
class PythonGroupCompressor(_CommonGroupCompressor):
908
"""Create a GroupCompressor.
910
Used only if the pyrex version is not available.
912
super(PythonGroupCompressor, self).__init__()
913
self._delta_index = LinesDeltaIndex([])
914
# The actual content is managed by LinesDeltaIndex
915
self.chunks = self._delta_index.lines
917
def _compress(self, key, bytes, max_delta_size, soft=False):
918
"""see _CommonGroupCompressor._compress"""
919
input_len = len(bytes)
920
new_lines = osutils.split_lines(bytes)
921
out_lines, index_lines = self._delta_index.make_delta(
922
new_lines, bytes_length=input_len, soft=soft)
923
delta_length = sum(map(len, out_lines))
924
if delta_length > max_delta_size:
925
# The delta is longer than the fulltext, insert a fulltext
927
out_lines = ['f', encode_base128_int(input_len)]
928
out_lines.extend(new_lines)
929
index_lines = [False, False]
930
index_lines.extend([True] * len(new_lines))
932
# this is a worthy delta, output it
935
# Update the delta_length to include those two encoded integers
936
out_lines[1] = encode_base128_int(delta_length)
938
start = self.endpoint
939
chunk_start = len(self.chunks)
940
self._last = (chunk_start, self.endpoint)
941
self._delta_index.extend_lines(out_lines, index_lines)
942
self.endpoint = self._delta_index.endpoint
943
self.input_bytes += input_len
944
chunk_end = len(self.chunks)
945
self.labels_deltas[key] = (start, chunk_start,
946
self.endpoint, chunk_end)
947
return start, self.endpoint, type
950
class PyrexGroupCompressor(_CommonGroupCompressor):
951
"""Produce a serialised group of compressed texts.
953
It contains code very similar to SequenceMatcher because of having a similar
954
task. However some key differences apply:
955
- there is no junk, we want a minimal edit not a human readable diff.
956
- we don't filter very common lines (because we don't know where a good
957
range will start, and after the first text we want to be emitting minmal
959
- we chain the left side, not the right side
960
- we incrementally update the adjacency matrix as new lines are provided.
961
- we look for matches in all of the left side, so the routine which does
962
the analagous task of find_longest_match does not need to filter on the
967
super(PyrexGroupCompressor, self).__init__()
968
self._delta_index = DeltaIndex()
970
def _compress(self, key, bytes, max_delta_size, soft=False):
971
"""see _CommonGroupCompressor._compress"""
972
input_len = len(bytes)
973
# By having action/label/sha1/len, we can parse the group if the index
974
# was ever destroyed, we have the key in 'label', we know the final
975
# bytes are valid from sha1, and we know where to find the end of this
976
# record because of 'len'. (the delta record itself will store the
977
# total length for the expanded record)
978
# 'len: %d\n' costs approximately 1% increase in total data
979
# Having the labels at all costs us 9-10% increase, 38% increase for
980
# inventory pages, and 5.8% increase for text pages
981
# new_chunks = ['label:%s\nsha1:%s\n' % (label, sha1)]
982
if self._delta_index._source_offset != self.endpoint:
983
raise AssertionError('_source_offset != endpoint'
984
' somehow the DeltaIndex got out of sync with'
986
delta = self._delta_index.make_delta(bytes, max_delta_size)
989
enc_length = encode_base128_int(len(bytes))
990
len_mini_header = 1 + len(enc_length)
991
self._delta_index.add_source(bytes, len_mini_header)
992
new_chunks = ['f', enc_length, bytes]
995
enc_length = encode_base128_int(len(delta))
996
len_mini_header = 1 + len(enc_length)
997
new_chunks = ['d', enc_length, delta]
998
self._delta_index.add_delta_source(delta, len_mini_header)
1000
start = self.endpoint
1001
chunk_start = len(self.chunks)
1002
# Now output these bytes
1003
self._output_chunks(new_chunks)
1004
self.input_bytes += input_len
1005
chunk_end = len(self.chunks)
1006
self.labels_deltas[key] = (start, chunk_start,
1007
self.endpoint, chunk_end)
1008
if not self._delta_index._source_offset == self.endpoint:
1009
raise AssertionError('the delta index is out of sync'
1010
'with the output lines %s != %s'
1011
% (self._delta_index._source_offset, self.endpoint))
1012
return start, self.endpoint, type
1014
def _output_chunks(self, new_chunks):
1015
"""Output some chunks.
1017
:param new_chunks: The chunks to output.
1019
self._last = (len(self.chunks), self.endpoint)
1020
endpoint = self.endpoint
1021
self.chunks.extend(new_chunks)
1022
endpoint += sum(map(len, new_chunks))
1023
self.endpoint = endpoint
1026
def make_pack_factory(graph, delta, keylength, inconsistency_fatal=True):
274
1027
"""Create a factory for creating a pack based groupcompress.
276
1029
This is only functional enough to run interface tests, it doesn't try to
277
1030
provide a full pack environment.
279
1032
:param graph: Store a graph.
280
1033
:param delta: Delta compress contents.
281
1034
:param keylength: How long should keys be.
283
1036
def factory(transport):
284
parents = graph or delta
288
1041
graph_index = BTreeBuilder(reference_lists=ref_length,
289
1042
key_elements=keylength)
290
1043
stream = transport.open_write_stream('newpack')
291
1044
writer = pack.ContainerWriter(stream.write)
293
1046
index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
294
add_callback=graph_index.add_nodes)
295
access = _DirectPackAccess({})
1047
add_callback=graph_index.add_nodes,
1048
inconsistency_fatal=inconsistency_fatal)
1049
access = knit._DirectPackAccess({})
296
1050
access.set_writer(writer, graph_index, (transport, 'newpack'))
297
1051
result = GroupCompressVersionedFiles(index, access, delta)
298
1052
result.stream = stream