1354
def _find_children_info(store, interesting_keys, uninteresting_keys, pb):
1355
"""Read the associated records, and determine what is interesting."""
1356
uninteresting_keys = set(uninteresting_keys)
1357
chks_to_read = uninteresting_keys.union(interesting_keys)
1358
next_uninteresting = set()
1359
next_interesting = set()
1360
uninteresting_items = set()
1361
interesting_items = set()
1362
interesting_to_yield = []
1363
for record in store.get_record_stream(chks_to_read, 'unordered', True):
1364
# records_read.add(record.key())
1367
bytes = record.get_bytes_as('fulltext')
1368
# We don't care about search_key_func for this code, because we only
1369
# care about external references.
1370
node = _deserialise(bytes, record.key, search_key_func=None)
1371
if record.key in uninteresting_keys:
1372
if type(node) is InternalNode:
1373
next_uninteresting.update(node.refs())
1375
# We know we are at a LeafNode, so we can pass None for the
1377
uninteresting_items.update(node.iteritems(None))
1379
interesting_to_yield.append(record.key)
1380
if type(node) is InternalNode:
1381
next_interesting.update(node.refs())
1383
interesting_items.update(node.iteritems(None))
1384
return (next_uninteresting, uninteresting_items,
1385
next_interesting, interesting_to_yield, interesting_items)
1388
def _find_all_uninteresting(store, interesting_root_keys,
1389
uninteresting_root_keys, pb):
1390
"""Determine the full set of uninteresting keys."""
1391
# What about duplicates between interesting_root_keys and
1392
# uninteresting_root_keys?
1393
if not uninteresting_root_keys:
1394
# Shortcut case. We know there is nothing uninteresting to filter out
1395
# So we just let the rest of the algorithm do the work
1396
# We know there is nothing uninteresting, and we didn't have to read
1397
# any interesting records yet.
1398
return (set(), set(), set(interesting_root_keys), [], set())
1399
all_uninteresting_chks = set(uninteresting_root_keys)
1400
all_uninteresting_items = set()
1402
# First step, find the direct children of both the interesting and
1404
(uninteresting_keys, uninteresting_items,
1405
interesting_keys, interesting_to_yield,
1406
interesting_items) = _find_children_info(store, interesting_root_keys,
1407
uninteresting_root_keys,
1409
all_uninteresting_chks.update(uninteresting_keys)
1410
all_uninteresting_items.update(uninteresting_items)
1411
del uninteresting_items
1412
# Note: Exact matches between interesting and uninteresting do not need
1413
# to be search further. Non-exact matches need to be searched in case
1414
# there is a future exact-match
1415
uninteresting_keys.difference_update(interesting_keys)
1417
# Second, find the full set of uninteresting bits reachable by the
1418
# uninteresting roots
1419
chks_to_read = uninteresting_keys
1422
for record in store.get_record_stream(chks_to_read, 'unordered', False):
1423
# TODO: Handle 'absent'
1466
class CHKMapDifference(object):
1467
"""Iterate the stored pages and key,value pairs for (new - old).
1469
This class provides a generator over the stored CHK pages and the
1470
(key, value) pairs that are in any of the new maps and not in any of the
1473
Note that it may yield chk pages that are common (especially root nodes),
1474
but it won't yield (key,value) pairs that are common.
1477
def __init__(self, store, new_root_keys, old_root_keys,
1478
search_key_func, pb=None):
1479
# TODO: Should we add a StaticTuple barrier here? It would be nice to
1480
# force callers to use StaticTuple, because there will often be
1481
# lots of keys passed in here. And even if we cast it locally,
1482
# that just meanst that we will have *both* a StaticTuple and a
1483
# tuple() in memory, referring to the same object. (so a net
1484
# increase in memory, not a decrease.)
1486
self._new_root_keys = new_root_keys
1487
self._old_root_keys = old_root_keys
1489
# All uninteresting chks that we have seen. By the time they are added
1490
# here, they should be either fully ignored, or queued up for
1492
# TODO: This might grow to a large size if there are lots of merge
1493
# parents, etc. However, it probably doesn't scale to O(history)
1494
# like _processed_new_refs does.
1495
self._all_old_chks = set(self._old_root_keys)
1496
# All items that we have seen from the old_root_keys
1497
self._all_old_items = set()
1498
# These are interesting items which were either read, or already in the
1499
# interesting queue (so we don't need to walk them again)
1500
# TODO: processed_new_refs becomes O(all_chks), consider switching to
1502
self._processed_new_refs = set()
1503
self._search_key_func = search_key_func
1505
# The uninteresting and interesting nodes to be searched
1506
self._old_queue = []
1507
self._new_queue = []
1508
# Holds the (key, value) items found when processing the root nodes,
1509
# waiting for the uninteresting nodes to be walked
1510
self._new_item_queue = []
1513
def _read_nodes_from_store(self, keys):
1514
# We chose not to use _get_cache(), because we think in
1515
# terms of records to be yielded. Also, we expect to touch each page
1516
# only 1 time during this code. (We may want to evaluate saving the
1517
# raw bytes into the page cache, which would allow a working tree
1518
# update after the fetch to not have to read the bytes again.)
1519
as_st = StaticTuple.from_sequence
1520
stream = self._store.get_record_stream(keys, 'unordered', True)
1521
for record in stream:
1522
if self._pb is not None:
1524
if record.storage_kind == 'absent':
1525
raise errors.NoSuchRevision(self._store, record.key)
1426
1526
bytes = record.get_bytes_as('fulltext')
1427
# We don't care about search_key_func for this code, because we
1428
# only care about external references.
1429
node = _deserialise(bytes, record.key, search_key_func=None)
1527
node = _deserialise(bytes, record.key,
1528
search_key_func=self._search_key_func)
1430
1529
if type(node) is InternalNode:
1431
# uninteresting_prefix_chks.update(node._items.iteritems())
1432
chks = node._items.values()
1433
# TODO: We remove the entries that are already in
1434
# uninteresting_chks ?
1435
next_chks.update(chks)
1436
all_uninteresting_chks.update(chks)
1530
# Note we don't have to do node.refs() because we know that
1531
# there are no children that have been pushed into this node
1532
# Note: Using as_st() here seemed to save 1.2MB, which would
1533
# indicate that we keep 100k prefix_refs around while
1534
# processing. They *should* be shorter lived than that...
1535
# It does cost us ~10s of processing time
1536
#prefix_refs = [as_st(item) for item in node._items.iteritems()]
1537
prefix_refs = node._items.items()
1438
all_uninteresting_items.update(node._items.iteritems())
1439
chks_to_read = next_chks
1440
return (all_uninteresting_chks, all_uninteresting_items,
1441
interesting_keys, interesting_to_yield, interesting_items)
1541
# Note: We don't use a StaticTuple here. Profiling showed a
1542
# minor memory improvement (0.8MB out of 335MB peak 0.2%)
1543
# But a significant slowdown (15s / 145s, or 10%)
1544
items = node._items.items()
1545
yield record, node, prefix_refs, items
1547
def _read_old_roots(self):
1548
old_chks_to_enqueue = []
1549
all_old_chks = self._all_old_chks
1550
for record, node, prefix_refs, items in \
1551
self._read_nodes_from_store(self._old_root_keys):
1552
# Uninteresting node
1553
prefix_refs = [p_r for p_r in prefix_refs
1554
if p_r[1] not in all_old_chks]
1555
new_refs = [p_r[1] for p_r in prefix_refs]
1556
all_old_chks.update(new_refs)
1557
# TODO: This might be a good time to turn items into StaticTuple
1558
# instances and possibly intern them. However, this does not
1559
# impact 'initial branch' performance, so I'm not worrying
1561
self._all_old_items.update(items)
1562
# Queue up the uninteresting references
1563
# Don't actually put them in the 'to-read' queue until we have
1564
# finished checking the interesting references
1565
old_chks_to_enqueue.extend(prefix_refs)
1566
return old_chks_to_enqueue
1568
def _enqueue_old(self, new_prefixes, old_chks_to_enqueue):
1569
# At this point, we have read all the uninteresting and interesting
1570
# items, so we can queue up the uninteresting stuff, knowing that we've
1571
# handled the interesting ones
1572
for prefix, ref in old_chks_to_enqueue:
1573
not_interesting = True
1574
for i in xrange(len(prefix), 0, -1):
1575
if prefix[:i] in new_prefixes:
1576
not_interesting = False
1579
# This prefix is not part of the remaining 'interesting set'
1581
self._old_queue.append(ref)
1583
def _read_all_roots(self):
1584
"""Read the root pages.
1586
This is structured as a generator, so that the root records can be
1587
yielded up to whoever needs them without any buffering.
1589
# This is the bootstrap phase
1590
if not self._old_root_keys:
1591
# With no old_root_keys we can just shortcut and be ready
1592
# for _flush_new_queue
1593
self._new_queue = list(self._new_root_keys)
1595
old_chks_to_enqueue = self._read_old_roots()
1596
# filter out any root keys that are already known to be uninteresting
1597
new_keys = set(self._new_root_keys).difference(self._all_old_chks)
1598
# These are prefixes that are present in new_keys that we are
1600
new_prefixes = set()
1601
# We are about to yield all of these, so we don't want them getting
1602
# added a second time
1603
processed_new_refs = self._processed_new_refs
1604
processed_new_refs.update(new_keys)
1605
for record, node, prefix_refs, items in \
1606
self._read_nodes_from_store(new_keys):
1607
# At this level, we now know all the uninteresting references
1608
# So we filter and queue up whatever is remaining
1609
prefix_refs = [p_r for p_r in prefix_refs
1610
if p_r[1] not in self._all_old_chks
1611
and p_r[1] not in processed_new_refs]
1612
refs = [p_r[1] for p_r in prefix_refs]
1613
new_prefixes.update([p_r[0] for p_r in prefix_refs])
1614
self._new_queue.extend(refs)
1615
# TODO: We can potentially get multiple items here, however the
1616
# current design allows for this, as callers will do the work
1617
# to make the results unique. We might profile whether we
1618
# gain anything by ensuring unique return values for items
1619
# TODO: This might be a good time to cast to StaticTuple, as
1620
# self._new_item_queue will hold the contents of multiple
1621
# records for an extended lifetime
1622
new_items = [item for item in items
1623
if item not in self._all_old_items]
1624
self._new_item_queue.extend(new_items)
1625
new_prefixes.update([self._search_key_func(item[0])
1626
for item in new_items])
1627
processed_new_refs.update(refs)
1629
# For new_prefixes we have the full length prefixes queued up.
1630
# However, we also need possible prefixes. (If we have a known ref to
1631
# 'ab', then we also need to include 'a'.) So expand the
1632
# new_prefixes to include all shorter prefixes
1633
for prefix in list(new_prefixes):
1634
new_prefixes.update([prefix[:i] for i in xrange(1, len(prefix))])
1635
self._enqueue_old(new_prefixes, old_chks_to_enqueue)
1637
def _flush_new_queue(self):
1638
# No need to maintain the heap invariant anymore, just pull things out
1640
refs = set(self._new_queue)
1641
self._new_queue = []
1642
# First pass, flush all interesting items and convert to using direct refs
1643
all_old_chks = self._all_old_chks
1644
processed_new_refs = self._processed_new_refs
1645
all_old_items = self._all_old_items
1646
new_items = [item for item in self._new_item_queue
1647
if item not in all_old_items]
1648
self._new_item_queue = []
1650
yield None, new_items
1651
refs = refs.difference(all_old_chks)
1652
processed_new_refs.update(refs)
1654
# TODO: Using a SimpleSet for self._processed_new_refs and
1655
# saved as much as 10MB of peak memory. However, it requires
1656
# implementing a non-pyrex version.
1658
next_refs_update = next_refs.update
1659
# Inlining _read_nodes_from_store improves 'bzr branch bzr.dev'
1660
# from 1m54s to 1m51s. Consider it.
1661
for record, _, p_refs, items in self._read_nodes_from_store(refs):
1663
# using the 'if' check saves about 145s => 141s, when
1664
# streaming initial branch of Launchpad data.
1665
items = [item for item in items
1666
if item not in all_old_items]
1668
next_refs_update([p_r[1] for p_r in p_refs])
1670
# set1.difference(set/dict) walks all of set1, and checks if it
1671
# exists in 'other'.
1672
# set1.difference(iterable) walks all of iterable, and does a
1673
# 'difference_update' on a clone of set1. Pick wisely based on the
1674
# expected sizes of objects.
1675
# in our case it is expected that 'new_refs' will always be quite
1677
next_refs = next_refs.difference(all_old_chks)
1678
next_refs = next_refs.difference(processed_new_refs)
1679
processed_new_refs.update(next_refs)
1682
def _process_next_old(self):
1683
# Since we don't filter uninteresting any further than during
1684
# _read_all_roots, process the whole queue in a single pass.
1685
refs = self._old_queue
1686
self._old_queue = []
1687
all_old_chks = self._all_old_chks
1688
for record, _, prefix_refs, items in self._read_nodes_from_store(refs):
1689
# TODO: Use StaticTuple here?
1690
self._all_old_items.update(items)
1691
refs = [r for _,r in prefix_refs if r not in all_old_chks]
1692
self._old_queue.extend(refs)
1693
all_old_chks.update(refs)
1695
def _process_queues(self):
1696
while self._old_queue:
1697
self._process_next_old()
1698
return self._flush_new_queue()
1701
for record in self._read_all_roots():
1703
for record, items in self._process_queues():
1444
1707
def iter_interesting_nodes(store, interesting_root_keys,
1456
1719
(interesting record, {interesting key:values})
1458
# TODO: consider that it may be more memory efficient to use the 20-byte
1459
# sha1 string, rather than tuples of hexidecimal sha1 strings.
1460
# TODO: Try to factor out a lot of the get_record_stream() calls into a
1461
# helper function similar to _read_bytes. This function should be
1462
# able to use nodes from the _page_cache as well as actually
1463
# requesting bytes from the store.
1465
(all_uninteresting_chks, all_uninteresting_items, interesting_keys,
1466
interesting_to_yield, interesting_items) = _find_all_uninteresting(store,
1467
interesting_root_keys, uninteresting_root_keys, pb)
1469
# Now that we know everything uninteresting, we can yield information from
1471
interesting_items.difference_update(all_uninteresting_items)
1472
interesting_to_yield = set(interesting_to_yield) - all_uninteresting_chks
1473
if interesting_items:
1474
yield None, interesting_items
1475
if interesting_to_yield:
1476
# We request these records again, rather than buffering the root
1477
# records, most likely they are still in the _group_cache anyway.
1478
for record in store.get_record_stream(interesting_to_yield,
1479
'unordered', False):
1481
all_uninteresting_chks.update(interesting_to_yield)
1482
interesting_keys.difference_update(all_uninteresting_chks)
1484
chks_to_read = interesting_keys
1488
for record in store.get_record_stream(chks_to_read, 'unordered', False):
1491
pb.update('find chk pages', counter)
1492
# TODO: Handle 'absent'?
1493
bytes = record.get_bytes_as('fulltext')
1494
# We don't care about search_key_func for this code, because we
1495
# only care about external references.
1496
node = _deserialise(bytes, record.key, search_key_func=None)
1497
if type(node) is InternalNode:
1498
# all_uninteresting_chks grows large, as it lists all nodes we
1499
# don't want to process (including already seen interesting
1501
# small.difference_update(large) scales O(large), but
1502
# small.difference(large) scales O(small).
1503
# Also, we know we just _deserialised this node, so we can
1504
# access the dict directly.
1505
chks = set(node._items.itervalues()).difference(
1506
all_uninteresting_chks)
1507
# Is set() and .difference_update better than:
1508
# chks = [chk for chk in node.refs()
1509
# if chk not in all_uninteresting_chks]
1510
next_chks.update(chks)
1511
# These are now uninteresting everywhere else
1512
all_uninteresting_chks.update(chks)
1513
interesting_items = []
1515
interesting_items = [item for item in node._items.iteritems()
1516
if item not in all_uninteresting_items]
1517
# TODO: Do we need to filter out items that we have already
1518
# seen on other pages? We don't really want to buffer the
1519
# whole thing, but it does mean that callers need to
1520
# understand they may get duplicate values.
1521
# all_uninteresting_items.update(interesting_items)
1522
yield record, interesting_items
1523
chks_to_read = next_chks
1721
iterator = CHKMapDifference(store, interesting_root_keys,
1722
uninteresting_root_keys,
1723
search_key_func=store._search_key_func,
1725
return iterator.process()
1527
1729
from bzrlib._chk_map_pyx import (
1528
1731
_search_key_16,
1529
1732
_search_key_255,
1530
1733
_deserialise_leaf_node,
1531
1734
_deserialise_internal_node,
1736
except ImportError, e:
1737
osutils.failed_to_load_extension(e)
1534
1738
from bzrlib._chk_map_py import (
1535
1740
_search_key_16,
1536
1741
_search_key_255,
1537
1742
_deserialise_leaf_node,