~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/groupcompress.py

  • Committer: John Arbash Meinel
  • Date: 2009-03-27 21:47:08 UTC
  • mto: (3735.39.2 clean)
  • mto: This revision was merged to the branch mainline in revision 4280.
  • Revision ID: john@arbash-meinel.com-20090327214708-sy13r2m4cu0qn72k
Change the attribute from 'lines' to 'chunks' to make it more
obvious that entries aren't guaranteed to end with '\n'.
Also, remove more of the duplication of the 2 compressor implementations into
the common base class.

Show diffs side-by-side

added added

removed removed

Lines of Context:
610
610
 
611
611
    def __init__(self):
612
612
        """Create a GroupCompressor."""
613
 
        self.lines = []
 
613
        self.chunks = []
614
614
        self._last = None
615
615
        self.endpoint = 0
616
616
        self.input_bytes = 0
617
617
        self.labels_deltas = {}
 
618
        self._delta_index = None # Set by the children
618
619
        self._block = GroupCompressBlock()
619
620
 
620
621
    def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
688
689
        :return: An iterable over bytes and the sha1.
689
690
        """
690
691
        delta_details = self.labels_deltas[key]
691
 
        delta_chunks = self.lines[delta_details[0][1]:delta_details[1][1]]
 
692
        delta_chunks = self.chunks[delta_details[0][1]:delta_details[1][1]]
692
693
        stored_bytes = ''.join(delta_chunks)
693
694
        # TODO: Fix this, we shouldn't really be peeking here
694
695
        entry = self._block._entries[key]
707
708
            if entry.type != 'delta':
708
709
                raise ValueError('Unknown entry type: %s' % (entry.type,))
709
710
            # XXX: This is inefficient at best
710
 
            source = ''.join(self.lines)
 
711
            source = ''.join(self.chunks)
711
712
            if stored_bytes[0] != 'd':
712
713
                raise ValueError('Entry type claims delta, bytes claim %s'
713
714
                                 % (stored_bytes[0],))
724
725
                             % (entry.sha1, bytes_sha1))
725
726
        return bytes, entry.sha1
726
727
 
 
728
    def flush(self):
 
729
        """Finish this group, creating a formatted stream.
 
730
 
 
731
        After calling this, the compressor should no longer be used
 
732
        """
 
733
        content = ''.join(self.chunks)
 
734
        self.chunks = None
 
735
        self._delta_index = None
 
736
        self._block.set_content(content)
 
737
        return self._block
 
738
 
727
739
    def pop_last(self):
728
740
        """Call this if you want to 'revoke' the last compression.
729
741
 
731
743
        more compression.
732
744
        """
733
745
        self._delta_index = None
734
 
        del self.lines[self._last[0]:]
 
746
        del self.chunks[self._last[0]:]
735
747
        self.endpoint = self._last[1]
736
748
        self._last = None
737
749
 
748
760
        :param delta: If False, do not compress records.
749
761
        """
750
762
        super(PythonGroupCompressor, self).__init__()
751
 
        self.line_locations = LinesDeltaIndex([])
752
 
        self.lines = self.line_locations.lines
753
 
        self._present_prefixes = set()
 
763
        self._delta_index = LinesDeltaIndex([])
 
764
        # The actual content is managed by LinesDeltaIndex
 
765
        self.chunks = self._delta_index.lines
754
766
 
755
767
    def _compress(self, key, bytes, sha1, max_delta_size, soft=False):
756
768
        """see _CommonGroupCompressor._compress"""
757
769
        bytes_length = len(bytes)
758
770
        new_lines = osutils.split_lines(bytes)
759
 
        out_lines, index_lines = self.line_locations.make_delta(new_lines,
 
771
        out_lines, index_lines = self._delta_index.make_delta(new_lines,
760
772
            bytes_length=bytes_length, soft=soft)
761
773
        delta_length = sum(map(len, out_lines))
762
774
        if delta_length > max_delta_size:
777
789
        self._block.add_entry(key, type=type, sha1=sha1,
778
790
                              start=self.endpoint, length=out_length)
779
791
        start = self.endpoint # Before insertion
780
 
        delta_start = (self.endpoint, len(self.lines))
781
 
        self.output_lines(out_lines, index_lines)
 
792
        delta_start = (start, len(self._delta_index.lines))
 
793
        self._delta_index.extend_lines(out_lines, index_lines)
 
794
        self.endpoint = self._delta_index.endpoint
782
795
        self.input_bytes += bytes_length
783
 
        delta_end = (self.endpoint, len(self.lines))
 
796
        delta_end = (self.endpoint, len(self._delta_index.lines))
784
797
        self.labels_deltas[key] = (delta_start, delta_end)
785
798
        return sha1, start, self.endpoint, type, out_length
786
799
 
787
 
    def flush(self):
788
 
        self._block.set_content(''.join(self.lines))
789
 
        return self._block
790
 
 
791
 
    def output_lines(self, new_lines, index_lines):
792
 
        """Output some lines.
793
 
 
794
 
        :param new_lines: The lines to output.
795
 
        :param index_lines: A boolean flag for each line - when True, index
796
 
            that line.
797
 
        """
798
 
        # indexed_newlines = [idx for idx, val in enumerate(index_lines)
799
 
        #                          if val and new_lines[idx] == '\n']
800
 
        # if indexed_newlines:
801
 
        #     import pdb; pdb.set_trace()
802
 
        self._last = (len(self.lines), self.endpoint)
803
 
        endpoint = self.endpoint
804
 
        self.line_locations.extend_lines(new_lines, index_lines)
805
 
        for line in new_lines:
806
 
            endpoint += len(line)
807
 
        self.endpoint = endpoint
808
 
 
809
800
 
810
801
class PyrexGroupCompressor(_CommonGroupCompressor):
811
802
    """Produce a serialised group of compressed texts.
825
816
 
826
817
    def __init__(self):
827
818
        super(PyrexGroupCompressor, self).__init__()
828
 
        self.num_keys = 0
829
819
        self._delta_index = DeltaIndex()
830
820
 
831
821
    def _compress(self, key, bytes, sha1, max_delta_size, soft=False):
862
852
        self._block.add_entry(key, type=type, sha1=sha1,
863
853
                              start=self.endpoint, length=length)
864
854
        start = self.endpoint # Before insertion
865
 
        delta_start = (self.endpoint, len(self.lines))
866
 
        self.num_keys += 1
867
 
        self.output_chunks(new_chunks)
 
855
        delta_start = (self.endpoint, len(self.chunks))
 
856
        self._output_chunks(new_chunks)
868
857
        self.input_bytes += input_len
869
 
        delta_end = (self.endpoint, len(self.lines))
 
858
        delta_end = (self.endpoint, len(self.chunks))
870
859
        self.labels_deltas[key] = (delta_start, delta_end)
871
860
        if not self._delta_index._source_offset == self.endpoint:
872
861
            raise AssertionError('the delta index is out of sync'
874
863
                % (self._delta_index._source_offset, self.endpoint))
875
864
        return sha1, start, self.endpoint, type, length
876
865
 
877
 
    def extract(self, key):
878
 
        """Extract a key previously added to the compressor.
879
 
 
880
 
        :param key: The key to extract.
881
 
        :return: An iterable over bytes and the sha1.
882
 
        """
883
 
        delta_details = self.labels_deltas[key]
884
 
        delta_chunks = self.lines[delta_details[0][1]:delta_details[1][1]]
885
 
        stored_bytes = ''.join(delta_chunks)
886
 
        # TODO: Fix this, we shouldn't really be peeking here
887
 
        entry = self._block._entries[key]
888
 
        if entry.type == 'fulltext':
889
 
            if stored_bytes[0] != 'f':
890
 
                raise ValueError('Index claimed fulltext, but stored bytes'
891
 
                                 ' indicate %s' % (stored_bytes[0],))
892
 
            fulltext_len, offset = decode_base128_int(stored_bytes[1:10])
893
 
            if fulltext_len + 1 + offset != len(stored_bytes):
894
 
                raise ValueError('Index claimed fulltext len, but stored bytes'
895
 
                                 ' claim %s != %s'
896
 
                                 % (len(stored_bytes),
897
 
                                    fulltext_len + 1 + offset))
898
 
            bytes = stored_bytes[offset + 1:]
899
 
        else:
900
 
            if entry.type != 'delta':
901
 
                raise ValueError('Unknown entry type: %s' % (entry.type,))
902
 
            # XXX: This is inefficient at best
903
 
            source = ''.join(self.lines)
904
 
            if stored_bytes[0] != 'd':
905
 
                raise ValueError('Entry type claims delta, bytes claim %s'
906
 
                                 % (stored_bytes[0],))
907
 
            delta_len, offset = decode_base128_int(stored_bytes[1:10])
908
 
            if delta_len + 1 + offset != len(stored_bytes):
909
 
                raise ValueError('Index claimed delta len, but stored bytes'
910
 
                                 ' claim %s != %s'
911
 
                                 % (len(stored_bytes),
912
 
                                    delta_len + 1 + offset))
913
 
            bytes = apply_delta(source, stored_bytes[offset + 1:])
914
 
        bytes_sha1 = osutils.sha_string(bytes)
915
 
        if entry.sha1 != bytes_sha1:
916
 
            raise ValueError('Recorded sha1 != measured %s != %s'
917
 
                             % (entry.sha1, bytes_sha1))
918
 
        return bytes, entry.sha1
919
 
 
920
 
    def flush(self):
921
 
        """Finish this group, creating a formatted stream."""
922
 
        content = ''.join(self.lines)
923
 
        self.lines = None
924
 
        self._block.set_content(content)
925
 
        return self._block
926
 
 
927
 
    def output_chunks(self, new_chunks):
 
866
    def _output_chunks(self, new_chunks):
928
867
        """Output some chunks.
929
868
 
930
869
        :param new_chunks: The chunks to output.
931
870
        """
932
 
        self._last = (len(self.lines), self.endpoint)
 
871
        self._last = (len(self.chunks), self.endpoint)
933
872
        endpoint = self.endpoint
934
 
        self.lines.extend(new_chunks)
 
873
        self.chunks.extend(new_chunks)
935
874
        endpoint += sum(map(len, new_chunks))
936
875
        self.endpoint = endpoint
937
876
 
1504
1443
                start_new_block = True
1505
1444
            else:
1506
1445
                start_new_block = False
1507
 
            # if type == 'fulltext':
1508
 
            #     # If this is the first text, we don't do anything
1509
 
            #     if self._compressor.num_keys > 1:
1510
 
            #         if prefix is not None and prefix != last_prefix:
1511
 
            #             # We just inserted a fulltext for a different prefix
1512
 
            #             # (aka file-id).
1513
 
            #             if end_point > 512 * 1024:
1514
 
            #                 start_new_block = True
1515
 
            #             # TODO: Consider packing several small texts together
1516
 
            #             #       maybe only flush if end_point > some threshold
1517
 
            #             # if end_point > 512 * 1024 or len(bytes) <
1518
 
            #             #     start_new_block = true
1519
 
            #         else:
1520
 
            #             # We just added a fulltext, part of the same file-id
1521
 
            #             if (end_point > 2*1024*1024
1522
 
            #                 and end_point > 5*max_fulltext_len):
1523
 
            #                 start_new_block = True
1524
 
            #     last_fulltext_len = len(bytes)
1525
 
            # else:
1526
 
            #     delta_ratio = float(len(bytes)) / length
1527
 
            #     if delta_ratio < 3: # Not much compression
1528
 
            #         if end_point > 1*1024*1024:
1529
 
            #             start_new_block = True
1530
 
            #     elif delta_ratio < 10: # 10:1 compression
1531
 
            #         if end_point > 4*1024*1024:
1532
 
            #             start_new_block = True
1533
1446
            last_prefix = prefix
1534
1447
            if start_new_block:
1535
1448
                self._compressor.pop_last()
1783
1696
    apply_delta,
1784
1697
    encode_base128_int,
1785
1698
    decode_base128_int,
1786
 
    encode_copy_instruction,
1787
1699
    LinesDeltaIndex,
1788
1700
    )
1789
1701
try: