~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/knit.py

Make use of the transaction finalisation warning support to implement in-knit caching.

Show diffs side-by-side

added added

removed removed

Lines of Context:
63
63
# record content length ?
64
64
                  
65
65
 
 
66
from copy import copy
66
67
from cStringIO import StringIO
67
68
import difflib
68
69
from difflib import SequenceMatcher
259
260
        self._data = _KnitData(transport, relpath + DATA_SUFFIX,
260
261
            access_mode, create=not len(self.versions()))
261
262
 
 
263
    def clear_cache(self):
 
264
        """Clear the data cache only."""
 
265
        self._data.clear_cache()
 
266
 
262
267
    def copy_to(self, name, transport):
263
268
        """See VersionedFile.copy_to()."""
264
269
        # copy the current index to a temp index to avoid racing with local
514
519
    def check(self, progress_bar=None):
515
520
        """See VersionedFile.check()."""
516
521
 
517
 
    def clone_text(self, new_version_id, old_version_id, parents):
 
522
    def _clone_text(self, new_version_id, old_version_id, parents):
518
523
        """See VersionedFile.clone_text()."""
519
524
        # FIXME RBC 20060228 make fast by only inserting an index with null delta.
520
525
        self.add_lines(new_version_id, parents, self.get_lines(old_version_id))
902
907
        self._checked = False
903
908
        if create:
904
909
            self._transport.put(self._filename, StringIO(''))
 
910
        self._records = {}
 
911
 
 
912
    def clear_cache(self):
 
913
        """Clear the record cache."""
 
914
        self._records = {}
905
915
 
906
916
    def _open_file(self):
907
917
        if self._file is None:
921
931
        print >>data_file, "end %s\n" % version_id
922
932
        data_file.close()
923
933
 
 
934
        # cache
 
935
        self._records[version_id] = (digest, lines)
 
936
 
924
937
        content = sio.getvalue()
925
 
        start_pos = self._transport.append(self._filename, StringIO(content))
 
938
        sio.seek(0)
 
939
        start_pos = self._transport.append(self._filename, sio)
926
940
        return start_pos, len(content)
927
941
 
928
942
    def _parse_record(self, version_id, data):
956
970
        contents, digest).
957
971
        """
958
972
 
959
 
        class ContinuousRange:
960
 
            def __init__(self, rec_id, pos, size):
961
 
                self.start_pos = pos
962
 
                self.end_pos = pos + size
963
 
                self.versions = [(rec_id, pos, size)]
964
 
 
965
 
            def add(self, rec_id, pos, size):
966
 
                if self.end_pos != pos:
967
 
                    return False
968
 
                self.end_pos = pos + size
969
 
                self.versions.append((rec_id, pos, size))
970
 
                return True
971
 
 
972
 
            def split(self, fp):
973
 
                for rec_id, pos, size in self.versions:
974
 
                    yield rec_id, fp.read(size)
975
 
 
976
 
        # We take it that the transport optimizes the fetching as good
977
 
        # as possible (ie, reads continous ranges.)
978
 
        response = self._transport.readv(self._filename,
979
 
            [(pos, size) for version_id, pos, size in records])
980
 
 
981
 
        for (record_id, pos, size), (pos, data) in izip(iter(records), response):
982
 
            content, digest = self._parse_record(record_id, data)
983
 
            yield record_id, content, digest
 
973
        needed_records = []
 
974
        for version_id, pos, size in records:
 
975
            if version_id not in self._records:
 
976
                needed_records.append((version_id, pos, size))
 
977
 
 
978
        if len(needed_records):
 
979
            # We take it that the transport optimizes the fetching as good
 
980
            # as possible (ie, reads continous ranges.)
 
981
            response = self._transport.readv(self._filename,
 
982
                [(pos, size) for version_id, pos, size in needed_records])
 
983
 
 
984
            for (record_id, pos, size), (pos, data) in izip(iter(records), response):
 
985
                content, digest = self._parse_record(record_id, data)
 
986
                self._records[record_id] = (digest, content)
 
987
    
 
988
        for version_id, pos, size in records:
 
989
            yield version_id, copy(self._records[version_id][1]), copy(self._records[version_id][0])
984
990
 
985
991
    def read_records(self, records):
986
992
        """Read records into a dictionary."""
1021
1027
        if not version_ids:
1022
1028
            return 0
1023
1029
 
1024
 
        if pb is None:
1025
 
            from bzrlib.progress import DummyProgress
1026
 
            pb = DummyProgress()
1027
 
 
1028
 
        version_ids = list(version_ids)
1029
 
        if None in version_ids:
1030
 
            version_ids.remove(None)
1031
 
 
1032
 
        self.source_ancestry = set(self.source.get_ancestry(version_ids))
1033
 
        this_versions = set(self.target._index.get_versions())
1034
 
        needed_versions = self.source_ancestry - this_versions
1035
 
        cross_check_versions = self.source_ancestry.intersection(this_versions)
1036
 
        mismatched_versions = set()
1037
 
        for version in cross_check_versions:
1038
 
            # scan to include needed parents.
1039
 
            n1 = set(self.target.get_parents_with_ghosts(version))
1040
 
            n2 = set(self.source.get_parents_with_ghosts(version))
1041
 
            if n1 != n2:
1042
 
                # FIXME TEST this check for cycles being introduced works
1043
 
                # the logic is we have a cycle if in our graph we are an
1044
 
                # ancestor of any of the n2 revisions.
1045
 
                for parent in n2:
1046
 
                    if parent in n1:
1047
 
                        # safe
1048
 
                        continue
1049
 
                    else:
1050
 
                        parent_ancestors = self.source.get_ancestry(parent)
1051
 
                        if version in parent_ancestors:
1052
 
                            raise errors.GraphCycleError([parent, version])
1053
 
                # ensure this parent will be available later.
1054
 
                new_parents = n2.difference(n1)
1055
 
                needed_versions.update(new_parents.difference(this_versions))
1056
 
                mismatched_versions.add(version)
1057
 
 
1058
 
        if not needed_versions and not cross_check_versions:
1059
 
            return 0
1060
 
        full_list = topo_sort(self.source.get_graph())
1061
 
 
1062
 
        version_list = [i for i in full_list if (not self.target.has_version(i)
1063
 
                        and i in needed_versions)]
1064
 
 
1065
 
        records = []
1066
 
        for version_id in version_list:
1067
 
            data_pos, data_size = self.source._index.get_position(version_id)
1068
 
            records.append((version_id, data_pos, data_size))
1069
 
 
1070
 
        count = 0
1071
 
        for version_id, lines, digest \
1072
 
                in self.source._data.read_records_iter(records):
1073
 
            options = self.source._index.get_options(version_id)
1074
 
            parents = self.source._index.get_parents_with_ghosts(version_id)
1075
 
            
1076
 
            for parent in parents:
1077
 
                # if source has the parent, we must hav grabbed it first.
1078
 
                assert (self.target.has_version(parent) or not
1079
 
                        self.source.has_version(parent))
1080
 
 
1081
 
            if self.target.factory.annotated:
1082
 
                # FIXME jrydberg: it should be possible to skip
1083
 
                # re-annotating components if we know that we are
1084
 
                # going to pull all revisions in the same order.
1085
 
                new_version_id = version_id
1086
 
                new_version_idx = self.target._index.num_versions()
1087
 
                if 'fulltext' in options:
1088
 
                    lines = self.target._reannotate_fulltext(self.source, lines,
1089
 
                        new_version_id, new_version_idx)
1090
 
                elif 'line-delta' in options:
1091
 
                    lines = self.target._reannotate_line_delta(self.source, lines,
1092
 
                        new_version_id, new_version_idx)
1093
 
 
1094
 
            count = count + 1
1095
 
            pb.update("Joining knit", count, len(version_list))
1096
 
 
1097
 
            pos, size = self.target._data.add_record(version_id, digest, lines)
1098
 
            self.target._index.add_version(version_id, options, pos, size, parents)
1099
 
 
1100
 
        for version in mismatched_versions:
1101
 
            n1 = set(self.target.get_parents_with_ghosts(version))
1102
 
            n2 = set(self.source.get_parents_with_ghosts(version))
1103
 
            # write a combined record to our history preserving the current 
1104
 
            # parents as first in the list
1105
 
            new_parents = self.target.get_parents_with_ghosts(version) + list(n2.difference(n1))
1106
 
            self.target.fix_parents(version, new_parents)
1107
 
        pb.clear()
1108
 
        return count
 
1030
        pb = bzrlib.ui.ui_factory.nested_progress_bar()
 
1031
        try:
 
1032
            version_ids = list(version_ids)
 
1033
            if None in version_ids:
 
1034
                version_ids.remove(None)
 
1035
    
 
1036
            self.source_ancestry = set(self.source.get_ancestry(version_ids))
 
1037
            this_versions = set(self.target._index.get_versions())
 
1038
            needed_versions = self.source_ancestry - this_versions
 
1039
            cross_check_versions = self.source_ancestry.intersection(this_versions)
 
1040
            mismatched_versions = set()
 
1041
            for version in cross_check_versions:
 
1042
                # scan to include needed parents.
 
1043
                n1 = set(self.target.get_parents_with_ghosts(version))
 
1044
                n2 = set(self.source.get_parents_with_ghosts(version))
 
1045
                if n1 != n2:
 
1046
                    # FIXME TEST this check for cycles being introduced works
 
1047
                    # the logic is we have a cycle if in our graph we are an
 
1048
                    # ancestor of any of the n2 revisions.
 
1049
                    for parent in n2:
 
1050
                        if parent in n1:
 
1051
                            # safe
 
1052
                            continue
 
1053
                        else:
 
1054
                            parent_ancestors = self.source.get_ancestry(parent)
 
1055
                            if version in parent_ancestors:
 
1056
                                raise errors.GraphCycleError([parent, version])
 
1057
                    # ensure this parent will be available later.
 
1058
                    new_parents = n2.difference(n1)
 
1059
                    needed_versions.update(new_parents.difference(this_versions))
 
1060
                    mismatched_versions.add(version)
 
1061
    
 
1062
            if not needed_versions and not cross_check_versions:
 
1063
                return 0
 
1064
            full_list = topo_sort(self.source.get_graph())
 
1065
    
 
1066
            version_list = [i for i in full_list if (not self.target.has_version(i)
 
1067
                            and i in needed_versions)]
 
1068
    
 
1069
            records = []
 
1070
            for version_id in version_list:
 
1071
                data_pos, data_size = self.source._index.get_position(version_id)
 
1072
                records.append((version_id, data_pos, data_size))
 
1073
    
 
1074
            count = 0
 
1075
            for version_id, lines, digest \
 
1076
                    in self.source._data.read_records_iter(records):
 
1077
                options = self.source._index.get_options(version_id)
 
1078
                parents = self.source._index.get_parents_with_ghosts(version_id)
 
1079
                
 
1080
                for parent in parents:
 
1081
                    # if source has the parent, we must hav grabbed it first.
 
1082
                    assert (self.target.has_version(parent) or not
 
1083
                            self.source.has_version(parent))
 
1084
    
 
1085
                if self.target.factory.annotated:
 
1086
                    # FIXME jrydberg: it should be possible to skip
 
1087
                    # re-annotating components if we know that we are
 
1088
                    # going to pull all revisions in the same order.
 
1089
                    new_version_id = version_id
 
1090
                    new_version_idx = self.target._index.num_versions()
 
1091
                    if 'fulltext' in options:
 
1092
                        lines = self.target._reannotate_fulltext(self.source, lines,
 
1093
                            new_version_id, new_version_idx)
 
1094
                    elif 'line-delta' in options:
 
1095
                        lines = self.target._reannotate_line_delta(self.source, lines,
 
1096
                            new_version_id, new_version_idx)
 
1097
    
 
1098
                count = count + 1
 
1099
                pb.update("Joining knit", count, len(version_list))
 
1100
    
 
1101
                pos, size = self.target._data.add_record(version_id, digest, lines)
 
1102
                self.target._index.add_version(version_id, options, pos, size, parents)
 
1103
    
 
1104
            for version in mismatched_versions:
 
1105
                n1 = set(self.target.get_parents_with_ghosts(version))
 
1106
                n2 = set(self.source.get_parents_with_ghosts(version))
 
1107
                # write a combined record to our history preserving the current 
 
1108
                # parents as first in the list
 
1109
                new_parents = self.target.get_parents_with_ghosts(version) + list(n2.difference(n1))
 
1110
                self.target.fix_parents(version, new_parents)
 
1111
            return count
 
1112
        finally:
 
1113
            pb.clear()
 
1114
            pb.finished()
1109
1115
 
1110
1116
 
1111
1117
InterVersionedFile.register_optimiser(InterKnit)