~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/knit.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2008-05-12 03:07:05 UTC
  • mfrom: (3350.3.22 data_stream_revamp)
  • Revision ID: pqm@pqm.ubuntu.com-20080512030705-nvl2q1tuls904eru
Deprecate bzrlib.versionedfiles.VersionedFile.join.

Show diffs side-by-side

added added

removed removed

Lines of Context:
107
107
    contains_linebreaks,
108
108
    sha_string,
109
109
    sha_strings,
 
110
    split_lines,
110
111
    )
111
112
from bzrlib.symbol_versioning import (
112
113
    DEPRECATED_PARAMETER,
117
118
from bzrlib.tsort import topo_sort
118
119
from bzrlib.tuned_gzip import GzipFile, bytes_to_gzip
119
120
import bzrlib.ui
120
 
from bzrlib.versionedfile import VersionedFile, InterVersionedFile
 
121
from bzrlib.versionedfile import (
 
122
    AbsentContentFactory,
 
123
    adapter_registry,
 
124
    ContentFactory,
 
125
    InterVersionedFile,
 
126
    VersionedFile,
 
127
    )
121
128
import bzrlib.weave
122
129
 
123
130
 
138
145
INDEX_SUFFIX = '.kndx'
139
146
 
140
147
 
 
148
class KnitAdapter(object):
 
149
    """Base class for knit record adaption."""
 
150
 
 
151
    def __init__(self, basis_vf):
 
152
        """Create an adapter which accesses full texts from basis_vf.
 
153
        
 
154
        :param basis_vf: A versioned file to access basis texts of deltas from.
 
155
            May be None for adapters that do not need to access basis texts.
 
156
        """
 
157
        self._data = _KnitData(None)
 
158
        self._annotate_factory = KnitAnnotateFactory()
 
159
        self._plain_factory = KnitPlainFactory()
 
160
        self._basis_vf = basis_vf
 
161
 
 
162
 
 
163
class FTAnnotatedToUnannotated(KnitAdapter):
 
164
    """An adapter from FT annotated knits to unannotated ones."""
 
165
 
 
166
    def get_bytes(self, factory, annotated_compressed_bytes):
 
167
        rec, contents = \
 
168
            self._data._parse_record_unchecked(annotated_compressed_bytes)
 
169
        content = self._annotate_factory.parse_fulltext(contents, rec[1])
 
170
        size, bytes = self._data._record_to_data(rec[1], rec[3], content.text())
 
171
        return bytes
 
172
 
 
173
 
 
174
class DeltaAnnotatedToUnannotated(KnitAdapter):
 
175
    """An adapter for deltas from annotated to unannotated."""
 
176
 
 
177
    def get_bytes(self, factory, annotated_compressed_bytes):
 
178
        rec, contents = \
 
179
            self._data._parse_record_unchecked(annotated_compressed_bytes)
 
180
        delta = self._annotate_factory.parse_line_delta(contents, rec[1],
 
181
            plain=True)
 
182
        contents = self._plain_factory.lower_line_delta(delta)
 
183
        size, bytes = self._data._record_to_data(rec[1], rec[3], contents)
 
184
        return bytes
 
185
 
 
186
 
 
187
class FTAnnotatedToFullText(KnitAdapter):
 
188
    """An adapter from FT annotated knits to unannotated ones."""
 
189
 
 
190
    def get_bytes(self, factory, annotated_compressed_bytes):
 
191
        rec, contents = \
 
192
            self._data._parse_record_unchecked(annotated_compressed_bytes)
 
193
        content, delta = self._annotate_factory.parse_record(factory.key[0],
 
194
            contents, factory._build_details, None)
 
195
        return ''.join(content.text())
 
196
 
 
197
 
 
198
class DeltaAnnotatedToFullText(KnitAdapter):
 
199
    """An adapter for deltas from annotated to unannotated."""
 
200
 
 
201
    def get_bytes(self, factory, annotated_compressed_bytes):
 
202
        rec, contents = \
 
203
            self._data._parse_record_unchecked(annotated_compressed_bytes)
 
204
        delta = self._annotate_factory.parse_line_delta(contents, rec[1],
 
205
            plain=True)
 
206
        compression_parent = factory.parents[0][0]
 
207
        basis_lines = self._basis_vf.get_lines(compression_parent)
 
208
        # Manually apply the delta because we have one annotated content and
 
209
        # one plain.
 
210
        basis_content = PlainKnitContent(basis_lines, compression_parent)
 
211
        basis_content.apply_delta(delta, rec[1])
 
212
        basis_content._should_strip_eol = factory._build_details[1]
 
213
        return ''.join(basis_content.text())
 
214
 
 
215
 
 
216
class FTPlainToFullText(KnitAdapter):
 
217
    """An adapter from FT plain knits to unannotated ones."""
 
218
 
 
219
    def get_bytes(self, factory, compressed_bytes):
 
220
        rec, contents = \
 
221
            self._data._parse_record_unchecked(compressed_bytes)
 
222
        content, delta = self._plain_factory.parse_record(factory.key[0],
 
223
            contents, factory._build_details, None)
 
224
        return ''.join(content.text())
 
225
 
 
226
 
 
227
class DeltaPlainToFullText(KnitAdapter):
 
228
    """An adapter for deltas from annotated to unannotated."""
 
229
 
 
230
    def get_bytes(self, factory, compressed_bytes):
 
231
        rec, contents = \
 
232
            self._data._parse_record_unchecked(compressed_bytes)
 
233
        delta = self._plain_factory.parse_line_delta(contents, rec[1])
 
234
        compression_parent = factory.parents[0][0]
 
235
        basis_lines = self._basis_vf.get_lines(compression_parent)
 
236
        basis_content = PlainKnitContent(basis_lines, compression_parent)
 
237
        # Manually apply the delta because we have one annotated content and
 
238
        # one plain.
 
239
        content, _ = self._plain_factory.parse_record(rec[1], contents,
 
240
            factory._build_details, basis_content)
 
241
        return ''.join(content.text())
 
242
 
 
243
 
 
244
class KnitContentFactory(ContentFactory):
 
245
    """Content factory for streaming from knits.
 
246
    
 
247
    :seealso ContentFactory:
 
248
    """
 
249
 
 
250
    def __init__(self, version, parents, build_details, sha1, raw_record,
 
251
        annotated, knit=None):
 
252
        """Create a KnitContentFactory for version.
 
253
        
 
254
        :param version: The version.
 
255
        :param parents: The parents.
 
256
        :param build_details: The build details as returned from
 
257
            get_build_details.
 
258
        :param sha1: The sha1 expected from the full text of this object.
 
259
        :param raw_record: The bytes of the knit data from disk.
 
260
        :param annotated: True if the raw data is annotated.
 
261
        """
 
262
        ContentFactory.__init__(self)
 
263
        self.sha1 = sha1
 
264
        self.key = (version,)
 
265
        self.parents = tuple((parent,) for parent in parents)
 
266
        if build_details[0] == 'line-delta':
 
267
            kind = 'delta'
 
268
        else:
 
269
            kind = 'ft'
 
270
        if annotated:
 
271
            annotated_kind = 'annotated-'
 
272
        else:
 
273
            annotated_kind = ''
 
274
        self.storage_kind = 'knit-%s%s-gz' % (annotated_kind, kind)
 
275
        self._raw_record = raw_record
 
276
        self._build_details = build_details
 
277
        self._knit = knit
 
278
 
 
279
    def get_bytes_as(self, storage_kind):
 
280
        if storage_kind == self.storage_kind:
 
281
            return self._raw_record
 
282
        if storage_kind == 'fulltext' and self._knit is not None:
 
283
            return self._knit.get_text(self.key[0])
 
284
        else:
 
285
            raise errors.UnavailableRepresentation(self.key, storage_kind,
 
286
                self.storage_kind)
 
287
 
 
288
 
141
289
class KnitContent(object):
142
290
    """Content of a knit version to which deltas can be applied."""
143
291
 
231
379
                % (e,))
232
380
 
233
381
        if self._should_strip_eol:
234
 
            anno, line = lines[-1]
235
 
            lines[-1] = (anno, line.rstrip('\n'))
 
382
            lines[-1] = lines[-1].rstrip('\n')
236
383
        return lines
237
384
 
238
385
    def copy(self):
499
646
    """Factory to create a KnitVersionedFile for a .knit/.kndx file pair."""
500
647
    if factory is None:
501
648
        factory = KnitAnnotateFactory()
502
 
    else:
503
 
        factory = KnitPlainFactory()
504
649
    if get_scope is None:
505
650
        get_scope = lambda:None
506
651
    index = _KnitIndex(transport, name + INDEX_SUFFIX,
612
757
        # write all the data
613
758
        raw_record_sizes = [record[3] for record in records]
614
759
        positions = self._data.add_raw_records(raw_record_sizes, data)
615
 
        offset = 0
616
760
        index_entries = []
617
 
        for (version_id, options, parents, size), access_memo in zip(
 
761
        for (version_id, options, parents, _), access_memo in zip(
618
762
            records, positions):
619
763
            index_entries.append((version_id, options, access_memo, parents))
620
 
            offset += size
621
764
        self._index.add_versions(index_entries)
622
765
 
623
766
    def copy_to(self, name, transport):
698
841
        # know the length of all the data a-priori.
699
842
        raw_datum = []
700
843
        result_version_list = []
701
 
        for (version_id, raw_data), \
 
844
        for (version_id, raw_data, _), \
702
845
            (version_id2, options, _, parents) in \
703
846
            izip(self._data.read_records_iter_raw(copy_queue_records),
704
847
                 temp_version_list):
716
859
                return pseudo_file.read(length)
717
860
        return (self.get_format_signature(), result_version_list, read)
718
861
 
 
862
    def get_record_stream(self, versions, ordering, include_delta_closure):
 
863
        """Get a stream of records for versions.
 
864
 
 
865
        :param versions: The versions to include. Each version is a tuple
 
866
            (version,).
 
867
        :param ordering: Either 'unordered' or 'topological'. A topologically
 
868
            sorted stream has compression parents strictly before their
 
869
            children.
 
870
        :param include_delta_closure: If True then the closure across any
 
871
            compression parents will be included (in the opaque data).
 
872
        :return: An iterator of ContentFactory objects, each of which is only
 
873
            valid until the iterator is advanced.
 
874
        """
 
875
        if include_delta_closure:
 
876
            # Nb: what we should do is plan the data to stream to allow
 
877
            # reconstruction of all the texts without excessive buffering,
 
878
            # including re-sending common bases as needed. This makes the most
 
879
            # sense when we start serialising these streams though, so for now
 
880
            # we just fallback to individual text construction behind the
 
881
            # abstraction barrier.
 
882
            knit = self
 
883
        else:
 
884
            knit = None
 
885
        # We end up doing multiple index lookups here for parents details and
 
886
        # disk layout details - we need a unified api ?
 
887
        parent_map = self.get_parent_map(versions)
 
888
        absent_versions = set(versions) - set(parent_map)
 
889
        if ordering == 'topological':
 
890
            present_versions = topo_sort(parent_map)
 
891
        else:
 
892
            # List comprehension to keep the requested order (as that seems
 
893
            # marginally useful, at least until we start doing IO optimising
 
894
            # here.
 
895
            present_versions = [version for version in versions if version in
 
896
                parent_map]
 
897
        position_map = self._get_components_positions(present_versions)
 
898
        records = [(version, position_map[version][1]) for version in
 
899
            present_versions]
 
900
        record_map = {}
 
901
        for version in absent_versions:
 
902
            yield AbsentContentFactory((version,))
 
903
        for version, raw_data, sha1 in \
 
904
                self._data.read_records_iter_raw(records):
 
905
            (record_details, index_memo, _) = position_map[version]
 
906
            yield KnitContentFactory(version, parent_map[version],
 
907
                record_details, sha1, raw_data, self.factory.annotated, knit)
 
908
 
719
909
    def _extract_blocks(self, version_id, source, target):
720
910
        if self._index.get_method(version_id) != 'line-delta':
721
911
            return None
797
987
                    'incompatible format signature inserting to %r', self)
798
988
            source = self._knit_from_datastream(
799
989
                (format, data_list, reader_callable))
800
 
            self.join(source)
 
990
            stream = source.get_record_stream(source.versions(), 'unordered', False)
 
991
            self.insert_record_stream(stream)
801
992
            return
802
993
 
803
994
        for version_id, options, length, parents in data_list:
886
1077
        return KnitVersionedFile(self.filename, self.transport,
887
1078
            factory=factory, index=index, access_method=access)
888
1079
 
 
1080
    def insert_record_stream(self, stream):
 
1081
        """Insert a record stream into this versioned file.
 
1082
 
 
1083
        :param stream: A stream of records to insert. 
 
1084
        :return: None
 
1085
        :seealso VersionedFile.get_record_stream:
 
1086
        """
 
1087
        def get_adapter(adapter_key):
 
1088
            try:
 
1089
                return adapters[adapter_key]
 
1090
            except KeyError:
 
1091
                adapter_factory = adapter_registry.get(adapter_key)
 
1092
                adapter = adapter_factory(self)
 
1093
                adapters[adapter_key] = adapter
 
1094
                return adapter
 
1095
        if self.factory.annotated:
 
1096
            # self is annotated, we need annotated knits to use directly.
 
1097
            annotated = "annotated-"
 
1098
            convertibles = []
 
1099
        else:
 
1100
            # self is not annotated, but we can strip annotations cheaply.
 
1101
            annotated = ""
 
1102
            convertibles = set(["knit-annotated-delta-gz",
 
1103
                "knit-annotated-ft-gz"])
 
1104
        # The set of types we can cheaply adapt without needing basis texts.
 
1105
        native_types = set()
 
1106
        native_types.add("knit-%sdelta-gz" % annotated)
 
1107
        native_types.add("knit-%sft-gz" % annotated)
 
1108
        knit_types = native_types.union(convertibles)
 
1109
        adapters = {}
 
1110
        # Buffer all index entries that we can't add immediately because their
 
1111
        # basis parent is missing. We don't buffer all because generating
 
1112
        # annotations may require access to some of the new records. However we
 
1113
        # can't generate annotations from new deltas until their basis parent
 
1114
        # is present anyway, so we get away with not needing an index that
 
1115
        # includes the new keys.
 
1116
        # key = basis_parent, value = index entry to add
 
1117
        buffered_index_entries = {}
 
1118
        for record in stream:
 
1119
            # Raise an error when a record is missing.
 
1120
            if record.storage_kind == 'absent':
 
1121
                raise RevisionNotPresent([record.key[0]], self)
 
1122
            # adapt to non-tuple interface
 
1123
            parents = [parent[0] for parent in record.parents]
 
1124
            if record.storage_kind in knit_types:
 
1125
                if record.storage_kind not in native_types:
 
1126
                    try:
 
1127
                        adapter_key = (record.storage_kind, "knit-delta-gz")
 
1128
                        adapter = get_adapter(adapter_key)
 
1129
                    except KeyError:
 
1130
                        adapter_key = (record.storage_kind, "knit-ft-gz")
 
1131
                        adapter = get_adapter(adapter_key)
 
1132
                    bytes = adapter.get_bytes(
 
1133
                        record, record.get_bytes_as(record.storage_kind))
 
1134
                else:
 
1135
                    bytes = record.get_bytes_as(record.storage_kind)
 
1136
                options = [record._build_details[0]]
 
1137
                if record._build_details[1]:
 
1138
                    options.append('no-eol')
 
1139
                # Just blat it across.
 
1140
                # Note: This does end up adding data on duplicate keys. As
 
1141
                # modern repositories use atomic insertions this should not
 
1142
                # lead to excessive growth in the event of interrupted fetches.
 
1143
                # 'knit' repositories may suffer excessive growth, but as a
 
1144
                # deprecated format this is tolerable. It can be fixed if
 
1145
                # needed by in the kndx index support raising on a duplicate
 
1146
                # add with identical parents and options.
 
1147
                access_memo = self._data.add_raw_records([len(bytes)], bytes)[0]
 
1148
                index_entry = (record.key[0], options, access_memo, parents)
 
1149
                buffered = False
 
1150
                if 'fulltext' not in options:
 
1151
                    basis_parent = parents[0]
 
1152
                    if not self.has_version(basis_parent):
 
1153
                        pending = buffered_index_entries.setdefault(
 
1154
                            basis_parent, [])
 
1155
                        pending.append(index_entry)
 
1156
                        buffered = True
 
1157
                if not buffered:
 
1158
                    self._index.add_versions([index_entry])
 
1159
            elif record.storage_kind == 'fulltext':
 
1160
                self.add_lines(record.key[0], parents,
 
1161
                    split_lines(record.get_bytes_as('fulltext')))
 
1162
            else:
 
1163
                adapter_key = record.storage_kind, 'fulltext'
 
1164
                adapter = get_adapter(adapter_key)
 
1165
                lines = split_lines(adapter.get_bytes(
 
1166
                    record, record.get_bytes_as(record.storage_kind)))
 
1167
                try:
 
1168
                    self.add_lines(record.key[0], parents, lines)
 
1169
                except errors.RevisionAlreadyPresent:
 
1170
                    pass
 
1171
            # Add any records whose basis parent is now available.
 
1172
            added_keys = [record.key[0]]
 
1173
            while added_keys:
 
1174
                key = added_keys.pop(0)
 
1175
                if key in buffered_index_entries:
 
1176
                    index_entries = buffered_index_entries[key]
 
1177
                    self._index.add_versions(index_entries)
 
1178
                    added_keys.extend(
 
1179
                        [index_entry[0] for index_entry in index_entries])
 
1180
                    del buffered_index_entries[key]
 
1181
        # If there were any deltas which had a missing basis parent, error.
 
1182
        if buffered_index_entries:
 
1183
            raise errors.RevisionNotPresent(buffered_index_entries.keys()[0],
 
1184
                self)
 
1185
 
889
1186
    def versions(self):
890
1187
        """See VersionedFile.versions."""
891
1188
        if 'evil' in debug.debug_flags:
1101
1398
 
1102
1399
    def check(self, progress_bar=None):
1103
1400
        """See VersionedFile.check()."""
 
1401
        # This doesn't actually test extraction of everything, but that will
 
1402
        # impact 'bzr check' substantially, and needs to be integrated with
 
1403
        # care. However, it does check for the obvious problem of a delta with
 
1404
        # no basis.
 
1405
        versions = self.versions()
 
1406
        parent_map = self.get_parent_map(versions)
 
1407
        for version in versions:
 
1408
            if self._index.get_method(version) != 'fulltext':
 
1409
                compression_parent = parent_map[version][0]
 
1410
                if compression_parent not in parent_map:
 
1411
                    raise errors.KnitCorrupt(self,
 
1412
                        "Missing basis parent %s for %s" % (
 
1413
                        compression_parent, version))
1104
1414
 
1105
1415
    def get_lines(self, version_id):
1106
1416
        """See VersionedFile.get_lines()."""
2441
2751
                              % (version_id, e.__class__.__name__, str(e)))
2442
2752
        return df, rec
2443
2753
 
2444
 
    def _check_header(self, version_id, line):
 
2754
    def _split_header(self, line):
2445
2755
        rec = line.split()
2446
2756
        if len(rec) != 4:
2447
2757
            raise KnitCorrupt(self._access,
2448
2758
                              'unexpected number of elements in record header')
 
2759
        return rec
 
2760
 
 
2761
    def _check_header_version(self, rec, version_id):
2449
2762
        if rec[1] != version_id:
2450
2763
            raise KnitCorrupt(self._access,
2451
2764
                              'unexpected version, wanted %r, got %r'
2452
2765
                              % (version_id, rec[1]))
 
2766
 
 
2767
    def _check_header(self, version_id, line):
 
2768
        rec = self._split_header(line)
 
2769
        self._check_header_version(rec, version_id)
2453
2770
        return rec
2454
2771
 
2455
 
    def _parse_record(self, version_id, data):
 
2772
    def _parse_record_unchecked(self, data):
2456
2773
        # profiling notes:
2457
2774
        # 4168 calls in 2880 217 internal
2458
2775
        # 4168 calls to _parse_record_header in 2121
2459
2776
        # 4168 calls to readlines in 330
2460
2777
        df = GzipFile(mode='rb', fileobj=StringIO(data))
2461
 
 
2462
2778
        try:
2463
2779
            record_contents = df.readlines()
2464
2780
        except Exception, e:
2465
 
            raise KnitCorrupt(self._access,
2466
 
                              "While reading {%s} got %s(%s)"
2467
 
                              % (version_id, e.__class__.__name__, str(e)))
 
2781
            raise KnitCorrupt(self._access, "Corrupt compressed record %r, got %s(%s)" %
 
2782
                (data, e.__class__.__name__, str(e)))
2468
2783
        header = record_contents.pop(0)
2469
 
        rec = self._check_header(version_id, header)
2470
 
 
 
2784
        rec = self._split_header(header)
2471
2785
        last_line = record_contents.pop()
2472
2786
        if len(record_contents) != int(rec[2]):
2473
2787
            raise KnitCorrupt(self._access,
2474
2788
                              'incorrect number of lines %s != %s'
2475
2789
                              ' for version {%s}'
2476
2790
                              % (len(record_contents), int(rec[2]),
2477
 
                                 version_id))
 
2791
                                 rec[1]))
2478
2792
        if last_line != 'end %s\n' % rec[1]:
2479
2793
            raise KnitCorrupt(self._access,
2480
2794
                              'unexpected version end line %r, wanted %r' 
2481
 
                              % (last_line, version_id))
 
2795
                              % (last_line, rec[1]))
2482
2796
        df.close()
 
2797
        return rec, record_contents
 
2798
 
 
2799
    def _parse_record(self, version_id, data):
 
2800
        rec, record_contents = self._parse_record_unchecked(data)
 
2801
        self._check_header_version(rec, version_id)
2483
2802
        return record_contents, rec[3]
2484
2803
 
2485
2804
    def read_records_iter_raw(self, records):
2487
2806
 
2488
2807
        This unpacks enough of the text record to validate the id is
2489
2808
        as expected but thats all.
 
2809
 
 
2810
        Each item the iterator yields is (version_id, bytes,
 
2811
        sha1_of_full_text).
2490
2812
        """
2491
2813
        # setup an iterator of the external records:
2492
2814
        # uses readv so nice and fast we hope.
2501
2823
            # validate the header
2502
2824
            df, rec = self._parse_record_header(version_id, data)
2503
2825
            df.close()
2504
 
            yield version_id, data
 
2826
            yield version_id, data, rec[3]
2505
2827
 
2506
2828
    def read_records_iter(self, records):
2507
2829
        """Read text records from data file and yield result.
2666
2988
            total = len(version_list)
2667
2989
            raw_datum = []
2668
2990
            raw_records = []
2669
 
            for (version_id, raw_data), \
 
2991
            for (version_id, raw_data, _), \
2670
2992
                (version_id2, options, parents) in \
2671
2993
                izip(self.source._data.read_records_iter_raw(copy_queue_records),
2672
2994
                     copy_queue):