~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/knit.py

  • Committer: Robert Collins
  • Date: 2009-02-17 06:21:52 UTC
  • mto: (4022.1.4 fetch.RemoteSink)
  • mto: This revision was merged to the branch mainline in revision 4026.
  • Revision ID: robertc@robertcollins.net-20090217062152-8j02ev4tjhaidj54
Support delta_closure=True with NetworkRecordStream to transmit deltas over the wire when full text extraction is required on the far end.

Show diffs side-by-side

added added

removed removed

Lines of Context:
314
314
            self.storage_kind)
315
315
 
316
316
 
 
317
class LazyKnitContentFactory(ContentFactory):
 
318
    """A ContentFactory which can either generate full text or a wire form.
 
319
 
 
320
    :seealso ContentFactory:
 
321
    """
 
322
 
 
323
    def __init__(self, key, parents, generator, first):
 
324
        """Create a LazyKnitContentFactory.
 
325
        
 
326
        :param key: The key of the record.
 
327
        :param parents: The parents of the record.
 
328
        :param generator: A _ContentMapGenerator containing the record for this
 
329
            key.
 
330
        :param first: Is this the first content object returned from generator?
 
331
            if it is, its storage kind is knit-delta-closure, otherwise it is
 
332
            knit-delta-closure-ref
 
333
        """
 
334
        self.key = key
 
335
        self.parents = parents
 
336
        self.sha1 = None
 
337
        self._generator = generator
 
338
        self.storage_kind = "knit-delta-closure"
 
339
        if not first:
 
340
            self.storage_kind = self.storage_kind + "-ref"
 
341
        self._first = first
 
342
 
 
343
    def get_bytes_as(self, storage_kind):
 
344
        if storage_kind == self.storage_kind:
 
345
            if self._first:
 
346
                return self._generator._wire_bytes()
 
347
            else:
 
348
                # all the keys etc are contained in the bytes returned in the
 
349
                # first record.
 
350
                return ''
 
351
        if storage_kind in ('chunked', 'fulltext'):
 
352
            chunks = self._generator._get_one_work(self.key).text()
 
353
            if storage_kind == 'chunked':
 
354
                return chunks
 
355
            else:
 
356
                return ''.join(chunks)
 
357
        raise errors.UnavailableRepresentation(self.key, storage_kind,
 
358
            self.storage_kind)
 
359
 
 
360
 
 
361
def knit_delta_closure_to_records(storage_kind, bytes, line_end):
 
362
    """Convert a network record to a iterator over stream records.
 
363
 
 
364
    :param storage_kind: The storage kind of the record.
 
365
        Must be 'knit-delta-closure'.
 
366
    :param bytes: The bytes of the record on the network.
 
367
    """
 
368
    generator = _NetworkContentMapGenerator(bytes, line_end)
 
369
    return generator.get_record_stream()
 
370
 
 
371
 
317
372
def knit_network_to_record(storage_kind, bytes, line_end):
318
373
    """Convert a network record to a record object.
319
374
 
342
397
    start = start + 1
343
398
    raw_record = bytes[start:]
344
399
    annotated = 'annotated' in storage_kind
345
 
    return KnitContentFactory(key, parents, build_details, None, raw_record,
346
 
        annotated, network_bytes=bytes)
 
400
    return [KnitContentFactory(key, parents, build_details, None, raw_record,
 
401
        annotated, network_bytes=bytes)]
347
402
 
348
403
 
349
404
class KnitContent(object):
1045
1100
            if not self.get_parent_map([key]):
1046
1101
                raise RevisionNotPresent(key, self)
1047
1102
            return cached_version
1048
 
        text_map, contents_map = self._get_content_maps([key])
1049
 
        return contents_map[key]
1050
 
 
1051
 
    def _get_content_maps(self, keys, nonlocal_keys=None):
1052
 
        """Produce maps of text and KnitContents
1053
 
        
1054
 
        :param keys: The keys to produce content maps for.
1055
 
        :param nonlocal_keys: An iterable of keys(possibly intersecting keys)
1056
 
            which are known to not be in this knit, but rather in one of the
1057
 
            fallback knits.
1058
 
        :return: (text_map, content_map) where text_map contains the texts for
1059
 
            the requested versions and content_map contains the KnitContents.
1060
 
        """
1061
 
        # FUTURE: This function could be improved for the 'extract many' case
1062
 
        # by tracking each component and only doing the copy when the number of
1063
 
        # children than need to apply delta's to it is > 1 or it is part of the
1064
 
        # final output.
1065
 
        keys = list(keys)
1066
 
        multiple_versions = len(keys) != 1
1067
 
        record_map = self._get_record_map(keys, allow_missing=True)
1068
 
 
1069
 
        text_map = {}
1070
 
        content_map = {}
1071
 
        final_content = {}
1072
 
        if nonlocal_keys is None:
1073
 
            nonlocal_keys = set()
1074
 
        else:
1075
 
            nonlocal_keys = frozenset(nonlocal_keys)
1076
 
        missing_keys = set(nonlocal_keys)
1077
 
        for source in self._fallback_vfs:
1078
 
            if not missing_keys:
1079
 
                break
1080
 
            for record in source.get_record_stream(missing_keys,
1081
 
                'unordered', True):
1082
 
                if record.storage_kind == 'absent':
1083
 
                    continue
1084
 
                missing_keys.remove(record.key)
1085
 
                lines = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
1086
 
                text_map[record.key] = lines
1087
 
                content_map[record.key] = PlainKnitContent(lines, record.key)
1088
 
                if record.key in keys:
1089
 
                    final_content[record.key] = content_map[record.key]
1090
 
        for key in keys:
1091
 
            if key in nonlocal_keys:
1092
 
                # already handled
1093
 
                continue
1094
 
            components = []
1095
 
            cursor = key
1096
 
            while cursor is not None:
1097
 
                try:
1098
 
                    record, record_details, digest, next = record_map[cursor]
1099
 
                except KeyError:
1100
 
                    raise RevisionNotPresent(cursor, self)
1101
 
                components.append((cursor, record, record_details, digest))
1102
 
                cursor = next
1103
 
                if cursor in content_map:
1104
 
                    # no need to plan further back
1105
 
                    components.append((cursor, None, None, None))
1106
 
                    break
1107
 
 
1108
 
            content = None
1109
 
            for (component_id, record, record_details,
1110
 
                 digest) in reversed(components):
1111
 
                if component_id in content_map:
1112
 
                    content = content_map[component_id]
1113
 
                else:
1114
 
                    content, delta = self._factory.parse_record(key[-1],
1115
 
                        record, record_details, content,
1116
 
                        copy_base_content=multiple_versions)
1117
 
                    if multiple_versions:
1118
 
                        content_map[component_id] = content
1119
 
 
1120
 
            final_content[key] = content
1121
 
 
1122
 
            # digest here is the digest from the last applied component.
1123
 
            text = content.text()
1124
 
            actual_sha = sha_strings(text)
1125
 
            if actual_sha != digest:
1126
 
                raise SHA1KnitCorrupt(self, actual_sha, digest, key, text)
1127
 
            text_map[key] = text
1128
 
        return text_map, final_content
 
1103
        generator = _VFContentMapGenerator(self, [key])
 
1104
        return generator._get_content(key)
1129
1105
 
1130
1106
    def get_parent_map(self, keys):
1131
1107
        """Get a map of the graph parents of keys.
1164
1140
        
1165
1141
        :return: {key:(record, record_details, digest, next)}
1166
1142
            record
1167
 
                data returned from read_records
 
1143
                data returned from read_records (a KnitContentobject)
1168
1144
            record_details
1169
1145
                opaque information to pass to parse_record
1170
1146
            digest
1176
1152
        :param allow_missing: If some records are missing, rather than 
1177
1153
            error, just return the data that could be generated.
1178
1154
        """
 
1155
        raw_map = self._get_record_map_unparsed(keys,
 
1156
            allow_missing=allow_missing)
 
1157
        return self._raw_map_to_record_map(raw_map)
 
1158
 
 
1159
    def _raw_map_to_record_map(self, raw_map):
 
1160
        """Parse the contents of _get_record_map_unparsed.
 
1161
        
 
1162
        :return: see _get_record_map.
 
1163
        """
 
1164
        result = {}
 
1165
        for key in raw_map:
 
1166
            data, record_details, next = raw_map[key]
 
1167
            content, digest = self._parse_record(key[-1], data)
 
1168
            result[key] = content, record_details, digest, next
 
1169
        return result
 
1170
 
 
1171
    def _get_record_map_unparsed(self, keys, allow_missing=False):
 
1172
        """Get the raw data for reconstructing keys without parsing it.
 
1173
        
 
1174
        :return: A dict suitable for parsing via _raw_map_to_record_map.
 
1175
            key-> raw_bytes, (method, noeol), compression_parent
 
1176
        """
1179
1177
        # This retries the whole request if anything fails. Potentially we
1180
1178
        # could be a bit more selective. We could track the keys whose records
1181
1179
        # we have successfully found, and then only request the new records
1182
1180
        # from there. However, _get_components_positions grabs the whole build
1183
1181
        # chain, which means we'll likely try to grab the same records again
1184
 
        # anyway. Also, can the build chains change as part of a pack
 
1182
        # anyway. Also, can the build chains change as art of a pack
1185
1183
        # operation? We wouldn't want to end up with a broken chain.
1186
1184
        while True:
1187
1185
            try:
1191
1189
                # n = next
1192
1190
                records = [(key, i_m) for key, (r, i_m, n)
1193
1191
                                       in position_map.iteritems()]
1194
 
                record_map = {}
1195
 
                for key, record, digest in self._read_records_iter(records):
 
1192
                raw_record_map = {}
 
1193
                for key, data in self._read_records_iter_unchecked(records):
1196
1194
                    (record_details, index_memo, next) = position_map[key]
1197
 
                    record_map[key] = record, record_details, digest, next
1198
 
                return record_map
 
1195
                    raw_record_map[key] = data, record_details, next
 
1196
                return raw_record_map
1199
1197
            except errors.RetryWithNewPacks, e:
1200
1198
                self._access.reload_or_raise(e)
1201
1199
 
1265
1263
        absent_keys = keys.difference(set(positions))
1266
1264
        # There may be more absent keys : if we're missing the basis component
1267
1265
        # and are trying to include the delta closure.
 
1266
        # XXX: We should not ever need to examine remote sources because we do
 
1267
        # not permit deltas across versioned files boundaries.
1268
1268
        if include_delta_closure:
1269
1269
            needed_from_fallback = set()
1270
1270
            # Build up reconstructable_keys dict.  key:True in this dict means
1347
1347
            for prefix, keys in prefix_split_keys.iteritems():
1348
1348
                non_local = prefix_split_non_local_keys.get(prefix, [])
1349
1349
                non_local = set(non_local)
1350
 
                text_map, _ = self._get_content_maps(keys, non_local)
1351
 
                for key in keys:
1352
 
                    lines = text_map.pop(key)
1353
 
                    yield ChunkedContentFactory(key, global_map[key], None,
1354
 
                                                lines)
 
1350
                generator = _VFContentMapGenerator(self, keys, non_local,
 
1351
                    global_map)
 
1352
                for record in generator.get_record_stream():
 
1353
                    yield record
1355
1354
        else:
1356
1355
            for source, keys in source_keys:
1357
1356
                if source is parent_maps[0]:
1759
1758
        This unpacks enough of the text record to validate the id is
1760
1759
        as expected but thats all.
1761
1760
 
1762
 
        Each item the iterator yields is (key, bytes, sha1_of_full_text).
 
1761
        Each item the iterator yields is (key, bytes,
 
1762
            expected_sha1_of_full_text).
 
1763
        """
 
1764
        for key, data in self._read_records_iter_unchecked(records):
 
1765
            # validate the header (note that we can only use the suffix in
 
1766
            # current knit records).
 
1767
            df, rec = self._parse_record_header(key, data)
 
1768
            df.close()
 
1769
            yield key, data, rec[3]
 
1770
 
 
1771
    def _read_records_iter_unchecked(self, records):
 
1772
        """Read text records from data file and yield raw data.
 
1773
 
 
1774
        No validation is done.
 
1775
 
 
1776
        Yields tuples of (key, data).
1763
1777
        """
1764
1778
        # setup an iterator of the external records:
1765
1779
        # uses readv so nice and fast we hope.
1771
1785
 
1772
1786
        for key, index_memo in records:
1773
1787
            data = raw_records.next()
1774
 
            # validate the header (note that we can only use the suffix in
1775
 
            # current knit records).
1776
 
            df, rec = self._parse_record_header(key, data)
1777
 
            df.close()
1778
 
            yield key, data, rec[3]
 
1788
            yield key, data
1779
1789
 
1780
1790
    def _record_to_data(self, key, digest, lines, dense_lines=None):
1781
1791
        """Convert key, digest, lines into a raw data block.
1825
1835
        return result
1826
1836
 
1827
1837
 
 
1838
class _ContentMapGenerator(object):
 
1839
    """Generate texts or expose raw deltas for a set of texts."""
 
1840
 
 
1841
    def _get_content(self, key):
 
1842
        """Get the content object for key."""
 
1843
        if key in self.nonlocal_keys:
 
1844
            record = self.get_record_stream().next()
 
1845
            # Create a content object on the fly
 
1846
            lines = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
 
1847
            return PlainKnitContent(lines, record.key)
 
1848
        else:
 
1849
            # local keys we can ask for directly
 
1850
            return self._get_one_work(key)
 
1851
 
 
1852
    def get_record_stream(self):
 
1853
        """Get a record stream for the keys requested during __init__."""
 
1854
        for record in self._work():
 
1855
            yield record
 
1856
 
 
1857
    def _work(self):
 
1858
        """Produce maps of text and KnitContents as dicts.
 
1859
        
 
1860
        :return: (text_map, content_map) where text_map contains the texts for
 
1861
            the requested versions and content_map contains the KnitContents.
 
1862
        """
 
1863
        # NB: By definition we never need to read remote sources unless texts
 
1864
        # are requested from them: we don't delta across stores - and we
 
1865
        # explicitly do not want to to prevent data loss situations.
 
1866
        if self.global_map is None:
 
1867
            self.global_map = self.vf.get_parent_map(self.keys)
 
1868
        nonlocal_keys = self.nonlocal_keys
 
1869
 
 
1870
        missing_keys = set(nonlocal_keys)
 
1871
        # Read from remote versioned file instances and provide to our caller.
 
1872
        for source in self.vf._fallback_vfs:
 
1873
            if not missing_keys:
 
1874
                break
 
1875
            # Loop over fallback repositories asking them for texts - ignore
 
1876
            # any missing from a particular fallback.
 
1877
            for record in source.get_record_stream(missing_keys,
 
1878
                'unordered', True):
 
1879
                if record.storage_kind == 'absent':
 
1880
                    # Not in thie particular stream, may be in one of the
 
1881
                    # other fallback vfs objects.
 
1882
                    continue
 
1883
                missing_keys.remove(record.key)
 
1884
                yield record
 
1885
 
 
1886
        self._raw_record_map = self.vf._get_record_map_unparsed(self.keys,
 
1887
            allow_missing=True)
 
1888
        first = True
 
1889
        for key in self.keys:
 
1890
            if key in self.nonlocal_keys:
 
1891
                continue
 
1892
            yield LazyKnitContentFactory(key, self.global_map[key], self, first)
 
1893
            first = False
 
1894
 
 
1895
    def _get_one_work(self, requested_key):
 
1896
        # Now, if we have calculated everything already, just return the
 
1897
        # desired text.
 
1898
        if requested_key in self._contents_map:
 
1899
            return self._contents_map[requested_key]
 
1900
        # To simply things, parse everything at once - code that wants one text
 
1901
        # probably wants them all.
 
1902
        # FUTURE: This function could be improved for the 'extract many' case
 
1903
        # by tracking each component and only doing the copy when the number of
 
1904
        # children than need to apply delta's to it is > 1 or it is part of the
 
1905
        # final output.
 
1906
        multiple_versions = len(self.keys) != 1
 
1907
        if self._record_map is None:
 
1908
            self._record_map = self.vf._raw_map_to_record_map(
 
1909
                self._raw_record_map)
 
1910
        record_map = self._record_map
 
1911
        # raw_record_map is key:
 
1912
        # Have read and parsed records at this point. 
 
1913
        for key in self.keys:
 
1914
            if key in self.nonlocal_keys:
 
1915
                # already handled
 
1916
                continue
 
1917
            components = []
 
1918
            cursor = key
 
1919
            while cursor is not None:
 
1920
                try:
 
1921
                    record, record_details, digest, next = record_map[cursor]
 
1922
                except KeyError:
 
1923
                    raise RevisionNotPresent(cursor, self)
 
1924
                components.append((cursor, record, record_details, digest))
 
1925
                cursor = next
 
1926
                if cursor in self._contents_map:
 
1927
                    # no need to plan further back
 
1928
                    components.append((cursor, None, None, None))
 
1929
                    break
 
1930
 
 
1931
            content = None
 
1932
            for (component_id, record, record_details,
 
1933
                 digest) in reversed(components):
 
1934
                if component_id in self._contents_map:
 
1935
                    content = self._contents_map[component_id]
 
1936
                else:
 
1937
                    content, delta = self._factory.parse_record(key[-1],
 
1938
                        record, record_details, content,
 
1939
                        copy_base_content=multiple_versions)
 
1940
                    if multiple_versions:
 
1941
                        self._contents_map[component_id] = content
 
1942
 
 
1943
            # digest here is the digest from the last applied component.
 
1944
            text = content.text()
 
1945
            actual_sha = sha_strings(text)
 
1946
            if actual_sha != digest:
 
1947
                raise SHA1KnitCorrupt(self, actual_sha, digest, key, text)
 
1948
        if multiple_versions:
 
1949
            return self._contents_map[requested_key]
 
1950
        else:
 
1951
            return content
 
1952
 
 
1953
    def _wire_bytes(self):
 
1954
        """Get the bytes to put on the wire for 'key'.
 
1955
 
 
1956
        The first collection of bytes asked for returns the serialised
 
1957
        raw_record_map and the additional details (key, parent) for key.
 
1958
        Subsequent calls return just the additional details (key, parent).
 
1959
        The wire storage_kind given for the first key is 'knit-delta-closure',
 
1960
        For subsequent keys it is 'knit-delta-closure-ref'.
 
1961
 
 
1962
        :param key: A key from the content generator.
 
1963
        :return: Bytes to put on the wire.
 
1964
        """
 
1965
        lines = []
 
1966
        # kind marker for dispatch on the far side,
 
1967
        lines.append('knit-delta-closure')
 
1968
        # Annotated or not
 
1969
        if self.vf._factory.annotated:
 
1970
            lines.append('annotated')
 
1971
        else:
 
1972
            lines.append('')
 
1973
        # then the list of keys
 
1974
        lines.append('\t'.join(['\x00'.join(key) for key in self.keys
 
1975
            if key not in self.nonlocal_keys]))
 
1976
        # then the _raw_record_map in serialised form:
 
1977
        map_byte_list = []
 
1978
        # for each item in the map:
 
1979
        # 1 line with key
 
1980
        # 1 line with parents if the key is to be yielded (None: for None, '' for ())
 
1981
        # one line with method
 
1982
        # one line with noeol
 
1983
        # one line with next ('' for None)
 
1984
        # one line with byte count of the record bytes
 
1985
        # the record bytes
 
1986
        for key, (record_bytes, (method, noeol), next) in \
 
1987
            self._raw_record_map.iteritems():
 
1988
            key_bytes = '\x00'.join(key)
 
1989
            parents = self.global_map.get(key, None)
 
1990
            if parents is None:
 
1991
                parent_bytes = 'None:'
 
1992
            else:
 
1993
                parent_bytes = '\t'.join('\x00'.join(key) for key in parents)
 
1994
            method_bytes = method
 
1995
            if noeol:
 
1996
                noeol_bytes = "T"
 
1997
            else:
 
1998
                noeol_bytes = "F"
 
1999
            if next:
 
2000
                next_bytes = '\x00'.join(next)
 
2001
            else:
 
2002
                next_bytes = ''
 
2003
            map_byte_list.append('%s\n%s\n%s\n%s\n%s\n%d\n%s' % (
 
2004
                key_bytes, parent_bytes, method_bytes, noeol_bytes, next_bytes,
 
2005
                len(record_bytes), record_bytes))
 
2006
        map_bytes = ''.join(map_byte_list)
 
2007
        lines.append(map_bytes)
 
2008
        bytes = '\n'.join(lines)
 
2009
        return bytes
 
2010
 
 
2011
 
 
2012
class _VFContentMapGenerator(_ContentMapGenerator):
 
2013
    """Content map generator reading from a VersionedFiles object."""
 
2014
 
 
2015
    def __init__(self, versioned_files, keys, nonlocal_keys=None,
 
2016
        global_map=None, raw_record_map=None):
 
2017
        """Create a _ContentMapGenerator.
 
2018
        
 
2019
        :param versioned_files: The versioned files that the texts are being
 
2020
            extracted from.
 
2021
        :param keys: The keys to produce content maps for.
 
2022
        :param nonlocal_keys: An iterable of keys(possibly intersecting keys)
 
2023
            which are known to not be in this knit, but rather in one of the
 
2024
            fallback knits.
 
2025
        :param global_map: The result of get_parent_map(keys) (or a supermap).
 
2026
            This is required if get_record_stream() is to be used.
 
2027
        :param raw_record_map: A unparsed raw record map to use for answering
 
2028
            contents.
 
2029
        """
 
2030
        # The vf to source data from
 
2031
        self.vf = versioned_files
 
2032
        # The keys desired
 
2033
        self.keys = list(keys)
 
2034
        # Keys known to be in fallback vfs objects
 
2035
        if nonlocal_keys is None:
 
2036
            self.nonlocal_keys = set()
 
2037
        else:
 
2038
            self.nonlocal_keys = frozenset(nonlocal_keys)
 
2039
        # Parents data for keys to be returned in get_record_stream
 
2040
        self.global_map = global_map
 
2041
        # The chunked lists for self.keys in text form
 
2042
        self._text_map = {}
 
2043
        # A cache of KnitContent objects used in extracting texts.
 
2044
        self._contents_map = {}
 
2045
        # All the knit records needed to assemble the requested keys as full
 
2046
        # texts.
 
2047
        self._record_map = None
 
2048
        if raw_record_map is None:
 
2049
            self._raw_record_map = self.vf._get_record_map_unparsed(keys,
 
2050
                allow_missing=True)
 
2051
        else:
 
2052
            self._raw_record_map = raw_record_map
 
2053
        # the factory for parsing records
 
2054
        self._factory = self.vf._factory
 
2055
 
 
2056
 
 
2057
class _NetworkContentMapGenerator(_ContentMapGenerator):
 
2058
    """Content map generator sourced from a network stream."""
 
2059
 
 
2060
    def __init__(self, bytes, line_end):
 
2061
        """Construct a _NetworkContentMapGenerator from a bytes block."""
 
2062
        self._bytes = bytes
 
2063
        self.global_map = {}
 
2064
        self._raw_record_map = {}
 
2065
        self._contents_map = {}
 
2066
        self._record_map = None
 
2067
        self.nonlocal_keys = []
 
2068
        # Get access to record parsing facilities
 
2069
        self.vf = KnitVersionedFiles(None, None)
 
2070
        start = line_end
 
2071
        # Annotated or not
 
2072
        line_end = bytes.find('\n', start)
 
2073
        line = bytes[start:line_end]
 
2074
        start = line_end + 1
 
2075
        if line == 'annotated':
 
2076
            self._factory = KnitAnnotateFactory()
 
2077
        else:
 
2078
            self._factory = KnitPlainFactory()
 
2079
        # list of keys to emit in get_record_stream
 
2080
        line_end = bytes.find('\n', start)
 
2081
        line = bytes[start:line_end]
 
2082
        start = line_end + 1
 
2083
        self.keys = [
 
2084
            tuple(segment.split('\x00')) for segment in line.split('\t')
 
2085
            if segment]
 
2086
        # now a loop until the end. XXX: It would be nice if this was just a
 
2087
        # bunch of the same records as get_record_stream(..., False) gives, but
 
2088
        # there is a decent sized gap stopping that at the moment.
 
2089
        end = len(bytes)
 
2090
        while start < end:
 
2091
            # 1 line with key
 
2092
            line_end = bytes.find('\n', start)
 
2093
            key = tuple(bytes[start:line_end].split('\x00'))
 
2094
            start = line_end + 1
 
2095
            # 1 line with parents (None: for None, '' for ())
 
2096
            line_end = bytes.find('\n', start)
 
2097
            line = bytes[start:line_end]
 
2098
            if line == 'None:':
 
2099
                parents = None
 
2100
            else:
 
2101
                parents = tuple(
 
2102
                    [tuple(segment.split('\x00')) for segment in line.split('\t')
 
2103
                     if segment])
 
2104
            self.global_map[key] = parents
 
2105
            start = line_end + 1
 
2106
            # one line with method
 
2107
            line_end = bytes.find('\n', start)
 
2108
            line = bytes[start:line_end]
 
2109
            method = line
 
2110
            start = line_end + 1
 
2111
            # one line with noeol
 
2112
            line_end = bytes.find('\n', start)
 
2113
            line = bytes[start:line_end]
 
2114
            noeol = line == "T"
 
2115
            start = line_end + 1
 
2116
            # one line with next ('' for None)
 
2117
            line_end = bytes.find('\n', start)
 
2118
            line = bytes[start:line_end]
 
2119
            if not line:
 
2120
                next = None
 
2121
            else:
 
2122
                next = tuple(bytes[start:line_end].split('\x00'))
 
2123
            start = line_end + 1
 
2124
            # one line with byte count of the record bytes
 
2125
            line_end = bytes.find('\n', start)
 
2126
            line = bytes[start:line_end]
 
2127
            count = int(line)
 
2128
            start = line_end + 1
 
2129
            # the record bytes
 
2130
            record_bytes = bytes[start:start+count]
 
2131
            start = start + count
 
2132
            # put it in the map
 
2133
            self._raw_record_map[key] = (record_bytes, (method, noeol), next)
 
2134
 
 
2135
    def get_record_stream(self):
 
2136
        """Get a record stream for for keys requested by the bytestream."""
 
2137
        first = True
 
2138
        for key in self.keys:
 
2139
            yield LazyKnitContentFactory(key, self.global_map[key], self, first)
 
2140
            first = False
 
2141
 
 
2142
    def _wire_bytes(self):
 
2143
        return self._bytes
 
2144
 
 
2145
 
1828
2146
class _KndxIndex(object):
1829
2147
    """Manages knit index files
1830
2148