216
204
:return: The root chk of the resulting CHKMap.
218
root_key = klass._create_directly(store, initial_value,
219
maximum_size=maximum_size, key_width=key_width,
220
search_key_func=search_key_func)
224
def _create_via_map(klass, store, initial_value, maximum_size=0,
225
key_width=1, search_key_func=None):
226
result = klass(store, None, search_key_func=search_key_func)
206
result = CHKMap(store, None, search_key_func=search_key_func)
227
207
result._root_node.set_maximum_size(maximum_size)
228
208
result._root_node._key_width = key_width
230
210
for key, value in initial_value.items():
231
211
delta.append((None, key, value))
232
root_key = result.apply_delta(delta)
236
def _create_directly(klass, store, initial_value, maximum_size=0,
237
key_width=1, search_key_func=None):
238
node = LeafNode(search_key_func=search_key_func)
239
node.set_maximum_size(maximum_size)
240
node._key_width = key_width
241
node._items = dict(initial_value)
242
node._raw_size = sum([node._key_value_len(key, value)
243
for key,value in initial_value.iteritems()])
244
node._len = len(node._items)
245
node._compute_search_prefix()
246
node._compute_serialised_prefix()
249
and node._current_size() > maximum_size):
250
prefix, node_details = node._split(store)
251
if len(node_details) == 1:
252
raise AssertionError('Failed to split using node._split')
253
node = InternalNode(prefix, search_key_func=search_key_func)
254
node.set_maximum_size(maximum_size)
255
node._key_width = key_width
256
for split, subnode in node_details:
257
node.add_node(split, subnode)
258
keys = list(node.serialise(store))
212
return result.apply_delta(delta)
261
214
def iter_changes(self, basis):
262
215
"""Iterate over the changes between basis and self.
1018
954
# prefix is the key in self._items to use, key_filter is the key_filter
1019
955
# entries that would match this node
1022
957
if key_filter is None:
1023
# yielding all nodes, yield whatever we have, and queue up a read
1024
# for whatever we are missing
1026
958
for prefix, node in self._items.iteritems():
1027
if node.__class__ is tuple:
959
if type(node) == tuple:
1028
960
keys[node] = (prefix, None)
1030
962
yield node, None
1031
elif len(key_filter) == 1:
1032
# Technically, this path could also be handled by the first check
1033
# in 'self._node_width' in length_filters. However, we can handle
1034
# this case without spending any time building up the
1035
# prefix_to_keys, etc state.
1037
# This is a bit ugly, but TIMEIT showed it to be by far the fastest
1038
# 0.626us list(key_filter)[0]
1039
# is a func() for list(), 2 mallocs, and a getitem
1040
# 0.489us [k for k in key_filter][0]
1041
# still has the mallocs, avoids the func() call
1042
# 0.350us iter(key_filter).next()
1043
# has a func() call, and mallocs an iterator
1044
# 0.125us for key in key_filter: pass
1045
# no func() overhead, might malloc an iterator
1046
# 0.105us for key in key_filter: break
1047
# no func() overhead, might malloc an iterator, probably
1048
# avoids checking an 'else' clause as part of the for
1049
for key in key_filter:
1051
search_prefix = self._search_prefix_filter(key)
1052
if len(search_prefix) == self._node_width:
1053
# This item will match exactly, so just do a dict lookup, and
1054
# see what we can return
1057
node = self._items[search_prefix]
1059
# A given key can only match 1 child node, if it isn't
1060
# there, then we can just return nothing
1062
if node.__class__ is tuple:
1063
keys[node] = (search_prefix, [key])
1065
# This is loaded, and the only thing that can match,
1070
# First, convert all keys into a list of search prefixes
1071
# Aggregate common prefixes, and track the keys they come from
1072
965
prefix_to_keys = {}
1073
966
length_filters = {}
1074
967
for key in key_filter:
1075
search_prefix = self._search_prefix_filter(key)
968
search_key = self._search_prefix_filter(key)
1076
969
length_filter = length_filters.setdefault(
1077
len(search_prefix), set())
1078
length_filter.add(search_prefix)
1079
prefix_to_keys.setdefault(search_prefix, []).append(key)
1081
if (self._node_width in length_filters
1082
and len(length_filters) == 1):
1083
# all of the search prefixes match exactly _node_width. This
1084
# means that everything is an exact match, and we can do a
1085
# lookup into self._items, rather than iterating over the items
1087
search_prefixes = length_filters[self._node_width]
1088
for search_prefix in search_prefixes:
1090
node = self._items[search_prefix]
1092
# We can ignore this one
1094
node_key_filter = prefix_to_keys[search_prefix]
1095
if node.__class__ is tuple:
1096
keys[node] = (search_prefix, node_key_filter)
970
len(search_key), set())
971
length_filter.add(search_key)
972
prefix_to_keys.setdefault(search_key, []).append(key)
973
length_filters = length_filters.items()
974
for prefix, node in self._items.iteritems():
976
for length, length_filter in length_filters:
977
sub_prefix = prefix[:length]
978
if sub_prefix in length_filter:
979
node_key_filter.extend(prefix_to_keys[sub_prefix])
980
if node_key_filter: # this key matched something, yield it
981
if type(node) == tuple:
982
keys[node] = (prefix, node_key_filter)
1098
984
yield node, node_key_filter
1100
# The slow way. We walk every item in self._items, and check to
1101
# see if there are any matches
1102
length_filters = length_filters.items()
1103
for prefix, node in self._items.iteritems():
1104
node_key_filter = []
1105
for length, length_filter in length_filters:
1106
sub_prefix = prefix[:length]
1107
if sub_prefix in length_filter:
1108
node_key_filter.extend(prefix_to_keys[sub_prefix])
1109
if node_key_filter: # this key matched something, yield it
1110
if node.__class__ is tuple:
1111
keys[node] = (prefix, node_key_filter)
1113
yield node, node_key_filter
1115
986
# Look in the page cache for some more bytes
1116
987
found_keys = set()
1417
class CHKMapDifference(object):
1418
"""Iterate the stored pages and key,value pairs for (new - old).
1420
This class provides a generator over the stored CHK pages and the
1421
(key, value) pairs that are in any of the new maps and not in any of the
1424
Note that it may yield chk pages that are common (especially root nodes),
1425
but it won't yield (key,value) pairs that are common.
1428
def __init__(self, store, new_root_keys, old_root_keys,
1429
search_key_func, pb=None):
1431
self._new_root_keys = new_root_keys
1432
self._old_root_keys = old_root_keys
1434
# All uninteresting chks that we have seen. By the time they are added
1435
# here, they should be either fully ignored, or queued up for
1437
self._all_old_chks = set(self._old_root_keys)
1438
# All items that we have seen from the old_root_keys
1439
self._all_old_items = set()
1440
# These are interesting items which were either read, or already in the
1441
# interesting queue (so we don't need to walk them again)
1442
self._processed_new_refs = set()
1443
self._search_key_func = search_key_func
1445
# The uninteresting and interesting nodes to be searched
1446
self._old_queue = []
1447
self._new_queue = []
1448
# Holds the (key, value) items found when processing the root nodes,
1449
# waiting for the uninteresting nodes to be walked
1450
self._new_item_queue = []
1453
def _read_nodes_from_store(self, keys):
1454
# We chose not to use _page_cache, because we think in terms of records
1455
# to be yielded. Also, we expect to touch each page only 1 time during
1456
# this code. (We may want to evaluate saving the raw bytes into the
1457
# page cache, which would allow a working tree update after the fetch
1458
# to not have to read the bytes again.)
1459
stream = self._store.get_record_stream(keys, 'unordered', True)
1460
for record in stream:
1461
if self._pb is not None:
1463
if record.storage_kind == 'absent':
1464
raise errors.NoSuchRevision(self._store, record.key)
1288
def _find_children_info(store, interesting_keys, uninteresting_keys, pb):
1289
"""Read the associated records, and determine what is interesting."""
1290
uninteresting_keys = set(uninteresting_keys)
1291
chks_to_read = uninteresting_keys.union(interesting_keys)
1292
next_uninteresting = set()
1293
next_interesting = set()
1294
uninteresting_items = set()
1295
interesting_items = set()
1296
interesting_to_yield = []
1297
for record in store.get_record_stream(chks_to_read, 'unordered', True):
1298
# records_read.add(record.key())
1301
bytes = record.get_bytes_as('fulltext')
1302
# We don't care about search_key_func for this code, because we only
1303
# care about external references.
1304
node = _deserialise(bytes, record.key, search_key_func=None)
1305
if record.key in uninteresting_keys:
1306
if type(node) is InternalNode:
1307
next_uninteresting.update(node.refs())
1309
# We know we are at a LeafNode, so we can pass None for the
1311
uninteresting_items.update(node.iteritems(None))
1313
interesting_to_yield.append(record.key)
1314
if type(node) is InternalNode:
1315
next_interesting.update(node.refs())
1317
interesting_items.update(node.iteritems(None))
1318
return (next_uninteresting, uninteresting_items,
1319
next_interesting, interesting_to_yield, interesting_items)
1322
def _find_all_uninteresting(store, interesting_root_keys,
1323
uninteresting_root_keys, pb):
1324
"""Determine the full set of uninteresting keys."""
1325
# What about duplicates between interesting_root_keys and
1326
# uninteresting_root_keys?
1327
if not uninteresting_root_keys:
1328
# Shortcut case. We know there is nothing uninteresting to filter out
1329
# So we just let the rest of the algorithm do the work
1330
# We know there is nothing uninteresting, and we didn't have to read
1331
# any interesting records yet.
1332
return (set(), set(), set(interesting_root_keys), [], set())
1333
all_uninteresting_chks = set(uninteresting_root_keys)
1334
all_uninteresting_items = set()
1336
# First step, find the direct children of both the interesting and
1338
(uninteresting_keys, uninteresting_items,
1339
interesting_keys, interesting_to_yield,
1340
interesting_items) = _find_children_info(store, interesting_root_keys,
1341
uninteresting_root_keys,
1343
all_uninteresting_chks.update(uninteresting_keys)
1344
all_uninteresting_items.update(uninteresting_items)
1345
del uninteresting_items
1346
# Note: Exact matches between interesting and uninteresting do not need
1347
# to be search further. Non-exact matches need to be searched in case
1348
# there is a future exact-match
1349
uninteresting_keys.difference_update(interesting_keys)
1351
# Second, find the full set of uninteresting bits reachable by the
1352
# uninteresting roots
1353
chks_to_read = uninteresting_keys
1356
for record in store.get_record_stream(chks_to_read, 'unordered', False):
1357
# TODO: Handle 'absent'
1465
1360
bytes = record.get_bytes_as('fulltext')
1466
node = _deserialise(bytes, record.key,
1467
search_key_func=self._search_key_func)
1361
# We don't care about search_key_func for this code, because we
1362
# only care about external references.
1363
node = _deserialise(bytes, record.key, search_key_func=None)
1468
1364
if type(node) is InternalNode:
1469
# Note we don't have to do node.refs() because we know that
1470
# there are no children that have been pushed into this node
1471
prefix_refs = node._items.items()
1365
# uninteresting_prefix_chks.update(node._items.iteritems())
1366
chks = node._items.values()
1367
# TODO: We remove the entries that are already in
1368
# uninteresting_chks ?
1369
next_chks.update(chks)
1370
all_uninteresting_chks.update(chks)
1475
items = node._items.items()
1476
yield record, node, prefix_refs, items
1478
def _read_old_roots(self):
1479
old_chks_to_enqueue = []
1480
all_old_chks = self._all_old_chks
1481
for record, node, prefix_refs, items in \
1482
self._read_nodes_from_store(self._old_root_keys):
1483
# Uninteresting node
1484
prefix_refs = [p_r for p_r in prefix_refs
1485
if p_r[1] not in all_old_chks]
1486
new_refs = [p_r[1] for p_r in prefix_refs]
1487
all_old_chks.update(new_refs)
1488
self._all_old_items.update(items)
1489
# Queue up the uninteresting references
1490
# Don't actually put them in the 'to-read' queue until we have
1491
# finished checking the interesting references
1492
old_chks_to_enqueue.extend(prefix_refs)
1493
return old_chks_to_enqueue
1495
def _enqueue_old(self, new_prefixes, old_chks_to_enqueue):
1496
# At this point, we have read all the uninteresting and interesting
1497
# items, so we can queue up the uninteresting stuff, knowing that we've
1498
# handled the interesting ones
1499
for prefix, ref in old_chks_to_enqueue:
1500
not_interesting = True
1501
for i in xrange(len(prefix), 0, -1):
1502
if prefix[:i] in new_prefixes:
1503
not_interesting = False
1506
# This prefix is not part of the remaining 'interesting set'
1508
self._old_queue.append(ref)
1510
def _read_all_roots(self):
1511
"""Read the root pages.
1513
This is structured as a generator, so that the root records can be
1514
yielded up to whoever needs them without any buffering.
1516
# This is the bootstrap phase
1517
if not self._old_root_keys:
1518
# With no old_root_keys we can just shortcut and be ready
1519
# for _flush_new_queue
1520
self._new_queue = list(self._new_root_keys)
1522
old_chks_to_enqueue = self._read_old_roots()
1523
# filter out any root keys that are already known to be uninteresting
1524
new_keys = set(self._new_root_keys).difference(self._all_old_chks)
1525
# These are prefixes that are present in new_keys that we are
1527
new_prefixes = set()
1528
# We are about to yield all of these, so we don't want them getting
1529
# added a second time
1530
processed_new_refs = self._processed_new_refs
1531
processed_new_refs.update(new_keys)
1532
for record, node, prefix_refs, items in \
1533
self._read_nodes_from_store(new_keys):
1534
# At this level, we now know all the uninteresting references
1535
# So we filter and queue up whatever is remaining
1536
prefix_refs = [p_r for p_r in prefix_refs
1537
if p_r[1] not in self._all_old_chks
1538
and p_r[1] not in processed_new_refs]
1539
refs = [p_r[1] for p_r in prefix_refs]
1540
new_prefixes.update([p_r[0] for p_r in prefix_refs])
1541
self._new_queue.extend(refs)
1542
# TODO: We can potentially get multiple items here, however the
1543
# current design allows for this, as callers will do the work
1544
# to make the results unique. We might profile whether we
1545
# gain anything by ensuring unique return values for items
1546
new_items = [item for item in items
1547
if item not in self._all_old_items]
1548
self._new_item_queue.extend(new_items)
1549
new_prefixes.update([self._search_key_func(item[0])
1550
for item in new_items])
1551
processed_new_refs.update(refs)
1553
# For new_prefixes we have the full length prefixes queued up.
1554
# However, we also need possible prefixes. (If we have a known ref to
1555
# 'ab', then we also need to include 'a'.) So expand the
1556
# new_prefixes to include all shorter prefixes
1557
for prefix in list(new_prefixes):
1558
new_prefixes.update([prefix[:i] for i in xrange(1, len(prefix))])
1559
self._enqueue_old(new_prefixes, old_chks_to_enqueue)
1561
def _flush_new_queue(self):
1562
# No need to maintain the heap invariant anymore, just pull things out
1564
refs = set(self._new_queue)
1565
self._new_queue = []
1566
# First pass, flush all interesting items and convert to using direct refs
1567
all_old_chks = self._all_old_chks
1568
processed_new_refs = self._processed_new_refs
1569
all_old_items = self._all_old_items
1570
new_items = [item for item in self._new_item_queue
1571
if item not in all_old_items]
1572
self._new_item_queue = []
1574
yield None, new_items
1575
refs = refs.difference(all_old_chks)
1578
next_refs_update = next_refs.update
1579
# Inlining _read_nodes_from_store improves 'bzr branch bzr.dev'
1580
# from 1m54s to 1m51s. Consider it.
1581
for record, _, p_refs, items in self._read_nodes_from_store(refs):
1582
items = [item for item in items
1583
if item not in all_old_items]
1585
next_refs_update([p_r[1] for p_r in p_refs])
1586
next_refs = next_refs.difference(all_old_chks)
1587
next_refs = next_refs.difference(processed_new_refs)
1588
processed_new_refs.update(next_refs)
1591
def _process_next_old(self):
1592
# Since we don't filter uninteresting any further than during
1593
# _read_all_roots, process the whole queue in a single pass.
1594
refs = self._old_queue
1595
self._old_queue = []
1596
all_old_chks = self._all_old_chks
1597
for record, _, prefix_refs, items in self._read_nodes_from_store(refs):
1598
self._all_old_items.update(items)
1599
refs = [r for _,r in prefix_refs if r not in all_old_chks]
1600
self._old_queue.extend(refs)
1601
all_old_chks.update(refs)
1603
def _process_queues(self):
1604
while self._old_queue:
1605
self._process_next_old()
1606
return self._flush_new_queue()
1609
for record in self._read_all_roots():
1611
for record, items in self._process_queues():
1372
all_uninteresting_items.update(node._items.iteritems())
1373
chks_to_read = next_chks
1374
return (all_uninteresting_chks, all_uninteresting_items,
1375
interesting_keys, interesting_to_yield, interesting_items)
1615
1378
def iter_interesting_nodes(store, interesting_root_keys,
1627
1390
(interesting record, {interesting key:values})
1629
iterator = CHKMapDifference(store, interesting_root_keys,
1630
uninteresting_root_keys,
1631
search_key_func=store._search_key_func,
1633
return iterator.process()
1392
# TODO: consider that it may be more memory efficient to use the 20-byte
1393
# sha1 string, rather than tuples of hexidecimal sha1 strings.
1394
# TODO: Try to factor out a lot of the get_record_stream() calls into a
1395
# helper function similar to _read_bytes. This function should be
1396
# able to use nodes from the _page_cache as well as actually
1397
# requesting bytes from the store.
1399
(all_uninteresting_chks, all_uninteresting_items, interesting_keys,
1400
interesting_to_yield, interesting_items) = _find_all_uninteresting(store,
1401
interesting_root_keys, uninteresting_root_keys, pb)
1403
# Now that we know everything uninteresting, we can yield information from
1405
interesting_items.difference_update(all_uninteresting_items)
1406
interesting_to_yield = set(interesting_to_yield) - all_uninteresting_chks
1407
if interesting_items:
1408
yield None, interesting_items
1409
if interesting_to_yield:
1410
# We request these records again, rather than buffering the root
1411
# records, most likely they are still in the _group_cache anyway.
1412
for record in store.get_record_stream(interesting_to_yield,
1413
'unordered', False):
1415
all_uninteresting_chks.update(interesting_to_yield)
1416
interesting_keys.difference_update(all_uninteresting_chks)
1418
chks_to_read = interesting_keys
1422
for record in store.get_record_stream(chks_to_read, 'unordered', False):
1425
pb.update('find chk pages', counter)
1426
# TODO: Handle 'absent'?
1427
bytes = record.get_bytes_as('fulltext')
1428
# We don't care about search_key_func for this code, because we
1429
# only care about external references.
1430
node = _deserialise(bytes, record.key, search_key_func=None)
1431
if type(node) is InternalNode:
1432
# all_uninteresting_chks grows large, as it lists all nodes we
1433
# don't want to process (including already seen interesting
1435
# small.difference_update(large) scales O(large), but
1436
# small.difference(large) scales O(small).
1437
# Also, we know we just _deserialised this node, so we can
1438
# access the dict directly.
1439
chks = set(node._items.itervalues()).difference(
1440
all_uninteresting_chks)
1441
# Is set() and .difference_update better than:
1442
# chks = [chk for chk in node.refs()
1443
# if chk not in all_uninteresting_chks]
1444
next_chks.update(chks)
1445
# These are now uninteresting everywhere else
1446
all_uninteresting_chks.update(chks)
1447
interesting_items = []
1449
interesting_items = [item for item in node._items.iteritems()
1450
if item not in all_uninteresting_items]
1451
# TODO: Do we need to filter out items that we have already
1452
# seen on other pages? We don't really want to buffer the
1453
# whole thing, but it does mean that callers need to
1454
# understand they may get duplicate values.
1455
# all_uninteresting_items.update(interesting_items)
1456
yield record, interesting_items
1457
chks_to_read = next_chks