~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/knit.py

  • Committer: John Arbash Meinel
  • Date: 2006-10-11 00:23:23 UTC
  • mfrom: (2070 +trunk)
  • mto: This revision was merged to the branch mainline in revision 2071.
  • Revision ID: john@arbash-meinel.com-20061011002323-82ba88c293d7caff
[merge] bzr.dev 2070

Show diffs side-by-side

added added

removed removed

Lines of Context:
74
74
import warnings
75
75
 
76
76
import bzrlib
77
 
import bzrlib.errors as errors
 
77
from bzrlib import (
 
78
    cache_utf8,
 
79
    errors,
 
80
    progress,
 
81
    )
78
82
from bzrlib.errors import FileExists, NoSuchFile, KnitError, \
79
83
        InvalidRevisionId, KnitCorrupt, KnitHeaderError, \
80
84
        RevisionNotPresent, RevisionAlreadyPresent
82
86
from bzrlib.trace import mutter
83
87
from bzrlib.osutils import contains_whitespace, contains_linebreaks, \
84
88
     sha_strings
85
 
from bzrlib.versionedfile import VersionedFile, InterVersionedFile
86
89
from bzrlib.symbol_versioning import DEPRECATED_PARAMETER, deprecated_passed
87
90
from bzrlib.tsort import topo_sort
88
91
import bzrlib.weave
 
92
from bzrlib.versionedfile import VersionedFile, InterVersionedFile
89
93
 
90
94
 
91
95
# TODO: Split out code specific to this format into an associated object.
162
166
        internal representation is of the format:
163
167
        (revid, plaintext)
164
168
        """
 
169
        decode_utf8 = cache_utf8.decode
165
170
        lines = []
166
171
        for line in content:
167
172
            origin, text = line.split(' ', 1)
168
 
            lines.append((origin.decode('utf-8'), text))
 
173
            lines.append((decode_utf8(origin), text))
169
174
        return KnitContent(lines)
170
175
 
171
176
    def parse_line_delta_iter(self, lines):
182
187
        internal representation is
183
188
        (start, end, count, [1..count tuples (revid, newline)])
184
189
        """
 
190
        decode_utf8 = cache_utf8.decode
185
191
        result = []
186
192
        lines = iter(lines)
187
193
        next = lines.next
193
199
            while remaining:
194
200
                origin, text = next().split(' ', 1)
195
201
                remaining -= 1
196
 
                contents.append((origin.decode('utf-8'), text))
 
202
                contents.append((decode_utf8(origin), text))
197
203
            result.append((start, end, count, contents))
198
204
        return result
199
205
 
202
208
 
203
209
        see parse_fulltext which this inverts.
204
210
        """
205
 
        return ['%s %s' % (o.encode('utf-8'), t) for o, t in content._lines]
 
211
        encode_utf8 = cache_utf8.encode
 
212
        return ['%s %s' % (encode_utf8(o), t) for o, t in content._lines]
206
213
 
207
214
    def lower_line_delta(self, delta):
208
215
        """convert a delta into a serializable form.
209
216
 
210
217
        See parse_line_delta which this inverts.
211
218
        """
 
219
        encode_utf8 = cache_utf8.encode
212
220
        out = []
213
221
        for start, end, c, lines in delta:
214
222
            out.append('%d,%d,%d\n' % (start, end, c))
215
 
            for origin, text in lines:
216
 
                out.append('%s %s' % (origin.encode('utf-8'), text))
 
223
            out.extend(encode_utf8(origin) + ' ' + text
 
224
                       for origin, text in lines)
217
225
        return out
218
226
 
219
227
 
272
280
    stored and retrieved.
273
281
    """
274
282
 
275
 
    def __init__(self, relpath, transport, file_mode=None, access_mode=None, 
 
283
    def __init__(self, relpath, transport, file_mode=None, access_mode=None,
276
284
                 factory=None, basis_knit=DEPRECATED_PARAMETER, delta=True,
277
 
                 create=False):
 
285
                 create=False, create_parent_dir=False, delay_create=False,
 
286
                 dir_mode=None):
278
287
        """Construct a knit at location specified by relpath.
279
288
        
280
289
        :param create: If not True, only open an existing knit.
 
290
        :param create_parent_dir: If True, create the parent directory if 
 
291
            creating the file fails. (This is used for stores with 
 
292
            hash-prefixes that may not exist yet)
 
293
        :param delay_create: The calling code is aware that the knit won't 
 
294
            actually be created until the first data is stored.
281
295
        """
282
296
        if deprecated_passed(basis_knit):
283
297
            warnings.warn("KnitVersionedFile.__(): The basis_knit parameter is"
294
308
        self.delta = delta
295
309
 
296
310
        self._index = _KnitIndex(transport, relpath + INDEX_SUFFIX,
297
 
            access_mode, create=create, file_mode=file_mode)
 
311
            access_mode, create=create, file_mode=file_mode,
 
312
            create_parent_dir=create_parent_dir, delay_create=delay_create,
 
313
            dir_mode=dir_mode)
298
314
        self._data = _KnitData(transport, relpath + DATA_SUFFIX,
299
 
            access_mode, create=create and not len(self), file_mode=file_mode)
 
315
            access_mode, create=create and not len(self), file_mode=file_mode,
 
316
            create_parent_dir=create_parent_dir, delay_create=delay_create,
 
317
            dir_mode=dir_mode)
300
318
 
301
319
    def __repr__(self):
302
320
        return '%s(%s)' % (self.__class__.__name__, 
374
392
        """
375
393
        # write all the data
376
394
        pos = self._data.add_raw_record(data)
 
395
        offset = 0
377
396
        index_entries = []
378
397
        for (version_id, options, parents, size) in records:
379
 
            index_entries.append((version_id, options, pos, size, parents))
380
 
            pos += size
 
398
            index_entries.append((version_id, options, pos+offset,
 
399
                                  size, parents))
 
400
            if self._data._do_cache:
 
401
                self._data._cache[version_id] = data[offset:offset+size]
 
402
            offset += size
381
403
        self._index.add_versions(index_entries)
382
404
 
 
405
    def enable_cache(self):
 
406
        """Start caching data for this knit"""
 
407
        self._data.enable_cache()
 
408
 
383
409
    def clear_cache(self):
384
410
        """Clear the data cache only."""
385
411
        self._data.clear_cache()
388
414
        """See VersionedFile.copy_to()."""
389
415
        # copy the current index to a temp index to avoid racing with local
390
416
        # writes
391
 
        transport.put(name + INDEX_SUFFIX + '.tmp', self.transport.get(self._index._filename),)
 
417
        transport.put_file_non_atomic(name + INDEX_SUFFIX + '.tmp',
 
418
                self.transport.get(self._index._filename))
392
419
        # copy the data file
393
420
        f = self._data._open_file()
394
421
        try:
395
 
            transport.put(name + DATA_SUFFIX, f)
 
422
            transport.put_file(name + DATA_SUFFIX, f)
396
423
        finally:
397
424
            f.close()
398
425
        # move the copied index into place
399
426
        transport.move(name + INDEX_SUFFIX + '.tmp', name + INDEX_SUFFIX)
400
427
 
401
428
    def create_empty(self, name, transport, mode=None):
402
 
        return KnitVersionedFile(name, transport, factory=self.factory, delta=self.delta, create=True)
 
429
        return KnitVersionedFile(name, transport, factory=self.factory,
 
430
                                 delta=self.delta, create=True)
403
431
    
404
432
    def _fix_parents(self, version, new_parents):
405
433
        """Fix the parents list for version.
695
723
        # c = component_id, m = method, p = position, s = size, n = next
696
724
        records = [(c, p, s) for c, (m, p, s, n) in position_map.iteritems()]
697
725
        record_map = {}
698
 
        for component_id, content, digest in\
699
 
            self._data.read_records_iter(records): 
 
726
        for component_id, content, digest in \
 
727
                self._data.read_records_iter(records):
700
728
            method, position, size, next = position_map[component_id]
701
729
            record_map[component_id] = method, content, digest, next
702
730
                          
771
799
            text_map[version_id] = text 
772
800
        return text_map, final_content 
773
801
 
774
 
    def iter_lines_added_or_present_in_versions(self, version_ids=None):
 
802
    def iter_lines_added_or_present_in_versions(self, version_ids=None, 
 
803
                                                pb=None):
775
804
        """See VersionedFile.iter_lines_added_or_present_in_versions()."""
776
805
        if version_ids is None:
777
806
            version_ids = self.versions()
 
807
        if pb is None:
 
808
            pb = progress.DummyProgress()
778
809
        # we don't care about inclusions, the caller cares.
779
810
        # but we need to setup a list of records to visit.
780
811
        # we need version_id, position, length
792
823
                data_pos, length = self._index.get_position(version_id)
793
824
                version_id_records.append((version_id, data_pos, length))
794
825
 
795
 
        pb = bzrlib.ui.ui_factory.nested_progress_bar()
796
826
        count = 0
797
827
        total = len(version_id_records)
798
 
        try:
 
828
        pb.update('Walking content.', count, total)
 
829
        for version_id, data, sha_value in \
 
830
            self._data.read_records_iter(version_id_records):
799
831
            pb.update('Walking content.', count, total)
800
 
            for version_id, data, sha_value in \
801
 
                self._data.read_records_iter(version_id_records):
802
 
                pb.update('Walking content.', count, total)
803
 
                method = self._index.get_method(version_id)
804
 
                version_idx = self._index.lookup(version_id)
805
 
                assert method in ('fulltext', 'line-delta')
806
 
                if method == 'fulltext':
807
 
                    content = self.factory.parse_fulltext(data, version_idx)
808
 
                    for line in content.text():
 
832
            method = self._index.get_method(version_id)
 
833
            version_idx = self._index.lookup(version_id)
 
834
            assert method in ('fulltext', 'line-delta')
 
835
            if method == 'fulltext':
 
836
                content = self.factory.parse_fulltext(data, version_idx)
 
837
                for line in content.text():
 
838
                    yield line
 
839
            else:
 
840
                delta = self.factory.parse_line_delta(data, version_idx)
 
841
                for start, end, count, lines in delta:
 
842
                    for origin, line in lines:
809
843
                        yield line
810
 
                else:
811
 
                    delta = self.factory.parse_line_delta(data, version_idx)
812
 
                    for start, end, count, lines in delta:
813
 
                        for origin, line in lines:
814
 
                            yield line
815
 
                count +=1
816
 
            pb.update('Walking content.', total, total)
817
 
            pb.finished()
818
 
        except:
819
 
            pb.update('Walking content.', total, total)
820
 
            pb.finished()
821
 
            raise
 
844
            count +=1
 
845
        pb.update('Walking content.', total, total)
822
846
        
823
847
    def num_versions(self):
824
848
        """See VersionedFile.num_versions()."""
931
955
class _KnitComponentFile(object):
932
956
    """One of the files used to implement a knit database"""
933
957
 
934
 
    def __init__(self, transport, filename, mode, file_mode=None):
 
958
    def __init__(self, transport, filename, mode, file_mode=None,
 
959
                 create_parent_dir=False, dir_mode=None):
935
960
        self._transport = transport
936
961
        self._filename = filename
937
962
        self._mode = mode
938
 
        self._file_mode=file_mode
939
 
 
940
 
    def write_header(self):
941
 
        if self._transport.append(self._filename, StringIO(self.HEADER),
942
 
            mode=self._file_mode):
943
 
            raise KnitCorrupt(self._filename, 'misaligned after writing header')
 
963
        self._file_mode = file_mode
 
964
        self._dir_mode = dir_mode
 
965
        self._create_parent_dir = create_parent_dir
 
966
        self._need_to_create = False
944
967
 
945
968
    def check_header(self, fp):
946
969
        line = fp.readline()
1033
1056
                                   parents,
1034
1057
                                   index)
1035
1058
 
1036
 
    def __init__(self, transport, filename, mode, create=False, file_mode=None):
1037
 
        _KnitComponentFile.__init__(self, transport, filename, mode, file_mode)
 
1059
    def __init__(self, transport, filename, mode, create=False, file_mode=None,
 
1060
                 create_parent_dir=False, delay_create=False, dir_mode=None):
 
1061
        _KnitComponentFile.__init__(self, transport, filename, mode,
 
1062
                                    file_mode=file_mode,
 
1063
                                    create_parent_dir=create_parent_dir,
 
1064
                                    dir_mode=dir_mode)
1038
1065
        self._cache = {}
1039
1066
        # position in _history is the 'official' index for a revision
1040
1067
        # but the values may have come from a newer entry.
1109
1136
            except NoSuchFile, e:
1110
1137
                if mode != 'w' or not create:
1111
1138
                    raise
1112
 
                self.write_header()
 
1139
                if delay_create:
 
1140
                    self._need_to_create = True
 
1141
                else:
 
1142
                    self._transport.put_bytes_non_atomic(self._filename,
 
1143
                        self.HEADER, mode=self._file_mode)
 
1144
 
1113
1145
        finally:
1114
1146
            pb.update('read knit index', total, total)
1115
1147
            pb.finished()
1198
1230
        return self._cache[version_id][5]
1199
1231
 
1200
1232
    def _version_list_to_index(self, versions):
 
1233
        encode_utf8 = cache_utf8.encode
1201
1234
        result_list = []
1202
1235
        for version in versions:
1203
1236
            if version in self._cache:
1205
1238
                result_list.append(str(self._cache[version][5]))
1206
1239
                # -- end lookup () --
1207
1240
            else:
1208
 
                result_list.append('.' + version.encode('utf-8'))
 
1241
                result_list.append('.' + encode_utf8(version))
1209
1242
        return ' '.join(result_list)
1210
1243
 
1211
1244
    def add_version(self, version_id, options, pos, size, parents):
1219
1252
                         (version_id, options, pos, size, parents).
1220
1253
        """
1221
1254
        lines = []
 
1255
        encode_utf8 = cache_utf8.encode
1222
1256
        for version_id, options, pos, size, parents in versions:
1223
 
            line = "\n%s %s %s %s %s :" % (version_id.encode('utf-8'),
 
1257
            line = "\n%s %s %s %s %s :" % (encode_utf8(version_id),
1224
1258
                                           ','.join(options),
1225
1259
                                           pos,
1226
1260
                                           size,
1228
1262
            assert isinstance(line, str), \
1229
1263
                'content must be utf-8 encoded: %r' % (line,)
1230
1264
            lines.append(line)
1231
 
        self._transport.append(self._filename, StringIO(''.join(lines)))
 
1265
        if not self._need_to_create:
 
1266
            self._transport.append_bytes(self._filename, ''.join(lines))
 
1267
        else:
 
1268
            sio = StringIO()
 
1269
            sio.write(self.HEADER)
 
1270
            sio.writelines(lines)
 
1271
            sio.seek(0)
 
1272
            self._transport.put_file_non_atomic(self._filename, sio,
 
1273
                                create_parent_dir=self._create_parent_dir,
 
1274
                                mode=self._file_mode,
 
1275
                                dir_mode=self._dir_mode)
 
1276
            self._need_to_create = False
 
1277
 
1232
1278
        # cache after writing, so that a failed write leads to missing cache
1233
1279
        # entries not extra ones. XXX TODO: RBC 20060502 in the event of a 
1234
1280
        # failure, reload the index or flush it or some such, to prevent
1238
1284
        
1239
1285
    def has_version(self, version_id):
1240
1286
        """True if the version is in the index."""
1241
 
        return self._cache.has_key(version_id)
 
1287
        return (version_id in self._cache)
1242
1288
 
1243
1289
    def get_position(self, version_id):
1244
1290
        """Return data position and size of specified version."""
1279
1325
class _KnitData(_KnitComponentFile):
1280
1326
    """Contents of the knit data file"""
1281
1327
 
1282
 
    HEADER = "# bzr knit data 8\n"
1283
 
 
1284
 
    def __init__(self, transport, filename, mode, create=False, file_mode=None):
1285
 
        _KnitComponentFile.__init__(self, transport, filename, mode)
 
1328
    def __init__(self, transport, filename, mode, create=False, file_mode=None,
 
1329
                 create_parent_dir=False, delay_create=False,
 
1330
                 dir_mode=None):
 
1331
        _KnitComponentFile.__init__(self, transport, filename, mode,
 
1332
                                    file_mode=file_mode,
 
1333
                                    create_parent_dir=create_parent_dir,
 
1334
                                    dir_mode=dir_mode)
1286
1335
        self._checked = False
 
1336
        # TODO: jam 20060713 conceptually, this could spill to disk
 
1337
        #       if the cached size gets larger than a certain amount
 
1338
        #       but it complicates the model a bit, so for now just use
 
1339
        #       a simple dictionary
 
1340
        self._cache = {}
 
1341
        self._do_cache = False
1287
1342
        if create:
1288
 
            self._transport.put(self._filename, StringIO(''), mode=file_mode)
 
1343
            if delay_create:
 
1344
                self._need_to_create = create
 
1345
            else:
 
1346
                self._transport.put_bytes_non_atomic(self._filename, '',
 
1347
                                                     mode=self._file_mode)
 
1348
 
 
1349
    def enable_cache(self):
 
1350
        """Enable caching of reads."""
 
1351
        self._do_cache = True
1289
1352
 
1290
1353
    def clear_cache(self):
1291
1354
        """Clear the record cache."""
1292
 
        pass
 
1355
        self._do_cache = False
 
1356
        self._cache = {}
1293
1357
 
1294
1358
    def _open_file(self):
1295
1359
        try:
1305
1369
        """
1306
1370
        sio = StringIO()
1307
1371
        data_file = GzipFile(None, mode='wb', fileobj=sio)
 
1372
 
 
1373
        version_id_utf8 = cache_utf8.encode(version_id)
1308
1374
        data_file.writelines(chain(
1309
 
            ["version %s %d %s\n" % (version_id.encode('utf-8'), 
 
1375
            ["version %s %d %s\n" % (version_id_utf8,
1310
1376
                                     len(lines),
1311
1377
                                     digest)],
1312
1378
            lines,
1313
 
            ["end %s\n" % version_id.encode('utf-8')]))
 
1379
            ["end %s\n" % version_id_utf8]))
1314
1380
        data_file.close()
1315
1381
        length= sio.tell()
1316
1382
 
1323
1389
        :return: the offset in the data file raw_data was written.
1324
1390
        """
1325
1391
        assert isinstance(raw_data, str), 'data must be plain bytes'
1326
 
        return self._transport.append(self._filename, StringIO(raw_data))
 
1392
        if not self._need_to_create:
 
1393
            return self._transport.append_bytes(self._filename, raw_data)
 
1394
        else:
 
1395
            self._transport.put_bytes_non_atomic(self._filename, raw_data,
 
1396
                                   create_parent_dir=self._create_parent_dir,
 
1397
                                   mode=self._file_mode,
 
1398
                                   dir_mode=self._dir_mode)
 
1399
            self._need_to_create = False
 
1400
            return 0
1327
1401
        
1328
1402
    def add_record(self, version_id, digest, lines):
1329
1403
        """Write new text record to disk.  Returns the position in the
1330
1404
        file where it was written."""
1331
1405
        size, sio = self._record_to_data(version_id, digest, lines)
1332
1406
        # write to disk
1333
 
        start_pos = self._transport.append(self._filename, sio)
 
1407
        if not self._need_to_create:
 
1408
            start_pos = self._transport.append_file(self._filename, sio)
 
1409
        else:
 
1410
            self._transport.put_file_non_atomic(self._filename, sio,
 
1411
                               create_parent_dir=self._create_parent_dir,
 
1412
                               mode=self._file_mode,
 
1413
                               dir_mode=self._dir_mode)
 
1414
            self._need_to_create = False
 
1415
            start_pos = 0
 
1416
        if self._do_cache:
 
1417
            self._cache[version_id] = sio.getvalue()
1334
1418
        return start_pos, size
1335
1419
 
1336
1420
    def _parse_record_header(self, version_id, raw_data):
1343
1427
        rec = df.readline().split()
1344
1428
        if len(rec) != 4:
1345
1429
            raise KnitCorrupt(self._filename, 'unexpected number of elements in record header')
1346
 
        if rec[1].decode('utf-8')!= version_id:
 
1430
        if cache_utf8.decode(rec[1]) != version_id:
1347
1431
            raise KnitCorrupt(self._filename, 
1348
1432
                              'unexpected version, wanted %r, got %r' % (
1349
1433
                                version_id, rec[1]))
1358
1442
        record_contents = df.readlines()
1359
1443
        l = record_contents.pop()
1360
1444
        assert len(record_contents) == int(rec[2])
1361
 
        if l.decode('utf-8') != 'end %s\n' % version_id:
 
1445
        if l != 'end %s\n' % cache_utf8.encode(version_id):
1362
1446
            raise KnitCorrupt(self._filename, 'unexpected version end line %r, wanted %r' 
1363
1447
                        % (l, version_id))
1364
1448
        df.close()
1369
1453
 
1370
1454
        This unpacks enough of the text record to validate the id is
1371
1455
        as expected but thats all.
1372
 
 
1373
 
        It will actively recompress currently cached records on the
1374
 
        basis that that is cheaper than I/O activity.
1375
1456
        """
1376
1457
        # setup an iterator of the external records:
1377
1458
        # uses readv so nice and fast we hope.
1378
1459
        if len(records):
1379
1460
            # grab the disk data needed.
1380
 
            raw_records = self._transport.readv(self._filename,
1381
 
                [(pos, size) for version_id, pos, size in records])
 
1461
            if self._cache:
 
1462
                # Don't check _cache if it is empty
 
1463
                needed_offsets = [(pos, size) for version_id, pos, size
 
1464
                                              in records
 
1465
                                              if version_id not in self._cache]
 
1466
            else:
 
1467
                needed_offsets = [(pos, size) for version_id, pos, size
 
1468
                                               in records]
 
1469
 
 
1470
            raw_records = self._transport.readv(self._filename, needed_offsets)
 
1471
                
1382
1472
 
1383
1473
        for version_id, pos, size in records:
1384
 
            pos, data = raw_records.next()
1385
 
            # validate the header
1386
 
            df, rec = self._parse_record_header(version_id, data)
1387
 
            df.close()
 
1474
            if version_id in self._cache:
 
1475
                # This data has already been validated
 
1476
                data = self._cache[version_id]
 
1477
            else:
 
1478
                pos, data = raw_records.next()
 
1479
                if self._do_cache:
 
1480
                    self._cache[version_id] = data
 
1481
 
 
1482
                # validate the header
 
1483
                df, rec = self._parse_record_header(version_id, data)
 
1484
                df.close()
1388
1485
            yield version_id, data
1389
1486
 
1390
1487
    def read_records_iter(self, records):
1391
1488
        """Read text records from data file and yield result.
1392
1489
 
1393
 
        Each passed record is a tuple of (version_id, pos, len) and
1394
 
        will be read in the given order.  Yields (version_id,
1395
 
        contents, digest).
 
1490
        The result will be returned in whatever is the fastest to read.
 
1491
        Not by the order requested. Also, multiple requests for the same
 
1492
        record will only yield 1 response.
 
1493
        :param records: A list of (version_id, pos, len) entries
 
1494
        :return: Yields (version_id, contents, digest) in the order
 
1495
                 read, not the order requested
1396
1496
        """
1397
 
        if len(records) == 0:
1398
 
            return
1399
 
        # profiling notes:
1400
 
        # 60890  calls for 4168 extractions in 5045, 683 internal.
1401
 
        # 4168   calls to readv              in 1411
1402
 
        # 4168   calls to parse_record       in 2880
1403
 
 
1404
 
        # Get unique records, sorted by position
1405
 
        needed_records = sorted(set(records), key=operator.itemgetter(1))
1406
 
 
1407
 
        # We take it that the transport optimizes the fetching as good
1408
 
        # as possible (ie, reads continuous ranges.)
1409
 
        response = self._transport.readv(self._filename,
 
1497
        if not records:
 
1498
            return
 
1499
 
 
1500
        if self._cache:
 
1501
            # Skip records we have alread seen
 
1502
            yielded_records = set()
 
1503
            needed_records = set()
 
1504
            for record in records:
 
1505
                if record[0] in self._cache:
 
1506
                    if record[0] in yielded_records:
 
1507
                        continue
 
1508
                    yielded_records.add(record[0])
 
1509
                    data = self._cache[record[0]]
 
1510
                    content, digest = self._parse_record(record[0], data)
 
1511
                    yield (record[0], content, digest)
 
1512
                else:
 
1513
                    needed_records.add(record)
 
1514
            needed_records = sorted(needed_records, key=operator.itemgetter(1))
 
1515
        else:
 
1516
            needed_records = sorted(set(records), key=operator.itemgetter(1))
 
1517
 
 
1518
        if not needed_records:
 
1519
            return
 
1520
 
 
1521
        # The transport optimizes the fetching as well 
 
1522
        # (ie, reads continuous ranges.)
 
1523
        readv_response = self._transport.readv(self._filename,
1410
1524
            [(pos, size) for version_id, pos, size in needed_records])
1411
1525
 
1412
 
        record_map = {}
1413
 
        for (record_id, pos, size), (pos, data) in \
1414
 
            izip(iter(needed_records), response):
1415
 
            content, digest = self._parse_record(record_id, data)
1416
 
            record_map[record_id] = (digest, content)
1417
 
 
1418
 
        for version_id, pos, size in records:
1419
 
            digest, content = record_map[version_id]
 
1526
        for (version_id, pos, size), (pos, data) in \
 
1527
                izip(iter(needed_records), readv_response):
 
1528
            content, digest = self._parse_record(version_id, data)
 
1529
            if self._do_cache:
 
1530
                self._cache[version_id] = data
1420
1531
            yield version_id, content, digest
1421
1532
 
1422
1533
    def read_records(self, records):
1423
1534
        """Read records into a dictionary."""
1424
1535
        components = {}
1425
 
        for record_id, content, digest in self.read_records_iter(records):
 
1536
        for record_id, content, digest in \
 
1537
                self.read_records_iter(records):
1426
1538
            components[record_id] = (content, digest)
1427
1539
        return components
1428
1540