~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/groupcompress.py

  • Committer: Andrew Bennetts
  • Date: 2009-08-24 09:53:11 UTC
  • mto: This revision was merged to the branch mainline in revision 4657.
  • Revision ID: andrew.bennetts@canonical.com-20090824095311-8t7dgl20b7w5d9ep
Fix bug, add docstrings, improve clarity.

Show diffs side-by-side

added added

removed removed

Lines of Context:
974
974
    versioned_files.stream.close()
975
975
 
976
976
 
977
 
class _Batcher(object):
 
977
class _BatchingBlockFetcher(object):
 
978
    """Fetch group compress blocks in batches.
 
979
    
 
980
    :ivar total_bytes: int of expected number of bytes needed to fetch the
 
981
        currently pending batch.
 
982
    """
978
983
 
979
984
    def __init__(self, gcvf, locations):
980
985
        self.gcvf = gcvf
985
990
        self.manager = None
986
991
 
987
992
    def add_key(self, key):
 
993
        """Add another to key to fetch."""
988
994
        self.keys.append(key)
989
995
        index_memo, _, _, _ = self.locations[key]
990
 
        start, end = index_memo[3:5]
991
 
        # XXX: if this key has already been fetched in another group, it
992
 
        # shouldn't count towards this total.
993
 
        self.total_bytes += end - start
 
996
        read_memo = index_memo[0:3]
 
997
        if read_memo not in self.gcvf._group_cache:
 
998
            start, end = index_memo[3:5]
 
999
            self.total_bytes += end - start
994
1000
        
995
 
    def _frob_batch(self, last_read_memo, manager, full_flush=False):
996
 
        blocks_to_get = []
997
 
        last_read_memo_tmp = last_read_memo
 
1001
    def empty_manager(self):
 
1002
        if self.manager is not None:
 
1003
            for factory in self.manager.get_record_stream():
 
1004
                yield factory
 
1005
            self.manager = None
 
1006
 
 
1007
    def yield_factories(self, full_flush=False):
 
1008
        if self.manager is None and not self.keys:
 
1009
            return
 
1010
        memos_to_get = []
 
1011
        keys_to_get = []
 
1012
        last_read_memo = self.last_read_memo
998
1013
        for key in self.keys:
999
 
            index_memo, _, parents, (method, _) = self.locations[key]
1000
 
            read_memo = index_memo[0:3]
 
1014
            index_memo = self.locations[key][0]
 
1015
            read_memo = index_memo[:3]
1001
1016
            if last_read_memo != read_memo:
1002
 
                blocks_to_get.append((key, read_memo))
 
1017
                memos_to_get.append(read_memo)
 
1018
                keys_to_get.append(key)
1003
1019
                last_read_memo = read_memo
1004
 
        last_read_memo = last_read_memo_tmp
1005
 
        blocks = self.gcvf._get_blocks(
1006
 
            [read_memo for _, read_memo in blocks_to_get])
 
1020
        blocks = self.gcvf._get_blocks(memos_to_get)
1007
1021
        block_map = {}
1008
 
        for block, (key, _) in zip(blocks, blocks_to_get):
 
1022
        for block, key in zip(blocks, keys_to_get):
1009
1023
            block_map[key] = block
 
1024
        last_read_memo = self.last_read_memo
1010
1025
        for key in self.keys:
1011
 
            index_memo, _, parents, (method, _) = self.locations[key]
1012
 
            read_memo = index_memo[0:3]
 
1026
            index_memo, _, parents, _ = self.locations[key]
 
1027
            read_memo = index_memo[:3]
1013
1028
            if last_read_memo != read_memo:
1014
1029
                # We are starting a new block. If we have a
1015
1030
                # manager, we have found everything that fits for
1016
1031
                # now, so yield records
1017
 
                if manager is not None:
1018
 
                    for factory in manager.get_record_stream():
1019
 
                        yield factory
 
1032
                for factory in self.empty_manager():
 
1033
                    yield factory
1020
1034
                # Now start a new manager
1021
1035
                block = block_map[key]
1022
 
                manager = _LazyGroupContentManager(block)
 
1036
                self.manager = _LazyGroupContentManager(block)
1023
1037
                last_read_memo = read_memo
1024
1038
            start, end = index_memo[3:5]
1025
 
            manager.add_factory(key, parents, start, end)
 
1039
            self.manager.add_factory(key, parents, start, end)
1026
1040
        if full_flush:
1027
 
            if manager is not None:
1028
 
                for factory in manager.get_record_stream():
1029
 
                    yield factory
1030
 
                last_read_memo = manager = None
 
1041
            for factory in self.empty_manager():
 
1042
                yield factory
 
1043
            last_read_memo = None
1031
1044
        self.last_read_memo = last_read_memo
1032
 
        self.manager = manager
1033
1045
        del self.keys[:]
1034
1046
        self.total_bytes = 0
1035
1047
 
1207
1219
                pass
1208
1220
            else:
1209
1221
                cached[read_memo] = block
1210
 
        not_cached = [
1211
 
            read_memo for read_memo in read_memos if read_memo not in cached]
 
1222
        not_cached = []
 
1223
        not_cached_seen = set()
 
1224
        for read_memo in read_memos:
 
1225
            if read_memo in cached:
 
1226
                # Don't fetch what we already have
 
1227
                continue
 
1228
            if read_memo in not_cached_seen:
 
1229
                # Don't try to fetch the same data twice
 
1230
                continue
 
1231
            not_cached.append(read_memo)
 
1232
            not_cached_seen.add(read_memo)
1212
1233
        raw_records = self._access.get_raw_records(not_cached)
1213
1234
        for read_memo in read_memos:
1214
1235
            try:
1222
1243
                # texts are not penalised by big groups.
1223
1244
                block = GroupCompressBlock.from_bytes(zdata)
1224
1245
                self._group_cache[read_memo] = block
 
1246
                cached[read_memo] = block
1225
1247
                yield block
1226
1248
 
1227
1249
    def _get_block(self, index_memo):
1415
1437
                unadded_keys, source_result)
1416
1438
        for key in missing:
1417
1439
            yield AbsentContentFactory(key)
1418
 
        manager = None
1419
 
        last_read_memo = None
1420
 
        # TODO: This works fairly well at batching up existing groups into a
1421
 
        #       streamable format, and possibly allowing for taking one big
1422
 
        #       group and splitting it when it isn't fully utilized.
1423
 
        #       However, it doesn't allow us to find under-utilized groups and
1424
 
        #       combine them into a bigger group on the fly.
1425
 
        #       (Consider the issue with how chk_map inserts texts
1426
 
        #       one-at-a-time.) This could be done at insert_record_stream()
1427
 
        #       time, but it probably would decrease the number of
1428
 
        #       bytes-on-the-wire for fetch.
 
1440
        # Batch up as many keys as we can until either:
 
1441
        #  - we encounter an unadded ref, or
 
1442
        #  - we run out of keys, or
 
1443
        #  - the total bytes to retrieve for this batch > 64k
 
1444
        batcher = _BatchingBlockFetcher(self, locations)
1429
1445
        for source, keys in source_keys:
1430
1446
            if source is self:
1431
 
                # Batch up as many keys as we can until either:
1432
 
                #  - we encounter an unadded ref, or
1433
 
                #  - we run out of keys, or
1434
 
                #  - the total bytes to retrieve for this batch > 64k
1435
 
                batch_bytes_total = 0
1436
 
                batcher = _Batcher(self, locations)
1437
1447
                for key in keys:
1438
1448
                    if key in self._unadded_refs:
1439
1449
                        # flush batch, then yield unadded ref from
1440
1450
                        # self._compressor
1441
 
                        for _ in batcher._frob_batch(last_read_memo, manager,
1442
 
                                full_flush=True):
 
1451
                        for _ in batcher.yield_factories(full_flush=True):
1443
1452
                            yield _
1444
 
                        last_read_memo = batcher.last_read_memo
1445
 
                        manager = batcher.manager
1446
1453
                        bytes, sha1 = self._compressor.extract(key)
1447
1454
                        parents = self._unadded_refs[key]
1448
1455
                        yield FulltextContentFactory(key, parents, sha1, bytes)
1450
1457
                    batcher.add_key(key)
1451
1458
                    if batcher.total_bytes > 2**16:
1452
1459
                        # Ok!  Our batch is full.  Let's do it.
1453
 
                        for _ in batcher._frob_batch(last_read_memo, manager):
 
1460
                        for _ in batcher.yield_factories():
1454
1461
                            yield _
1455
 
                        last_read_memo = batcher.last_read_memo
1456
 
                        manager = batcher.manager
1457
 
                if batcher.keys:
1458
 
                    for _ in batcher._frob_batch(last_read_memo, manager,
1459
 
                            full_flush=True):
1460
 
                        yield _
1461
 
                    last_read_memo = batcher.last_read_memo
1462
 
                    manager = batcher.manager
1463
1462
            else:
1464
 
                if manager is not None:
1465
 
                    for factory in manager.get_record_stream():
1466
 
                        yield factory
1467
 
                    last_read_memo = manager = None
 
1463
                for _ in batcher.yield_factories(full_flush=True):
 
1464
                    yield _
 
1465
                batcher.last_read_memo = None
1468
1466
                for record in source.get_record_stream(keys, ordering,
1469
1467
                                                       include_delta_closure):
1470
1468
                    yield record
1471
 
        if manager is not None:
1472
 
            for factory in manager.get_record_stream():
1473
 
                yield factory
 
1469
        for _ in batcher.yield_factories(full_flush=True):
 
1470
            yield _
1474
1471
 
1475
1472
    def get_sha1s(self, keys):
1476
1473
        """See VersionedFiles.get_sha1s()."""