1463
class CHKMapDifference(object):
1464
"""Iterate the stored pages and key,value pairs for (new - old).
1466
This class provides a generator over the stored CHK pages and the
1467
(key, value) pairs that are in any of the new maps and not in any of the
1470
Note that it may yield chk pages that are common (especially root nodes),
1471
but it won't yield (key,value) pairs that are common.
1474
def __init__(self, store, new_root_keys, old_root_keys,
1475
search_key_func, pb=None):
1476
# TODO: Should we add a StaticTuple barrier here? It would be nice to
1477
# force callers to use StaticTuple, because there will often be
1478
# lots of keys passed in here. And even if we cast it locally,
1479
# that just meanst that we will have *both* a StaticTuple and a
1480
# tuple() in memory, referring to the same object. (so a net
1481
# increase in memory, not a decrease.)
1483
self._new_root_keys = new_root_keys
1484
self._old_root_keys = old_root_keys
1486
# All uninteresting chks that we have seen. By the time they are added
1487
# here, they should be either fully ignored, or queued up for
1489
# TODO: This might grow to a large size if there are lots of merge
1490
# parents, etc. However, it probably doesn't scale to O(history)
1491
# like _processed_new_refs does.
1492
self._all_old_chks = set(self._old_root_keys)
1493
# All items that we have seen from the old_root_keys
1494
self._all_old_items = set()
1495
# These are interesting items which were either read, or already in the
1496
# interesting queue (so we don't need to walk them again)
1497
# TODO: processed_new_refs becomes O(all_chks), consider switching to
1499
self._processed_new_refs = set()
1500
self._search_key_func = search_key_func
1502
# The uninteresting and interesting nodes to be searched
1503
self._old_queue = []
1504
self._new_queue = []
1505
# Holds the (key, value) items found when processing the root nodes,
1506
# waiting for the uninteresting nodes to be walked
1507
self._new_item_queue = []
1510
def _read_nodes_from_store(self, keys):
1511
# We chose not to use _get_cache(), because we think in
1512
# terms of records to be yielded. Also, we expect to touch each page
1513
# only 1 time during this code. (We may want to evaluate saving the
1514
# raw bytes into the page cache, which would allow a working tree
1515
# update after the fetch to not have to read the bytes again.)
1516
as_st = StaticTuple.from_sequence
1517
stream = self._store.get_record_stream(keys, 'unordered', True)
1518
for record in stream:
1519
if self._pb is not None:
1521
if record.storage_kind == 'absent':
1522
raise errors.NoSuchRevision(self._store, record.key)
1401
def _find_children_info(store, interesting_keys, uninteresting_keys, pb):
1402
"""Read the associated records, and determine what is interesting."""
1403
uninteresting_keys = set(uninteresting_keys)
1404
chks_to_read = uninteresting_keys.union(interesting_keys)
1405
next_uninteresting = set()
1406
next_interesting = set()
1407
uninteresting_items = set()
1408
interesting_items = set()
1409
interesting_to_yield = []
1410
for record in store.get_record_stream(chks_to_read, 'unordered', True):
1411
# records_read.add(record.key())
1414
bytes = record.get_bytes_as('fulltext')
1415
# We don't care about search_key_func for this code, because we only
1416
# care about external references.
1417
node = _deserialise(bytes, record.key, search_key_func=None)
1418
if record.key in uninteresting_keys:
1419
if type(node) is InternalNode:
1420
next_uninteresting.update(node.refs())
1422
# We know we are at a LeafNode, so we can pass None for the
1424
uninteresting_items.update(node.iteritems(None))
1426
interesting_to_yield.append(record.key)
1427
if type(node) is InternalNode:
1428
next_interesting.update(node.refs())
1430
interesting_items.update(node.iteritems(None))
1431
return (next_uninteresting, uninteresting_items,
1432
next_interesting, interesting_to_yield, interesting_items)
1435
def _find_all_uninteresting(store, interesting_root_keys,
1436
uninteresting_root_keys, pb):
1437
"""Determine the full set of uninteresting keys."""
1438
# What about duplicates between interesting_root_keys and
1439
# uninteresting_root_keys?
1440
if not uninteresting_root_keys:
1441
# Shortcut case. We know there is nothing uninteresting to filter out
1442
# So we just let the rest of the algorithm do the work
1443
# We know there is nothing uninteresting, and we didn't have to read
1444
# any interesting records yet.
1445
return (set(), set(), set(interesting_root_keys), [], set())
1446
all_uninteresting_chks = set(uninteresting_root_keys)
1447
all_uninteresting_items = set()
1449
# First step, find the direct children of both the interesting and
1451
(uninteresting_keys, uninteresting_items,
1452
interesting_keys, interesting_to_yield,
1453
interesting_items) = _find_children_info(store, interesting_root_keys,
1454
uninteresting_root_keys,
1456
all_uninteresting_chks.update(uninteresting_keys)
1457
all_uninteresting_items.update(uninteresting_items)
1458
del uninteresting_items
1459
# Note: Exact matches between interesting and uninteresting do not need
1460
# to be search further. Non-exact matches need to be searched in case
1461
# there is a future exact-match
1462
uninteresting_keys.difference_update(interesting_keys)
1464
# Second, find the full set of uninteresting bits reachable by the
1465
# uninteresting roots
1466
chks_to_read = uninteresting_keys
1469
for record in store.get_record_stream(chks_to_read, 'unordered', False):
1470
# TODO: Handle 'absent'
1523
1473
bytes = record.get_bytes_as('fulltext')
1524
node = _deserialise(bytes, record.key,
1525
search_key_func=self._search_key_func)
1474
# We don't care about search_key_func for this code, because we
1475
# only care about external references.
1476
node = _deserialise(bytes, record.key, search_key_func=None)
1526
1477
if type(node) is InternalNode:
1527
# Note we don't have to do node.refs() because we know that
1528
# there are no children that have been pushed into this node
1529
# Note: Using as_st() here seemed to save 1.2MB, which would
1530
# indicate that we keep 100k prefix_refs around while
1531
# processing. They *should* be shorter lived than that...
1532
# It does cost us ~10s of processing time
1533
#prefix_refs = [as_st(item) for item in node._items.iteritems()]
1534
prefix_refs = node._items.items()
1478
# uninteresting_prefix_chks.update(node._items.iteritems())
1479
chks = node._items.values()
1480
# TODO: We remove the entries that are already in
1481
# uninteresting_chks ?
1482
next_chks.update(chks)
1483
all_uninteresting_chks.update(chks)
1538
# Note: We don't use a StaticTuple here. Profiling showed a
1539
# minor memory improvement (0.8MB out of 335MB peak 0.2%)
1540
# But a significant slowdown (15s / 145s, or 10%)
1541
items = node._items.items()
1542
yield record, node, prefix_refs, items
1544
def _read_old_roots(self):
1545
old_chks_to_enqueue = []
1546
all_old_chks = self._all_old_chks
1547
for record, node, prefix_refs, items in \
1548
self._read_nodes_from_store(self._old_root_keys):
1549
# Uninteresting node
1550
prefix_refs = [p_r for p_r in prefix_refs
1551
if p_r[1] not in all_old_chks]
1552
new_refs = [p_r[1] for p_r in prefix_refs]
1553
all_old_chks.update(new_refs)
1554
# TODO: This might be a good time to turn items into StaticTuple
1555
# instances and possibly intern them. However, this does not
1556
# impact 'initial branch' performance, so I'm not worrying
1558
self._all_old_items.update(items)
1559
# Queue up the uninteresting references
1560
# Don't actually put them in the 'to-read' queue until we have
1561
# finished checking the interesting references
1562
old_chks_to_enqueue.extend(prefix_refs)
1563
return old_chks_to_enqueue
1565
def _enqueue_old(self, new_prefixes, old_chks_to_enqueue):
1566
# At this point, we have read all the uninteresting and interesting
1567
# items, so we can queue up the uninteresting stuff, knowing that we've
1568
# handled the interesting ones
1569
for prefix, ref in old_chks_to_enqueue:
1570
not_interesting = True
1571
for i in xrange(len(prefix), 0, -1):
1572
if prefix[:i] in new_prefixes:
1573
not_interesting = False
1576
# This prefix is not part of the remaining 'interesting set'
1578
self._old_queue.append(ref)
1580
def _read_all_roots(self):
1581
"""Read the root pages.
1583
This is structured as a generator, so that the root records can be
1584
yielded up to whoever needs them without any buffering.
1586
# This is the bootstrap phase
1587
if not self._old_root_keys:
1588
# With no old_root_keys we can just shortcut and be ready
1589
# for _flush_new_queue
1590
self._new_queue = list(self._new_root_keys)
1592
old_chks_to_enqueue = self._read_old_roots()
1593
# filter out any root keys that are already known to be uninteresting
1594
new_keys = set(self._new_root_keys).difference(self._all_old_chks)
1595
# These are prefixes that are present in new_keys that we are
1597
new_prefixes = set()
1598
# We are about to yield all of these, so we don't want them getting
1599
# added a second time
1600
processed_new_refs = self._processed_new_refs
1601
processed_new_refs.update(new_keys)
1602
for record, node, prefix_refs, items in \
1603
self._read_nodes_from_store(new_keys):
1604
# At this level, we now know all the uninteresting references
1605
# So we filter and queue up whatever is remaining
1606
prefix_refs = [p_r for p_r in prefix_refs
1607
if p_r[1] not in self._all_old_chks
1608
and p_r[1] not in processed_new_refs]
1609
refs = [p_r[1] for p_r in prefix_refs]
1610
new_prefixes.update([p_r[0] for p_r in prefix_refs])
1611
self._new_queue.extend(refs)
1612
# TODO: We can potentially get multiple items here, however the
1613
# current design allows for this, as callers will do the work
1614
# to make the results unique. We might profile whether we
1615
# gain anything by ensuring unique return values for items
1616
# TODO: This might be a good time to cast to StaticTuple, as
1617
# self._new_item_queue will hold the contents of multiple
1618
# records for an extended lifetime
1619
new_items = [item for item in items
1620
if item not in self._all_old_items]
1621
self._new_item_queue.extend(new_items)
1622
new_prefixes.update([self._search_key_func(item[0])
1623
for item in new_items])
1624
processed_new_refs.update(refs)
1626
# For new_prefixes we have the full length prefixes queued up.
1627
# However, we also need possible prefixes. (If we have a known ref to
1628
# 'ab', then we also need to include 'a'.) So expand the
1629
# new_prefixes to include all shorter prefixes
1630
for prefix in list(new_prefixes):
1631
new_prefixes.update([prefix[:i] for i in xrange(1, len(prefix))])
1632
self._enqueue_old(new_prefixes, old_chks_to_enqueue)
1634
def _flush_new_queue(self):
1635
# No need to maintain the heap invariant anymore, just pull things out
1637
refs = set(self._new_queue)
1638
self._new_queue = []
1639
# First pass, flush all interesting items and convert to using direct refs
1640
all_old_chks = self._all_old_chks
1641
processed_new_refs = self._processed_new_refs
1642
all_old_items = self._all_old_items
1643
new_items = [item for item in self._new_item_queue
1644
if item not in all_old_items]
1645
self._new_item_queue = []
1647
yield None, new_items
1648
refs = refs.difference(all_old_chks)
1649
processed_new_refs.update(refs)
1651
# TODO: Using a SimpleSet for self._processed_new_refs and
1652
# saved as much as 10MB of peak memory. However, it requires
1653
# implementing a non-pyrex version.
1655
next_refs_update = next_refs.update
1656
# Inlining _read_nodes_from_store improves 'bzr branch bzr.dev'
1657
# from 1m54s to 1m51s. Consider it.
1658
for record, _, p_refs, items in self._read_nodes_from_store(refs):
1660
# using the 'if' check saves about 145s => 141s, when
1661
# streaming initial branch of Launchpad data.
1662
items = [item for item in items
1663
if item not in all_old_items]
1665
next_refs_update([p_r[1] for p_r in p_refs])
1667
# set1.difference(set/dict) walks all of set1, and checks if it
1668
# exists in 'other'.
1669
# set1.difference(iterable) walks all of iterable, and does a
1670
# 'difference_update' on a clone of set1. Pick wisely based on the
1671
# expected sizes of objects.
1672
# in our case it is expected that 'new_refs' will always be quite
1674
next_refs = next_refs.difference(all_old_chks)
1675
next_refs = next_refs.difference(processed_new_refs)
1676
processed_new_refs.update(next_refs)
1679
def _process_next_old(self):
1680
# Since we don't filter uninteresting any further than during
1681
# _read_all_roots, process the whole queue in a single pass.
1682
refs = self._old_queue
1683
self._old_queue = []
1684
all_old_chks = self._all_old_chks
1685
for record, _, prefix_refs, items in self._read_nodes_from_store(refs):
1686
# TODO: Use StaticTuple here?
1687
self._all_old_items.update(items)
1688
refs = [r for _,r in prefix_refs if r not in all_old_chks]
1689
self._old_queue.extend(refs)
1690
all_old_chks.update(refs)
1692
def _process_queues(self):
1693
while self._old_queue:
1694
self._process_next_old()
1695
return self._flush_new_queue()
1698
for record in self._read_all_roots():
1700
for record, items in self._process_queues():
1485
all_uninteresting_items.update(node._items.iteritems())
1486
chks_to_read = next_chks
1487
return (all_uninteresting_chks, all_uninteresting_items,
1488
interesting_keys, interesting_to_yield, interesting_items)
1704
1491
def iter_interesting_nodes(store, interesting_root_keys,
1716
1503
(interesting record, {interesting key:values})
1718
iterator = CHKMapDifference(store, interesting_root_keys,
1719
uninteresting_root_keys,
1720
search_key_func=store._search_key_func,
1722
return iterator.process()
1505
# TODO: consider that it may be more memory efficient to use the 20-byte
1506
# sha1 string, rather than tuples of hexidecimal sha1 strings.
1507
# TODO: Try to factor out a lot of the get_record_stream() calls into a
1508
# helper function similar to _read_bytes. This function should be
1509
# able to use nodes from the _page_cache as well as actually
1510
# requesting bytes from the store.
1512
(all_uninteresting_chks, all_uninteresting_items, interesting_keys,
1513
interesting_to_yield, interesting_items) = _find_all_uninteresting(store,
1514
interesting_root_keys, uninteresting_root_keys, pb)
1516
# Now that we know everything uninteresting, we can yield information from
1518
interesting_items.difference_update(all_uninteresting_items)
1519
interesting_to_yield = set(interesting_to_yield) - all_uninteresting_chks
1520
if interesting_items:
1521
yield None, interesting_items
1522
if interesting_to_yield:
1523
# We request these records again, rather than buffering the root
1524
# records, most likely they are still in the _group_cache anyway.
1525
for record in store.get_record_stream(interesting_to_yield,
1526
'unordered', False):
1528
all_uninteresting_chks.update(interesting_to_yield)
1529
interesting_keys.difference_update(all_uninteresting_chks)
1531
chks_to_read = interesting_keys
1535
for record in store.get_record_stream(chks_to_read, 'unordered', False):
1538
pb.update('find chk pages', counter)
1539
# TODO: Handle 'absent'?
1540
bytes = record.get_bytes_as('fulltext')
1541
# We don't care about search_key_func for this code, because we
1542
# only care about external references.
1543
node = _deserialise(bytes, record.key, search_key_func=None)
1544
if type(node) is InternalNode:
1545
# all_uninteresting_chks grows large, as it lists all nodes we
1546
# don't want to process (including already seen interesting
1548
# small.difference_update(large) scales O(large), but
1549
# small.difference(large) scales O(small).
1550
# Also, we know we just _deserialised this node, so we can
1551
# access the dict directly.
1552
chks = set(node._items.itervalues()).difference(
1553
all_uninteresting_chks)
1554
# Is set() and .difference_update better than:
1555
# chks = [chk for chk in node.refs()
1556
# if chk not in all_uninteresting_chks]
1557
next_chks.update(chks)
1558
# These are now uninteresting everywhere else
1559
all_uninteresting_chks.update(chks)
1560
interesting_items = []
1562
interesting_items = [item for item in node._items.iteritems()
1563
if item not in all_uninteresting_items]
1564
# TODO: Do we need to filter out items that we have already
1565
# seen on other pages? We don't really want to buffer the
1566
# whole thing, but it does mean that callers need to
1567
# understand they may get duplicate values.
1568
# all_uninteresting_items.update(interesting_items)
1569
yield record, interesting_items
1570
chks_to_read = next_chks
1726
1574
from bzrlib._chk_map_pyx import (
1728
1575
_search_key_16,
1729
1576
_search_key_255,
1730
1577
_deserialise_leaf_node,
1731
1578
_deserialise_internal_node,
1733
except ImportError, e:
1734
osutils.failed_to_load_extension(e)
1735
1581
from bzrlib._chk_map_py import (
1737
1582
_search_key_16,
1738
1583
_search_key_255,
1739
1584
_deserialise_leaf_node,