688
689
:return: An iterable over bytes and the sha1.
690
691
delta_details = self.labels_deltas[key]
691
delta_chunks = self.lines[delta_details[0][1]:delta_details[1][1]]
692
delta_chunks = self.chunks[delta_details[0][1]:delta_details[1][1]]
692
693
stored_bytes = ''.join(delta_chunks)
693
694
# TODO: Fix this, we shouldn't really be peeking here
694
695
entry = self._block._entries[key]
707
708
if entry.type != 'delta':
708
709
raise ValueError('Unknown entry type: %s' % (entry.type,))
709
710
# XXX: This is inefficient at best
710
source = ''.join(self.lines)
711
source = ''.join(self.chunks)
711
712
if stored_bytes[0] != 'd':
712
713
raise ValueError('Entry type claims delta, bytes claim %s'
713
714
% (stored_bytes[0],))
748
760
:param delta: If False, do not compress records.
750
762
super(PythonGroupCompressor, self).__init__()
751
self.line_locations = LinesDeltaIndex([])
752
self.lines = self.line_locations.lines
753
self._present_prefixes = set()
763
self._delta_index = LinesDeltaIndex([])
764
# The actual content is managed by LinesDeltaIndex
765
self.chunks = self._delta_index.lines
755
767
def _compress(self, key, bytes, sha1, max_delta_size, soft=False):
756
768
"""see _CommonGroupCompressor._compress"""
757
769
bytes_length = len(bytes)
758
770
new_lines = osutils.split_lines(bytes)
759
out_lines, index_lines = self.line_locations.make_delta(new_lines,
771
out_lines, index_lines = self._delta_index.make_delta(new_lines,
760
772
bytes_length=bytes_length, soft=soft)
761
773
delta_length = sum(map(len, out_lines))
762
774
if delta_length > max_delta_size:
777
789
self._block.add_entry(key, type=type, sha1=sha1,
778
790
start=self.endpoint, length=out_length)
779
791
start = self.endpoint # Before insertion
780
delta_start = (self.endpoint, len(self.lines))
781
self.output_lines(out_lines, index_lines)
792
delta_start = (start, len(self._delta_index.lines))
793
self._delta_index.extend_lines(out_lines, index_lines)
794
self.endpoint = self._delta_index.endpoint
782
795
self.input_bytes += bytes_length
783
delta_end = (self.endpoint, len(self.lines))
796
delta_end = (self.endpoint, len(self._delta_index.lines))
784
797
self.labels_deltas[key] = (delta_start, delta_end)
785
798
return sha1, start, self.endpoint, type, out_length
788
self._block.set_content(''.join(self.lines))
791
def output_lines(self, new_lines, index_lines):
792
"""Output some lines.
794
:param new_lines: The lines to output.
795
:param index_lines: A boolean flag for each line - when True, index
798
# indexed_newlines = [idx for idx, val in enumerate(index_lines)
799
# if val and new_lines[idx] == '\n']
800
# if indexed_newlines:
801
# import pdb; pdb.set_trace()
802
self._last = (len(self.lines), self.endpoint)
803
endpoint = self.endpoint
804
self.line_locations.extend_lines(new_lines, index_lines)
805
for line in new_lines:
806
endpoint += len(line)
807
self.endpoint = endpoint
810
801
class PyrexGroupCompressor(_CommonGroupCompressor):
811
802
"""Produce a serialised group of compressed texts.
862
852
self._block.add_entry(key, type=type, sha1=sha1,
863
853
start=self.endpoint, length=length)
864
854
start = self.endpoint # Before insertion
865
delta_start = (self.endpoint, len(self.lines))
867
self.output_chunks(new_chunks)
855
delta_start = (self.endpoint, len(self.chunks))
856
self._output_chunks(new_chunks)
868
857
self.input_bytes += input_len
869
delta_end = (self.endpoint, len(self.lines))
858
delta_end = (self.endpoint, len(self.chunks))
870
859
self.labels_deltas[key] = (delta_start, delta_end)
871
860
if not self._delta_index._source_offset == self.endpoint:
872
861
raise AssertionError('the delta index is out of sync'
874
863
% (self._delta_index._source_offset, self.endpoint))
875
864
return sha1, start, self.endpoint, type, length
877
def extract(self, key):
878
"""Extract a key previously added to the compressor.
880
:param key: The key to extract.
881
:return: An iterable over bytes and the sha1.
883
delta_details = self.labels_deltas[key]
884
delta_chunks = self.lines[delta_details[0][1]:delta_details[1][1]]
885
stored_bytes = ''.join(delta_chunks)
886
# TODO: Fix this, we shouldn't really be peeking here
887
entry = self._block._entries[key]
888
if entry.type == 'fulltext':
889
if stored_bytes[0] != 'f':
890
raise ValueError('Index claimed fulltext, but stored bytes'
891
' indicate %s' % (stored_bytes[0],))
892
fulltext_len, offset = decode_base128_int(stored_bytes[1:10])
893
if fulltext_len + 1 + offset != len(stored_bytes):
894
raise ValueError('Index claimed fulltext len, but stored bytes'
896
% (len(stored_bytes),
897
fulltext_len + 1 + offset))
898
bytes = stored_bytes[offset + 1:]
900
if entry.type != 'delta':
901
raise ValueError('Unknown entry type: %s' % (entry.type,))
902
# XXX: This is inefficient at best
903
source = ''.join(self.lines)
904
if stored_bytes[0] != 'd':
905
raise ValueError('Entry type claims delta, bytes claim %s'
906
% (stored_bytes[0],))
907
delta_len, offset = decode_base128_int(stored_bytes[1:10])
908
if delta_len + 1 + offset != len(stored_bytes):
909
raise ValueError('Index claimed delta len, but stored bytes'
911
% (len(stored_bytes),
912
delta_len + 1 + offset))
913
bytes = apply_delta(source, stored_bytes[offset + 1:])
914
bytes_sha1 = osutils.sha_string(bytes)
915
if entry.sha1 != bytes_sha1:
916
raise ValueError('Recorded sha1 != measured %s != %s'
917
% (entry.sha1, bytes_sha1))
918
return bytes, entry.sha1
921
"""Finish this group, creating a formatted stream."""
922
content = ''.join(self.lines)
924
self._block.set_content(content)
927
def output_chunks(self, new_chunks):
866
def _output_chunks(self, new_chunks):
928
867
"""Output some chunks.
930
869
:param new_chunks: The chunks to output.
932
self._last = (len(self.lines), self.endpoint)
871
self._last = (len(self.chunks), self.endpoint)
933
872
endpoint = self.endpoint
934
self.lines.extend(new_chunks)
873
self.chunks.extend(new_chunks)
935
874
endpoint += sum(map(len, new_chunks))
936
875
self.endpoint = endpoint
1504
1443
start_new_block = True
1506
1445
start_new_block = False
1507
# if type == 'fulltext':
1508
# # If this is the first text, we don't do anything
1509
# if self._compressor.num_keys > 1:
1510
# if prefix is not None and prefix != last_prefix:
1511
# # We just inserted a fulltext for a different prefix
1513
# if end_point > 512 * 1024:
1514
# start_new_block = True
1515
# # TODO: Consider packing several small texts together
1516
# # maybe only flush if end_point > some threshold
1517
# # if end_point > 512 * 1024 or len(bytes) <
1518
# # start_new_block = true
1520
# # We just added a fulltext, part of the same file-id
1521
# if (end_point > 2*1024*1024
1522
# and end_point > 5*max_fulltext_len):
1523
# start_new_block = True
1524
# last_fulltext_len = len(bytes)
1526
# delta_ratio = float(len(bytes)) / length
1527
# if delta_ratio < 3: # Not much compression
1528
# if end_point > 1*1024*1024:
1529
# start_new_block = True
1530
# elif delta_ratio < 10: # 10:1 compression
1531
# if end_point > 4*1024*1024:
1532
# start_new_block = True
1533
1446
last_prefix = prefix
1534
1447
if start_new_block:
1535
1448
self._compressor.pop_last()