~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/chk_map.py

merge 2.0 branch rev 4647

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008 Canonical Ltd
 
1
# Copyright (C) 2008, 2009 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
26
26
 
27
27
Updates to a CHKMap are done preferentially via the apply_delta method, to
28
28
allow optimisation of the update operation; but individual map/unmap calls are
29
 
possible and supported. All changes via map/unmap are buffered in memory until
30
 
the _save method is called to force serialisation of the tree. apply_delta
31
 
performs a _save implicitly.
 
29
possible and supported. Individual changes via map/unmap are buffered in memory
 
30
until the _save method is called to force serialisation of the tree.
 
31
apply_delta records its changes immediately by performing an implicit _save.
32
32
 
33
33
TODO:
34
34
-----
38
38
"""
39
39
 
40
40
import heapq
41
 
import time
42
41
 
43
42
from bzrlib import lazy_import
44
43
lazy_import.lazy_import(globals(), """
45
 
from bzrlib import versionedfile
 
44
from bzrlib import (
 
45
    errors,
 
46
    versionedfile,
 
47
    )
46
48
""")
47
49
from bzrlib import (
48
 
    errors,
49
50
    lru_cache,
50
51
    osutils,
51
52
    registry,
59
60
# We are caching bytes so len(value) is perfectly accurate
60
61
_page_cache = lru_cache.LRUSizeCache(_PAGE_CACHE_SIZE)
61
62
 
 
63
def clear_cache():
 
64
    _page_cache.clear()
 
65
 
62
66
# If a ChildNode falls below this many bytes, we check for a remap
63
67
_INTERESTING_NEW_SIZE = 50
64
68
# If a ChildNode shrinks by more than this amount, we check for a remap
107
111
            of old_key is removed.
108
112
        """
109
113
        delete_count = 0
 
114
        # Check preconditions first.
 
115
        new_items = set([key for (old, key, value) in delta if key is not None
 
116
            and old is None])
 
117
        existing_new = list(self.iteritems(key_filter=new_items))
 
118
        if existing_new:
 
119
            raise errors.InconsistentDeltaDelta(delta,
 
120
                "New items are already in the map %r." % existing_new)
 
121
        # Now apply changes.
110
122
        for old, new, value in delta:
111
123
            if old is not None and old != new:
112
124
                self.unmap(old, check_remap=False)
203
215
            multiple pages.
204
216
        :return: The root chk of the resulting CHKMap.
205
217
        """
206
 
        result = CHKMap(store, None, search_key_func=search_key_func)
 
218
        root_key = klass._create_directly(store, initial_value,
 
219
            maximum_size=maximum_size, key_width=key_width,
 
220
            search_key_func=search_key_func)
 
221
        return root_key
 
222
 
 
223
    @classmethod
 
224
    def _create_via_map(klass, store, initial_value, maximum_size=0,
 
225
                        key_width=1, search_key_func=None):
 
226
        result = klass(store, None, search_key_func=search_key_func)
207
227
        result._root_node.set_maximum_size(maximum_size)
208
228
        result._root_node._key_width = key_width
209
229
        delta = []
210
230
        for key, value in initial_value.items():
211
231
            delta.append((None, key, value))
212
 
        return result.apply_delta(delta)
 
232
        root_key = result.apply_delta(delta)
 
233
        return root_key
 
234
 
 
235
    @classmethod
 
236
    def _create_directly(klass, store, initial_value, maximum_size=0,
 
237
                         key_width=1, search_key_func=None):
 
238
        node = LeafNode(search_key_func=search_key_func)
 
239
        node.set_maximum_size(maximum_size)
 
240
        node._key_width = key_width
 
241
        node._items = dict(initial_value)
 
242
        node._raw_size = sum([node._key_value_len(key, value)
 
243
                              for key,value in initial_value.iteritems()])
 
244
        node._len = len(node._items)
 
245
        node._compute_search_prefix()
 
246
        node._compute_serialised_prefix()
 
247
        if (node._len > 1
 
248
            and maximum_size
 
249
            and node._current_size() > maximum_size):
 
250
            prefix, node_details = node._split(store)
 
251
            if len(node_details) == 1:
 
252
                raise AssertionError('Failed to split using node._split')
 
253
            node = InternalNode(prefix, search_key_func=search_key_func)
 
254
            node.set_maximum_size(maximum_size)
 
255
            node._key_width = key_width
 
256
            for split, subnode in node_details:
 
257
                node.add_node(split, subnode)
 
258
        keys = list(node.serialise(store))
 
259
        return keys[-1]
213
260
 
214
261
    def iter_changes(self, basis):
215
262
        """Iterate over the changes between basis and self.
449
496
        return len(self._root_node)
450
497
 
451
498
    def map(self, key, value):
452
 
        """Map a key tuple to value."""
 
499
        """Map a key tuple to value.
 
500
        
 
501
        :param key: A key to map.
 
502
        :param value: The value to assign to key.
 
503
        """
453
504
        # Need a root object.
454
505
        self._ensure_root()
455
506
        prefix, node_details = self._root_node.map(self._store, key, value)
764
815
                result[prefix] = node
765
816
            else:
766
817
                node = result[prefix]
767
 
            node.map(store, key, value)
 
818
            sub_prefix, node_details = node.map(store, key, value)
 
819
            if len(node_details) > 1:
 
820
                if prefix != sub_prefix:
 
821
                    # This node has been split and is now found via a different
 
822
                    # path
 
823
                    result.pop(prefix)
 
824
                new_node = InternalNode(sub_prefix,
 
825
                    search_key_func=self._search_key_func)
 
826
                new_node.set_maximum_size(self._maximum_size)
 
827
                new_node._key_width = self._key_width
 
828
                for split, node in node_details:
 
829
                    new_node.add_node(split, node)
 
830
                result[prefix] = new_node
768
831
        return common_prefix, result.items()
769
832
 
770
833
    def map(self, store, key, value):
1351
1414
    return node
1352
1415
 
1353
1416
 
1354
 
def _find_children_info(store, interesting_keys, uninteresting_keys, pb):
1355
 
    """Read the associated records, and determine what is interesting."""
1356
 
    uninteresting_keys = set(uninteresting_keys)
1357
 
    chks_to_read = uninteresting_keys.union(interesting_keys)
1358
 
    next_uninteresting = set()
1359
 
    next_interesting = set()
1360
 
    uninteresting_items = set()
1361
 
    interesting_items = set()
1362
 
    interesting_to_yield = []
1363
 
    for record in store.get_record_stream(chks_to_read, 'unordered', True):
1364
 
        # records_read.add(record.key())
1365
 
        if pb is not None:
1366
 
            pb.tick()
1367
 
        bytes = record.get_bytes_as('fulltext')
1368
 
        # We don't care about search_key_func for this code, because we only
1369
 
        # care about external references.
1370
 
        node = _deserialise(bytes, record.key, search_key_func=None)
1371
 
        if record.key in uninteresting_keys:
1372
 
            if type(node) is InternalNode:
1373
 
                next_uninteresting.update(node.refs())
1374
 
            else:
1375
 
                # We know we are at a LeafNode, so we can pass None for the
1376
 
                # store
1377
 
                uninteresting_items.update(node.iteritems(None))
1378
 
        else:
1379
 
            interesting_to_yield.append(record.key)
1380
 
            if type(node) is InternalNode:
1381
 
                next_interesting.update(node.refs())
1382
 
            else:
1383
 
                interesting_items.update(node.iteritems(None))
1384
 
    return (next_uninteresting, uninteresting_items,
1385
 
            next_interesting, interesting_to_yield, interesting_items)
1386
 
 
1387
 
 
1388
 
def _find_all_uninteresting(store, interesting_root_keys,
1389
 
                            uninteresting_root_keys, pb):
1390
 
    """Determine the full set of uninteresting keys."""
1391
 
    # What about duplicates between interesting_root_keys and
1392
 
    # uninteresting_root_keys?
1393
 
    if not uninteresting_root_keys:
1394
 
        # Shortcut case. We know there is nothing uninteresting to filter out
1395
 
        # So we just let the rest of the algorithm do the work
1396
 
        # We know there is nothing uninteresting, and we didn't have to read
1397
 
        # any interesting records yet.
1398
 
        return (set(), set(), set(interesting_root_keys), [], set())
1399
 
    all_uninteresting_chks = set(uninteresting_root_keys)
1400
 
    all_uninteresting_items = set()
1401
 
 
1402
 
    # First step, find the direct children of both the interesting and
1403
 
    # uninteresting set
1404
 
    (uninteresting_keys, uninteresting_items,
1405
 
     interesting_keys, interesting_to_yield,
1406
 
     interesting_items) = _find_children_info(store, interesting_root_keys,
1407
 
                                              uninteresting_root_keys,
1408
 
                                              pb=pb)
1409
 
    all_uninteresting_chks.update(uninteresting_keys)
1410
 
    all_uninteresting_items.update(uninteresting_items)
1411
 
    del uninteresting_items
1412
 
    # Note: Exact matches between interesting and uninteresting do not need
1413
 
    #       to be search further. Non-exact matches need to be searched in case
1414
 
    #       there is a future exact-match
1415
 
    uninteresting_keys.difference_update(interesting_keys)
1416
 
 
1417
 
    # Second, find the full set of uninteresting bits reachable by the
1418
 
    # uninteresting roots
1419
 
    chks_to_read = uninteresting_keys
1420
 
    while chks_to_read:
1421
 
        next_chks = set()
1422
 
        for record in store.get_record_stream(chks_to_read, 'unordered', False):
1423
 
            # TODO: Handle 'absent'
1424
 
            if pb is not None:
1425
 
                pb.tick()
 
1417
class CHKMapDifference(object):
 
1418
    """Iterate the stored pages and key,value pairs for (new - old).
 
1419
 
 
1420
    This class provides a generator over the stored CHK pages and the
 
1421
    (key, value) pairs that are in any of the new maps and not in any of the
 
1422
    old maps.
 
1423
 
 
1424
    Note that it may yield chk pages that are common (especially root nodes),
 
1425
    but it won't yield (key,value) pairs that are common.
 
1426
    """
 
1427
 
 
1428
    def __init__(self, store, new_root_keys, old_root_keys,
 
1429
                 search_key_func, pb=None):
 
1430
        self._store = store
 
1431
        self._new_root_keys = new_root_keys
 
1432
        self._old_root_keys = old_root_keys
 
1433
        self._pb = pb
 
1434
        # All uninteresting chks that we have seen. By the time they are added
 
1435
        # here, they should be either fully ignored, or queued up for
 
1436
        # processing
 
1437
        self._all_old_chks = set(self._old_root_keys)
 
1438
        # All items that we have seen from the old_root_keys
 
1439
        self._all_old_items = set()
 
1440
        # These are interesting items which were either read, or already in the
 
1441
        # interesting queue (so we don't need to walk them again)
 
1442
        self._processed_new_refs = set()
 
1443
        self._search_key_func = search_key_func
 
1444
 
 
1445
        # The uninteresting and interesting nodes to be searched
 
1446
        self._old_queue = []
 
1447
        self._new_queue = []
 
1448
        # Holds the (key, value) items found when processing the root nodes,
 
1449
        # waiting for the uninteresting nodes to be walked
 
1450
        self._new_item_queue = []
 
1451
        self._state = None
 
1452
 
 
1453
    def _read_nodes_from_store(self, keys):
 
1454
        # We chose not to use _page_cache, because we think in terms of records
 
1455
        # to be yielded. Also, we expect to touch each page only 1 time during
 
1456
        # this code. (We may want to evaluate saving the raw bytes into the
 
1457
        # page cache, which would allow a working tree update after the fetch
 
1458
        # to not have to read the bytes again.)
 
1459
        stream = self._store.get_record_stream(keys, 'unordered', True)
 
1460
        for record in stream:
 
1461
            if self._pb is not None:
 
1462
                self._pb.tick()
 
1463
            if record.storage_kind == 'absent':
 
1464
                raise errors.NoSuchRevision(self._store, record.key)
1426
1465
            bytes = record.get_bytes_as('fulltext')
1427
 
            # We don't care about search_key_func for this code, because we
1428
 
            # only care about external references.
1429
 
            node = _deserialise(bytes, record.key, search_key_func=None)
 
1466
            node = _deserialise(bytes, record.key,
 
1467
                                search_key_func=self._search_key_func)
1430
1468
            if type(node) is InternalNode:
1431
 
                # uninteresting_prefix_chks.update(node._items.iteritems())
1432
 
                chks = node._items.values()
1433
 
                # TODO: We remove the entries that are already in
1434
 
                #       uninteresting_chks ?
1435
 
                next_chks.update(chks)
1436
 
                all_uninteresting_chks.update(chks)
 
1469
                # Note we don't have to do node.refs() because we know that
 
1470
                # there are no children that have been pushed into this node
 
1471
                prefix_refs = node._items.items()
 
1472
                items = []
1437
1473
            else:
1438
 
                all_uninteresting_items.update(node._items.iteritems())
1439
 
        chks_to_read = next_chks
1440
 
    return (all_uninteresting_chks, all_uninteresting_items,
1441
 
            interesting_keys, interesting_to_yield, interesting_items)
 
1474
                prefix_refs = []
 
1475
                items = node._items.items()
 
1476
            yield record, node, prefix_refs, items
 
1477
 
 
1478
    def _read_old_roots(self):
 
1479
        old_chks_to_enqueue = []
 
1480
        all_old_chks = self._all_old_chks
 
1481
        for record, node, prefix_refs, items in \
 
1482
                self._read_nodes_from_store(self._old_root_keys):
 
1483
            # Uninteresting node
 
1484
            prefix_refs = [p_r for p_r in prefix_refs
 
1485
                                if p_r[1] not in all_old_chks]
 
1486
            new_refs = [p_r[1] for p_r in prefix_refs]
 
1487
            all_old_chks.update(new_refs)
 
1488
            self._all_old_items.update(items)
 
1489
            # Queue up the uninteresting references
 
1490
            # Don't actually put them in the 'to-read' queue until we have
 
1491
            # finished checking the interesting references
 
1492
            old_chks_to_enqueue.extend(prefix_refs)
 
1493
        return old_chks_to_enqueue
 
1494
 
 
1495
    def _enqueue_old(self, new_prefixes, old_chks_to_enqueue):
 
1496
        # At this point, we have read all the uninteresting and interesting
 
1497
        # items, so we can queue up the uninteresting stuff, knowing that we've
 
1498
        # handled the interesting ones
 
1499
        for prefix, ref in old_chks_to_enqueue:
 
1500
            not_interesting = True
 
1501
            for i in xrange(len(prefix), 0, -1):
 
1502
                if prefix[:i] in new_prefixes:
 
1503
                    not_interesting = False
 
1504
                    break
 
1505
            if not_interesting:
 
1506
                # This prefix is not part of the remaining 'interesting set'
 
1507
                continue
 
1508
            self._old_queue.append(ref)
 
1509
 
 
1510
    def _read_all_roots(self):
 
1511
        """Read the root pages.
 
1512
 
 
1513
        This is structured as a generator, so that the root records can be
 
1514
        yielded up to whoever needs them without any buffering.
 
1515
        """
 
1516
        # This is the bootstrap phase
 
1517
        if not self._old_root_keys:
 
1518
            # With no old_root_keys we can just shortcut and be ready
 
1519
            # for _flush_new_queue
 
1520
            self._new_queue = list(self._new_root_keys)
 
1521
            return
 
1522
        old_chks_to_enqueue = self._read_old_roots()
 
1523
        # filter out any root keys that are already known to be uninteresting
 
1524
        new_keys = set(self._new_root_keys).difference(self._all_old_chks)
 
1525
        # These are prefixes that are present in new_keys that we are
 
1526
        # thinking to yield
 
1527
        new_prefixes = set()
 
1528
        # We are about to yield all of these, so we don't want them getting
 
1529
        # added a second time
 
1530
        processed_new_refs = self._processed_new_refs
 
1531
        processed_new_refs.update(new_keys)
 
1532
        for record, node, prefix_refs, items in \
 
1533
                self._read_nodes_from_store(new_keys):
 
1534
            # At this level, we now know all the uninteresting references
 
1535
            # So we filter and queue up whatever is remaining
 
1536
            prefix_refs = [p_r for p_r in prefix_refs
 
1537
                           if p_r[1] not in self._all_old_chks
 
1538
                              and p_r[1] not in processed_new_refs]
 
1539
            refs = [p_r[1] for p_r in prefix_refs]
 
1540
            new_prefixes.update([p_r[0] for p_r in prefix_refs])
 
1541
            self._new_queue.extend(refs)
 
1542
            # TODO: We can potentially get multiple items here, however the
 
1543
            #       current design allows for this, as callers will do the work
 
1544
            #       to make the results unique. We might profile whether we
 
1545
            #       gain anything by ensuring unique return values for items
 
1546
            new_items = [item for item in items
 
1547
                               if item not in self._all_old_items]
 
1548
            self._new_item_queue.extend(new_items)
 
1549
            new_prefixes.update([self._search_key_func(item[0])
 
1550
                                 for item in new_items])
 
1551
            processed_new_refs.update(refs)
 
1552
            yield record
 
1553
        # For new_prefixes we have the full length prefixes queued up.
 
1554
        # However, we also need possible prefixes. (If we have a known ref to
 
1555
        # 'ab', then we also need to include 'a'.) So expand the
 
1556
        # new_prefixes to include all shorter prefixes
 
1557
        for prefix in list(new_prefixes):
 
1558
            new_prefixes.update([prefix[:i] for i in xrange(1, len(prefix))])
 
1559
        self._enqueue_old(new_prefixes, old_chks_to_enqueue)
 
1560
 
 
1561
    def _flush_new_queue(self):
 
1562
        # No need to maintain the heap invariant anymore, just pull things out
 
1563
        # and process them
 
1564
        refs = set(self._new_queue)
 
1565
        self._new_queue = []
 
1566
        # First pass, flush all interesting items and convert to using direct refs
 
1567
        all_old_chks = self._all_old_chks
 
1568
        processed_new_refs = self._processed_new_refs
 
1569
        all_old_items = self._all_old_items
 
1570
        new_items = [item for item in self._new_item_queue
 
1571
                           if item not in all_old_items]
 
1572
        self._new_item_queue = []
 
1573
        if new_items:
 
1574
            yield None, new_items
 
1575
        refs = refs.difference(all_old_chks)
 
1576
        while refs:
 
1577
            next_refs = set()
 
1578
            next_refs_update = next_refs.update
 
1579
            # Inlining _read_nodes_from_store improves 'bzr branch bzr.dev'
 
1580
            # from 1m54s to 1m51s. Consider it.
 
1581
            for record, _, p_refs, items in self._read_nodes_from_store(refs):
 
1582
                items = [item for item in items
 
1583
                         if item not in all_old_items]
 
1584
                yield record, items
 
1585
                next_refs_update([p_r[1] for p_r in p_refs])
 
1586
            next_refs = next_refs.difference(all_old_chks)
 
1587
            next_refs = next_refs.difference(processed_new_refs)
 
1588
            processed_new_refs.update(next_refs)
 
1589
            refs = next_refs
 
1590
 
 
1591
    def _process_next_old(self):
 
1592
        # Since we don't filter uninteresting any further than during
 
1593
        # _read_all_roots, process the whole queue in a single pass.
 
1594
        refs = self._old_queue
 
1595
        self._old_queue = []
 
1596
        all_old_chks = self._all_old_chks
 
1597
        for record, _, prefix_refs, items in self._read_nodes_from_store(refs):
 
1598
            self._all_old_items.update(items)
 
1599
            refs = [r for _,r in prefix_refs if r not in all_old_chks]
 
1600
            self._old_queue.extend(refs)
 
1601
            all_old_chks.update(refs)
 
1602
 
 
1603
    def _process_queues(self):
 
1604
        while self._old_queue:
 
1605
            self._process_next_old()
 
1606
        return self._flush_new_queue()
 
1607
 
 
1608
    def process(self):
 
1609
        for record in self._read_all_roots():
 
1610
            yield record, []
 
1611
        for record, items in self._process_queues():
 
1612
            yield record, items
1442
1613
 
1443
1614
 
1444
1615
def iter_interesting_nodes(store, interesting_root_keys,
1455
1626
    :return: Yield
1456
1627
        (interesting record, {interesting key:values})
1457
1628
    """
1458
 
    # TODO: consider that it may be more memory efficient to use the 20-byte
1459
 
    #       sha1 string, rather than tuples of hexidecimal sha1 strings.
1460
 
    # TODO: Try to factor out a lot of the get_record_stream() calls into a
1461
 
    #       helper function similar to _read_bytes. This function should be
1462
 
    #       able to use nodes from the _page_cache as well as actually
1463
 
    #       requesting bytes from the store.
1464
 
 
1465
 
    (all_uninteresting_chks, all_uninteresting_items, interesting_keys,
1466
 
     interesting_to_yield, interesting_items) = _find_all_uninteresting(store,
1467
 
        interesting_root_keys, uninteresting_root_keys, pb)
1468
 
 
1469
 
    # Now that we know everything uninteresting, we can yield information from
1470
 
    # our first request
1471
 
    interesting_items.difference_update(all_uninteresting_items)
1472
 
    interesting_to_yield = set(interesting_to_yield) - all_uninteresting_chks
1473
 
    if interesting_items:
1474
 
        yield None, interesting_items
1475
 
    if interesting_to_yield:
1476
 
        # We request these records again, rather than buffering the root
1477
 
        # records, most likely they are still in the _group_cache anyway.
1478
 
        for record in store.get_record_stream(interesting_to_yield,
1479
 
                                              'unordered', False):
1480
 
            yield record, []
1481
 
    all_uninteresting_chks.update(interesting_to_yield)
1482
 
    interesting_keys.difference_update(all_uninteresting_chks)
1483
 
 
1484
 
    chks_to_read = interesting_keys
1485
 
    counter = 0
1486
 
    while chks_to_read:
1487
 
        next_chks = set()
1488
 
        for record in store.get_record_stream(chks_to_read, 'unordered', False):
1489
 
            counter += 1
1490
 
            if pb is not None:
1491
 
                pb.update('find chk pages', counter)
1492
 
            # TODO: Handle 'absent'?
1493
 
            bytes = record.get_bytes_as('fulltext')
1494
 
            # We don't care about search_key_func for this code, because we
1495
 
            # only care about external references.
1496
 
            node = _deserialise(bytes, record.key, search_key_func=None)
1497
 
            if type(node) is InternalNode:
1498
 
                # all_uninteresting_chks grows large, as it lists all nodes we
1499
 
                # don't want to process (including already seen interesting
1500
 
                # nodes).
1501
 
                # small.difference_update(large) scales O(large), but
1502
 
                # small.difference(large) scales O(small).
1503
 
                # Also, we know we just _deserialised this node, so we can
1504
 
                # access the dict directly.
1505
 
                chks = set(node._items.itervalues()).difference(
1506
 
                            all_uninteresting_chks)
1507
 
                # Is set() and .difference_update better than:
1508
 
                # chks = [chk for chk in node.refs()
1509
 
                #              if chk not in all_uninteresting_chks]
1510
 
                next_chks.update(chks)
1511
 
                # These are now uninteresting everywhere else
1512
 
                all_uninteresting_chks.update(chks)
1513
 
                interesting_items = []
1514
 
            else:
1515
 
                interesting_items = [item for item in node._items.iteritems()
1516
 
                                     if item not in all_uninteresting_items]
1517
 
                # TODO: Do we need to filter out items that we have already
1518
 
                #       seen on other pages? We don't really want to buffer the
1519
 
                #       whole thing, but it does mean that callers need to
1520
 
                #       understand they may get duplicate values.
1521
 
                # all_uninteresting_items.update(interesting_items)
1522
 
            yield record, interesting_items
1523
 
        chks_to_read = next_chks
 
1629
    iterator = CHKMapDifference(store, interesting_root_keys,
 
1630
                                uninteresting_root_keys,
 
1631
                                search_key_func=store._search_key_func,
 
1632
                                pb=pb)
 
1633
    return iterator.process()
1524
1634
 
1525
1635
 
1526
1636
try: