~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/groupcompress.py

Change _LazyGroupContentManager.get_record_stream() to invalidate the nodes.
This means that records are genuinely only valid until you go to the next one.
This breaks the reference cycle with the manager, and drops peak memory by a decent amount.
(as otherwise we have to wait until gc.collect() runs).
Also, change get_bytes_as() so that it caches the bytes (though this cache is also removed
when we go to the next object).
This is mostly useful for CHK pages, which get processed at several layers. The
get_record_stream() code uses it to determine what to send next, and the
insert_record_stream() needs the raw bytes to put into the target repo.
This saves an extract() on every chk page during 'pack'. apply_delta is rather fast,
but there is no reason to waste calls.

Show diffs side-by-side

added added

removed removed

Lines of Context:
339
339
        :param sha1: TODO (should we validate only when sha1 is supplied?)
340
340
        :return: The bytes for the content
341
341
        """
342
 
        # Handle the 'Empty Content' record, even if we don't always write it
343
 
        # yet.
344
342
        if start == end == 0:
345
343
            return ''
346
344
        self._ensure_content(end)
477
475
        #       get_bytes_as call? After Manager.get_record_stream() returns
478
476
        #       the object?
479
477
        self._manager = manager
 
478
        self._bytes = None
480
479
        self.storage_kind = 'groupcompress-block'
481
480
        if not first:
482
481
            self.storage_kind = 'groupcompress-block-ref'
496
495
            else:
497
496
                return ''
498
497
        if storage_kind in ('fulltext', 'chunked'):
499
 
            self._manager._prepare_for_extract()
500
 
            block = self._manager._block
501
 
            bytes = block.extract(self.key, self._start, self._end)
 
498
            if self._bytes is None:
 
499
                # Grab and cache the raw bytes for this entry
 
500
                # and break the ref-cycle with _manager since we don't need it
 
501
                # anymore
 
502
                self._manager._prepare_for_extract()
 
503
                block = self._manager._block
 
504
                self._bytes = block.extract(self.key, self._start, self._end)
 
505
                self._manager = None
502
506
            if storage_kind == 'fulltext':
503
 
                return bytes
 
507
                return self._bytes
504
508
            else:
505
 
                return [bytes]
 
509
                return [self._bytes]
506
510
        raise errors.UnavailableRepresentation(self.key, storage_kind,
507
 
            self.storage_kind)
 
511
                                               self.storage_kind)
508
512
 
509
513
 
510
514
class _LazyGroupContentManager(object):
531
535
        """Get a record for all keys added so far."""
532
536
        for factory in self._factories:
533
537
            yield factory
 
538
            # Break the ref-cycle
 
539
            factory._bytes = None
 
540
            factory._manager = None
534
541
        # TODO: Consider setting self._factories = None after the above loop,
535
542
        #       as it will break the reference cycle
536
543
 
1281
1288
        for key in missing:
1282
1289
            yield AbsentContentFactory(key)
1283
1290
        manager = None
 
1291
        last_read_memo = None
1284
1292
        # TODO: This works fairly well at batching up existing groups into a
1285
1293
        #       streamable format, and possibly allowing for taking one big
1286
1294
        #       group and splitting it when it isn't fully utilized.
1295
1303
                for key in keys:
1296
1304
                    if key in self._unadded_refs:
1297
1305
                        if manager is not None:
1298
 
                            # Yield everything buffered so far
1299
1306
                            for factory in manager.get_record_stream():
1300
1307
                                yield factory
1301
 
                            manager = None
 
1308
                            last_read_memo = manager = None
1302
1309
                        bytes, sha1 = self._compressor.extract(key)
1303
1310
                        parents = self._unadded_refs[key]
1304
1311
                        yield FulltextContentFactory(key, parents, sha1, bytes)
1305
1312
                    else:
1306
1313
                        index_memo, _, parents, (method, _) = locations[key]
1307
 
                        block = self._get_block(index_memo)
 
1314
                        read_memo = index_memo[0:3]
 
1315
                        if last_read_memo != read_memo:
 
1316
                            # We are starting a new block. If we have a
 
1317
                            # manager, we have found everything that fits for
 
1318
                            # now, so yield records
 
1319
                            if manager is not None:
 
1320
                                for factory in manager.get_record_stream():
 
1321
                                    yield factory
 
1322
                            # Now start a new manager
 
1323
                            block = self._get_block(index_memo)
 
1324
                            manager = _LazyGroupContentManager(block)
 
1325
                            last_read_memo = read_memo
1308
1326
                        start, end = index_memo[3:5]
1309
 
                        if manager is None:
1310
 
                            manager = _LazyGroupContentManager(block)
1311
 
                        elif manager._block is not block:
1312
 
                            # Flush and create a new manager
1313
 
                            for factory in manager.get_record_stream():
1314
 
                                yield factory
1315
 
                            manager = _LazyGroupContentManager(block)
1316
1327
                        manager.add_factory(key, parents, start, end)
1317
1328
            else:
1318
1329
                if manager is not None:
1319
 
                    # Yield everything buffered so far
1320
1330
                    for factory in manager.get_record_stream():
1321
1331
                        yield factory
1322
 
                    manager = None
 
1332
                    last_read_memo = manager = None
1323
1333
                for record in source.get_record_stream(keys, ordering,
1324
1334
                                                       include_delta_closure):
1325
1335
                    yield record
1326
1336
        if manager is not None:
1327
 
            # Yield everything buffered so far
1328
1337
            for factory in manager.get_record_stream():
1329
1338
                yield factory
1330
 
            manager = None
1331
1339
 
1332
1340
    def get_sha1s(self, keys):
1333
1341
        """See VersionedFiles.get_sha1s()."""