~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/knit.py

Merge transaction finalisation and ensure iter_lines_added_or_present in knits does a old-to-new read in the knit.

Show diffs side-by-side

added added

removed removed

Lines of Context:
63
63
# record content length ?
64
64
                  
65
65
 
 
66
from copy import copy
66
67
from cStringIO import StringIO
67
68
import difflib
68
69
from difflib import SequenceMatcher
242
243
        """
243
244
        if access_mode is None:
244
245
            access_mode = 'w'
 
246
        super(KnitVersionedFile, self).__init__(access_mode)
245
247
        assert access_mode in ('r', 'w'), "invalid mode specified %r" % access_mode
246
248
        assert not basis_knit or isinstance(basis_knit, KnitVersionedFile), \
247
249
            type(basis_knit)
258
260
        self._data = _KnitData(transport, relpath + DATA_SUFFIX,
259
261
            access_mode, create=not len(self.versions()))
260
262
 
 
263
    def clear_cache(self):
 
264
        """Clear the data cache only."""
 
265
        self._data.clear_cache()
 
266
 
261
267
    def copy_to(self, name, transport):
262
268
        """See VersionedFile.copy_to()."""
263
269
        # copy the current index to a temp index to avoid racing with local
271
277
    def create_empty(self, name, transport, mode=None):
272
278
        return KnitVersionedFile(name, transport, factory=self.factory, delta=self.delta, create=True)
273
279
    
274
 
    def fix_parents(self, version, new_parents):
 
280
    def _fix_parents(self, version, new_parents):
275
281
        """Fix the parents list for version.
276
282
        
277
283
        This is done by appending a new version to the index
427
433
        if version_ids:
428
434
            raise RevisionNotPresent(list(version_ids)[0], self.filename)
429
435
 
430
 
    def add_lines_with_ghosts(self, version_id, parents, lines):
 
436
    def _add_lines_with_ghosts(self, version_id, parents, lines):
431
437
        """See VersionedFile.add_lines_with_ghosts()."""
432
438
        self._check_add(version_id, lines)
433
439
        return self._add(version_id, lines[:], parents, self.delta)
434
440
 
435
 
    def add_lines(self, version_id, parents, lines):
 
441
    def _add_lines(self, version_id, parents, lines):
436
442
        """See VersionedFile.add_lines."""
437
443
        self._check_add(version_id, lines)
438
444
        self._check_versions_present(parents)
513
519
    def check(self, progress_bar=None):
514
520
        """See VersionedFile.check()."""
515
521
 
516
 
    def clone_text(self, new_version_id, old_version_id, parents):
 
522
    def _clone_text(self, new_version_id, old_version_id, parents):
517
523
        """See VersionedFile.clone_text()."""
518
524
        # FIXME RBC 20060228 make fast by only inserting an index with null delta.
519
525
        self.add_lines(new_version_id, parents, self.get_lines(old_version_id))
530
536
        # but we need to setup a list of records to visit.
531
537
        # we need version_id, position, length
532
538
        version_id_records = []
533
 
        for version_id in version_ids:
 
539
        requested_versions = list(version_ids)
 
540
        # filter for available versions
 
541
        for version_id in requested_versions:
534
542
            if not self.has_version(version_id):
535
543
                raise RevisionNotPresent(version_id, self.filename)
536
 
            data_pos, length = self._index.get_position(version_id)
537
 
            version_id_records.append((version_id, data_pos, length))
 
544
        # get a in-component-order queue:
 
545
        version_ids = []
 
546
        for version_id in self.versions():
 
547
            if version_id in requested_versions:
 
548
                version_ids.append(version_id)
 
549
                data_pos, length = self._index.get_position(version_id)
 
550
                version_id_records.append((version_id, data_pos, length))
 
551
 
538
552
        pb = bzrlib.ui.ui_factory.nested_progress_bar()
539
553
        count = 0
540
554
        total = len(version_id_records)
901
915
        self._checked = False
902
916
        if create:
903
917
            self._transport.put(self._filename, StringIO(''))
 
918
        self._records = {}
 
919
 
 
920
    def clear_cache(self):
 
921
        """Clear the record cache."""
 
922
        self._records = {}
904
923
 
905
924
    def _open_file(self):
906
925
        if self._file is None:
920
939
        print >>data_file, "end %s\n" % version_id
921
940
        data_file.close()
922
941
 
 
942
        # cache
 
943
        self._records[version_id] = (digest, lines)
 
944
 
923
945
        content = sio.getvalue()
924
 
        start_pos = self._transport.append(self._filename, StringIO(content))
 
946
        sio.seek(0)
 
947
        start_pos = self._transport.append(self._filename, sio)
925
948
        return start_pos, len(content)
926
949
 
927
950
    def _parse_record(self, version_id, data):
955
978
        contents, digest).
956
979
        """
957
980
 
958
 
        class ContinuousRange:
959
 
            def __init__(self, rec_id, pos, size):
960
 
                self.start_pos = pos
961
 
                self.end_pos = pos + size
962
 
                self.versions = [(rec_id, pos, size)]
963
 
 
964
 
            def add(self, rec_id, pos, size):
965
 
                if self.end_pos != pos:
966
 
                    return False
967
 
                self.end_pos = pos + size
968
 
                self.versions.append((rec_id, pos, size))
969
 
                return True
970
 
 
971
 
            def split(self, fp):
972
 
                for rec_id, pos, size in self.versions:
973
 
                    yield rec_id, fp.read(size)
974
 
 
975
 
        # We take it that the transport optimizes the fetching as good
976
 
        # as possible (ie, reads continous ranges.)
977
 
        response = self._transport.readv(self._filename,
978
 
            [(pos, size) for version_id, pos, size in records])
979
 
 
980
 
        for (record_id, pos, size), (pos, data) in izip(iter(records), response):
981
 
            content, digest = self._parse_record(record_id, data)
982
 
            yield record_id, content, digest
 
981
        needed_records = []
 
982
        for version_id, pos, size in records:
 
983
            if version_id not in self._records:
 
984
                needed_records.append((version_id, pos, size))
 
985
 
 
986
        if len(needed_records):
 
987
            # We take it that the transport optimizes the fetching as good
 
988
            # as possible (ie, reads continous ranges.)
 
989
            response = self._transport.readv(self._filename,
 
990
                [(pos, size) for version_id, pos, size in needed_records])
 
991
 
 
992
            for (record_id, pos, size), (pos, data) in izip(iter(records), response):
 
993
                content, digest = self._parse_record(record_id, data)
 
994
                self._records[record_id] = (digest, content)
 
995
    
 
996
        for version_id, pos, size in records:
 
997
            yield version_id, copy(self._records[version_id][1]), copy(self._records[version_id][0])
983
998
 
984
999
    def read_records(self, records):
985
1000
        """Read records into a dictionary."""
1020
1035
        if not version_ids:
1021
1036
            return 0
1022
1037
 
1023
 
        if pb is None:
1024
 
            from bzrlib.progress import DummyProgress
1025
 
            pb = DummyProgress()
1026
 
 
1027
 
        version_ids = list(version_ids)
1028
 
        if None in version_ids:
1029
 
            version_ids.remove(None)
1030
 
 
1031
 
        self.source_ancestry = set(self.source.get_ancestry(version_ids))
1032
 
        this_versions = set(self.target._index.get_versions())
1033
 
        needed_versions = self.source_ancestry - this_versions
1034
 
        cross_check_versions = self.source_ancestry.intersection(this_versions)
1035
 
        mismatched_versions = set()
1036
 
        for version in cross_check_versions:
1037
 
            # scan to include needed parents.
1038
 
            n1 = set(self.target.get_parents_with_ghosts(version))
1039
 
            n2 = set(self.source.get_parents_with_ghosts(version))
1040
 
            if n1 != n2:
1041
 
                # FIXME TEST this check for cycles being introduced works
1042
 
                # the logic is we have a cycle if in our graph we are an
1043
 
                # ancestor of any of the n2 revisions.
1044
 
                for parent in n2:
1045
 
                    if parent in n1:
1046
 
                        # safe
1047
 
                        continue
1048
 
                    else:
1049
 
                        parent_ancestors = self.source.get_ancestry(parent)
1050
 
                        if version in parent_ancestors:
1051
 
                            raise errors.GraphCycleError([parent, version])
1052
 
                # ensure this parent will be available later.
1053
 
                new_parents = n2.difference(n1)
1054
 
                needed_versions.update(new_parents.difference(this_versions))
1055
 
                mismatched_versions.add(version)
1056
 
 
1057
 
        if not needed_versions and not cross_check_versions:
1058
 
            return 0
1059
 
        full_list = topo_sort(self.source.get_graph())
1060
 
 
1061
 
        version_list = [i for i in full_list if (not self.target.has_version(i)
1062
 
                        and i in needed_versions)]
1063
 
 
1064
 
        records = []
1065
 
        for version_id in version_list:
1066
 
            data_pos, data_size = self.source._index.get_position(version_id)
1067
 
            records.append((version_id, data_pos, data_size))
1068
 
 
1069
 
        count = 0
1070
 
        for version_id, lines, digest \
1071
 
                in self.source._data.read_records_iter(records):
1072
 
            options = self.source._index.get_options(version_id)
1073
 
            parents = self.source._index.get_parents_with_ghosts(version_id)
1074
 
            
1075
 
            for parent in parents:
1076
 
                # if source has the parent, we must hav grabbed it first.
1077
 
                assert (self.target.has_version(parent) or not
1078
 
                        self.source.has_version(parent))
1079
 
 
1080
 
            if self.target.factory.annotated:
1081
 
                # FIXME jrydberg: it should be possible to skip
1082
 
                # re-annotating components if we know that we are
1083
 
                # going to pull all revisions in the same order.
1084
 
                new_version_id = version_id
1085
 
                new_version_idx = self.target._index.num_versions()
1086
 
                if 'fulltext' in options:
1087
 
                    lines = self.target._reannotate_fulltext(self.source, lines,
1088
 
                        new_version_id, new_version_idx)
1089
 
                elif 'line-delta' in options:
1090
 
                    lines = self.target._reannotate_line_delta(self.source, lines,
1091
 
                        new_version_id, new_version_idx)
1092
 
 
1093
 
            count = count + 1
1094
 
            pb.update("Joining knit", count, len(version_list))
1095
 
 
1096
 
            pos, size = self.target._data.add_record(version_id, digest, lines)
1097
 
            self.target._index.add_version(version_id, options, pos, size, parents)
1098
 
 
1099
 
        for version in mismatched_versions:
1100
 
            n1 = set(self.target.get_parents_with_ghosts(version))
1101
 
            n2 = set(self.source.get_parents_with_ghosts(version))
1102
 
            # write a combined record to our history preserving the current 
1103
 
            # parents as first in the list
1104
 
            new_parents = self.target.get_parents_with_ghosts(version) + list(n2.difference(n1))
1105
 
            self.target.fix_parents(version, new_parents)
1106
 
        pb.clear()
1107
 
        return count
 
1038
        pb = bzrlib.ui.ui_factory.nested_progress_bar()
 
1039
        try:
 
1040
            version_ids = list(version_ids)
 
1041
            if None in version_ids:
 
1042
                version_ids.remove(None)
 
1043
    
 
1044
            self.source_ancestry = set(self.source.get_ancestry(version_ids))
 
1045
            this_versions = set(self.target._index.get_versions())
 
1046
            needed_versions = self.source_ancestry - this_versions
 
1047
            cross_check_versions = self.source_ancestry.intersection(this_versions)
 
1048
            mismatched_versions = set()
 
1049
            for version in cross_check_versions:
 
1050
                # scan to include needed parents.
 
1051
                n1 = set(self.target.get_parents_with_ghosts(version))
 
1052
                n2 = set(self.source.get_parents_with_ghosts(version))
 
1053
                if n1 != n2:
 
1054
                    # FIXME TEST this check for cycles being introduced works
 
1055
                    # the logic is we have a cycle if in our graph we are an
 
1056
                    # ancestor of any of the n2 revisions.
 
1057
                    for parent in n2:
 
1058
                        if parent in n1:
 
1059
                            # safe
 
1060
                            continue
 
1061
                        else:
 
1062
                            parent_ancestors = self.source.get_ancestry(parent)
 
1063
                            if version in parent_ancestors:
 
1064
                                raise errors.GraphCycleError([parent, version])
 
1065
                    # ensure this parent will be available later.
 
1066
                    new_parents = n2.difference(n1)
 
1067
                    needed_versions.update(new_parents.difference(this_versions))
 
1068
                    mismatched_versions.add(version)
 
1069
    
 
1070
            if not needed_versions and not cross_check_versions:
 
1071
                return 0
 
1072
            full_list = topo_sort(self.source.get_graph())
 
1073
    
 
1074
            version_list = [i for i in full_list if (not self.target.has_version(i)
 
1075
                            and i in needed_versions)]
 
1076
    
 
1077
            records = []
 
1078
            for version_id in version_list:
 
1079
                data_pos, data_size = self.source._index.get_position(version_id)
 
1080
                records.append((version_id, data_pos, data_size))
 
1081
    
 
1082
            count = 0
 
1083
            for version_id, lines, digest \
 
1084
                    in self.source._data.read_records_iter(records):
 
1085
                options = self.source._index.get_options(version_id)
 
1086
                parents = self.source._index.get_parents_with_ghosts(version_id)
 
1087
                
 
1088
                for parent in parents:
 
1089
                    # if source has the parent, we must hav grabbed it first.
 
1090
                    assert (self.target.has_version(parent) or not
 
1091
                            self.source.has_version(parent))
 
1092
    
 
1093
                if self.target.factory.annotated:
 
1094
                    # FIXME jrydberg: it should be possible to skip
 
1095
                    # re-annotating components if we know that we are
 
1096
                    # going to pull all revisions in the same order.
 
1097
                    new_version_id = version_id
 
1098
                    new_version_idx = self.target._index.num_versions()
 
1099
                    if 'fulltext' in options:
 
1100
                        lines = self.target._reannotate_fulltext(self.source, lines,
 
1101
                            new_version_id, new_version_idx)
 
1102
                    elif 'line-delta' in options:
 
1103
                        lines = self.target._reannotate_line_delta(self.source, lines,
 
1104
                            new_version_id, new_version_idx)
 
1105
    
 
1106
                count = count + 1
 
1107
                pb.update("Joining knit", count, len(version_list))
 
1108
    
 
1109
                pos, size = self.target._data.add_record(version_id, digest, lines)
 
1110
                self.target._index.add_version(version_id, options, pos, size, parents)
 
1111
    
 
1112
            for version in mismatched_versions:
 
1113
                n1 = set(self.target.get_parents_with_ghosts(version))
 
1114
                n2 = set(self.source.get_parents_with_ghosts(version))
 
1115
                # write a combined record to our history preserving the current 
 
1116
                # parents as first in the list
 
1117
                new_parents = self.target.get_parents_with_ghosts(version) + list(n2.difference(n1))
 
1118
                self.target.fix_parents(version, new_parents)
 
1119
            return count
 
1120
        finally:
 
1121
            pb.clear()
 
1122
            pb.finished()
1108
1123
 
1109
1124
 
1110
1125
InterVersionedFile.register_optimiser(InterKnit)