~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/knit.py

Merge knit utf8 and reannotate-less support.

Show diffs side-by-side

added added

removed removed

Lines of Context:
154
154
    annotated = True
155
155
 
156
156
    def parse_fulltext(self, content, version):
 
157
        """Convert fulltext to internal representation
 
158
 
 
159
        fulltext content is of the format
 
160
        revid(utf8) plaintext\n
 
161
        internal representation is of the format:
 
162
        (revid, plaintext)
 
163
        """
157
164
        lines = []
158
165
        for line in content:
159
166
            origin, text = line.split(' ', 1)
160
 
            lines.append((int(origin), text))
 
167
            lines.append((origin.decode('utf-8'), text))
161
168
        return KnitContent(lines)
162
169
 
163
170
    def parse_line_delta_iter(self, lines):
 
171
        """Convert a line based delta into internal representation.
 
172
 
 
173
        line delta is in the form of:
 
174
        intstart intend intcount
 
175
        1..count lines:
 
176
        revid(utf8) newline\n
 
177
        internal represnetation is
 
178
        (start, end, count, [1..count tuples (revid, newline)])
 
179
        """
164
180
        while lines:
165
181
            header = lines.pop(0)
166
182
            start, end, c = [int(n) for n in header.split(',')]
167
183
            contents = []
168
184
            for i in range(c):
169
185
                origin, text = lines.pop(0).split(' ', 1)
170
 
                contents.append((int(origin), text))
 
186
                contents.append((origin.decode('utf-8'), text))
171
187
            yield start, end, c, contents
172
188
 
173
189
    def parse_line_delta(self, lines, version):
174
190
        return list(self.parse_line_delta_iter(lines))
175
191
 
176
192
    def lower_fulltext(self, content):
177
 
        return ['%d %s' % (o, t) for o, t in content._lines]
 
193
        """convert a fulltext content record into a serializable form.
 
194
 
 
195
        see parse_fulltext which this inverts.
 
196
        """
 
197
        return ['%s %s' % (o.encode('utf-8'), t) for o, t in content._lines]
178
198
 
179
199
    def lower_line_delta(self, delta):
 
200
        """convert a delta into a serializable form.
 
201
 
 
202
        See parse_line_delta_iter which this inverts.
 
203
        """
180
204
        out = []
181
205
        for start, end, c, lines in delta:
182
206
            out.append('%d,%d,%d\n' % (start, end, c))
183
207
            for origin, text in lines:
184
 
                out.append('%d %s' % (origin, text))
 
208
                out.append('%s %s' % (origin.encode('utf-8'), text))
185
209
        return out
186
210
 
187
211
 
191
215
    annotated = False
192
216
 
193
217
    def parse_fulltext(self, content, version):
 
218
        """This parses an unannotated fulltext.
 
219
 
 
220
        Note that this is not a noop - the internal representation
 
221
        has (versionid, line) - its just a constant versionid.
 
222
        """
194
223
        return self.make(content, version)
195
224
 
196
225
    def parse_line_delta_iter(self, lines, version):
465
494
 
466
495
        Any versions not present will be converted into ghosts.
467
496
        """
468
 
        ghostless_parents = []
 
497
        present_parents = []
469
498
        ghosts = []
470
499
        for parent in parents:
471
500
            if not self.has_version(parent):
472
501
                ghosts.append(parent)
473
502
            else:
474
 
                ghostless_parents.append(parent)
 
503
                present_parents.append(parent)
475
504
 
476
 
        if delta and not len(ghostless_parents):
 
505
        if delta and not len(present_parents):
477
506
            delta = False
478
507
 
479
508
        digest = sha_strings(lines)
483
512
                options.append('no-eol')
484
513
                lines[-1] = lines[-1] + '\n'
485
514
 
486
 
        lines = self.factory.make(lines, len(self._index))
487
 
        if self.factory.annotated and len(ghostless_parents) > 0:
 
515
        lines = self.factory.make(lines, version_id)
 
516
        if self.factory.annotated and len(present_parents) > 0:
488
517
            # Merge annotations from parent texts if so is needed.
489
 
            self._merge_annotations(lines, ghostless_parents)
 
518
            self._merge_annotations(lines, present_parents)
490
519
 
491
 
        if len(ghostless_parents) and delta:
 
520
        if len(present_parents) and delta:
492
521
            # To speed the extract of texts the delta chain is limited
493
522
            # to a fixed number of deltas.  This should minimize both
494
523
            # I/O and the time spend applying deltas.
495
524
            count = 0
496
 
            delta_parents = ghostless_parents
 
525
            delta_parents = present_parents
497
526
            while count < 25:
498
527
                parent = delta_parents[0]
499
528
                method = self._index.get_method(parent)
506
535
 
507
536
        if delta:
508
537
            options.append('line-delta')
509
 
            content = self._get_content(ghostless_parents[0])
 
538
            content = self._get_content(present_parents[0])
510
539
            delta_hunks = content.line_delta(lines)
511
540
            store_lines = self.factory.lower_line_delta(delta_hunks)
512
541
        else:
587
616
        """See VersionedFile.annotate_iter."""
588
617
        content = self._get_content(version_id)
589
618
        for origin, text in content.annotate_iter():
590
 
            yield self._index.idx_to_name(origin), text
 
619
            yield origin, text
591
620
 
592
621
    def get_parents(self, version_id):
593
622
        """See VersionedFile.get_parents."""
617
646
        self._check_versions_present(versions)
618
647
        return self._index.get_ancestry_with_ghosts(versions)
619
648
 
620
 
    def _reannotate_line_delta(self, other, lines, new_version_id,
621
 
                               new_version_idx):
622
 
        """Re-annotate line-delta and return new delta."""
623
 
        new_delta = []
624
 
        for start, end, count, contents \
625
 
                in self.factory.parse_line_delta_iter(lines):
626
 
            new_lines = []
627
 
            for origin, line in contents:
628
 
                old_version_id = other._index.idx_to_name(origin)
629
 
                if old_version_id == new_version_id:
630
 
                    idx = new_version_idx
631
 
                else:
632
 
                    idx = self._index.lookup(old_version_id)
633
 
                new_lines.append((idx, line))
634
 
            new_delta.append((start, end, count, new_lines))
635
 
 
636
 
        return self.factory.lower_line_delta(new_delta)
637
 
 
638
 
    def _reannotate_fulltext(self, other, lines, new_version_id,
639
 
                             new_version_idx):
640
 
        """Re-annotate fulltext and return new version."""
641
 
        content = self.factory.parse_fulltext(lines, new_version_idx)
642
 
        new_lines = []
643
 
        for origin, line in content.annotate_iter():
644
 
            old_version_id = other._index.idx_to_name(origin)
645
 
            if old_version_id == new_version_id:
646
 
                idx = new_version_idx
647
 
            else:
648
 
                idx = self._index.lookup(old_version_id)
649
 
            new_lines.append((idx, line))
650
 
 
651
 
        return self.factory.lower_fulltext(KnitContent(new_lines))
652
 
 
653
649
    #@deprecated_method(zero_eight)
654
650
    def walk(self, version_ids):
655
651
        """See VersionedFile.walk."""
857
853
        """Add a version record to the index."""
858
854
        self._cache_version(version_id, options, pos, size, parents)
859
855
 
860
 
        content = "%s %s %s %s %s\n" % (version_id,
 
856
        content = "%s %s %s %s %s\n" % (version_id.encode('utf-8'),
861
857
                                        ','.join(options),
862
858
                                        pos,
863
859
                                        size,
864
860
                                        self._version_list_to_index(parents))
 
861
        assert isinstance(content, str), 'content must be utf-8 encoded'
865
862
        self._transport.append(self._filename, StringIO(content))
866
863
 
867
864
    def has_version(self, version_id):
929
926
                pass
930
927
        return self._file
931
928
 
932
 
    def add_record(self, version_id, digest, lines):
933
 
        """Write new text record to disk.  Returns the position in the
934
 
        file where it was written."""
 
929
    def _record_to_data(self, version_id, digest, lines):
 
930
        """Convert version_id, digest, lines into a raw data block.
 
931
        
 
932
        :return: (len, a StringIO instance with the raw data ready to read.)
 
933
        """
935
934
        sio = StringIO()
936
935
        data_file = GzipFile(None, mode='wb', fileobj=sio)
937
 
        print >>data_file, "version %s %d %s" % (version_id, len(lines), digest)
 
936
        print >>data_file, "version %s %d %s" % (version_id.encode('utf-8'), len(lines), digest)
938
937
        data_file.writelines(lines)
939
 
        print >>data_file, "end %s\n" % version_id
 
938
        print >>data_file, "end %s\n" % version_id.encode('utf-8')
940
939
        data_file.close()
 
940
        length= sio.tell()
 
941
        sio.seek(0)
 
942
        return length, sio
941
943
 
 
944
    def add_raw_record(self, raw_data):
 
945
        """Append a prepared record to the data file."""
 
946
        assert isinstance(raw_data, str), 'data must be plain bytes'
 
947
        start_pos = self._transport.append(self._filename, StringIO(raw_data))
 
948
        return start_pos, len(raw_data)
 
949
        
 
950
    def add_record(self, version_id, digest, lines):
 
951
        """Write new text record to disk.  Returns the position in the
 
952
        file where it was written."""
 
953
        size, sio = self._record_to_data(version_id, digest, lines)
942
954
        # cache
943
955
        self._records[version_id] = (digest, lines)
944
 
 
945
 
        content = sio.getvalue()
946
 
        sio.seek(0)
 
956
        # write to disk
947
957
        start_pos = self._transport.append(self._filename, sio)
948
 
        return start_pos, len(content)
949
 
 
950
 
    def _parse_record(self, version_id, data):
951
 
        df = GzipFile(mode='rb', fileobj=StringIO(data))
 
958
        return start_pos, size
 
959
 
 
960
    def _parse_record_header(self, version_id, raw_data):
 
961
        """Parse a record header for consistency.
 
962
 
 
963
        :return: the header and the decompressor stream.
 
964
                 as (stream, header_record)
 
965
        """
 
966
        df = GzipFile(mode='rb', fileobj=StringIO(raw_data))
952
967
        rec = df.readline().split()
953
968
        if len(rec) != 4:
954
 
            raise KnitCorrupt(self._filename, 'unexpected number of records')
955
 
        if rec[1] != version_id:
 
969
            raise KnitCorrupt(self._filename, 'unexpected number of elements in record header')
 
970
        if rec[1].decode('utf-8')!= version_id:
956
971
            raise KnitCorrupt(self._filename, 
957
972
                              'unexpected version, wanted %r, got %r' % (
958
973
                                version_id, rec[1]))
 
974
        return df, rec
 
975
 
 
976
    def _parse_record(self, version_id, data):
 
977
        df, rec = self._parse_record_header(version_id, data)
959
978
        lines = int(rec[2])
960
979
        record_contents = self._read_record_contents(df, lines)
961
980
        l = df.readline()
962
 
        if l != 'end %s\n' % version_id:
 
981
        if l.decode('utf-8') != 'end %s\n' % version_id:
963
982
            raise KnitCorrupt(self._filename, 'unexpected version end line %r, wanted %r' 
964
983
                        % (l, version_id))
 
984
        df.close()
965
985
        return record_contents, rec[3]
966
986
 
967
987
    def _read_record_contents(self, df, record_lines):
971
991
            r.append(df.readline())
972
992
        return r
973
993
 
 
994
    def read_records_iter_raw(self, records):
 
995
        """Read text records from data file and yield raw data.
 
996
 
 
997
        This unpacks enough of the text record to validate the id is
 
998
        as expected but thats all.
 
999
 
 
1000
        It will actively recompress currently cached records on the
 
1001
        basis that that is cheaper than I/O activity.
 
1002
        """
 
1003
        needed_records = []
 
1004
        for version_id, pos, size in records:
 
1005
            if version_id not in self._records:
 
1006
                needed_records.append((version_id, pos, size))
 
1007
 
 
1008
        # setup an iterator of the external records:
 
1009
        # uses readv so nice and fast we hope.
 
1010
        if len(needed_records):
 
1011
            # grab the disk data needed.
 
1012
            raw_records = self._transport.readv(self._filename,
 
1013
                [(pos, size) for version_id, pos, size in needed_records])
 
1014
 
 
1015
        for version_id, pos, size in records:
 
1016
            if version_id in self._records:
 
1017
                # compress a new version
 
1018
                size, sio = self._record_to_data(version_id,
 
1019
                                                 self._records[version_id][0],
 
1020
                                                 self._records[version_id][1])
 
1021
                yield version_id, sio.getvalue()
 
1022
            else:
 
1023
                pos, data = raw_records.next()
 
1024
                # validate the header
 
1025
                df, rec = self._parse_record_header(version_id, data)
 
1026
                df.close()
 
1027
                yield version_id, data
 
1028
 
 
1029
 
974
1030
    def read_records_iter(self, records):
975
1031
        """Read text records from data file and yield result.
976
1032
 
1075
1131
            version_list = [i for i in full_list if (not self.target.has_version(i)
1076
1132
                            and i in needed_versions)]
1077
1133
    
1078
 
            records = []
 
1134
            # plan the join:
 
1135
            copy_queue = []
 
1136
            copy_queue_records = []
 
1137
            copy_set = set()
1079
1138
            for version_id in version_list:
1080
 
                data_pos, data_size = self.source._index.get_position(version_id)
1081
 
                records.append((version_id, data_pos, data_size))
1082
 
    
1083
 
            count = 0
1084
 
            for version_id, lines, digest \
1085
 
                    in self.source._data.read_records_iter(records):
1086
1139
                options = self.source._index.get_options(version_id)
1087
1140
                parents = self.source._index.get_parents_with_ghosts(version_id)
1088
 
                
 
1141
                # check that its will be a consistent copy:
1089
1142
                for parent in parents:
1090
 
                    # if source has the parent, we must hav grabbed it first.
1091
 
                    assert (self.target.has_version(parent) or not
1092
 
                            self.source.has_version(parent))
1093
 
    
1094
 
                if self.target.factory.annotated:
1095
 
                    # FIXME jrydberg: it should be possible to skip
1096
 
                    # re-annotating components if we know that we are
1097
 
                    # going to pull all revisions in the same order.
1098
 
                    new_version_id = version_id
1099
 
                    new_version_idx = self.target._index.num_versions()
1100
 
                    if 'fulltext' in options:
1101
 
                        lines = self.target._reannotate_fulltext(self.source, lines,
1102
 
                            new_version_id, new_version_idx)
1103
 
                    elif 'line-delta' in options:
1104
 
                        lines = self.target._reannotate_line_delta(self.source, lines,
1105
 
                            new_version_id, new_version_idx)
1106
 
    
 
1143
                    # if source has the parent, we must :
 
1144
                    # * already have it or
 
1145
                    # * have it scheduled already
 
1146
                    # otherwise we dont care
 
1147
                    assert (self.target.has_version(parent) or
 
1148
                            parent in copy_set or
 
1149
                            not self.source.has_version(parent))
 
1150
                data_pos, data_size = self.source._index.get_position(version_id)
 
1151
                copy_queue_records.append((version_id, data_pos, data_size))
 
1152
                copy_queue.append((version_id, options, parents))
 
1153
                copy_set.add(version_id)
 
1154
 
 
1155
            # data suck the join:
 
1156
            count = 0
 
1157
            total = len(version_list)
 
1158
            # we want the raw gzip for bulk copying, but the record validated
 
1159
            # just enough to be sure its the right one.
 
1160
            # TODO: consider writev or write combining to reduce 
 
1161
            # death of a thousand cuts feeling.
 
1162
            for (version_id, raw_data), \
 
1163
                (version_id2, options, parents) in \
 
1164
                izip(self.source._data.read_records_iter_raw(copy_queue_records),
 
1165
                     copy_queue):
 
1166
                assert version_id == version_id2, 'logic error, inconsistent results'
1107
1167
                count = count + 1
1108
 
                pb.update("Joining knit", count, len(version_list))
1109
 
    
1110
 
                pos, size = self.target._data.add_record(version_id, digest, lines)
 
1168
                pb.update("Joining knit", count, total)
 
1169
                pos, size = self.target._data.add_raw_record(raw_data)
1111
1170
                self.target._index.add_version(version_id, options, pos, size, parents)
1112
 
    
 
1171
 
1113
1172
            for version in mismatched_versions:
 
1173
                # FIXME RBC 20060309 is this needed?
1114
1174
                n1 = set(self.target.get_parents_with_ghosts(version))
1115
1175
                n2 = set(self.source.get_parents_with_ghosts(version))
1116
1176
                # write a combined record to our history preserving the current