~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/knit.py

Join knits with the original gzipped data avoiding recompression.

Show diffs side-by-side

added added

removed removed

Lines of Context:
925
925
                pass
926
926
        return self._file
927
927
 
928
 
    def add_record(self, version_id, digest, lines):
929
 
        """Write new text record to disk.  Returns the position in the
930
 
        file where it was written."""
 
928
    def _record_to_data(self, version_id, digest, lines):
 
929
        """Convert version_id, digest, lines into a raw data block.
 
930
        
 
931
        :return: (len, a StringIO instance with the raw data ready to read.)
 
932
        """
931
933
        sio = StringIO()
932
934
        data_file = GzipFile(None, mode='wb', fileobj=sio)
933
935
        print >>data_file, "version %s %d %s" % (version_id, len(lines), digest)
934
936
        data_file.writelines(lines)
935
937
        print >>data_file, "end %s\n" % version_id
936
938
        data_file.close()
 
939
        length= sio.tell()
 
940
        sio.seek(0)
 
941
        return length, sio
937
942
 
 
943
    def add_raw_record(self, raw_data):
 
944
        """Append a prepared record to the data file."""
 
945
        start_pos = self._transport.append(self._filename, StringIO(raw_data))
 
946
        return start_pos, len(raw_data)
 
947
        
 
948
    def add_record(self, version_id, digest, lines):
 
949
        """Write new text record to disk.  Returns the position in the
 
950
        file where it was written."""
 
951
        size, sio = self._record_to_data(version_id, digest, lines)
938
952
        # cache
939
953
        self._records[version_id] = (digest, lines)
940
 
 
941
 
        content = sio.getvalue()
942
 
        sio.seek(0)
 
954
        # write to disk
943
955
        start_pos = self._transport.append(self._filename, sio)
944
 
        return start_pos, len(content)
945
 
 
946
 
    def _parse_record(self, version_id, data):
947
 
        df = GzipFile(mode='rb', fileobj=StringIO(data))
 
956
        return start_pos, size
 
957
 
 
958
    def _parse_record_header(self, version_id, raw_data):
 
959
        """Parse a record header for consistency.
 
960
 
 
961
        :return: the header and the decompressor stream.
 
962
                 as (stream, header_record)
 
963
        """
 
964
        df = GzipFile(mode='rb', fileobj=StringIO(raw_data))
948
965
        rec = df.readline().split()
949
966
        if len(rec) != 4:
950
 
            raise KnitCorrupt(self._filename, 'unexpected number of records')
 
967
            raise KnitCorrupt(self._filename, 'unexpected number of elements in record header')
951
968
        if rec[1] != version_id:
952
969
            raise KnitCorrupt(self._filename, 
953
970
                              'unexpected version, wanted %r, got %r' % (
954
971
                                version_id, rec[1]))
 
972
        return df, rec
 
973
 
 
974
    def _parse_record(self, version_id, data):
 
975
        df, rec = self._parse_record_header(version_id, data)
955
976
        lines = int(rec[2])
956
977
        record_contents = self._read_record_contents(df, lines)
957
978
        l = df.readline()
958
979
        if l != 'end %s\n' % version_id:
959
980
            raise KnitCorrupt(self._filename, 'unexpected version end line %r, wanted %r' 
960
981
                        % (l, version_id))
 
982
        df.close()
961
983
        return record_contents, rec[3]
962
984
 
963
985
    def _read_record_contents(self, df, record_lines):
967
989
            r.append(df.readline())
968
990
        return r
969
991
 
 
992
    def read_records_iter_raw(self, records):
 
993
        """Read text records from data file and yield raw data.
 
994
 
 
995
        This unpacks enough of the text record to validate the id is
 
996
        as expected but thats all.
 
997
 
 
998
        It will actively recompress currently cached records on the
 
999
        basis that that is cheaper than I/O activity.
 
1000
        """
 
1001
        needed_records = []
 
1002
        for version_id, pos, size in records:
 
1003
            if version_id not in self._records:
 
1004
                needed_records.append((version_id, pos, size))
 
1005
 
 
1006
        # setup an iterator of the external records:
 
1007
        # uses readv so nice and fast we hope.
 
1008
        if len(needed_records):
 
1009
            # grab the disk data needed.
 
1010
            raw_records = self._transport.readv(self._filename,
 
1011
                [(pos, size) for version_id, pos, size in needed_records])
 
1012
 
 
1013
        for version_id, pos, size in records:
 
1014
            if version_id in self._records:
 
1015
                # compress a new version
 
1016
                size, sio = self._record_to_data(version_id,
 
1017
                                                 self._records[version_id][0],
 
1018
                                                 self._records[version_id][1])
 
1019
                yield version_id, sio.getvalue()
 
1020
            else:
 
1021
                pos, data = raw_records.next()
 
1022
                # validate the header
 
1023
                df, rec = self._parse_record_header(version_id, data)
 
1024
                df.close()
 
1025
                yield version_id, data
 
1026
 
 
1027
 
970
1028
    def read_records_iter(self, records):
971
1029
        """Read text records from data file and yield result.
972
1030
 
1071
1129
            version_list = [i for i in full_list if (not self.target.has_version(i)
1072
1130
                            and i in needed_versions)]
1073
1131
    
1074
 
            records = []
 
1132
            # plan the join:
 
1133
            copy_queue = []
 
1134
            copy_queue_records = []
 
1135
            copy_set = set()
1075
1136
            for version_id in version_list:
1076
 
                data_pos, data_size = self.source._index.get_position(version_id)
1077
 
                records.append((version_id, data_pos, data_size))
1078
 
    
1079
 
            count = 0
1080
 
            for version_id, lines, digest \
1081
 
                    in self.source._data.read_records_iter(records):
1082
1137
                options = self.source._index.get_options(version_id)
1083
1138
                parents = self.source._index.get_parents_with_ghosts(version_id)
1084
 
                
 
1139
                # check that its will be a consistent copy:
1085
1140
                for parent in parents:
1086
 
                    # if source has the parent, we must hav grabbed it first.
1087
 
                    assert (self.target.has_version(parent) or not
1088
 
                            self.source.has_version(parent))
1089
 
    
 
1141
                    # if source has the parent, we must :
 
1142
                    # * already have it or
 
1143
                    # * have it scheduled already
 
1144
                    # otherwise we dont care
 
1145
                    assert (self.target.has_version(parent) or
 
1146
                            parent in copy_set or
 
1147
                            not self.source.has_version(parent))
 
1148
                data_pos, data_size = self.source._index.get_position(version_id)
 
1149
                copy_queue_records.append((version_id, data_pos, data_size))
 
1150
                copy_queue.append((version_id, options, parents))
 
1151
                copy_set.add(version_id)
 
1152
 
 
1153
            # data suck the join:
 
1154
            count = 0
 
1155
            total = len(version_list)
 
1156
            # we want the raw gzip for bulk copying, but the record validated
 
1157
            # just enough to be sure its the right one.
 
1158
            # TODO: consider writev or write combining to reduce 
 
1159
            # death of a thousand cuts feeling.
 
1160
            for (version_id, raw_data), \
 
1161
                (version_id2, options, parents) in \
 
1162
                izip(self.source._data.read_records_iter_raw(copy_queue_records),
 
1163
                     copy_queue):
 
1164
                assert version_id == version_id2, 'logic error, inconsistent results'
1090
1165
                count = count + 1
1091
 
                pb.update("Joining knit", count, len(version_list))
1092
 
    
1093
 
                pos, size = self.target._data.add_record(version_id, digest, lines)
 
1166
                pb.update("Joining knit", count, total)
 
1167
                pos, size = self.target._data.add_raw_record(raw_data)
1094
1168
                self.target._index.add_version(version_id, options, pos, size, parents)
1095
 
    
 
1169
 
1096
1170
            for version in mismatched_versions:
 
1171
                # FIXME RBC 20060309 is this needed?
1097
1172
                n1 = set(self.target.get_parents_with_ghosts(version))
1098
1173
                n2 = set(self.source.get_parents_with_ghosts(version))
1099
1174
                # write a combined record to our history preserving the current