~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/chk_map.py

  • Committer: Matt Nordhoff
  • Date: 2009-06-23 05:12:07 UTC
  • mto: This revision was merged to the branch mainline in revision 4474.
  • Revision ID: mnordhoff@mattnordhoff.com-20090623051207-fksdtbzkwtnrw9dd
Update _add_text docstrings that still referred to add_text.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008, 2009 Canonical Ltd
 
1
# Copyright (C) 2008 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
26
26
 
27
27
Updates to a CHKMap are done preferentially via the apply_delta method, to
28
28
allow optimisation of the update operation; but individual map/unmap calls are
29
 
possible and supported. Individual changes via map/unmap are buffered in memory
30
 
until the _save method is called to force serialisation of the tree.
31
 
apply_delta records its changes immediately by performing an implicit _save.
 
29
possible and supported. All changes via map/unmap are buffered in memory until
 
30
the _save method is called to force serialisation of the tree. apply_delta
 
31
performs a _save implicitly.
32
32
 
33
33
TODO:
34
34
-----
38
38
"""
39
39
 
40
40
import heapq
 
41
import time
41
42
 
42
43
from bzrlib import lazy_import
43
44
lazy_import.lazy_import(globals(), """
 
45
from bzrlib import versionedfile
 
46
""")
44
47
from bzrlib import (
45
48
    errors,
46
 
    versionedfile,
47
 
    )
48
 
""")
49
 
from bzrlib import (
50
49
    lru_cache,
51
50
    osutils,
52
51
    registry,
60
59
# We are caching bytes so len(value) is perfectly accurate
61
60
_page_cache = lru_cache.LRUSizeCache(_PAGE_CACHE_SIZE)
62
61
 
63
 
def clear_cache():
64
 
    _page_cache.clear()
65
 
 
66
62
# If a ChildNode falls below this many bytes, we check for a remap
67
63
_INTERESTING_NEW_SIZE = 50
68
64
# If a ChildNode shrinks by more than this amount, we check for a remap
111
107
            of old_key is removed.
112
108
        """
113
109
        delete_count = 0
114
 
        # Check preconditions first.
115
 
        new_items = set([key for (old, key, value) in delta if key is not None
116
 
            and old is None])
117
 
        existing_new = list(self.iteritems(key_filter=new_items))
118
 
        if existing_new:
119
 
            raise errors.InconsistentDeltaDelta(delta,
120
 
                "New items are already in the map %r." % existing_new)
121
 
        # Now apply changes.
122
110
        for old, new, value in delta:
123
111
            if old is not None and old != new:
124
112
                self.unmap(old, check_remap=False)
133
121
 
134
122
    def _ensure_root(self):
135
123
        """Ensure that the root node is an object not a key."""
136
 
        if type(self._root_node) is tuple:
 
124
        if type(self._root_node) == tuple:
137
125
            # Demand-load the root
138
126
            self._root_node = self._get_node(self._root_node)
139
127
 
147
135
        :param node: A tuple key or node object.
148
136
        :return: A node object.
149
137
        """
150
 
        if type(node) is tuple:
 
138
        if type(node) == tuple:
151
139
            bytes = self._read_bytes(node)
152
140
            return _deserialise(bytes, node,
153
141
                search_key_func=self._search_key_func)
215
203
            multiple pages.
216
204
        :return: The root chk of the resulting CHKMap.
217
205
        """
218
 
        root_key = klass._create_directly(store, initial_value,
219
 
            maximum_size=maximum_size, key_width=key_width,
220
 
            search_key_func=search_key_func)
221
 
        return root_key
222
 
 
223
 
    @classmethod
224
 
    def _create_via_map(klass, store, initial_value, maximum_size=0,
225
 
                        key_width=1, search_key_func=None):
226
 
        result = klass(store, None, search_key_func=search_key_func)
 
206
        result = CHKMap(store, None, search_key_func=search_key_func)
227
207
        result._root_node.set_maximum_size(maximum_size)
228
208
        result._root_node._key_width = key_width
229
209
        delta = []
230
210
        for key, value in initial_value.items():
231
211
            delta.append((None, key, value))
232
 
        root_key = result.apply_delta(delta)
233
 
        return root_key
234
 
 
235
 
    @classmethod
236
 
    def _create_directly(klass, store, initial_value, maximum_size=0,
237
 
                         key_width=1, search_key_func=None):
238
 
        node = LeafNode(search_key_func=search_key_func)
239
 
        node.set_maximum_size(maximum_size)
240
 
        node._key_width = key_width
241
 
        node._items = dict(initial_value)
242
 
        node._raw_size = sum([node._key_value_len(key, value)
243
 
                              for key,value in initial_value.iteritems()])
244
 
        node._len = len(node._items)
245
 
        node._compute_search_prefix()
246
 
        node._compute_serialised_prefix()
247
 
        if (node._len > 1
248
 
            and maximum_size
249
 
            and node._current_size() > maximum_size):
250
 
            prefix, node_details = node._split(store)
251
 
            if len(node_details) == 1:
252
 
                raise AssertionError('Failed to split using node._split')
253
 
            node = InternalNode(prefix, search_key_func=search_key_func)
254
 
            node.set_maximum_size(maximum_size)
255
 
            node._key_width = key_width
256
 
            for split, subnode in node_details:
257
 
                node.add_node(split, subnode)
258
 
        keys = list(node.serialise(store))
259
 
        return keys[-1]
 
212
        return result.apply_delta(delta)
260
213
 
261
214
    def iter_changes(self, basis):
262
215
        """Iterate over the changes between basis and self.
496
449
        return len(self._root_node)
497
450
 
498
451
    def map(self, key, value):
499
 
        """Map a key tuple to value.
500
 
        
501
 
        :param key: A key to map.
502
 
        :param value: The value to assign to key.
503
 
        """
 
452
        """Map a key tuple to value."""
504
453
        # Need a root object.
505
454
        self._ensure_root()
506
455
        prefix, node_details = self._root_node.map(self._store, key, value)
516
465
 
517
466
    def _node_key(self, node):
518
467
        """Get the key for a node whether it's a tuple or node."""
519
 
        if type(node) is tuple:
 
468
        if type(node) == tuple:
520
469
            return node
521
470
        else:
522
471
            return node._key
542
491
 
543
492
        :return: The key of the root node.
544
493
        """
545
 
        if type(self._root_node) is tuple:
 
494
        if type(self._root_node) == tuple:
546
495
            # Already saved.
547
496
            return self._root_node
548
497
        keys = list(self._root_node.serialise(self._store))
815
764
                result[prefix] = node
816
765
            else:
817
766
                node = result[prefix]
818
 
            sub_prefix, node_details = node.map(store, key, value)
819
 
            if len(node_details) > 1:
820
 
                if prefix != sub_prefix:
821
 
                    # This node has been split and is now found via a different
822
 
                    # path
823
 
                    result.pop(prefix)
824
 
                new_node = InternalNode(sub_prefix,
825
 
                    search_key_func=self._search_key_func)
826
 
                new_node.set_maximum_size(self._maximum_size)
827
 
                new_node._key_width = self._key_width
828
 
                for split, node in node_details:
829
 
                    new_node.add_node(split, node)
830
 
                result[prefix] = new_node
 
767
            node.map(store, key, value)
831
768
        return common_prefix, result.items()
832
769
 
833
770
    def map(self, store, key, value):
1018
955
        # prefix is the key in self._items to use, key_filter is the key_filter
1019
956
        # entries that would match this node
1020
957
        keys = {}
1021
 
        shortcut = False
1022
958
        if key_filter is None:
1023
 
            # yielding all nodes, yield whatever we have, and queue up a read
1024
 
            # for whatever we are missing
1025
 
            shortcut = True
1026
959
            for prefix, node in self._items.iteritems():
1027
 
                if node.__class__ is tuple:
 
960
                if type(node) == tuple:
1028
961
                    keys[node] = (prefix, None)
1029
962
                else:
1030
963
                    yield node, None
1031
 
        elif len(key_filter) == 1:
1032
 
            # Technically, this path could also be handled by the first check
1033
 
            # in 'self._node_width' in length_filters. However, we can handle
1034
 
            # this case without spending any time building up the
1035
 
            # prefix_to_keys, etc state.
1036
 
 
1037
 
            # This is a bit ugly, but TIMEIT showed it to be by far the fastest
1038
 
            # 0.626us   list(key_filter)[0]
1039
 
            #       is a func() for list(), 2 mallocs, and a getitem
1040
 
            # 0.489us   [k for k in key_filter][0]
1041
 
            #       still has the mallocs, avoids the func() call
1042
 
            # 0.350us   iter(key_filter).next()
1043
 
            #       has a func() call, and mallocs an iterator
1044
 
            # 0.125us   for key in key_filter: pass
1045
 
            #       no func() overhead, might malloc an iterator
1046
 
            # 0.105us   for key in key_filter: break
1047
 
            #       no func() overhead, might malloc an iterator, probably
1048
 
            #       avoids checking an 'else' clause as part of the for
1049
 
            for key in key_filter:
1050
 
                break
1051
 
            search_prefix = self._search_prefix_filter(key)
1052
 
            if len(search_prefix) == self._node_width:
1053
 
                # This item will match exactly, so just do a dict lookup, and
1054
 
                # see what we can return
1055
 
                shortcut = True
1056
 
                try:
1057
 
                    node = self._items[search_prefix]
1058
 
                except KeyError:
1059
 
                    # A given key can only match 1 child node, if it isn't
1060
 
                    # there, then we can just return nothing
1061
 
                    return
1062
 
                if node.__class__ is tuple:
1063
 
                    keys[node] = (search_prefix, [key])
1064
 
                else:
1065
 
                    # This is loaded, and the only thing that can match,
1066
 
                    # return
1067
 
                    yield node, [key]
1068
 
                    return
1069
 
        if not shortcut:
1070
 
            # First, convert all keys into a list of search prefixes
1071
 
            # Aggregate common prefixes, and track the keys they come from
 
964
        else:
 
965
            # XXX defaultdict ?
1072
966
            prefix_to_keys = {}
1073
967
            length_filters = {}
1074
968
            for key in key_filter:
1075
 
                search_prefix = self._search_prefix_filter(key)
 
969
                search_key = self._search_prefix_filter(key)
1076
970
                length_filter = length_filters.setdefault(
1077
 
                                    len(search_prefix), set())
1078
 
                length_filter.add(search_prefix)
1079
 
                prefix_to_keys.setdefault(search_prefix, []).append(key)
1080
 
 
1081
 
            if (self._node_width in length_filters
1082
 
                and len(length_filters) == 1):
1083
 
                # all of the search prefixes match exactly _node_width. This
1084
 
                # means that everything is an exact match, and we can do a
1085
 
                # lookup into self._items, rather than iterating over the items
1086
 
                # dict.
1087
 
                search_prefixes = length_filters[self._node_width]
1088
 
                for search_prefix in search_prefixes:
1089
 
                    try:
1090
 
                        node = self._items[search_prefix]
1091
 
                    except KeyError:
1092
 
                        # We can ignore this one
1093
 
                        continue
1094
 
                    node_key_filter = prefix_to_keys[search_prefix]
1095
 
                    if node.__class__ is tuple:
1096
 
                        keys[node] = (search_prefix, node_key_filter)
 
971
                                    len(search_key), set())
 
972
                length_filter.add(search_key)
 
973
                prefix_to_keys.setdefault(search_key, []).append(key)
 
974
            length_filters = length_filters.items()
 
975
            for prefix, node in self._items.iteritems():
 
976
                node_key_filter = []
 
977
                for length, length_filter in length_filters:
 
978
                    sub_prefix = prefix[:length]
 
979
                    if sub_prefix in length_filter:
 
980
                        node_key_filter.extend(prefix_to_keys[sub_prefix])
 
981
                if node_key_filter: # this key matched something, yield it
 
982
                    if type(node) == tuple:
 
983
                        keys[node] = (prefix, node_key_filter)
1097
984
                    else:
1098
985
                        yield node, node_key_filter
1099
 
            else:
1100
 
                # The slow way. We walk every item in self._items, and check to
1101
 
                # see if there are any matches
1102
 
                length_filters = length_filters.items()
1103
 
                for prefix, node in self._items.iteritems():
1104
 
                    node_key_filter = []
1105
 
                    for length, length_filter in length_filters:
1106
 
                        sub_prefix = prefix[:length]
1107
 
                        if sub_prefix in length_filter:
1108
 
                            node_key_filter.extend(prefix_to_keys[sub_prefix])
1109
 
                    if node_key_filter: # this key matched something, yield it
1110
 
                        if node.__class__ is tuple:
1111
 
                            keys[node] = (prefix, node_key_filter)
1112
 
                        else:
1113
 
                            yield node, node_key_filter
1114
986
        if keys:
1115
987
            # Look in the page cache for some more bytes
1116
988
            found_keys = set()
1245
1117
        :return: An iterable of the keys inserted by this operation.
1246
1118
        """
1247
1119
        for node in self._items.itervalues():
1248
 
            if type(node) is tuple:
 
1120
            if type(node) == tuple:
1249
1121
                # Never deserialised.
1250
1122
                continue
1251
1123
            if node._key is not None:
1262
1134
        lines.append('%s\n' % (self._search_prefix,))
1263
1135
        prefix_len = len(self._search_prefix)
1264
1136
        for prefix, node in sorted(self._items.items()):
1265
 
            if type(node) is tuple:
 
1137
            if type(node) == tuple:
1266
1138
                key = node[0]
1267
1139
            else:
1268
1140
                key = node._key[0]
1307
1179
            raise AssertionError("unserialised nodes have no refs.")
1308
1180
        refs = []
1309
1181
        for value in self._items.itervalues():
1310
 
            if type(value) is tuple:
 
1182
            if type(value) == tuple:
1311
1183
                refs.append(value)
1312
1184
            else:
1313
1185
                refs.append(value.key())
1414
1286
    return node
1415
1287
 
1416
1288
 
1417
 
class CHKMapDifference(object):
1418
 
    """Iterate the stored pages and key,value pairs for (new - old).
1419
 
 
1420
 
    This class provides a generator over the stored CHK pages and the
1421
 
    (key, value) pairs that are in any of the new maps and not in any of the
1422
 
    old maps.
1423
 
 
1424
 
    Note that it may yield chk pages that are common (especially root nodes),
1425
 
    but it won't yield (key,value) pairs that are common.
1426
 
    """
1427
 
 
1428
 
    def __init__(self, store, new_root_keys, old_root_keys,
1429
 
                 search_key_func, pb=None):
1430
 
        self._store = store
1431
 
        self._new_root_keys = new_root_keys
1432
 
        self._old_root_keys = old_root_keys
1433
 
        self._pb = pb
1434
 
        # All uninteresting chks that we have seen. By the time they are added
1435
 
        # here, they should be either fully ignored, or queued up for
1436
 
        # processing
1437
 
        self._all_old_chks = set(self._old_root_keys)
1438
 
        # All items that we have seen from the old_root_keys
1439
 
        self._all_old_items = set()
1440
 
        # These are interesting items which were either read, or already in the
1441
 
        # interesting queue (so we don't need to walk them again)
1442
 
        self._processed_new_refs = set()
1443
 
        self._search_key_func = search_key_func
1444
 
 
1445
 
        # The uninteresting and interesting nodes to be searched
1446
 
        self._old_queue = []
1447
 
        self._new_queue = []
1448
 
        # Holds the (key, value) items found when processing the root nodes,
1449
 
        # waiting for the uninteresting nodes to be walked
1450
 
        self._new_item_queue = []
1451
 
        self._state = None
1452
 
 
1453
 
    def _read_nodes_from_store(self, keys):
1454
 
        # We chose not to use _page_cache, because we think in terms of records
1455
 
        # to be yielded. Also, we expect to touch each page only 1 time during
1456
 
        # this code. (We may want to evaluate saving the raw bytes into the
1457
 
        # page cache, which would allow a working tree update after the fetch
1458
 
        # to not have to read the bytes again.)
1459
 
        stream = self._store.get_record_stream(keys, 'unordered', True)
1460
 
        for record in stream:
1461
 
            if self._pb is not None:
1462
 
                self._pb.tick()
1463
 
            if record.storage_kind == 'absent':
1464
 
                raise errors.NoSuchRevision(self._store, record.key)
 
1289
def _find_children_info(store, interesting_keys, uninteresting_keys, pb):
 
1290
    """Read the associated records, and determine what is interesting."""
 
1291
    uninteresting_keys = set(uninteresting_keys)
 
1292
    chks_to_read = uninteresting_keys.union(interesting_keys)
 
1293
    next_uninteresting = set()
 
1294
    next_interesting = set()
 
1295
    uninteresting_items = set()
 
1296
    interesting_items = set()
 
1297
    interesting_to_yield = []
 
1298
    for record in store.get_record_stream(chks_to_read, 'unordered', True):
 
1299
        # records_read.add(record.key())
 
1300
        if pb is not None:
 
1301
            pb.tick()
 
1302
        bytes = record.get_bytes_as('fulltext')
 
1303
        # We don't care about search_key_func for this code, because we only
 
1304
        # care about external references.
 
1305
        node = _deserialise(bytes, record.key, search_key_func=None)
 
1306
        if record.key in uninteresting_keys:
 
1307
            if type(node) is InternalNode:
 
1308
                next_uninteresting.update(node.refs())
 
1309
            else:
 
1310
                # We know we are at a LeafNode, so we can pass None for the
 
1311
                # store
 
1312
                uninteresting_items.update(node.iteritems(None))
 
1313
        else:
 
1314
            interesting_to_yield.append(record.key)
 
1315
            if type(node) is InternalNode:
 
1316
                next_interesting.update(node.refs())
 
1317
            else:
 
1318
                interesting_items.update(node.iteritems(None))
 
1319
    return (next_uninteresting, uninteresting_items,
 
1320
            next_interesting, interesting_to_yield, interesting_items)
 
1321
 
 
1322
 
 
1323
def _find_all_uninteresting(store, interesting_root_keys,
 
1324
                            uninteresting_root_keys, pb):
 
1325
    """Determine the full set of uninteresting keys."""
 
1326
    # What about duplicates between interesting_root_keys and
 
1327
    # uninteresting_root_keys?
 
1328
    if not uninteresting_root_keys:
 
1329
        # Shortcut case. We know there is nothing uninteresting to filter out
 
1330
        # So we just let the rest of the algorithm do the work
 
1331
        # We know there is nothing uninteresting, and we didn't have to read
 
1332
        # any interesting records yet.
 
1333
        return (set(), set(), set(interesting_root_keys), [], set())
 
1334
    all_uninteresting_chks = set(uninteresting_root_keys)
 
1335
    all_uninteresting_items = set()
 
1336
 
 
1337
    # First step, find the direct children of both the interesting and
 
1338
    # uninteresting set
 
1339
    (uninteresting_keys, uninteresting_items,
 
1340
     interesting_keys, interesting_to_yield,
 
1341
     interesting_items) = _find_children_info(store, interesting_root_keys,
 
1342
                                              uninteresting_root_keys,
 
1343
                                              pb=pb)
 
1344
    all_uninteresting_chks.update(uninteresting_keys)
 
1345
    all_uninteresting_items.update(uninteresting_items)
 
1346
    del uninteresting_items
 
1347
    # Note: Exact matches between interesting and uninteresting do not need
 
1348
    #       to be search further. Non-exact matches need to be searched in case
 
1349
    #       there is a future exact-match
 
1350
    uninteresting_keys.difference_update(interesting_keys)
 
1351
 
 
1352
    # Second, find the full set of uninteresting bits reachable by the
 
1353
    # uninteresting roots
 
1354
    chks_to_read = uninteresting_keys
 
1355
    while chks_to_read:
 
1356
        next_chks = set()
 
1357
        for record in store.get_record_stream(chks_to_read, 'unordered', False):
 
1358
            # TODO: Handle 'absent'
 
1359
            if pb is not None:
 
1360
                pb.tick()
1465
1361
            bytes = record.get_bytes_as('fulltext')
1466
 
            node = _deserialise(bytes, record.key,
1467
 
                                search_key_func=self._search_key_func)
 
1362
            # We don't care about search_key_func for this code, because we
 
1363
            # only care about external references.
 
1364
            node = _deserialise(bytes, record.key, search_key_func=None)
1468
1365
            if type(node) is InternalNode:
1469
 
                # Note we don't have to do node.refs() because we know that
1470
 
                # there are no children that have been pushed into this node
1471
 
                prefix_refs = node._items.items()
1472
 
                items = []
 
1366
                # uninteresting_prefix_chks.update(node._items.iteritems())
 
1367
                chks = node._items.values()
 
1368
                # TODO: We remove the entries that are already in
 
1369
                #       uninteresting_chks ?
 
1370
                next_chks.update(chks)
 
1371
                all_uninteresting_chks.update(chks)
1473
1372
            else:
1474
 
                prefix_refs = []
1475
 
                items = node._items.items()
1476
 
            yield record, node, prefix_refs, items
1477
 
 
1478
 
    def _read_old_roots(self):
1479
 
        old_chks_to_enqueue = []
1480
 
        all_old_chks = self._all_old_chks
1481
 
        for record, node, prefix_refs, items in \
1482
 
                self._read_nodes_from_store(self._old_root_keys):
1483
 
            # Uninteresting node
1484
 
            prefix_refs = [p_r for p_r in prefix_refs
1485
 
                                if p_r[1] not in all_old_chks]
1486
 
            new_refs = [p_r[1] for p_r in prefix_refs]
1487
 
            all_old_chks.update(new_refs)
1488
 
            self._all_old_items.update(items)
1489
 
            # Queue up the uninteresting references
1490
 
            # Don't actually put them in the 'to-read' queue until we have
1491
 
            # finished checking the interesting references
1492
 
            old_chks_to_enqueue.extend(prefix_refs)
1493
 
        return old_chks_to_enqueue
1494
 
 
1495
 
    def _enqueue_old(self, new_prefixes, old_chks_to_enqueue):
1496
 
        # At this point, we have read all the uninteresting and interesting
1497
 
        # items, so we can queue up the uninteresting stuff, knowing that we've
1498
 
        # handled the interesting ones
1499
 
        for prefix, ref in old_chks_to_enqueue:
1500
 
            not_interesting = True
1501
 
            for i in xrange(len(prefix), 0, -1):
1502
 
                if prefix[:i] in new_prefixes:
1503
 
                    not_interesting = False
1504
 
                    break
1505
 
            if not_interesting:
1506
 
                # This prefix is not part of the remaining 'interesting set'
1507
 
                continue
1508
 
            self._old_queue.append(ref)
1509
 
 
1510
 
    def _read_all_roots(self):
1511
 
        """Read the root pages.
1512
 
 
1513
 
        This is structured as a generator, so that the root records can be
1514
 
        yielded up to whoever needs them without any buffering.
1515
 
        """
1516
 
        # This is the bootstrap phase
1517
 
        if not self._old_root_keys:
1518
 
            # With no old_root_keys we can just shortcut and be ready
1519
 
            # for _flush_new_queue
1520
 
            self._new_queue = list(self._new_root_keys)
1521
 
            return
1522
 
        old_chks_to_enqueue = self._read_old_roots()
1523
 
        # filter out any root keys that are already known to be uninteresting
1524
 
        new_keys = set(self._new_root_keys).difference(self._all_old_chks)
1525
 
        # These are prefixes that are present in new_keys that we are
1526
 
        # thinking to yield
1527
 
        new_prefixes = set()
1528
 
        # We are about to yield all of these, so we don't want them getting
1529
 
        # added a second time
1530
 
        processed_new_refs = self._processed_new_refs
1531
 
        processed_new_refs.update(new_keys)
1532
 
        for record, node, prefix_refs, items in \
1533
 
                self._read_nodes_from_store(new_keys):
1534
 
            # At this level, we now know all the uninteresting references
1535
 
            # So we filter and queue up whatever is remaining
1536
 
            prefix_refs = [p_r for p_r in prefix_refs
1537
 
                           if p_r[1] not in self._all_old_chks
1538
 
                              and p_r[1] not in processed_new_refs]
1539
 
            refs = [p_r[1] for p_r in prefix_refs]
1540
 
            new_prefixes.update([p_r[0] for p_r in prefix_refs])
1541
 
            self._new_queue.extend(refs)
1542
 
            # TODO: We can potentially get multiple items here, however the
1543
 
            #       current design allows for this, as callers will do the work
1544
 
            #       to make the results unique. We might profile whether we
1545
 
            #       gain anything by ensuring unique return values for items
1546
 
            new_items = [item for item in items
1547
 
                               if item not in self._all_old_items]
1548
 
            self._new_item_queue.extend(new_items)
1549
 
            new_prefixes.update([self._search_key_func(item[0])
1550
 
                                 for item in new_items])
1551
 
            processed_new_refs.update(refs)
1552
 
            yield record
1553
 
        # For new_prefixes we have the full length prefixes queued up.
1554
 
        # However, we also need possible prefixes. (If we have a known ref to
1555
 
        # 'ab', then we also need to include 'a'.) So expand the
1556
 
        # new_prefixes to include all shorter prefixes
1557
 
        for prefix in list(new_prefixes):
1558
 
            new_prefixes.update([prefix[:i] for i in xrange(1, len(prefix))])
1559
 
        self._enqueue_old(new_prefixes, old_chks_to_enqueue)
1560
 
 
1561
 
    def _flush_new_queue(self):
1562
 
        # No need to maintain the heap invariant anymore, just pull things out
1563
 
        # and process them
1564
 
        refs = set(self._new_queue)
1565
 
        self._new_queue = []
1566
 
        # First pass, flush all interesting items and convert to using direct refs
1567
 
        all_old_chks = self._all_old_chks
1568
 
        processed_new_refs = self._processed_new_refs
1569
 
        all_old_items = self._all_old_items
1570
 
        new_items = [item for item in self._new_item_queue
1571
 
                           if item not in all_old_items]
1572
 
        self._new_item_queue = []
1573
 
        if new_items:
1574
 
            yield None, new_items
1575
 
        refs = refs.difference(all_old_chks)
1576
 
        while refs:
1577
 
            next_refs = set()
1578
 
            next_refs_update = next_refs.update
1579
 
            # Inlining _read_nodes_from_store improves 'bzr branch bzr.dev'
1580
 
            # from 1m54s to 1m51s. Consider it.
1581
 
            for record, _, p_refs, items in self._read_nodes_from_store(refs):
1582
 
                items = [item for item in items
1583
 
                         if item not in all_old_items]
1584
 
                yield record, items
1585
 
                next_refs_update([p_r[1] for p_r in p_refs])
1586
 
            next_refs = next_refs.difference(all_old_chks)
1587
 
            next_refs = next_refs.difference(processed_new_refs)
1588
 
            processed_new_refs.update(next_refs)
1589
 
            refs = next_refs
1590
 
 
1591
 
    def _process_next_old(self):
1592
 
        # Since we don't filter uninteresting any further than during
1593
 
        # _read_all_roots, process the whole queue in a single pass.
1594
 
        refs = self._old_queue
1595
 
        self._old_queue = []
1596
 
        all_old_chks = self._all_old_chks
1597
 
        for record, _, prefix_refs, items in self._read_nodes_from_store(refs):
1598
 
            self._all_old_items.update(items)
1599
 
            refs = [r for _,r in prefix_refs if r not in all_old_chks]
1600
 
            self._old_queue.extend(refs)
1601
 
            all_old_chks.update(refs)
1602
 
 
1603
 
    def _process_queues(self):
1604
 
        while self._old_queue:
1605
 
            self._process_next_old()
1606
 
        return self._flush_new_queue()
1607
 
 
1608
 
    def process(self):
1609
 
        for record in self._read_all_roots():
1610
 
            yield record, []
1611
 
        for record, items in self._process_queues():
1612
 
            yield record, items
 
1373
                all_uninteresting_items.update(node._items.iteritems())
 
1374
        chks_to_read = next_chks
 
1375
    return (all_uninteresting_chks, all_uninteresting_items,
 
1376
            interesting_keys, interesting_to_yield, interesting_items)
1613
1377
 
1614
1378
 
1615
1379
def iter_interesting_nodes(store, interesting_root_keys,
1626
1390
    :return: Yield
1627
1391
        (interesting record, {interesting key:values})
1628
1392
    """
1629
 
    iterator = CHKMapDifference(store, interesting_root_keys,
1630
 
                                uninteresting_root_keys,
1631
 
                                search_key_func=store._search_key_func,
1632
 
                                pb=pb)
1633
 
    return iterator.process()
 
1393
    # TODO: consider that it may be more memory efficient to use the 20-byte
 
1394
    #       sha1 string, rather than tuples of hexidecimal sha1 strings.
 
1395
    # TODO: Try to factor out a lot of the get_record_stream() calls into a
 
1396
    #       helper function similar to _read_bytes. This function should be
 
1397
    #       able to use nodes from the _page_cache as well as actually
 
1398
    #       requesting bytes from the store.
 
1399
 
 
1400
    (all_uninteresting_chks, all_uninteresting_items, interesting_keys,
 
1401
     interesting_to_yield, interesting_items) = _find_all_uninteresting(store,
 
1402
        interesting_root_keys, uninteresting_root_keys, pb)
 
1403
 
 
1404
    # Now that we know everything uninteresting, we can yield information from
 
1405
    # our first request
 
1406
    interesting_items.difference_update(all_uninteresting_items)
 
1407
    interesting_to_yield = set(interesting_to_yield) - all_uninteresting_chks
 
1408
    if interesting_items:
 
1409
        yield None, interesting_items
 
1410
    if interesting_to_yield:
 
1411
        # We request these records again, rather than buffering the root
 
1412
        # records, most likely they are still in the _group_cache anyway.
 
1413
        for record in store.get_record_stream(interesting_to_yield,
 
1414
                                              'unordered', False):
 
1415
            yield record, []
 
1416
    all_uninteresting_chks.update(interesting_to_yield)
 
1417
    interesting_keys.difference_update(all_uninteresting_chks)
 
1418
 
 
1419
    chks_to_read = interesting_keys
 
1420
    counter = 0
 
1421
    while chks_to_read:
 
1422
        next_chks = set()
 
1423
        for record in store.get_record_stream(chks_to_read, 'unordered', False):
 
1424
            counter += 1
 
1425
            if pb is not None:
 
1426
                pb.update('find chk pages', counter)
 
1427
            # TODO: Handle 'absent'?
 
1428
            bytes = record.get_bytes_as('fulltext')
 
1429
            # We don't care about search_key_func for this code, because we
 
1430
            # only care about external references.
 
1431
            node = _deserialise(bytes, record.key, search_key_func=None)
 
1432
            if type(node) is InternalNode:
 
1433
                # all_uninteresting_chks grows large, as it lists all nodes we
 
1434
                # don't want to process (including already seen interesting
 
1435
                # nodes).
 
1436
                # small.difference_update(large) scales O(large), but
 
1437
                # small.difference(large) scales O(small).
 
1438
                # Also, we know we just _deserialised this node, so we can
 
1439
                # access the dict directly.
 
1440
                chks = set(node._items.itervalues()).difference(
 
1441
                            all_uninteresting_chks)
 
1442
                # Is set() and .difference_update better than:
 
1443
                # chks = [chk for chk in node.refs()
 
1444
                #              if chk not in all_uninteresting_chks]
 
1445
                next_chks.update(chks)
 
1446
                # These are now uninteresting everywhere else
 
1447
                all_uninteresting_chks.update(chks)
 
1448
                interesting_items = []
 
1449
            else:
 
1450
                interesting_items = [item for item in node._items.iteritems()
 
1451
                                     if item not in all_uninteresting_items]
 
1452
                # TODO: Do we need to filter out items that we have already
 
1453
                #       seen on other pages? We don't really want to buffer the
 
1454
                #       whole thing, but it does mean that callers need to
 
1455
                #       understand they may get duplicate values.
 
1456
                # all_uninteresting_items.update(interesting_items)
 
1457
            yield record, interesting_items
 
1458
        chks_to_read = next_chks
1634
1459
 
1635
1460
 
1636
1461
try: