~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/chk_map.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-04-09 20:23:07 UTC
  • mfrom: (4265.1.4 bbc-merge)
  • Revision ID: pqm@pqm.ubuntu.com-20090409202307-n0depb16qepoe21o
(jam) Change _fetch_uses_deltas = False for CHK repos until we can
        write a better fix.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008, 2009 Canonical Ltd
 
1
# Copyright (C) 2008 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
26
26
 
27
27
Updates to a CHKMap are done preferentially via the apply_delta method, to
28
28
allow optimisation of the update operation; but individual map/unmap calls are
29
 
possible and supported. Individual changes via map/unmap are buffered in memory
30
 
until the _save method is called to force serialisation of the tree.
31
 
apply_delta records its changes immediately by performing an implicit _save.
 
29
possible and supported. All changes via map/unmap are buffered in memory until
 
30
the _save method is called to force serialisation of the tree. apply_delta
 
31
performs a _save implicitly.
32
32
 
33
33
TODO:
34
34
-----
38
38
"""
39
39
 
40
40
import heapq
 
41
import time
41
42
 
42
43
from bzrlib import lazy_import
43
44
lazy_import.lazy_import(globals(), """
 
45
from bzrlib import versionedfile
 
46
""")
44
47
from bzrlib import (
45
48
    errors,
46
 
    versionedfile,
47
 
    )
48
 
""")
49
 
from bzrlib import (
50
49
    lru_cache,
51
50
    osutils,
52
51
    registry,
60
59
# We are caching bytes so len(value) is perfectly accurate
61
60
_page_cache = lru_cache.LRUSizeCache(_PAGE_CACHE_SIZE)
62
61
 
63
 
def clear_cache():
64
 
    _page_cache.clear()
65
 
 
66
62
# If a ChildNode falls below this many bytes, we check for a remap
67
63
_INTERESTING_NEW_SIZE = 50
68
64
# If a ChildNode shrinks by more than this amount, we check for a remap
111
107
            of old_key is removed.
112
108
        """
113
109
        delete_count = 0
114
 
        # Check preconditions first.
115
 
        new_items = set([key for (old, key, value) in delta if key is not None
116
 
            and old is None])
117
 
        existing_new = list(self.iteritems(key_filter=new_items))
118
 
        if existing_new:
119
 
            raise errors.InconsistentDeltaDelta(delta,
120
 
                "New items are already in the map %r." % existing_new)
121
 
        # Now apply changes.
122
110
        for old, new, value in delta:
123
111
            if old is not None and old != new:
124
112
                self.unmap(old, check_remap=False)
133
121
 
134
122
    def _ensure_root(self):
135
123
        """Ensure that the root node is an object not a key."""
136
 
        if type(self._root_node) is tuple:
 
124
        if type(self._root_node) == tuple:
137
125
            # Demand-load the root
138
126
            self._root_node = self._get_node(self._root_node)
139
127
 
147
135
        :param node: A tuple key or node object.
148
136
        :return: A node object.
149
137
        """
150
 
        if type(node) is tuple:
 
138
        if type(node) == tuple:
151
139
            bytes = self._read_bytes(node)
152
140
            return _deserialise(bytes, node,
153
141
                search_key_func=self._search_key_func)
215
203
            multiple pages.
216
204
        :return: The root chk of the resulting CHKMap.
217
205
        """
218
 
        root_key = klass._create_directly(store, initial_value,
219
 
            maximum_size=maximum_size, key_width=key_width,
220
 
            search_key_func=search_key_func)
221
 
        return root_key
222
 
 
223
 
    @classmethod
224
 
    def _create_via_map(klass, store, initial_value, maximum_size=0,
225
 
                        key_width=1, search_key_func=None):
226
 
        result = klass(store, None, search_key_func=search_key_func)
 
206
        result = CHKMap(store, None, search_key_func=search_key_func)
227
207
        result._root_node.set_maximum_size(maximum_size)
228
208
        result._root_node._key_width = key_width
229
209
        delta = []
230
210
        for key, value in initial_value.items():
231
211
            delta.append((None, key, value))
232
 
        root_key = result.apply_delta(delta)
233
 
        return root_key
234
 
 
235
 
    @classmethod
236
 
    def _create_directly(klass, store, initial_value, maximum_size=0,
237
 
                         key_width=1, search_key_func=None):
238
 
        node = LeafNode(search_key_func=search_key_func)
239
 
        node.set_maximum_size(maximum_size)
240
 
        node._key_width = key_width
241
 
        node._items = dict(initial_value)
242
 
        node._raw_size = sum([node._key_value_len(key, value)
243
 
                              for key,value in initial_value.iteritems()])
244
 
        node._len = len(node._items)
245
 
        node._compute_search_prefix()
246
 
        node._compute_serialised_prefix()
247
 
        if (node._len > 1
248
 
            and maximum_size
249
 
            and node._current_size() > maximum_size):
250
 
            prefix, node_details = node._split(store)
251
 
            if len(node_details) == 1:
252
 
                raise AssertionError('Failed to split using node._split')
253
 
            node = InternalNode(prefix, search_key_func=search_key_func)
254
 
            node.set_maximum_size(maximum_size)
255
 
            node._key_width = key_width
256
 
            for split, subnode in node_details:
257
 
                node.add_node(split, subnode)
258
 
        keys = list(node.serialise(store))
259
 
        return keys[-1]
 
212
        return result.apply_delta(delta)
260
213
 
261
214
    def iter_changes(self, basis):
262
215
        """Iterate over the changes between basis and self.
496
449
        return len(self._root_node)
497
450
 
498
451
    def map(self, key, value):
499
 
        """Map a key tuple to value.
500
 
        
501
 
        :param key: A key to map.
502
 
        :param value: The value to assign to key.
503
 
        """
 
452
        """Map a key tuple to value."""
504
453
        # Need a root object.
505
454
        self._ensure_root()
506
455
        prefix, node_details = self._root_node.map(self._store, key, value)
516
465
 
517
466
    def _node_key(self, node):
518
467
        """Get the key for a node whether it's a tuple or node."""
519
 
        if type(node) is tuple:
 
468
        if type(node) == tuple:
520
469
            return node
521
470
        else:
522
471
            return node._key
542
491
 
543
492
        :return: The key of the root node.
544
493
        """
545
 
        if type(self._root_node) is tuple:
 
494
        if type(self._root_node) == tuple:
546
495
            # Already saved.
547
496
            return self._root_node
548
497
        keys = list(self._root_node.serialise(self._store))
610
559
        """
611
560
        if key.startswith(prefix):
612
561
            return prefix
613
 
        pos = -1
614
562
        # Is there a better way to do this?
615
563
        for pos, (left, right) in enumerate(zip(prefix, key)):
616
564
            if left != right:
815
763
                result[prefix] = node
816
764
            else:
817
765
                node = result[prefix]
818
 
            sub_prefix, node_details = node.map(store, key, value)
819
 
            if len(node_details) > 1:
820
 
                if prefix != sub_prefix:
821
 
                    # This node has been split and is now found via a different
822
 
                    # path
823
 
                    result.pop(prefix)
824
 
                new_node = InternalNode(sub_prefix,
825
 
                    search_key_func=self._search_key_func)
826
 
                new_node.set_maximum_size(self._maximum_size)
827
 
                new_node._key_width = self._key_width
828
 
                for split, node in node_details:
829
 
                    new_node.add_node(split, node)
830
 
                result[prefix] = new_node
 
766
            node.map(store, key, value)
831
767
        return common_prefix, result.items()
832
768
 
833
769
    def map(self, store, key, value):
1018
954
        # prefix is the key in self._items to use, key_filter is the key_filter
1019
955
        # entries that would match this node
1020
956
        keys = {}
1021
 
        shortcut = False
1022
957
        if key_filter is None:
1023
 
            # yielding all nodes, yield whatever we have, and queue up a read
1024
 
            # for whatever we are missing
1025
 
            shortcut = True
1026
958
            for prefix, node in self._items.iteritems():
1027
 
                if node.__class__ is tuple:
 
959
                if type(node) == tuple:
1028
960
                    keys[node] = (prefix, None)
1029
961
                else:
1030
962
                    yield node, None
1031
 
        elif len(key_filter) == 1:
1032
 
            # Technically, this path could also be handled by the first check
1033
 
            # in 'self._node_width' in length_filters. However, we can handle
1034
 
            # this case without spending any time building up the
1035
 
            # prefix_to_keys, etc state.
1036
 
 
1037
 
            # This is a bit ugly, but TIMEIT showed it to be by far the fastest
1038
 
            # 0.626us   list(key_filter)[0]
1039
 
            #       is a func() for list(), 2 mallocs, and a getitem
1040
 
            # 0.489us   [k for k in key_filter][0]
1041
 
            #       still has the mallocs, avoids the func() call
1042
 
            # 0.350us   iter(key_filter).next()
1043
 
            #       has a func() call, and mallocs an iterator
1044
 
            # 0.125us   for key in key_filter: pass
1045
 
            #       no func() overhead, might malloc an iterator
1046
 
            # 0.105us   for key in key_filter: break
1047
 
            #       no func() overhead, might malloc an iterator, probably
1048
 
            #       avoids checking an 'else' clause as part of the for
1049
 
            for key in key_filter:
1050
 
                break
1051
 
            search_prefix = self._search_prefix_filter(key)
1052
 
            if len(search_prefix) == self._node_width:
1053
 
                # This item will match exactly, so just do a dict lookup, and
1054
 
                # see what we can return
1055
 
                shortcut = True
1056
 
                try:
1057
 
                    node = self._items[search_prefix]
1058
 
                except KeyError:
1059
 
                    # A given key can only match 1 child node, if it isn't
1060
 
                    # there, then we can just return nothing
1061
 
                    return
1062
 
                if node.__class__ is tuple:
1063
 
                    keys[node] = (search_prefix, [key])
1064
 
                else:
1065
 
                    # This is loaded, and the only thing that can match,
1066
 
                    # return
1067
 
                    yield node, [key]
1068
 
                    return
1069
 
        if not shortcut:
1070
 
            # First, convert all keys into a list of search prefixes
1071
 
            # Aggregate common prefixes, and track the keys they come from
 
963
        else:
 
964
            # XXX defaultdict ?
1072
965
            prefix_to_keys = {}
1073
966
            length_filters = {}
1074
967
            for key in key_filter:
1075
 
                search_prefix = self._search_prefix_filter(key)
 
968
                search_key = self._search_prefix_filter(key)
1076
969
                length_filter = length_filters.setdefault(
1077
 
                                    len(search_prefix), set())
1078
 
                length_filter.add(search_prefix)
1079
 
                prefix_to_keys.setdefault(search_prefix, []).append(key)
1080
 
 
1081
 
            if (self._node_width in length_filters
1082
 
                and len(length_filters) == 1):
1083
 
                # all of the search prefixes match exactly _node_width. This
1084
 
                # means that everything is an exact match, and we can do a
1085
 
                # lookup into self._items, rather than iterating over the items
1086
 
                # dict.
1087
 
                search_prefixes = length_filters[self._node_width]
1088
 
                for search_prefix in search_prefixes:
1089
 
                    try:
1090
 
                        node = self._items[search_prefix]
1091
 
                    except KeyError:
1092
 
                        # We can ignore this one
1093
 
                        continue
1094
 
                    node_key_filter = prefix_to_keys[search_prefix]
1095
 
                    if node.__class__ is tuple:
1096
 
                        keys[node] = (search_prefix, node_key_filter)
 
970
                                    len(search_key), set())
 
971
                length_filter.add(search_key)
 
972
                prefix_to_keys.setdefault(search_key, []).append(key)
 
973
            length_filters = length_filters.items()
 
974
            for prefix, node in self._items.iteritems():
 
975
                node_key_filter = []
 
976
                for length, length_filter in length_filters:
 
977
                    sub_prefix = prefix[:length]
 
978
                    if sub_prefix in length_filter:
 
979
                        node_key_filter.extend(prefix_to_keys[sub_prefix])
 
980
                if node_key_filter: # this key matched something, yield it
 
981
                    if type(node) == tuple:
 
982
                        keys[node] = (prefix, node_key_filter)
1097
983
                    else:
1098
984
                        yield node, node_key_filter
1099
 
            else:
1100
 
                # The slow way. We walk every item in self._items, and check to
1101
 
                # see if there are any matches
1102
 
                length_filters = length_filters.items()
1103
 
                for prefix, node in self._items.iteritems():
1104
 
                    node_key_filter = []
1105
 
                    for length, length_filter in length_filters:
1106
 
                        sub_prefix = prefix[:length]
1107
 
                        if sub_prefix in length_filter:
1108
 
                            node_key_filter.extend(prefix_to_keys[sub_prefix])
1109
 
                    if node_key_filter: # this key matched something, yield it
1110
 
                        if node.__class__ is tuple:
1111
 
                            keys[node] = (prefix, node_key_filter)
1112
 
                        else:
1113
 
                            yield node, node_key_filter
1114
985
        if keys:
1115
986
            # Look in the page cache for some more bytes
1116
987
            found_keys = set()
1245
1116
        :return: An iterable of the keys inserted by this operation.
1246
1117
        """
1247
1118
        for node in self._items.itervalues():
1248
 
            if type(node) is tuple:
 
1119
            if type(node) == tuple:
1249
1120
                # Never deserialised.
1250
1121
                continue
1251
1122
            if node._key is not None:
1262
1133
        lines.append('%s\n' % (self._search_prefix,))
1263
1134
        prefix_len = len(self._search_prefix)
1264
1135
        for prefix, node in sorted(self._items.items()):
1265
 
            if type(node) is tuple:
 
1136
            if type(node) == tuple:
1266
1137
                key = node[0]
1267
1138
            else:
1268
1139
                key = node._key[0]
1307
1178
            raise AssertionError("unserialised nodes have no refs.")
1308
1179
        refs = []
1309
1180
        for value in self._items.itervalues():
1310
 
            if type(value) is tuple:
 
1181
            if type(value) == tuple:
1311
1182
                refs.append(value)
1312
1183
            else:
1313
1184
                refs.append(value.key())
1414
1285
    return node
1415
1286
 
1416
1287
 
1417
 
class CHKMapDifference(object):
1418
 
    """Iterate the stored pages and key,value pairs for (new - old).
1419
 
 
1420
 
    This class provides a generator over the stored CHK pages and the
1421
 
    (key, value) pairs that are in any of the new maps and not in any of the
1422
 
    old maps.
1423
 
 
1424
 
    Note that it may yield chk pages that are common (especially root nodes),
1425
 
    but it won't yield (key,value) pairs that are common.
1426
 
    """
1427
 
 
1428
 
    def __init__(self, store, new_root_keys, old_root_keys,
1429
 
                 search_key_func, pb=None):
1430
 
        self._store = store
1431
 
        self._new_root_keys = new_root_keys
1432
 
        self._old_root_keys = old_root_keys
1433
 
        self._pb = pb
1434
 
        # All uninteresting chks that we have seen. By the time they are added
1435
 
        # here, they should be either fully ignored, or queued up for
1436
 
        # processing
1437
 
        self._all_old_chks = set(self._old_root_keys)
1438
 
        # All items that we have seen from the old_root_keys
1439
 
        self._all_old_items = set()
1440
 
        # These are interesting items which were either read, or already in the
1441
 
        # interesting queue (so we don't need to walk them again)
1442
 
        self._processed_new_refs = set()
1443
 
        self._search_key_func = search_key_func
1444
 
 
1445
 
        # The uninteresting and interesting nodes to be searched
1446
 
        self._old_queue = []
1447
 
        self._new_queue = []
1448
 
        # Holds the (key, value) items found when processing the root nodes,
1449
 
        # waiting for the uninteresting nodes to be walked
1450
 
        self._new_item_queue = []
1451
 
        self._state = None
1452
 
 
1453
 
    def _read_nodes_from_store(self, keys):
1454
 
        # We chose not to use _page_cache, because we think in terms of records
1455
 
        # to be yielded. Also, we expect to touch each page only 1 time during
1456
 
        # this code. (We may want to evaluate saving the raw bytes into the
1457
 
        # page cache, which would allow a working tree update after the fetch
1458
 
        # to not have to read the bytes again.)
1459
 
        stream = self._store.get_record_stream(keys, 'unordered', True)
1460
 
        for record in stream:
1461
 
            if self._pb is not None:
1462
 
                self._pb.tick()
1463
 
            if record.storage_kind == 'absent':
1464
 
                raise errors.NoSuchRevision(self._store, record.key)
 
1288
def _find_children_info(store, interesting_keys, uninteresting_keys, pb):
 
1289
    """Read the associated records, and determine what is interesting."""
 
1290
    uninteresting_keys = set(uninteresting_keys)
 
1291
    chks_to_read = uninteresting_keys.union(interesting_keys)
 
1292
    next_uninteresting = set()
 
1293
    next_interesting = set()
 
1294
    uninteresting_items = set()
 
1295
    interesting_items = set()
 
1296
    interesting_to_yield = []
 
1297
    for record in store.get_record_stream(chks_to_read, 'unordered', True):
 
1298
        # records_read.add(record.key())
 
1299
        if pb is not None:
 
1300
            pb.tick()
 
1301
        bytes = record.get_bytes_as('fulltext')
 
1302
        # We don't care about search_key_func for this code, because we only
 
1303
        # care about external references.
 
1304
        node = _deserialise(bytes, record.key, search_key_func=None)
 
1305
        if record.key in uninteresting_keys:
 
1306
            if type(node) is InternalNode:
 
1307
                next_uninteresting.update(node.refs())
 
1308
            else:
 
1309
                # We know we are at a LeafNode, so we can pass None for the
 
1310
                # store
 
1311
                uninteresting_items.update(node.iteritems(None))
 
1312
        else:
 
1313
            interesting_to_yield.append(record.key)
 
1314
            if type(node) is InternalNode:
 
1315
                next_interesting.update(node.refs())
 
1316
            else:
 
1317
                interesting_items.update(node.iteritems(None))
 
1318
    return (next_uninteresting, uninteresting_items,
 
1319
            next_interesting, interesting_to_yield, interesting_items)
 
1320
 
 
1321
 
 
1322
def _find_all_uninteresting(store, interesting_root_keys,
 
1323
                            uninteresting_root_keys, pb):
 
1324
    """Determine the full set of uninteresting keys."""
 
1325
    # What about duplicates between interesting_root_keys and
 
1326
    # uninteresting_root_keys?
 
1327
    if not uninteresting_root_keys:
 
1328
        # Shortcut case. We know there is nothing uninteresting to filter out
 
1329
        # So we just let the rest of the algorithm do the work
 
1330
        # We know there is nothing uninteresting, and we didn't have to read
 
1331
        # any interesting records yet.
 
1332
        return (set(), set(), set(interesting_root_keys), [], set())
 
1333
    all_uninteresting_chks = set(uninteresting_root_keys)
 
1334
    all_uninteresting_items = set()
 
1335
 
 
1336
    # First step, find the direct children of both the interesting and
 
1337
    # uninteresting set
 
1338
    (uninteresting_keys, uninteresting_items,
 
1339
     interesting_keys, interesting_to_yield,
 
1340
     interesting_items) = _find_children_info(store, interesting_root_keys,
 
1341
                                              uninteresting_root_keys,
 
1342
                                              pb=pb)
 
1343
    all_uninteresting_chks.update(uninteresting_keys)
 
1344
    all_uninteresting_items.update(uninteresting_items)
 
1345
    del uninteresting_items
 
1346
    # Note: Exact matches between interesting and uninteresting do not need
 
1347
    #       to be search further. Non-exact matches need to be searched in case
 
1348
    #       there is a future exact-match
 
1349
    uninteresting_keys.difference_update(interesting_keys)
 
1350
 
 
1351
    # Second, find the full set of uninteresting bits reachable by the
 
1352
    # uninteresting roots
 
1353
    chks_to_read = uninteresting_keys
 
1354
    while chks_to_read:
 
1355
        next_chks = set()
 
1356
        for record in store.get_record_stream(chks_to_read, 'unordered', False):
 
1357
            # TODO: Handle 'absent'
 
1358
            if pb is not None:
 
1359
                pb.tick()
1465
1360
            bytes = record.get_bytes_as('fulltext')
1466
 
            node = _deserialise(bytes, record.key,
1467
 
                                search_key_func=self._search_key_func)
 
1361
            # We don't care about search_key_func for this code, because we
 
1362
            # only care about external references.
 
1363
            node = _deserialise(bytes, record.key, search_key_func=None)
1468
1364
            if type(node) is InternalNode:
1469
 
                # Note we don't have to do node.refs() because we know that
1470
 
                # there are no children that have been pushed into this node
1471
 
                prefix_refs = node._items.items()
1472
 
                items = []
 
1365
                # uninteresting_prefix_chks.update(node._items.iteritems())
 
1366
                chks = node._items.values()
 
1367
                # TODO: We remove the entries that are already in
 
1368
                #       uninteresting_chks ?
 
1369
                next_chks.update(chks)
 
1370
                all_uninteresting_chks.update(chks)
1473
1371
            else:
1474
 
                prefix_refs = []
1475
 
                items = node._items.items()
1476
 
            yield record, node, prefix_refs, items
1477
 
 
1478
 
    def _read_old_roots(self):
1479
 
        old_chks_to_enqueue = []
1480
 
        all_old_chks = self._all_old_chks
1481
 
        for record, node, prefix_refs, items in \
1482
 
                self._read_nodes_from_store(self._old_root_keys):
1483
 
            # Uninteresting node
1484
 
            prefix_refs = [p_r for p_r in prefix_refs
1485
 
                                if p_r[1] not in all_old_chks]
1486
 
            new_refs = [p_r[1] for p_r in prefix_refs]
1487
 
            all_old_chks.update(new_refs)
1488
 
            self._all_old_items.update(items)
1489
 
            # Queue up the uninteresting references
1490
 
            # Don't actually put them in the 'to-read' queue until we have
1491
 
            # finished checking the interesting references
1492
 
            old_chks_to_enqueue.extend(prefix_refs)
1493
 
        return old_chks_to_enqueue
1494
 
 
1495
 
    def _enqueue_old(self, new_prefixes, old_chks_to_enqueue):
1496
 
        # At this point, we have read all the uninteresting and interesting
1497
 
        # items, so we can queue up the uninteresting stuff, knowing that we've
1498
 
        # handled the interesting ones
1499
 
        for prefix, ref in old_chks_to_enqueue:
1500
 
            not_interesting = True
1501
 
            for i in xrange(len(prefix), 0, -1):
1502
 
                if prefix[:i] in new_prefixes:
1503
 
                    not_interesting = False
1504
 
                    break
1505
 
            if not_interesting:
1506
 
                # This prefix is not part of the remaining 'interesting set'
1507
 
                continue
1508
 
            self._old_queue.append(ref)
1509
 
 
1510
 
    def _read_all_roots(self):
1511
 
        """Read the root pages.
1512
 
 
1513
 
        This is structured as a generator, so that the root records can be
1514
 
        yielded up to whoever needs them without any buffering.
1515
 
        """
1516
 
        # This is the bootstrap phase
1517
 
        if not self._old_root_keys:
1518
 
            # With no old_root_keys we can just shortcut and be ready
1519
 
            # for _flush_new_queue
1520
 
            self._new_queue = list(self._new_root_keys)
1521
 
            return
1522
 
        old_chks_to_enqueue = self._read_old_roots()
1523
 
        # filter out any root keys that are already known to be uninteresting
1524
 
        new_keys = set(self._new_root_keys).difference(self._all_old_chks)
1525
 
        # These are prefixes that are present in new_keys that we are
1526
 
        # thinking to yield
1527
 
        new_prefixes = set()
1528
 
        # We are about to yield all of these, so we don't want them getting
1529
 
        # added a second time
1530
 
        processed_new_refs = self._processed_new_refs
1531
 
        processed_new_refs.update(new_keys)
1532
 
        for record, node, prefix_refs, items in \
1533
 
                self._read_nodes_from_store(new_keys):
1534
 
            # At this level, we now know all the uninteresting references
1535
 
            # So we filter and queue up whatever is remaining
1536
 
            prefix_refs = [p_r for p_r in prefix_refs
1537
 
                           if p_r[1] not in self._all_old_chks
1538
 
                              and p_r[1] not in processed_new_refs]
1539
 
            refs = [p_r[1] for p_r in prefix_refs]
1540
 
            new_prefixes.update([p_r[0] for p_r in prefix_refs])
1541
 
            self._new_queue.extend(refs)
1542
 
            # TODO: We can potentially get multiple items here, however the
1543
 
            #       current design allows for this, as callers will do the work
1544
 
            #       to make the results unique. We might profile whether we
1545
 
            #       gain anything by ensuring unique return values for items
1546
 
            new_items = [item for item in items
1547
 
                               if item not in self._all_old_items]
1548
 
            self._new_item_queue.extend(new_items)
1549
 
            new_prefixes.update([self._search_key_func(item[0])
1550
 
                                 for item in new_items])
1551
 
            processed_new_refs.update(refs)
1552
 
            yield record
1553
 
        # For new_prefixes we have the full length prefixes queued up.
1554
 
        # However, we also need possible prefixes. (If we have a known ref to
1555
 
        # 'ab', then we also need to include 'a'.) So expand the
1556
 
        # new_prefixes to include all shorter prefixes
1557
 
        for prefix in list(new_prefixes):
1558
 
            new_prefixes.update([prefix[:i] for i in xrange(1, len(prefix))])
1559
 
        self._enqueue_old(new_prefixes, old_chks_to_enqueue)
1560
 
 
1561
 
    def _flush_new_queue(self):
1562
 
        # No need to maintain the heap invariant anymore, just pull things out
1563
 
        # and process them
1564
 
        refs = set(self._new_queue)
1565
 
        self._new_queue = []
1566
 
        # First pass, flush all interesting items and convert to using direct refs
1567
 
        all_old_chks = self._all_old_chks
1568
 
        processed_new_refs = self._processed_new_refs
1569
 
        all_old_items = self._all_old_items
1570
 
        new_items = [item for item in self._new_item_queue
1571
 
                           if item not in all_old_items]
1572
 
        self._new_item_queue = []
1573
 
        if new_items:
1574
 
            yield None, new_items
1575
 
        refs = refs.difference(all_old_chks)
1576
 
        while refs:
1577
 
            next_refs = set()
1578
 
            next_refs_update = next_refs.update
1579
 
            # Inlining _read_nodes_from_store improves 'bzr branch bzr.dev'
1580
 
            # from 1m54s to 1m51s. Consider it.
1581
 
            for record, _, p_refs, items in self._read_nodes_from_store(refs):
1582
 
                items = [item for item in items
1583
 
                         if item not in all_old_items]
1584
 
                yield record, items
1585
 
                next_refs_update([p_r[1] for p_r in p_refs])
1586
 
            next_refs = next_refs.difference(all_old_chks)
1587
 
            next_refs = next_refs.difference(processed_new_refs)
1588
 
            processed_new_refs.update(next_refs)
1589
 
            refs = next_refs
1590
 
 
1591
 
    def _process_next_old(self):
1592
 
        # Since we don't filter uninteresting any further than during
1593
 
        # _read_all_roots, process the whole queue in a single pass.
1594
 
        refs = self._old_queue
1595
 
        self._old_queue = []
1596
 
        all_old_chks = self._all_old_chks
1597
 
        for record, _, prefix_refs, items in self._read_nodes_from_store(refs):
1598
 
            self._all_old_items.update(items)
1599
 
            refs = [r for _,r in prefix_refs if r not in all_old_chks]
1600
 
            self._old_queue.extend(refs)
1601
 
            all_old_chks.update(refs)
1602
 
 
1603
 
    def _process_queues(self):
1604
 
        while self._old_queue:
1605
 
            self._process_next_old()
1606
 
        return self._flush_new_queue()
1607
 
 
1608
 
    def process(self):
1609
 
        for record in self._read_all_roots():
1610
 
            yield record, []
1611
 
        for record, items in self._process_queues():
1612
 
            yield record, items
 
1372
                all_uninteresting_items.update(node._items.iteritems())
 
1373
        chks_to_read = next_chks
 
1374
    return (all_uninteresting_chks, all_uninteresting_items,
 
1375
            interesting_keys, interesting_to_yield, interesting_items)
1613
1376
 
1614
1377
 
1615
1378
def iter_interesting_nodes(store, interesting_root_keys,
1626
1389
    :return: Yield
1627
1390
        (interesting record, {interesting key:values})
1628
1391
    """
1629
 
    iterator = CHKMapDifference(store, interesting_root_keys,
1630
 
                                uninteresting_root_keys,
1631
 
                                search_key_func=store._search_key_func,
1632
 
                                pb=pb)
1633
 
    return iterator.process()
 
1392
    # TODO: consider that it may be more memory efficient to use the 20-byte
 
1393
    #       sha1 string, rather than tuples of hexidecimal sha1 strings.
 
1394
    # TODO: Try to factor out a lot of the get_record_stream() calls into a
 
1395
    #       helper function similar to _read_bytes. This function should be
 
1396
    #       able to use nodes from the _page_cache as well as actually
 
1397
    #       requesting bytes from the store.
 
1398
 
 
1399
    (all_uninteresting_chks, all_uninteresting_items, interesting_keys,
 
1400
     interesting_to_yield, interesting_items) = _find_all_uninteresting(store,
 
1401
        interesting_root_keys, uninteresting_root_keys, pb)
 
1402
 
 
1403
    # Now that we know everything uninteresting, we can yield information from
 
1404
    # our first request
 
1405
    interesting_items.difference_update(all_uninteresting_items)
 
1406
    interesting_to_yield = set(interesting_to_yield) - all_uninteresting_chks
 
1407
    if interesting_items:
 
1408
        yield None, interesting_items
 
1409
    if interesting_to_yield:
 
1410
        # We request these records again, rather than buffering the root
 
1411
        # records, most likely they are still in the _group_cache anyway.
 
1412
        for record in store.get_record_stream(interesting_to_yield,
 
1413
                                              'unordered', False):
 
1414
            yield record, []
 
1415
    all_uninteresting_chks.update(interesting_to_yield)
 
1416
    interesting_keys.difference_update(all_uninteresting_chks)
 
1417
 
 
1418
    chks_to_read = interesting_keys
 
1419
    counter = 0
 
1420
    while chks_to_read:
 
1421
        next_chks = set()
 
1422
        for record in store.get_record_stream(chks_to_read, 'unordered', False):
 
1423
            counter += 1
 
1424
            if pb is not None:
 
1425
                pb.update('find chk pages', counter)
 
1426
            # TODO: Handle 'absent'?
 
1427
            bytes = record.get_bytes_as('fulltext')
 
1428
            # We don't care about search_key_func for this code, because we
 
1429
            # only care about external references.
 
1430
            node = _deserialise(bytes, record.key, search_key_func=None)
 
1431
            if type(node) is InternalNode:
 
1432
                # all_uninteresting_chks grows large, as it lists all nodes we
 
1433
                # don't want to process (including already seen interesting
 
1434
                # nodes).
 
1435
                # small.difference_update(large) scales O(large), but
 
1436
                # small.difference(large) scales O(small).
 
1437
                # Also, we know we just _deserialised this node, so we can
 
1438
                # access the dict directly.
 
1439
                chks = set(node._items.itervalues()).difference(
 
1440
                            all_uninteresting_chks)
 
1441
                # Is set() and .difference_update better than:
 
1442
                # chks = [chk for chk in node.refs()
 
1443
                #              if chk not in all_uninteresting_chks]
 
1444
                next_chks.update(chks)
 
1445
                # These are now uninteresting everywhere else
 
1446
                all_uninteresting_chks.update(chks)
 
1447
                interesting_items = []
 
1448
            else:
 
1449
                interesting_items = [item for item in node._items.iteritems()
 
1450
                                     if item not in all_uninteresting_items]
 
1451
                # TODO: Do we need to filter out items that we have already
 
1452
                #       seen on other pages? We don't really want to buffer the
 
1453
                #       whole thing, but it does mean that callers need to
 
1454
                #       understand they may get duplicate values.
 
1455
                # all_uninteresting_items.update(interesting_items)
 
1456
            yield record, interesting_items
 
1457
        chks_to_read = next_chks
1634
1458
 
1635
1459
 
1636
1460
try: