~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/groupcompress.py

  • Committer: Andrew Bennetts
  • Date: 2009-08-26 03:58:00 UTC
  • mto: This revision was merged to the branch mainline in revision 4657.
  • Revision ID: andrew.bennetts@canonical.com-20090826035800-fg2xunjvzbw6ng8x
Some changes prompted by John's review.

Show diffs side-by-side

added added

removed removed

Lines of Context:
984
984
        self.gcvf = gcvf
985
985
        self.locations = locations
986
986
        self.keys = []
 
987
        self.batch_memos = {}
 
988
        self.memos_to_get = []
987
989
        self.total_bytes = 0
988
990
        self.last_read_memo = None
989
991
        self.manager = None
990
992
 
991
993
    def add_key(self, key):
992
 
        """Add another to key to fetch."""
 
994
        """Add another to key to fetch.
 
995
        
 
996
        :return: The estimated number of bytes needed to fetch the batch so
 
997
            far.
 
998
        """
993
999
        self.keys.append(key)
994
1000
        index_memo, _, _, _ = self.locations[key]
995
1001
        read_memo = index_memo[0:3]
996
 
        # This looks a bit dangerous, but it's ok: we're assuming that memos in
997
 
        # _group_cache now will still be there when yield_factories is called
998
 
        # (and that uncached memos don't become cached).  This ought to be
999
 
        # true.  But if it isn't that's ok, yield_factories will still work.
1000
 
        # The only negative effect is that the estimated 'total_bytes' value
1001
 
        # here will be wrong, so we might fetch bigger/smaller batches than
1002
 
        # intended.
1003
 
        if read_memo not in self.gcvf._group_cache:
 
1002
        # Three possibilities for this read_memo:
 
1003
        #  - it's already part of this batch; or
 
1004
        #  - it's not yet part of this batch, but is already cached; or
 
1005
        #  - it's not yet part of this batch and will need to be fetched.
 
1006
        if read_memo in self.batch_memos:
 
1007
            # This read memo is already in this batch.
 
1008
            return
 
1009
        try:
 
1010
            cached_block = self.gcvf._group_cache[read_memo]
 
1011
        except KeyError:
 
1012
            # This read memo is new to this batch, and the data isn't cached
 
1013
            # either.
 
1014
            self.batch_memos[read_memo] = None
 
1015
            self.memos_to_get.append(read_memo)
1004
1016
            byte_length = read_memo[2]
1005
1017
            self.total_bytes += byte_length
 
1018
        else:
 
1019
            # This read memo is new to this batch, but cached.
 
1020
            # Keep a reference to the cached block in batch_memos because it's
 
1021
            # certain that we'll use it when this batch is processed, but
 
1022
            # there's a risk that it would fall out of _group_cache between now
 
1023
            # and then.
 
1024
            self.batch_memos[read_memo] = cached_block
 
1025
        return self.total_bytes
1006
1026
        
1007
1027
    def _flush_manager(self):
1008
1028
        if self.manager is not None:
1021
1041
        """
1022
1042
        if self.manager is None and not self.keys:
1023
1043
            return
1024
 
        # First, determine the list of memos to get.
1025
 
        memos_to_get = []
1026
 
        last_read_memo = self.last_read_memo
1027
 
        for key in self.keys:
1028
 
            index_memo = self.locations[key][0]
1029
 
            read_memo = index_memo[:3]
1030
 
            if last_read_memo != read_memo:
1031
 
                memos_to_get.append(read_memo)
1032
 
                last_read_memo = read_memo
1033
 
        # Second, we fetch all those memos in one batch.
1034
 
        blocks = self.gcvf._get_blocks(memos_to_get)
1035
 
        # Finally, we turn blocks into factories and yield them.
 
1044
        # Fetch all memos in this batch.
 
1045
        blocks = self.gcvf._get_blocks(self.memos_to_get)
 
1046
        # Turn blocks into factories and yield them.
 
1047
        memos_to_get_stack = list(self.memos_to_get)
 
1048
        memos_to_get_stack.reverse()
1036
1049
        for key in self.keys:
1037
1050
            index_memo, _, parents, _ = self.locations[key]
1038
1051
            read_memo = index_memo[:3]
1042
1055
                # now, so yield records
1043
1056
                for factory in self._flush_manager():
1044
1057
                    yield factory
1045
 
                # Now start a new manager.  The next block from _get_blocks
1046
 
                # will be the block we need.
1047
 
                block = blocks.next()
 
1058
                # Now start a new manager.
 
1059
                if memos_to_get_stack and memos_to_get_stack[-1] == read_memo:
 
1060
                    # The next block from _get_blocks will be the block we
 
1061
                    # need.
 
1062
                    block_read_memo, block = blocks.next()
 
1063
                    if block_read_memo != read_memo:
 
1064
                        raise AssertionError(
 
1065
                            "block_read_memo out of sync with read_memo")
 
1066
                    self.batch_memos[read_memo] = block
 
1067
                    memos_to_get_stack.pop()
 
1068
                else:
 
1069
                    block = self.batch_memos[read_memo]
1048
1070
                self.manager = _LazyGroupContentManager(block)
1049
1071
                self.last_read_memo = read_memo
1050
1072
            start, end = index_memo[3:5]
1053
1075
            for factory in self._flush_manager():
1054
1076
                yield factory
1055
1077
        del self.keys[:]
 
1078
        self.batch_memos.clear()
 
1079
        del self.memos_to_get[:]
1056
1080
        self.total_bytes = 0
1057
1081
 
1058
1082
 
 
1083
 
1059
1084
class GroupCompressVersionedFiles(VersionedFiles):
1060
1085
    """A group-compress based VersionedFiles implementation."""
1061
1086
 
1222
1247
    def _get_blocks(self, read_memos):
1223
1248
        """Get GroupCompressBlocks for the given read_memos.
1224
1249
 
1225
 
        Blocks are returned in the order specified in read_memos.
 
1250
        :returns: a series of (read_memo, block) pairs, in the order they were
 
1251
            originally passed.
1226
1252
        """
1227
1253
        cached = {}
1228
1254
        for read_memo in read_memos:
1246
1272
        raw_records = self._access.get_raw_records(not_cached)
1247
1273
        for read_memo in read_memos:
1248
1274
            try:
1249
 
                yield cached[read_memo]
 
1275
                yield read_memos, cached[read_memo]
1250
1276
            except KeyError:
1251
1277
                # read the group
1252
1278
                zdata = raw_records.next()
1257
1283
                block = GroupCompressBlock.from_bytes(zdata)
1258
1284
                self._group_cache[read_memo] = block
1259
1285
                cached[read_memo] = block
1260
 
                yield block
 
1286
                yield read_memo, block
1261
1287
 
1262
1288
    def get_missing_compression_parent_keys(self):
1263
1289
        """Return the keys of missing compression parents.
1441
1467
                    if key in self._unadded_refs:
1442
1468
                        # Flush batch, then yield unadded ref from
1443
1469
                        # self._compressor.
1444
 
                        for _ in batcher.yield_factories(full_flush=True):
1445
 
                            yield _
 
1470
                        for factory in batcher.yield_factories(full_flush=True):
 
1471
                            yield factory
1446
1472
                        bytes, sha1 = self._compressor.extract(key)
1447
1473
                        parents = self._unadded_refs[key]
1448
1474
                        yield FulltextContentFactory(key, parents, sha1, bytes)
1449
1475
                        continue
1450
 
                    batcher.add_key(key)
1451
 
                    if batcher.total_bytes > BATCH_SIZE:
 
1476
                    if batcher.add_key(key) > BATCH_SIZE:
1452
1477
                        # Ok, this batch is big enough.  Yield some results.
1453
 
                        for _ in batcher.yield_factories():
1454
 
                            yield _
 
1478
                        for factory in batcher.yield_factories():
 
1479
                            yield factory
1455
1480
            else:
1456
 
                for _ in batcher.yield_factories(full_flush=True):
1457
 
                    yield _
 
1481
                for factory in batcher.yield_factories(full_flush=True):
 
1482
                    yield factory
1458
1483
                for record in source.get_record_stream(keys, ordering,
1459
1484
                                                       include_delta_closure):
1460
1485
                    yield record
1461
 
        for _ in batcher.yield_factories(full_flush=True):
1462
 
            yield _
 
1486
        for factory in batcher.yield_factories(full_flush=True):
 
1487
            yield factory
1463
1488
 
1464
1489
    def get_sha1s(self, keys):
1465
1490
        """See VersionedFiles.get_sha1s()."""