~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/groupcompress.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-08-27 02:27:19 UTC
  • mfrom: (4634.3.19 gc-batching)
  • Revision ID: pqm@pqm.ubuntu.com-20090827022719-bl2yoqhpj3fcfczu
(andrew) Fix #402657: 2a fetch over dumb transport reads one group at
        a time.

Show diffs side-by-side

added added

removed removed

Lines of Context:
33
33
    pack,
34
34
    trace,
35
35
    )
36
 
from bzrlib.graph import Graph
37
36
from bzrlib.btree_index import BTreeBuilder
38
37
from bzrlib.lru_cache import LRUSizeCache
39
38
from bzrlib.tsort import topo_sort
45
44
    VersionedFiles,
46
45
    )
47
46
 
 
47
# Minimum number of uncompressed bytes to try fetch at once when retrieving
 
48
# groupcompress blocks.
 
49
BATCH_SIZE = 2**16
 
50
 
48
51
_USE_LZMA = False and (pylzma is not None)
49
52
 
50
53
# osutils.sha_string('')
51
54
_null_sha1 = 'da39a3ee5e6b4b0d3255bfef95601890afd80709'
52
55
 
53
 
 
54
56
def sort_gc_optimal(parent_map):
55
57
    """Sort and group the keys in parent_map into groupcompress order.
56
58
 
974
976
    versioned_files.stream.close()
975
977
 
976
978
 
 
979
class _BatchingBlockFetcher(object):
 
980
    """Fetch group compress blocks in batches.
 
981
    
 
982
    :ivar total_bytes: int of expected number of bytes needed to fetch the
 
983
        currently pending batch.
 
984
    """
 
985
 
 
986
    def __init__(self, gcvf, locations):
 
987
        self.gcvf = gcvf
 
988
        self.locations = locations
 
989
        self.keys = []
 
990
        self.batch_memos = {}
 
991
        self.memos_to_get = []
 
992
        self.total_bytes = 0
 
993
        self.last_read_memo = None
 
994
        self.manager = None
 
995
 
 
996
    def add_key(self, key):
 
997
        """Add another to key to fetch.
 
998
        
 
999
        :return: The estimated number of bytes needed to fetch the batch so
 
1000
            far.
 
1001
        """
 
1002
        self.keys.append(key)
 
1003
        index_memo, _, _, _ = self.locations[key]
 
1004
        read_memo = index_memo[0:3]
 
1005
        # Three possibilities for this read_memo:
 
1006
        #  - it's already part of this batch; or
 
1007
        #  - it's not yet part of this batch, but is already cached; or
 
1008
        #  - it's not yet part of this batch and will need to be fetched.
 
1009
        if read_memo in self.batch_memos:
 
1010
            # This read memo is already in this batch.
 
1011
            return self.total_bytes
 
1012
        try:
 
1013
            cached_block = self.gcvf._group_cache[read_memo]
 
1014
        except KeyError:
 
1015
            # This read memo is new to this batch, and the data isn't cached
 
1016
            # either.
 
1017
            self.batch_memos[read_memo] = None
 
1018
            self.memos_to_get.append(read_memo)
 
1019
            byte_length = read_memo[2]
 
1020
            self.total_bytes += byte_length
 
1021
        else:
 
1022
            # This read memo is new to this batch, but cached.
 
1023
            # Keep a reference to the cached block in batch_memos because it's
 
1024
            # certain that we'll use it when this batch is processed, but
 
1025
            # there's a risk that it would fall out of _group_cache between now
 
1026
            # and then.
 
1027
            self.batch_memos[read_memo] = cached_block
 
1028
        return self.total_bytes
 
1029
        
 
1030
    def _flush_manager(self):
 
1031
        if self.manager is not None:
 
1032
            for factory in self.manager.get_record_stream():
 
1033
                yield factory
 
1034
            self.manager = None
 
1035
            self.last_read_memo = None
 
1036
 
 
1037
    def yield_factories(self, full_flush=False):
 
1038
        """Yield factories for keys added since the last yield.  They will be
 
1039
        returned in the order they were added via add_key.
 
1040
        
 
1041
        :param full_flush: by default, some results may not be returned in case
 
1042
            they can be part of the next batch.  If full_flush is True, then
 
1043
            all results are returned.
 
1044
        """
 
1045
        if self.manager is None and not self.keys:
 
1046
            return
 
1047
        # Fetch all memos in this batch.
 
1048
        blocks = self.gcvf._get_blocks(self.memos_to_get)
 
1049
        # Turn blocks into factories and yield them.
 
1050
        memos_to_get_stack = list(self.memos_to_get)
 
1051
        memos_to_get_stack.reverse()
 
1052
        for key in self.keys:
 
1053
            index_memo, _, parents, _ = self.locations[key]
 
1054
            read_memo = index_memo[:3]
 
1055
            if self.last_read_memo != read_memo:
 
1056
                # We are starting a new block. If we have a
 
1057
                # manager, we have found everything that fits for
 
1058
                # now, so yield records
 
1059
                for factory in self._flush_manager():
 
1060
                    yield factory
 
1061
                # Now start a new manager.
 
1062
                if memos_to_get_stack and memos_to_get_stack[-1] == read_memo:
 
1063
                    # The next block from _get_blocks will be the block we
 
1064
                    # need.
 
1065
                    block_read_memo, block = blocks.next()
 
1066
                    if block_read_memo != read_memo:
 
1067
                        raise AssertionError(
 
1068
                            "block_read_memo out of sync with read_memo"
 
1069
                            "(%r != %r)" % (block_read_memo, read_memo))
 
1070
                    self.batch_memos[read_memo] = block
 
1071
                    memos_to_get_stack.pop()
 
1072
                else:
 
1073
                    block = self.batch_memos[read_memo]
 
1074
                self.manager = _LazyGroupContentManager(block)
 
1075
                self.last_read_memo = read_memo
 
1076
            start, end = index_memo[3:5]
 
1077
            self.manager.add_factory(key, parents, start, end)
 
1078
        if full_flush:
 
1079
            for factory in self._flush_manager():
 
1080
                yield factory
 
1081
        del self.keys[:]
 
1082
        self.batch_memos.clear()
 
1083
        del self.memos_to_get[:]
 
1084
        self.total_bytes = 0
 
1085
 
 
1086
 
977
1087
class GroupCompressVersionedFiles(VersionedFiles):
978
1088
    """A group-compress based VersionedFiles implementation."""
979
1089
 
1137
1247
            missing.difference_update(set(new_result))
1138
1248
        return result, source_results
1139
1249
 
1140
 
    def _get_block(self, index_memo):
1141
 
        read_memo = index_memo[0:3]
1142
 
        # get the group:
1143
 
        try:
1144
 
            block = self._group_cache[read_memo]
1145
 
        except KeyError:
1146
 
            # read the group
1147
 
            zdata = self._access.get_raw_records([read_memo]).next()
1148
 
            # decompress - whole thing - this is not a bug, as it
1149
 
            # permits caching. We might want to store the partially
1150
 
            # decompresed group and decompress object, so that recent
1151
 
            # texts are not penalised by big groups.
1152
 
            block = GroupCompressBlock.from_bytes(zdata)
1153
 
            self._group_cache[read_memo] = block
1154
 
        # cheapo debugging:
1155
 
        # print len(zdata), len(plain)
1156
 
        # parse - requires split_lines, better to have byte offsets
1157
 
        # here (but not by much - we only split the region for the
1158
 
        # recipe, and we often want to end up with lines anyway.
1159
 
        return block
 
1250
    def _get_blocks(self, read_memos):
 
1251
        """Get GroupCompressBlocks for the given read_memos.
 
1252
 
 
1253
        :returns: a series of (read_memo, block) pairs, in the order they were
 
1254
            originally passed.
 
1255
        """
 
1256
        cached = {}
 
1257
        for read_memo in read_memos:
 
1258
            try:
 
1259
                block = self._group_cache[read_memo]
 
1260
            except KeyError:
 
1261
                pass
 
1262
            else:
 
1263
                cached[read_memo] = block
 
1264
        not_cached = []
 
1265
        not_cached_seen = set()
 
1266
        for read_memo in read_memos:
 
1267
            if read_memo in cached:
 
1268
                # Don't fetch what we already have
 
1269
                continue
 
1270
            if read_memo in not_cached_seen:
 
1271
                # Don't try to fetch the same data twice
 
1272
                continue
 
1273
            not_cached.append(read_memo)
 
1274
            not_cached_seen.add(read_memo)
 
1275
        raw_records = self._access.get_raw_records(not_cached)
 
1276
        for read_memo in read_memos:
 
1277
            try:
 
1278
                yield read_memo, cached[read_memo]
 
1279
            except KeyError:
 
1280
                # Read the block, and cache it.
 
1281
                zdata = raw_records.next()
 
1282
                block = GroupCompressBlock.from_bytes(zdata)
 
1283
                self._group_cache[read_memo] = block
 
1284
                cached[read_memo] = block
 
1285
                yield read_memo, block
1160
1286
 
1161
1287
    def get_missing_compression_parent_keys(self):
1162
1288
        """Return the keys of missing compression parents.
1328
1454
                unadded_keys, source_result)
1329
1455
        for key in missing:
1330
1456
            yield AbsentContentFactory(key)
1331
 
        manager = None
1332
 
        last_read_memo = None
1333
 
        # TODO: This works fairly well at batching up existing groups into a
1334
 
        #       streamable format, and possibly allowing for taking one big
1335
 
        #       group and splitting it when it isn't fully utilized.
1336
 
        #       However, it doesn't allow us to find under-utilized groups and
1337
 
        #       combine them into a bigger group on the fly.
1338
 
        #       (Consider the issue with how chk_map inserts texts
1339
 
        #       one-at-a-time.) This could be done at insert_record_stream()
1340
 
        #       time, but it probably would decrease the number of
1341
 
        #       bytes-on-the-wire for fetch.
 
1457
        # Batch up as many keys as we can until either:
 
1458
        #  - we encounter an unadded ref, or
 
1459
        #  - we run out of keys, or
 
1460
        #  - the total bytes to retrieve for this batch > BATCH_SIZE
 
1461
        batcher = _BatchingBlockFetcher(self, locations)
1342
1462
        for source, keys in source_keys:
1343
1463
            if source is self:
1344
1464
                for key in keys:
1345
1465
                    if key in self._unadded_refs:
1346
 
                        if manager is not None:
1347
 
                            for factory in manager.get_record_stream():
1348
 
                                yield factory
1349
 
                            last_read_memo = manager = None
 
1466
                        # Flush batch, then yield unadded ref from
 
1467
                        # self._compressor.
 
1468
                        for factory in batcher.yield_factories(full_flush=True):
 
1469
                            yield factory
1350
1470
                        bytes, sha1 = self._compressor.extract(key)
1351
1471
                        parents = self._unadded_refs[key]
1352
1472
                        yield FulltextContentFactory(key, parents, sha1, bytes)
1353
 
                    else:
1354
 
                        index_memo, _, parents, (method, _) = locations[key]
1355
 
                        read_memo = index_memo[0:3]
1356
 
                        if last_read_memo != read_memo:
1357
 
                            # We are starting a new block. If we have a
1358
 
                            # manager, we have found everything that fits for
1359
 
                            # now, so yield records
1360
 
                            if manager is not None:
1361
 
                                for factory in manager.get_record_stream():
1362
 
                                    yield factory
1363
 
                            # Now start a new manager
1364
 
                            block = self._get_block(index_memo)
1365
 
                            manager = _LazyGroupContentManager(block)
1366
 
                            last_read_memo = read_memo
1367
 
                        start, end = index_memo[3:5]
1368
 
                        manager.add_factory(key, parents, start, end)
 
1473
                        continue
 
1474
                    if batcher.add_key(key) > BATCH_SIZE:
 
1475
                        # Ok, this batch is big enough.  Yield some results.
 
1476
                        for factory in batcher.yield_factories():
 
1477
                            yield factory
1369
1478
            else:
1370
 
                if manager is not None:
1371
 
                    for factory in manager.get_record_stream():
1372
 
                        yield factory
1373
 
                    last_read_memo = manager = None
 
1479
                for factory in batcher.yield_factories(full_flush=True):
 
1480
                    yield factory
1374
1481
                for record in source.get_record_stream(keys, ordering,
1375
1482
                                                       include_delta_closure):
1376
1483
                    yield record
1377
 
        if manager is not None:
1378
 
            for factory in manager.get_record_stream():
1379
 
                yield factory
 
1484
        for factory in batcher.yield_factories(full_flush=True):
 
1485
            yield factory
1380
1486
 
1381
1487
    def get_sha1s(self, keys):
1382
1488
        """See VersionedFiles.get_sha1s()."""