950
1066
# prefix is the key in self._items to use, key_filter is the key_filter
951
1067
# entries that would match this node
953
1070
if key_filter is None:
1071
# yielding all nodes, yield whatever we have, and queue up a read
1072
# for whatever we are missing
954
1074
for prefix, node in self._items.iteritems():
955
if type(node) == tuple:
1075
if node.__class__ is StaticTuple:
956
1076
keys[node] = (prefix, None)
958
1078
yield node, None
1079
elif len(key_filter) == 1:
1080
# Technically, this path could also be handled by the first check
1081
# in 'self._node_width' in length_filters. However, we can handle
1082
# this case without spending any time building up the
1083
# prefix_to_keys, etc state.
1085
# This is a bit ugly, but TIMEIT showed it to be by far the fastest
1086
# 0.626us list(key_filter)[0]
1087
# is a func() for list(), 2 mallocs, and a getitem
1088
# 0.489us [k for k in key_filter][0]
1089
# still has the mallocs, avoids the func() call
1090
# 0.350us iter(key_filter).next()
1091
# has a func() call, and mallocs an iterator
1092
# 0.125us for key in key_filter: pass
1093
# no func() overhead, might malloc an iterator
1094
# 0.105us for key in key_filter: break
1095
# no func() overhead, might malloc an iterator, probably
1096
# avoids checking an 'else' clause as part of the for
1097
for key in key_filter:
1099
search_prefix = self._search_prefix_filter(key)
1100
if len(search_prefix) == self._node_width:
1101
# This item will match exactly, so just do a dict lookup, and
1102
# see what we can return
1105
node = self._items[search_prefix]
1107
# A given key can only match 1 child node, if it isn't
1108
# there, then we can just return nothing
1110
if node.__class__ is StaticTuple:
1111
keys[node] = (search_prefix, [key])
1113
# This is loaded, and the only thing that can match,
1118
# First, convert all keys into a list of search prefixes
1119
# Aggregate common prefixes, and track the keys they come from
961
1120
prefix_to_keys = {}
962
1121
length_filters = {}
963
1122
for key in key_filter:
964
search_key = self._search_prefix_filter(key)
1123
search_prefix = self._search_prefix_filter(key)
965
1124
length_filter = length_filters.setdefault(
966
len(search_key), set())
967
length_filter.add(search_key)
968
prefix_to_keys.setdefault(search_key, []).append(key)
969
length_filters = length_filters.items()
970
for prefix, node in self._items.iteritems():
972
for length, length_filter in length_filters:
973
sub_prefix = prefix[:length]
974
if sub_prefix in length_filter:
975
node_key_filter.extend(prefix_to_keys[sub_prefix])
976
if node_key_filter: # this key matched something, yield it
977
if type(node) == tuple:
978
keys[node] = (prefix, node_key_filter)
1125
len(search_prefix), set())
1126
length_filter.add(search_prefix)
1127
prefix_to_keys.setdefault(search_prefix, []).append(key)
1129
if (self._node_width in length_filters
1130
and len(length_filters) == 1):
1131
# all of the search prefixes match exactly _node_width. This
1132
# means that everything is an exact match, and we can do a
1133
# lookup into self._items, rather than iterating over the items
1135
search_prefixes = length_filters[self._node_width]
1136
for search_prefix in search_prefixes:
1138
node = self._items[search_prefix]
1140
# We can ignore this one
1142
node_key_filter = prefix_to_keys[search_prefix]
1143
if node.__class__ is StaticTuple:
1144
keys[node] = (search_prefix, node_key_filter)
980
1146
yield node, node_key_filter
1148
# The slow way. We walk every item in self._items, and check to
1149
# see if there are any matches
1150
length_filters = length_filters.items()
1151
for prefix, node in self._items.iteritems():
1152
node_key_filter = []
1153
for length, length_filter in length_filters:
1154
sub_prefix = prefix[:length]
1155
if sub_prefix in length_filter:
1156
node_key_filter.extend(prefix_to_keys[sub_prefix])
1157
if node_key_filter: # this key matched something, yield it
1158
if node.__class__ is StaticTuple:
1159
keys[node] = (prefix, node_key_filter)
1161
yield node, node_key_filter
982
1163
# Look in the page cache for some more bytes
983
1164
found_keys = set()
984
1165
for key in keys:
986
bytes = _page_cache[key]
1167
bytes = _get_cache()[key]
987
1168
except KeyError:
1284
def _find_children_info(store, interesting_keys, uninteresting_keys, pb):
1285
"""Read the associated records, and determine what is interesting."""
1286
uninteresting_keys = set(uninteresting_keys)
1287
chks_to_read = uninteresting_keys.union(interesting_keys)
1288
next_uninteresting = set()
1289
next_interesting = set()
1290
uninteresting_items = set()
1291
interesting_items = set()
1292
interesting_records = []
1293
# records_read = set()
1294
for record in store.get_record_stream(chks_to_read, 'unordered', True):
1295
# records_read.add(record.key())
1298
bytes = record.get_bytes_as('fulltext')
1299
# We don't care about search_key_func for this code, because we only
1300
# care about external references.
1301
node = _deserialise(bytes, record.key, search_key_func=None)
1302
if record.key in uninteresting_keys:
1303
if type(node) is InternalNode:
1304
next_uninteresting.update(node.refs())
1306
# We know we are at a LeafNode, so we can pass None for the
1308
uninteresting_items.update(node.iteritems(None))
1310
interesting_records.append(record)
1311
if type(node) is InternalNode:
1312
next_interesting.update(node.refs())
1314
interesting_items.update(node.iteritems(None))
1315
# TODO: Filter out records that have already been read, as node splitting
1316
# can cause us to reference the same nodes via shorter and longer
1318
return (next_uninteresting, uninteresting_items,
1319
next_interesting, interesting_records, interesting_items)
1322
def _find_all_uninteresting(store, interesting_root_keys,
1323
uninteresting_root_keys, adapter, pb):
1324
"""Determine the full set of uninteresting keys."""
1325
# What about duplicates between interesting_root_keys and
1326
# uninteresting_root_keys?
1327
if not uninteresting_root_keys:
1328
# Shortcut case. We know there is nothing uninteresting to filter out
1329
# So we just let the rest of the algorithm do the work
1330
# We know there is nothing uninteresting, and we didn't have to read
1331
# any interesting records yet.
1332
return (set(), set(), set(interesting_root_keys), [], set())
1333
all_uninteresting_chks = set(uninteresting_root_keys)
1334
all_uninteresting_items = set()
1336
# First step, find the direct children of both the interesting and
1338
(uninteresting_keys, uninteresting_items,
1339
interesting_keys, interesting_records,
1340
interesting_items) = _find_children_info(store, interesting_root_keys,
1341
uninteresting_root_keys,
1343
all_uninteresting_chks.update(uninteresting_keys)
1344
all_uninteresting_items.update(uninteresting_items)
1345
del uninteresting_items
1346
# Note: Exact matches between interesting and uninteresting do not need
1347
# to be search further. Non-exact matches need to be searched in case
1348
# there is a future exact-match
1349
uninteresting_keys.difference_update(interesting_keys)
1351
# Second, find the full set of uninteresting bits reachable by the
1352
# uninteresting roots
1353
chks_to_read = uninteresting_keys
1356
for record in store.get_record_stream(chks_to_read, 'unordered', False):
1357
# TODO: Handle 'absent'
1361
bytes = record.get_bytes_as('fulltext')
1362
except errors.UnavailableRepresentation:
1363
bytes = adapter.get_bytes(record)
1364
# We don't care about search_key_func for this code, because we
1365
# only care about external references.
1366
node = _deserialise(bytes, record.key, search_key_func=None)
1367
if type(node) is InternalNode:
1368
# uninteresting_prefix_chks.update(node._items.iteritems())
1369
chks = node._items.values()
1370
# TODO: We remove the entries that are already in
1371
# uninteresting_chks ?
1372
next_chks.update(chks)
1373
all_uninteresting_chks.update(chks)
1375
all_uninteresting_items.update(node._items.iteritems())
1376
chks_to_read = next_chks
1377
return (all_uninteresting_chks, all_uninteresting_items,
1378
interesting_keys, interesting_records, interesting_items)
1465
class CHKMapDifference(object):
1466
"""Iterate the stored pages and key,value pairs for (new - old).
1468
This class provides a generator over the stored CHK pages and the
1469
(key, value) pairs that are in any of the new maps and not in any of the
1472
Note that it may yield chk pages that are common (especially root nodes),
1473
but it won't yield (key,value) pairs that are common.
1476
def __init__(self, store, new_root_keys, old_root_keys,
1477
search_key_func, pb=None):
1478
# TODO: Should we add a StaticTuple barrier here? It would be nice to
1479
# force callers to use StaticTuple, because there will often be
1480
# lots of keys passed in here. And even if we cast it locally,
1481
# that just meanst that we will have *both* a StaticTuple and a
1482
# tuple() in memory, referring to the same object. (so a net
1483
# increase in memory, not a decrease.)
1485
self._new_root_keys = new_root_keys
1486
self._old_root_keys = old_root_keys
1488
# All uninteresting chks that we have seen. By the time they are added
1489
# here, they should be either fully ignored, or queued up for
1491
# TODO: This might grow to a large size if there are lots of merge
1492
# parents, etc. However, it probably doesn't scale to O(history)
1493
# like _processed_new_refs does.
1494
self._all_old_chks = set(self._old_root_keys)
1495
# All items that we have seen from the old_root_keys
1496
self._all_old_items = set()
1497
# These are interesting items which were either read, or already in the
1498
# interesting queue (so we don't need to walk them again)
1499
# TODO: processed_new_refs becomes O(all_chks), consider switching to
1501
self._processed_new_refs = set()
1502
self._search_key_func = search_key_func
1504
# The uninteresting and interesting nodes to be searched
1505
self._old_queue = []
1506
self._new_queue = []
1507
# Holds the (key, value) items found when processing the root nodes,
1508
# waiting for the uninteresting nodes to be walked
1509
self._new_item_queue = []
1512
def _read_nodes_from_store(self, keys):
1513
# We chose not to use _get_cache(), because we think in
1514
# terms of records to be yielded. Also, we expect to touch each page
1515
# only 1 time during this code. (We may want to evaluate saving the
1516
# raw bytes into the page cache, which would allow a working tree
1517
# update after the fetch to not have to read the bytes again.)
1518
as_st = StaticTuple.from_sequence
1519
stream = self._store.get_record_stream(keys, 'unordered', True)
1520
for record in stream:
1521
if self._pb is not None:
1523
if record.storage_kind == 'absent':
1524
raise errors.NoSuchRevision(self._store, record.key)
1525
bytes = record.get_bytes_as('fulltext')
1526
node = _deserialise(bytes, record.key,
1527
search_key_func=self._search_key_func)
1528
if type(node) is InternalNode:
1529
# Note we don't have to do node.refs() because we know that
1530
# there are no children that have been pushed into this node
1531
# Note: Using as_st() here seemed to save 1.2MB, which would
1532
# indicate that we keep 100k prefix_refs around while
1533
# processing. They *should* be shorter lived than that...
1534
# It does cost us ~10s of processing time
1535
#prefix_refs = [as_st(item) for item in node._items.iteritems()]
1536
prefix_refs = node._items.items()
1540
# Note: We don't use a StaticTuple here. Profiling showed a
1541
# minor memory improvement (0.8MB out of 335MB peak 0.2%)
1542
# But a significant slowdown (15s / 145s, or 10%)
1543
items = node._items.items()
1544
yield record, node, prefix_refs, items
1546
def _read_old_roots(self):
1547
old_chks_to_enqueue = []
1548
all_old_chks = self._all_old_chks
1549
for record, node, prefix_refs, items in \
1550
self._read_nodes_from_store(self._old_root_keys):
1551
# Uninteresting node
1552
prefix_refs = [p_r for p_r in prefix_refs
1553
if p_r[1] not in all_old_chks]
1554
new_refs = [p_r[1] for p_r in prefix_refs]
1555
all_old_chks.update(new_refs)
1556
# TODO: This might be a good time to turn items into StaticTuple
1557
# instances and possibly intern them. However, this does not
1558
# impact 'initial branch' performance, so I'm not worrying
1560
self._all_old_items.update(items)
1561
# Queue up the uninteresting references
1562
# Don't actually put them in the 'to-read' queue until we have
1563
# finished checking the interesting references
1564
old_chks_to_enqueue.extend(prefix_refs)
1565
return old_chks_to_enqueue
1567
def _enqueue_old(self, new_prefixes, old_chks_to_enqueue):
1568
# At this point, we have read all the uninteresting and interesting
1569
# items, so we can queue up the uninteresting stuff, knowing that we've
1570
# handled the interesting ones
1571
for prefix, ref in old_chks_to_enqueue:
1572
not_interesting = True
1573
for i in xrange(len(prefix), 0, -1):
1574
if prefix[:i] in new_prefixes:
1575
not_interesting = False
1578
# This prefix is not part of the remaining 'interesting set'
1580
self._old_queue.append(ref)
1582
def _read_all_roots(self):
1583
"""Read the root pages.
1585
This is structured as a generator, so that the root records can be
1586
yielded up to whoever needs them without any buffering.
1588
# This is the bootstrap phase
1589
if not self._old_root_keys:
1590
# With no old_root_keys we can just shortcut and be ready
1591
# for _flush_new_queue
1592
self._new_queue = list(self._new_root_keys)
1594
old_chks_to_enqueue = self._read_old_roots()
1595
# filter out any root keys that are already known to be uninteresting
1596
new_keys = set(self._new_root_keys).difference(self._all_old_chks)
1597
# These are prefixes that are present in new_keys that we are
1599
new_prefixes = set()
1600
# We are about to yield all of these, so we don't want them getting
1601
# added a second time
1602
processed_new_refs = self._processed_new_refs
1603
processed_new_refs.update(new_keys)
1604
for record, node, prefix_refs, items in \
1605
self._read_nodes_from_store(new_keys):
1606
# At this level, we now know all the uninteresting references
1607
# So we filter and queue up whatever is remaining
1608
prefix_refs = [p_r for p_r in prefix_refs
1609
if p_r[1] not in self._all_old_chks
1610
and p_r[1] not in processed_new_refs]
1611
refs = [p_r[1] for p_r in prefix_refs]
1612
new_prefixes.update([p_r[0] for p_r in prefix_refs])
1613
self._new_queue.extend(refs)
1614
# TODO: We can potentially get multiple items here, however the
1615
# current design allows for this, as callers will do the work
1616
# to make the results unique. We might profile whether we
1617
# gain anything by ensuring unique return values for items
1618
# TODO: This might be a good time to cast to StaticTuple, as
1619
# self._new_item_queue will hold the contents of multiple
1620
# records for an extended lifetime
1621
new_items = [item for item in items
1622
if item not in self._all_old_items]
1623
self._new_item_queue.extend(new_items)
1624
new_prefixes.update([self._search_key_func(item[0])
1625
for item in new_items])
1626
processed_new_refs.update(refs)
1628
# For new_prefixes we have the full length prefixes queued up.
1629
# However, we also need possible prefixes. (If we have a known ref to
1630
# 'ab', then we also need to include 'a'.) So expand the
1631
# new_prefixes to include all shorter prefixes
1632
for prefix in list(new_prefixes):
1633
new_prefixes.update([prefix[:i] for i in xrange(1, len(prefix))])
1634
self._enqueue_old(new_prefixes, old_chks_to_enqueue)
1636
def _flush_new_queue(self):
1637
# No need to maintain the heap invariant anymore, just pull things out
1639
refs = set(self._new_queue)
1640
self._new_queue = []
1641
# First pass, flush all interesting items and convert to using direct refs
1642
all_old_chks = self._all_old_chks
1643
processed_new_refs = self._processed_new_refs
1644
all_old_items = self._all_old_items
1645
new_items = [item for item in self._new_item_queue
1646
if item not in all_old_items]
1647
self._new_item_queue = []
1649
yield None, new_items
1650
refs = refs.difference(all_old_chks)
1651
processed_new_refs.update(refs)
1653
# TODO: Using a SimpleSet for self._processed_new_refs and
1654
# saved as much as 10MB of peak memory. However, it requires
1655
# implementing a non-pyrex version.
1657
next_refs_update = next_refs.update
1658
# Inlining _read_nodes_from_store improves 'bzr branch bzr.dev'
1659
# from 1m54s to 1m51s. Consider it.
1660
for record, _, p_refs, items in self._read_nodes_from_store(refs):
1662
# using the 'if' check saves about 145s => 141s, when
1663
# streaming initial branch of Launchpad data.
1664
items = [item for item in items
1665
if item not in all_old_items]
1667
next_refs_update([p_r[1] for p_r in p_refs])
1669
# set1.difference(set/dict) walks all of set1, and checks if it
1670
# exists in 'other'.
1671
# set1.difference(iterable) walks all of iterable, and does a
1672
# 'difference_update' on a clone of set1. Pick wisely based on the
1673
# expected sizes of objects.
1674
# in our case it is expected that 'new_refs' will always be quite
1676
next_refs = next_refs.difference(all_old_chks)
1677
next_refs = next_refs.difference(processed_new_refs)
1678
processed_new_refs.update(next_refs)
1681
def _process_next_old(self):
1682
# Since we don't filter uninteresting any further than during
1683
# _read_all_roots, process the whole queue in a single pass.
1684
refs = self._old_queue
1685
self._old_queue = []
1686
all_old_chks = self._all_old_chks
1687
for record, _, prefix_refs, items in self._read_nodes_from_store(refs):
1688
# TODO: Use StaticTuple here?
1689
self._all_old_items.update(items)
1690
refs = [r for _,r in prefix_refs if r not in all_old_chks]
1691
self._old_queue.extend(refs)
1692
all_old_chks.update(refs)
1694
def _process_queues(self):
1695
while self._old_queue:
1696
self._process_next_old()
1697
return self._flush_new_queue()
1700
for record in self._read_all_roots():
1702
for record, items in self._process_queues():
1381
1706
def iter_interesting_nodes(store, interesting_root_keys,
1390
1715
:param uninteresting_root_keys: keys which should be filtered out of the
1393
(interesting records, interesting chk's, interesting key:values)
1718
(interesting record, {interesting key:values})
1395
# TODO: consider that it may be more memory efficient to use the 20-byte
1396
# sha1 string, rather than tuples of hexidecimal sha1 strings.
1397
# TODO: Try to factor out a lot of the get_record_stream() calls into a
1398
# helper function similar to _read_bytes. This function should be
1399
# able to use nodes from the _page_cache as well as actually
1400
# requesting bytes from the store.
1402
# A way to adapt from the compressed texts back into fulltexts
1403
# In a way, this seems like a layering inversion to have CHKMap know the
1404
# details of versionedfile
1405
adapter_class = versionedfile.adapter_registry.get(
1406
('knit-ft-gz', 'fulltext'))
1407
adapter = adapter_class(store)
1409
(all_uninteresting_chks, all_uninteresting_items, interesting_keys,
1410
interesting_records, interesting_items) = _find_all_uninteresting(store,
1411
interesting_root_keys, uninteresting_root_keys, adapter, pb)
1413
# Now that we know everything uninteresting, we can yield information from
1415
interesting_items.difference_update(all_uninteresting_items)
1416
records = dict((record.key, record) for record in interesting_records
1417
if record.key not in all_uninteresting_chks)
1418
if records or interesting_items:
1419
yield records, interesting_items
1420
interesting_keys.difference_update(all_uninteresting_chks)
1422
chks_to_read = interesting_keys
1426
for record in store.get_record_stream(chks_to_read, 'unordered', False):
1429
pb.update('find chk pages', counter)
1430
# TODO: Handle 'absent'?
1432
bytes = record.get_bytes_as('fulltext')
1433
except errors.UnavailableRepresentation:
1434
bytes = adapter.get_bytes(record)
1435
# We don't care about search_key_func for this code, because we
1436
# only care about external references.
1437
node = _deserialise(bytes, record.key, search_key_func=None)
1438
if type(node) is InternalNode:
1439
# all_uninteresting_chks grows large, as it lists all nodes we
1440
# don't want to process (including already seen interesting
1442
# small.difference_update(large) scales O(large), but
1443
# small.difference(large) scales O(small).
1444
# Also, we know we just _deserialised this node, so we can
1445
# access the dict directly.
1446
chks = set(node._items.itervalues()).difference(
1447
all_uninteresting_chks)
1448
# Is set() and .difference_update better than:
1449
# chks = [chk for chk in node.refs()
1450
# if chk not in all_uninteresting_chks]
1451
next_chks.update(chks)
1452
# These are now uninteresting everywhere else
1453
all_uninteresting_chks.update(chks)
1454
interesting_items = []
1456
interesting_items = [item for item in node._items.iteritems()
1457
if item not in all_uninteresting_items]
1458
# TODO: Do we need to filter out items that we have already
1459
# seen on other pages? We don't really want to buffer the
1460
# whole thing, but it does mean that callers need to
1461
# understand they may get duplicate values.
1462
# all_uninteresting_items.update(interesting_items)
1463
yield {record.key: record}, interesting_items
1464
chks_to_read = next_chks
1720
iterator = CHKMapDifference(store, interesting_root_keys,
1721
uninteresting_root_keys,
1722
search_key_func=store._search_key_func,
1724
return iterator.process()