~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/chk_map.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-07-14 17:33:13 UTC
  • mfrom: (4476.1.40 1.17-chk-multilevel)
  • Revision ID: pqm@pqm.ubuntu.com-20090714173313-3p3ytzlfuc3y2bm6
(jam) Some rework of the internals of chk_map.iter_interesting_nodes.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1396
1396
    return node
1397
1397
 
1398
1398
 
1399
 
def _find_children_info(store, interesting_keys, uninteresting_keys, pb):
1400
 
    """Read the associated records, and determine what is interesting."""
1401
 
    uninteresting_keys = set(uninteresting_keys)
1402
 
    chks_to_read = uninteresting_keys.union(interesting_keys)
1403
 
    next_uninteresting = set()
1404
 
    next_interesting = set()
1405
 
    next_interesting_intersection = None
1406
 
    uninteresting_items = set()
1407
 
    interesting_items = set()
1408
 
    interesting_to_yield = []
1409
 
    for record in store.get_record_stream(chks_to_read, 'unordered', True):
1410
 
        # records_read.add(record.key())
1411
 
        if pb is not None:
1412
 
            pb.tick()
1413
 
        bytes = record.get_bytes_as('fulltext')
1414
 
        # We don't care about search_key_func for this code, because we only
1415
 
        # care about external references.
1416
 
        node = _deserialise(bytes, record.key, search_key_func=None)
1417
 
        if record.key in uninteresting_keys:
1418
 
            if type(node) is InternalNode:
1419
 
                next_uninteresting.update(node.refs())
1420
 
            else:
1421
 
                # We know we are at a LeafNode, so we can pass None for the
1422
 
                # store
1423
 
                uninteresting_items.update(node.iteritems(None))
1424
 
        else:
1425
 
            interesting_to_yield.append(record.key)
1426
 
            if type(node) is InternalNode:
1427
 
                if next_interesting_intersection is None:
1428
 
                    next_interesting_intersection = set(node.refs())
1429
 
                else:
1430
 
                    next_interesting_intersection = \
1431
 
                        next_interesting_intersection.intersection(node.refs())
1432
 
                next_interesting.update(node.refs())
1433
 
            else:
1434
 
                interesting_items.update(node.iteritems(None))
1435
 
    return (next_uninteresting, uninteresting_items,
1436
 
            next_interesting, interesting_to_yield, interesting_items,
1437
 
            next_interesting_intersection)
1438
 
 
1439
 
 
1440
 
def _find_all_uninteresting(store, interesting_root_keys,
1441
 
                            uninteresting_root_keys, pb):
1442
 
    """Determine the full set of uninteresting keys."""
1443
 
    # What about duplicates between interesting_root_keys and
1444
 
    # uninteresting_root_keys?
1445
 
    if not uninteresting_root_keys:
1446
 
        # Shortcut case. We know there is nothing uninteresting to filter out
1447
 
        # So we just let the rest of the algorithm do the work
1448
 
        # We know there is nothing uninteresting, and we didn't have to read
1449
 
        # any interesting records yet.
1450
 
        return (set(), set(), set(interesting_root_keys), [], set())
1451
 
    all_uninteresting_chks = set(uninteresting_root_keys)
1452
 
    all_uninteresting_items = set()
1453
 
 
1454
 
    # First step, find the direct children of both the interesting and
1455
 
    # uninteresting set
1456
 
    (uninteresting_keys, uninteresting_items,
1457
 
     interesting_keys, interesting_to_yield,
1458
 
     interesting_items, interesting_intersection,
1459
 
     ) = _find_children_info(store, interesting_root_keys,
1460
 
                                              uninteresting_root_keys,
1461
 
                                              pb=pb)
1462
 
    all_uninteresting_chks.update(uninteresting_keys)
1463
 
    all_uninteresting_items.update(uninteresting_items)
1464
 
    del uninteresting_items
1465
 
    # Do not examine in detail pages common to all interesting trees.
1466
 
    # Pages that are common to all interesting trees will have their
1467
 
    # older versions found via the uninteresting tree traversal. Some pages
1468
 
    # found via the interesting trees traversal will be uninteresting for
1469
 
    # other of the interesting trees, which is why we require the pages to be
1470
 
    # common for us to trim them.
1471
 
    if interesting_intersection is not None:
1472
 
        uninteresting_keys.difference_update(interesting_intersection)
1473
 
 
1474
 
    # Second, find the full set of uninteresting bits reachable by the
1475
 
    # uninteresting roots
1476
 
    chks_to_read = uninteresting_keys
1477
 
    while chks_to_read:
1478
 
        next_chks = set()
1479
 
        for record in store.get_record_stream(chks_to_read, 'unordered', False):
1480
 
            # TODO: Handle 'absent'
1481
 
            if pb is not None:
1482
 
                pb.tick()
 
1399
class CHKMapDifference(object):
 
1400
    """Iterate the stored pages and key,value pairs for (new - old).
 
1401
 
 
1402
    This class provides a generator over the stored CHK pages and the
 
1403
    (key, value) pairs that are in any of the new maps and not in any of the
 
1404
    old maps.
 
1405
 
 
1406
    Note that it may yield chk pages that are common (especially root nodes),
 
1407
    but it won't yield (key,value) pairs that are common.
 
1408
    """
 
1409
 
 
1410
    def __init__(self, store, new_root_keys, old_root_keys,
 
1411
                 search_key_func, pb=None):
 
1412
        self._store = store
 
1413
        self._new_root_keys = new_root_keys
 
1414
        self._old_root_keys = old_root_keys
 
1415
        self._pb = pb
 
1416
        # All uninteresting chks that we have seen. By the time they are added
 
1417
        # here, they should be either fully ignored, or queued up for
 
1418
        # processing
 
1419
        self._all_old_chks = set(self._old_root_keys)
 
1420
        # All items that we have seen from the old_root_keys
 
1421
        self._all_old_items = set()
 
1422
        # These are interesting items which were either read, or already in the
 
1423
        # interesting queue (so we don't need to walk them again)
 
1424
        self._processed_new_refs = set()
 
1425
        self._search_key_func = search_key_func
 
1426
 
 
1427
        # The uninteresting and interesting nodes to be searched
 
1428
        self._old_queue = []
 
1429
        self._new_queue = []
 
1430
        # Holds the (key, value) items found when processing the root nodes,
 
1431
        # waiting for the uninteresting nodes to be walked
 
1432
        self._new_item_queue = []
 
1433
        self._state = None
 
1434
 
 
1435
    def _read_nodes_from_store(self, keys):
 
1436
        # We chose not to use _page_cache, because we think in terms of records
 
1437
        # to be yielded. Also, we expect to touch each page only 1 time during
 
1438
        # this code. (We may want to evaluate saving the raw bytes into the
 
1439
        # page cache, which would allow a working tree update after the fetch
 
1440
        # to not have to read the bytes again.)
 
1441
        stream = self._store.get_record_stream(keys, 'unordered', True)
 
1442
        for record in stream:
 
1443
            if self._pb is not None:
 
1444
                self._pb.tick()
 
1445
            if record.storage_kind == 'absent':
 
1446
                raise errors.NoSuchRevision(self._store, record.key)
1483
1447
            bytes = record.get_bytes_as('fulltext')
1484
 
            # We don't care about search_key_func for this code, because we
1485
 
            # only care about external references.
1486
 
            node = _deserialise(bytes, record.key, search_key_func=None)
 
1448
            node = _deserialise(bytes, record.key,
 
1449
                                search_key_func=self._search_key_func)
1487
1450
            if type(node) is InternalNode:
1488
 
                # uninteresting_prefix_chks.update(node._items.iteritems())
1489
 
                chks = node._items.values()
1490
 
                # TODO: We remove the entries that are already in
1491
 
                #       uninteresting_chks ?
1492
 
                next_chks.update(chks)
1493
 
                all_uninteresting_chks.update(chks)
 
1451
                # Note we don't have to do node.refs() because we know that
 
1452
                # there are no children that have been pushed into this node
 
1453
                prefix_refs = node._items.items()
 
1454
                items = []
1494
1455
            else:
1495
 
                all_uninteresting_items.update(node._items.iteritems())
1496
 
        chks_to_read = next_chks
1497
 
    return (all_uninteresting_chks, all_uninteresting_items,
1498
 
            interesting_keys, interesting_to_yield, interesting_items)
 
1456
                prefix_refs = []
 
1457
                items = node._items.items()
 
1458
            yield record, node, prefix_refs, items
 
1459
 
 
1460
    def _read_old_roots(self):
 
1461
        old_chks_to_enqueue = []
 
1462
        all_old_chks = self._all_old_chks
 
1463
        for record, node, prefix_refs, items in \
 
1464
                self._read_nodes_from_store(self._old_root_keys):
 
1465
            # Uninteresting node
 
1466
            prefix_refs = [p_r for p_r in prefix_refs
 
1467
                                if p_r[1] not in all_old_chks]
 
1468
            new_refs = [p_r[1] for p_r in prefix_refs]
 
1469
            all_old_chks.update(new_refs)
 
1470
            self._all_old_items.update(items)
 
1471
            # Queue up the uninteresting references
 
1472
            # Don't actually put them in the 'to-read' queue until we have
 
1473
            # finished checking the interesting references
 
1474
            old_chks_to_enqueue.extend(prefix_refs)
 
1475
        return old_chks_to_enqueue
 
1476
 
 
1477
    def _enqueue_old(self, new_prefixes, old_chks_to_enqueue):
 
1478
        # At this point, we have read all the uninteresting and interesting
 
1479
        # items, so we can queue up the uninteresting stuff, knowing that we've
 
1480
        # handled the interesting ones
 
1481
        for prefix, ref in old_chks_to_enqueue:
 
1482
            not_interesting = True
 
1483
            for i in xrange(len(prefix), 0, -1):
 
1484
                if prefix[:i] in new_prefixes:
 
1485
                    not_interesting = False
 
1486
                    break
 
1487
            if not_interesting:
 
1488
                # This prefix is not part of the remaining 'interesting set'
 
1489
                continue
 
1490
            self._old_queue.append(ref)
 
1491
 
 
1492
    def _read_all_roots(self):
 
1493
        """Read the root pages.
 
1494
 
 
1495
        This is structured as a generator, so that the root records can be
 
1496
        yielded up to whoever needs them without any buffering.
 
1497
        """
 
1498
        # This is the bootstrap phase
 
1499
        if not self._old_root_keys:
 
1500
            # With no old_root_keys we can just shortcut and be ready
 
1501
            # for _flush_new_queue
 
1502
            self._new_queue = list(self._new_root_keys)
 
1503
            return
 
1504
        old_chks_to_enqueue = self._read_old_roots()
 
1505
        # filter out any root keys that are already known to be uninteresting
 
1506
        new_keys = set(self._new_root_keys).difference(self._all_old_chks)
 
1507
        # These are prefixes that are present in new_keys that we are
 
1508
        # thinking to yield
 
1509
        new_prefixes = set()
 
1510
        # We are about to yield all of these, so we don't want them getting
 
1511
        # added a second time
 
1512
        processed_new_refs = self._processed_new_refs
 
1513
        processed_new_refs.update(new_keys)
 
1514
        for record, node, prefix_refs, items in \
 
1515
                self._read_nodes_from_store(new_keys):
 
1516
            # At this level, we now know all the uninteresting references
 
1517
            # So we filter and queue up whatever is remaining
 
1518
            prefix_refs = [p_r for p_r in prefix_refs
 
1519
                           if p_r[1] not in self._all_old_chks
 
1520
                              and p_r[1] not in processed_new_refs]
 
1521
            refs = [p_r[1] for p_r in prefix_refs]
 
1522
            new_prefixes.update([p_r[0] for p_r in prefix_refs])
 
1523
            self._new_queue.extend(refs)
 
1524
            # TODO: We can potentially get multiple items here, however the
 
1525
            #       current design allows for this, as callers will do the work
 
1526
            #       to make the results unique. We might profile whether we
 
1527
            #       gain anything by ensuring unique return values for items
 
1528
            new_items = [item for item in items
 
1529
                               if item not in self._all_old_items]
 
1530
            self._new_item_queue.extend(new_items)
 
1531
            new_prefixes.update([self._search_key_func(item[0])
 
1532
                                 for item in new_items])
 
1533
            processed_new_refs.update(refs)
 
1534
            yield record
 
1535
        # For new_prefixes we have the full length prefixes queued up.
 
1536
        # However, we also need possible prefixes. (If we have a known ref to
 
1537
        # 'ab', then we also need to include 'a'.) So expand the
 
1538
        # new_prefixes to include all shorter prefixes
 
1539
        for prefix in list(new_prefixes):
 
1540
            new_prefixes.update([prefix[:i] for i in xrange(1, len(prefix))])
 
1541
        self._enqueue_old(new_prefixes, old_chks_to_enqueue)
 
1542
 
 
1543
    def _flush_new_queue(self):
 
1544
        # No need to maintain the heap invariant anymore, just pull things out
 
1545
        # and process them
 
1546
        refs = set(self._new_queue)
 
1547
        self._new_queue = []
 
1548
        # First pass, flush all interesting items and convert to using direct refs
 
1549
        all_old_chks = self._all_old_chks
 
1550
        processed_new_refs = self._processed_new_refs
 
1551
        all_old_items = self._all_old_items
 
1552
        new_items = [item for item in self._new_item_queue
 
1553
                           if item not in all_old_items]
 
1554
        self._new_item_queue = []
 
1555
        if new_items:
 
1556
            yield None, new_items
 
1557
        refs = refs.difference(all_old_chks)
 
1558
        while refs:
 
1559
            next_refs = set()
 
1560
            next_refs_update = next_refs.update
 
1561
            # Inlining _read_nodes_from_store improves 'bzr branch bzr.dev'
 
1562
            # from 1m54s to 1m51s. Consider it.
 
1563
            for record, _, p_refs, items in self._read_nodes_from_store(refs):
 
1564
                items = [item for item in items
 
1565
                         if item not in all_old_items]
 
1566
                yield record, items
 
1567
                next_refs_update([p_r[1] for p_r in p_refs])
 
1568
            next_refs = next_refs.difference(all_old_chks)
 
1569
            next_refs = next_refs.difference(processed_new_refs)
 
1570
            processed_new_refs.update(next_refs)
 
1571
            refs = next_refs
 
1572
 
 
1573
    def _process_next_old(self):
 
1574
        # Since we don't filter uninteresting any further than during
 
1575
        # _read_all_roots, process the whole queue in a single pass.
 
1576
        refs = self._old_queue
 
1577
        self._old_queue = []
 
1578
        all_old_chks = self._all_old_chks
 
1579
        for record, _, prefix_refs, items in self._read_nodes_from_store(refs):
 
1580
            self._all_old_items.update(items)
 
1581
            refs = [r for _,r in prefix_refs if r not in all_old_chks]
 
1582
            self._old_queue.extend(refs)
 
1583
            all_old_chks.update(refs)
 
1584
 
 
1585
    def _process_queues(self):
 
1586
        while self._old_queue:
 
1587
            self._process_next_old()
 
1588
        return self._flush_new_queue()
 
1589
 
 
1590
    def process(self):
 
1591
        for record in self._read_all_roots():
 
1592
            yield record, []
 
1593
        for record, items in self._process_queues():
 
1594
            yield record, items
1499
1595
 
1500
1596
 
1501
1597
def iter_interesting_nodes(store, interesting_root_keys,
1512
1608
    :return: Yield
1513
1609
        (interesting record, {interesting key:values})
1514
1610
    """
1515
 
    # TODO: consider that it may be more memory efficient to use the 20-byte
1516
 
    #       sha1 string, rather than tuples of hexidecimal sha1 strings.
1517
 
    # TODO: Try to factor out a lot of the get_record_stream() calls into a
1518
 
    #       helper function similar to _read_bytes. This function should be
1519
 
    #       able to use nodes from the _page_cache as well as actually
1520
 
    #       requesting bytes from the store.
1521
 
 
1522
 
    (all_uninteresting_chks, all_uninteresting_items, interesting_keys,
1523
 
     interesting_to_yield, interesting_items) = _find_all_uninteresting(store,
1524
 
        interesting_root_keys, uninteresting_root_keys, pb)
1525
 
 
1526
 
    # Now that we know everything uninteresting, we can yield information from
1527
 
    # our first request
1528
 
    interesting_items.difference_update(all_uninteresting_items)
1529
 
    interesting_to_yield = set(interesting_to_yield) - all_uninteresting_chks
1530
 
    if interesting_items:
1531
 
        yield None, interesting_items
1532
 
    if interesting_to_yield:
1533
 
        # We request these records again, rather than buffering the root
1534
 
        # records, most likely they are still in the _group_cache anyway.
1535
 
        for record in store.get_record_stream(interesting_to_yield,
1536
 
                                              'unordered', False):
1537
 
            yield record, []
1538
 
    all_uninteresting_chks.update(interesting_to_yield)
1539
 
    interesting_keys.difference_update(all_uninteresting_chks)
1540
 
 
1541
 
    chks_to_read = interesting_keys
1542
 
    counter = 0
1543
 
    while chks_to_read:
1544
 
        next_chks = set()
1545
 
        for record in store.get_record_stream(chks_to_read, 'unordered', False):
1546
 
            counter += 1
1547
 
            if pb is not None:
1548
 
                pb.update('find chk pages', counter)
1549
 
            # TODO: Handle 'absent'?
1550
 
            bytes = record.get_bytes_as('fulltext')
1551
 
            # We don't care about search_key_func for this code, because we
1552
 
            # only care about external references.
1553
 
            node = _deserialise(bytes, record.key, search_key_func=None)
1554
 
            if type(node) is InternalNode:
1555
 
                # all_uninteresting_chks grows large, as it lists all nodes we
1556
 
                # don't want to process (including already seen interesting
1557
 
                # nodes).
1558
 
                # small.difference_update(large) scales O(large), but
1559
 
                # small.difference(large) scales O(small).
1560
 
                # Also, we know we just _deserialised this node, so we can
1561
 
                # access the dict directly.
1562
 
                chks = set(node._items.itervalues()).difference(
1563
 
                            all_uninteresting_chks)
1564
 
                # Is set() and .difference_update better than:
1565
 
                # chks = [chk for chk in node.refs()
1566
 
                #              if chk not in all_uninteresting_chks]
1567
 
                next_chks.update(chks)
1568
 
                # These are now uninteresting everywhere else
1569
 
                all_uninteresting_chks.update(chks)
1570
 
                interesting_items = []
1571
 
            else:
1572
 
                interesting_items = [item for item in node._items.iteritems()
1573
 
                                     if item not in all_uninteresting_items]
1574
 
                # TODO: Do we need to filter out items that we have already
1575
 
                #       seen on other pages? We don't really want to buffer the
1576
 
                #       whole thing, but it does mean that callers need to
1577
 
                #       understand they may get duplicate values.
1578
 
                # all_uninteresting_items.update(interesting_items)
1579
 
            yield record, interesting_items
1580
 
        chks_to_read = next_chks
 
1611
    iterator = CHKMapDifference(store, interesting_root_keys,
 
1612
                                uninteresting_root_keys,
 
1613
                                search_key_func=store._search_key_func,
 
1614
                                pb=pb)
 
1615
    return iterator.process()
1581
1616
 
1582
1617
 
1583
1618
try: