~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/chk_map.py

  • Committer: Robert Collins
  • Date: 2009-08-04 04:36:34 UTC
  • mfrom: (4583 +trunk)
  • mto: This revision was merged to the branch mainline in revision 4593.
  • Revision ID: robertc@robertcollins.net-20090804043634-2iu9wpcgs273i97s
Merge bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008 Canonical Ltd
 
1
# Copyright (C) 2008, 2009 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
26
26
 
27
27
Updates to a CHKMap are done preferentially via the apply_delta method, to
28
28
allow optimisation of the update operation; but individual map/unmap calls are
29
 
possible and supported. All changes via map/unmap are buffered in memory until
30
 
the _save method is called to force serialisation of the tree. apply_delta
31
 
performs a _save implicitly.
 
29
possible and supported. Individual changes via map/unmap are buffered in memory
 
30
until the _save method is called to force serialisation of the tree.
 
31
apply_delta records its changes immediately by performing an implicit _save.
32
32
 
33
33
TODO:
34
34
-----
38
38
"""
39
39
 
40
40
import heapq
41
 
import time
42
41
 
43
42
from bzrlib import lazy_import
44
43
lazy_import.lazy_import(globals(), """
45
 
from bzrlib import versionedfile
 
44
from bzrlib import (
 
45
    errors,
 
46
    versionedfile,
 
47
    )
46
48
""")
47
49
from bzrlib import (
48
 
    errors,
49
50
    lru_cache,
50
51
    osutils,
51
52
    registry,
107
108
            of old_key is removed.
108
109
        """
109
110
        delete_count = 0
 
111
        # Check preconditions first.
 
112
        new_items = set([key for (old, key, value) in delta if key is not None
 
113
            and old is None])
 
114
        existing_new = list(self.iteritems(key_filter=new_items))
 
115
        if existing_new:
 
116
            raise errors.InconsistentDeltaDelta(delta,
 
117
                "New items are already in the map %r." % existing_new)
 
118
        # Now apply changes.
110
119
        for old, new, value in delta:
111
120
            if old is not None and old != new:
112
121
                self.unmap(old, check_remap=False)
203
212
            multiple pages.
204
213
        :return: The root chk of the resulting CHKMap.
205
214
        """
206
 
        result = CHKMap(store, None, search_key_func=search_key_func)
 
215
        root_key = klass._create_directly(store, initial_value,
 
216
            maximum_size=maximum_size, key_width=key_width,
 
217
            search_key_func=search_key_func)
 
218
        return root_key
 
219
 
 
220
    @classmethod
 
221
    def _create_via_map(klass, store, initial_value, maximum_size=0,
 
222
                        key_width=1, search_key_func=None):
 
223
        result = klass(store, None, search_key_func=search_key_func)
207
224
        result._root_node.set_maximum_size(maximum_size)
208
225
        result._root_node._key_width = key_width
209
226
        delta = []
210
227
        for key, value in initial_value.items():
211
228
            delta.append((None, key, value))
212
 
        return result.apply_delta(delta)
 
229
        root_key = result.apply_delta(delta)
 
230
        return root_key
 
231
 
 
232
    @classmethod
 
233
    def _create_directly(klass, store, initial_value, maximum_size=0,
 
234
                         key_width=1, search_key_func=None):
 
235
        node = LeafNode(search_key_func=search_key_func)
 
236
        node.set_maximum_size(maximum_size)
 
237
        node._key_width = key_width
 
238
        node._items = dict(initial_value)
 
239
        node._raw_size = sum([node._key_value_len(key, value)
 
240
                              for key,value in initial_value.iteritems()])
 
241
        node._len = len(node._items)
 
242
        node._compute_search_prefix()
 
243
        node._compute_serialised_prefix()
 
244
        if (node._len > 1
 
245
            and maximum_size
 
246
            and node._current_size() > maximum_size):
 
247
            prefix, node_details = node._split(store)
 
248
            if len(node_details) == 1:
 
249
                raise AssertionError('Failed to split using node._split')
 
250
            node = InternalNode(prefix, search_key_func=search_key_func)
 
251
            node.set_maximum_size(maximum_size)
 
252
            node._key_width = key_width
 
253
            for split, subnode in node_details:
 
254
                node.add_node(split, subnode)
 
255
        keys = list(node.serialise(store))
 
256
        return keys[-1]
213
257
 
214
258
    def iter_changes(self, basis):
215
259
        """Iterate over the changes between basis and self.
449
493
        return len(self._root_node)
450
494
 
451
495
    def map(self, key, value):
452
 
        """Map a key tuple to value."""
 
496
        """Map a key tuple to value.
 
497
        
 
498
        :param key: A key to map.
 
499
        :param value: The value to assign to key.
 
500
        """
453
501
        # Need a root object.
454
502
        self._ensure_root()
455
503
        prefix, node_details = self._root_node.map(self._store, key, value)
764
812
                result[prefix] = node
765
813
            else:
766
814
                node = result[prefix]
767
 
            node.map(store, key, value)
 
815
            sub_prefix, node_details = node.map(store, key, value)
 
816
            if len(node_details) > 1:
 
817
                if prefix != sub_prefix:
 
818
                    # This node has been split and is now found via a different
 
819
                    # path
 
820
                    result.pop(prefix)
 
821
                new_node = InternalNode(sub_prefix,
 
822
                    search_key_func=self._search_key_func)
 
823
                new_node.set_maximum_size(self._maximum_size)
 
824
                new_node._key_width = self._key_width
 
825
                for split, node in node_details:
 
826
                    new_node.add_node(split, node)
 
827
                result[prefix] = new_node
768
828
        return common_prefix, result.items()
769
829
 
770
830
    def map(self, store, key, value):
1351
1411
    return node
1352
1412
 
1353
1413
 
1354
 
def _find_children_info(store, interesting_keys, uninteresting_keys, pb):
1355
 
    """Read the associated records, and determine what is interesting."""
1356
 
    uninteresting_keys = set(uninteresting_keys)
1357
 
    chks_to_read = uninteresting_keys.union(interesting_keys)
1358
 
    next_uninteresting = set()
1359
 
    next_interesting = set()
1360
 
    uninteresting_items = set()
1361
 
    interesting_items = set()
1362
 
    interesting_to_yield = []
1363
 
    for record in store.get_record_stream(chks_to_read, 'unordered', True):
1364
 
        # records_read.add(record.key())
1365
 
        if pb is not None:
1366
 
            pb.tick()
1367
 
        bytes = record.get_bytes_as('fulltext')
1368
 
        # We don't care about search_key_func for this code, because we only
1369
 
        # care about external references.
1370
 
        node = _deserialise(bytes, record.key, search_key_func=None)
1371
 
        if record.key in uninteresting_keys:
1372
 
            if type(node) is InternalNode:
1373
 
                next_uninteresting.update(node.refs())
1374
 
            else:
1375
 
                # We know we are at a LeafNode, so we can pass None for the
1376
 
                # store
1377
 
                uninteresting_items.update(node.iteritems(None))
1378
 
        else:
1379
 
            interesting_to_yield.append(record.key)
1380
 
            if type(node) is InternalNode:
1381
 
                next_interesting.update(node.refs())
1382
 
            else:
1383
 
                interesting_items.update(node.iteritems(None))
1384
 
    return (next_uninteresting, uninteresting_items,
1385
 
            next_interesting, interesting_to_yield, interesting_items)
1386
 
 
1387
 
 
1388
 
def _find_all_uninteresting(store, interesting_root_keys,
1389
 
                            uninteresting_root_keys, pb):
1390
 
    """Determine the full set of uninteresting keys."""
1391
 
    # What about duplicates between interesting_root_keys and
1392
 
    # uninteresting_root_keys?
1393
 
    if not uninteresting_root_keys:
1394
 
        # Shortcut case. We know there is nothing uninteresting to filter out
1395
 
        # So we just let the rest of the algorithm do the work
1396
 
        # We know there is nothing uninteresting, and we didn't have to read
1397
 
        # any interesting records yet.
1398
 
        return (set(), set(), set(interesting_root_keys), [], set())
1399
 
    all_uninteresting_chks = set(uninteresting_root_keys)
1400
 
    all_uninteresting_items = set()
1401
 
 
1402
 
    # First step, find the direct children of both the interesting and
1403
 
    # uninteresting set
1404
 
    (uninteresting_keys, uninteresting_items,
1405
 
     interesting_keys, interesting_to_yield,
1406
 
     interesting_items) = _find_children_info(store, interesting_root_keys,
1407
 
                                              uninteresting_root_keys,
1408
 
                                              pb=pb)
1409
 
    all_uninteresting_chks.update(uninteresting_keys)
1410
 
    all_uninteresting_items.update(uninteresting_items)
1411
 
    del uninteresting_items
1412
 
    # Note: Exact matches between interesting and uninteresting do not need
1413
 
    #       to be search further. Non-exact matches need to be searched in case
1414
 
    #       there is a future exact-match
1415
 
    uninteresting_keys.difference_update(interesting_keys)
1416
 
 
1417
 
    # Second, find the full set of uninteresting bits reachable by the
1418
 
    # uninteresting roots
1419
 
    chks_to_read = uninteresting_keys
1420
 
    while chks_to_read:
1421
 
        next_chks = set()
1422
 
        for record in store.get_record_stream(chks_to_read, 'unordered', False):
1423
 
            # TODO: Handle 'absent'
1424
 
            if pb is not None:
1425
 
                pb.tick()
 
1414
class CHKMapDifference(object):
 
1415
    """Iterate the stored pages and key,value pairs for (new - old).
 
1416
 
 
1417
    This class provides a generator over the stored CHK pages and the
 
1418
    (key, value) pairs that are in any of the new maps and not in any of the
 
1419
    old maps.
 
1420
 
 
1421
    Note that it may yield chk pages that are common (especially root nodes),
 
1422
    but it won't yield (key,value) pairs that are common.
 
1423
    """
 
1424
 
 
1425
    def __init__(self, store, new_root_keys, old_root_keys,
 
1426
                 search_key_func, pb=None):
 
1427
        self._store = store
 
1428
        self._new_root_keys = new_root_keys
 
1429
        self._old_root_keys = old_root_keys
 
1430
        self._pb = pb
 
1431
        # All uninteresting chks that we have seen. By the time they are added
 
1432
        # here, they should be either fully ignored, or queued up for
 
1433
        # processing
 
1434
        self._all_old_chks = set(self._old_root_keys)
 
1435
        # All items that we have seen from the old_root_keys
 
1436
        self._all_old_items = set()
 
1437
        # These are interesting items which were either read, or already in the
 
1438
        # interesting queue (so we don't need to walk them again)
 
1439
        self._processed_new_refs = set()
 
1440
        self._search_key_func = search_key_func
 
1441
 
 
1442
        # The uninteresting and interesting nodes to be searched
 
1443
        self._old_queue = []
 
1444
        self._new_queue = []
 
1445
        # Holds the (key, value) items found when processing the root nodes,
 
1446
        # waiting for the uninteresting nodes to be walked
 
1447
        self._new_item_queue = []
 
1448
        self._state = None
 
1449
 
 
1450
    def _read_nodes_from_store(self, keys):
 
1451
        # We chose not to use _page_cache, because we think in terms of records
 
1452
        # to be yielded. Also, we expect to touch each page only 1 time during
 
1453
        # this code. (We may want to evaluate saving the raw bytes into the
 
1454
        # page cache, which would allow a working tree update after the fetch
 
1455
        # to not have to read the bytes again.)
 
1456
        stream = self._store.get_record_stream(keys, 'unordered', True)
 
1457
        for record in stream:
 
1458
            if self._pb is not None:
 
1459
                self._pb.tick()
 
1460
            if record.storage_kind == 'absent':
 
1461
                raise errors.NoSuchRevision(self._store, record.key)
1426
1462
            bytes = record.get_bytes_as('fulltext')
1427
 
            # We don't care about search_key_func for this code, because we
1428
 
            # only care about external references.
1429
 
            node = _deserialise(bytes, record.key, search_key_func=None)
 
1463
            node = _deserialise(bytes, record.key,
 
1464
                                search_key_func=self._search_key_func)
1430
1465
            if type(node) is InternalNode:
1431
 
                # uninteresting_prefix_chks.update(node._items.iteritems())
1432
 
                chks = node._items.values()
1433
 
                # TODO: We remove the entries that are already in
1434
 
                #       uninteresting_chks ?
1435
 
                next_chks.update(chks)
1436
 
                all_uninteresting_chks.update(chks)
 
1466
                # Note we don't have to do node.refs() because we know that
 
1467
                # there are no children that have been pushed into this node
 
1468
                prefix_refs = node._items.items()
 
1469
                items = []
1437
1470
            else:
1438
 
                all_uninteresting_items.update(node._items.iteritems())
1439
 
        chks_to_read = next_chks
1440
 
    return (all_uninteresting_chks, all_uninteresting_items,
1441
 
            interesting_keys, interesting_to_yield, interesting_items)
 
1471
                prefix_refs = []
 
1472
                items = node._items.items()
 
1473
            yield record, node, prefix_refs, items
 
1474
 
 
1475
    def _read_old_roots(self):
 
1476
        old_chks_to_enqueue = []
 
1477
        all_old_chks = self._all_old_chks
 
1478
        for record, node, prefix_refs, items in \
 
1479
                self._read_nodes_from_store(self._old_root_keys):
 
1480
            # Uninteresting node
 
1481
            prefix_refs = [p_r for p_r in prefix_refs
 
1482
                                if p_r[1] not in all_old_chks]
 
1483
            new_refs = [p_r[1] for p_r in prefix_refs]
 
1484
            all_old_chks.update(new_refs)
 
1485
            self._all_old_items.update(items)
 
1486
            # Queue up the uninteresting references
 
1487
            # Don't actually put them in the 'to-read' queue until we have
 
1488
            # finished checking the interesting references
 
1489
            old_chks_to_enqueue.extend(prefix_refs)
 
1490
        return old_chks_to_enqueue
 
1491
 
 
1492
    def _enqueue_old(self, new_prefixes, old_chks_to_enqueue):
 
1493
        # At this point, we have read all the uninteresting and interesting
 
1494
        # items, so we can queue up the uninteresting stuff, knowing that we've
 
1495
        # handled the interesting ones
 
1496
        for prefix, ref in old_chks_to_enqueue:
 
1497
            not_interesting = True
 
1498
            for i in xrange(len(prefix), 0, -1):
 
1499
                if prefix[:i] in new_prefixes:
 
1500
                    not_interesting = False
 
1501
                    break
 
1502
            if not_interesting:
 
1503
                # This prefix is not part of the remaining 'interesting set'
 
1504
                continue
 
1505
            self._old_queue.append(ref)
 
1506
 
 
1507
    def _read_all_roots(self):
 
1508
        """Read the root pages.
 
1509
 
 
1510
        This is structured as a generator, so that the root records can be
 
1511
        yielded up to whoever needs them without any buffering.
 
1512
        """
 
1513
        # This is the bootstrap phase
 
1514
        if not self._old_root_keys:
 
1515
            # With no old_root_keys we can just shortcut and be ready
 
1516
            # for _flush_new_queue
 
1517
            self._new_queue = list(self._new_root_keys)
 
1518
            return
 
1519
        old_chks_to_enqueue = self._read_old_roots()
 
1520
        # filter out any root keys that are already known to be uninteresting
 
1521
        new_keys = set(self._new_root_keys).difference(self._all_old_chks)
 
1522
        # These are prefixes that are present in new_keys that we are
 
1523
        # thinking to yield
 
1524
        new_prefixes = set()
 
1525
        # We are about to yield all of these, so we don't want them getting
 
1526
        # added a second time
 
1527
        processed_new_refs = self._processed_new_refs
 
1528
        processed_new_refs.update(new_keys)
 
1529
        for record, node, prefix_refs, items in \
 
1530
                self._read_nodes_from_store(new_keys):
 
1531
            # At this level, we now know all the uninteresting references
 
1532
            # So we filter and queue up whatever is remaining
 
1533
            prefix_refs = [p_r for p_r in prefix_refs
 
1534
                           if p_r[1] not in self._all_old_chks
 
1535
                              and p_r[1] not in processed_new_refs]
 
1536
            refs = [p_r[1] for p_r in prefix_refs]
 
1537
            new_prefixes.update([p_r[0] for p_r in prefix_refs])
 
1538
            self._new_queue.extend(refs)
 
1539
            # TODO: We can potentially get multiple items here, however the
 
1540
            #       current design allows for this, as callers will do the work
 
1541
            #       to make the results unique. We might profile whether we
 
1542
            #       gain anything by ensuring unique return values for items
 
1543
            new_items = [item for item in items
 
1544
                               if item not in self._all_old_items]
 
1545
            self._new_item_queue.extend(new_items)
 
1546
            new_prefixes.update([self._search_key_func(item[0])
 
1547
                                 for item in new_items])
 
1548
            processed_new_refs.update(refs)
 
1549
            yield record
 
1550
        # For new_prefixes we have the full length prefixes queued up.
 
1551
        # However, we also need possible prefixes. (If we have a known ref to
 
1552
        # 'ab', then we also need to include 'a'.) So expand the
 
1553
        # new_prefixes to include all shorter prefixes
 
1554
        for prefix in list(new_prefixes):
 
1555
            new_prefixes.update([prefix[:i] for i in xrange(1, len(prefix))])
 
1556
        self._enqueue_old(new_prefixes, old_chks_to_enqueue)
 
1557
 
 
1558
    def _flush_new_queue(self):
 
1559
        # No need to maintain the heap invariant anymore, just pull things out
 
1560
        # and process them
 
1561
        refs = set(self._new_queue)
 
1562
        self._new_queue = []
 
1563
        # First pass, flush all interesting items and convert to using direct refs
 
1564
        all_old_chks = self._all_old_chks
 
1565
        processed_new_refs = self._processed_new_refs
 
1566
        all_old_items = self._all_old_items
 
1567
        new_items = [item for item in self._new_item_queue
 
1568
                           if item not in all_old_items]
 
1569
        self._new_item_queue = []
 
1570
        if new_items:
 
1571
            yield None, new_items
 
1572
        refs = refs.difference(all_old_chks)
 
1573
        while refs:
 
1574
            next_refs = set()
 
1575
            next_refs_update = next_refs.update
 
1576
            # Inlining _read_nodes_from_store improves 'bzr branch bzr.dev'
 
1577
            # from 1m54s to 1m51s. Consider it.
 
1578
            for record, _, p_refs, items in self._read_nodes_from_store(refs):
 
1579
                items = [item for item in items
 
1580
                         if item not in all_old_items]
 
1581
                yield record, items
 
1582
                next_refs_update([p_r[1] for p_r in p_refs])
 
1583
            next_refs = next_refs.difference(all_old_chks)
 
1584
            next_refs = next_refs.difference(processed_new_refs)
 
1585
            processed_new_refs.update(next_refs)
 
1586
            refs = next_refs
 
1587
 
 
1588
    def _process_next_old(self):
 
1589
        # Since we don't filter uninteresting any further than during
 
1590
        # _read_all_roots, process the whole queue in a single pass.
 
1591
        refs = self._old_queue
 
1592
        self._old_queue = []
 
1593
        all_old_chks = self._all_old_chks
 
1594
        for record, _, prefix_refs, items in self._read_nodes_from_store(refs):
 
1595
            self._all_old_items.update(items)
 
1596
            refs = [r for _,r in prefix_refs if r not in all_old_chks]
 
1597
            self._old_queue.extend(refs)
 
1598
            all_old_chks.update(refs)
 
1599
 
 
1600
    def _process_queues(self):
 
1601
        while self._old_queue:
 
1602
            self._process_next_old()
 
1603
        return self._flush_new_queue()
 
1604
 
 
1605
    def process(self):
 
1606
        for record in self._read_all_roots():
 
1607
            yield record, []
 
1608
        for record, items in self._process_queues():
 
1609
            yield record, items
1442
1610
 
1443
1611
 
1444
1612
def iter_interesting_nodes(store, interesting_root_keys,
1455
1623
    :return: Yield
1456
1624
        (interesting record, {interesting key:values})
1457
1625
    """
1458
 
    # TODO: consider that it may be more memory efficient to use the 20-byte
1459
 
    #       sha1 string, rather than tuples of hexidecimal sha1 strings.
1460
 
    # TODO: Try to factor out a lot of the get_record_stream() calls into a
1461
 
    #       helper function similar to _read_bytes. This function should be
1462
 
    #       able to use nodes from the _page_cache as well as actually
1463
 
    #       requesting bytes from the store.
1464
 
 
1465
 
    (all_uninteresting_chks, all_uninteresting_items, interesting_keys,
1466
 
     interesting_to_yield, interesting_items) = _find_all_uninteresting(store,
1467
 
        interesting_root_keys, uninteresting_root_keys, pb)
1468
 
 
1469
 
    # Now that we know everything uninteresting, we can yield information from
1470
 
    # our first request
1471
 
    interesting_items.difference_update(all_uninteresting_items)
1472
 
    interesting_to_yield = set(interesting_to_yield) - all_uninteresting_chks
1473
 
    if interesting_items:
1474
 
        yield None, interesting_items
1475
 
    if interesting_to_yield:
1476
 
        # We request these records again, rather than buffering the root
1477
 
        # records, most likely they are still in the _group_cache anyway.
1478
 
        for record in store.get_record_stream(interesting_to_yield,
1479
 
                                              'unordered', False):
1480
 
            yield record, []
1481
 
    all_uninteresting_chks.update(interesting_to_yield)
1482
 
    interesting_keys.difference_update(all_uninteresting_chks)
1483
 
 
1484
 
    chks_to_read = interesting_keys
1485
 
    counter = 0
1486
 
    while chks_to_read:
1487
 
        next_chks = set()
1488
 
        for record in store.get_record_stream(chks_to_read, 'unordered', False):
1489
 
            counter += 1
1490
 
            if pb is not None:
1491
 
                pb.update('find chk pages', counter)
1492
 
            # TODO: Handle 'absent'?
1493
 
            bytes = record.get_bytes_as('fulltext')
1494
 
            # We don't care about search_key_func for this code, because we
1495
 
            # only care about external references.
1496
 
            node = _deserialise(bytes, record.key, search_key_func=None)
1497
 
            if type(node) is InternalNode:
1498
 
                # all_uninteresting_chks grows large, as it lists all nodes we
1499
 
                # don't want to process (including already seen interesting
1500
 
                # nodes).
1501
 
                # small.difference_update(large) scales O(large), but
1502
 
                # small.difference(large) scales O(small).
1503
 
                # Also, we know we just _deserialised this node, so we can
1504
 
                # access the dict directly.
1505
 
                chks = set(node._items.itervalues()).difference(
1506
 
                            all_uninteresting_chks)
1507
 
                # Is set() and .difference_update better than:
1508
 
                # chks = [chk for chk in node.refs()
1509
 
                #              if chk not in all_uninteresting_chks]
1510
 
                next_chks.update(chks)
1511
 
                # These are now uninteresting everywhere else
1512
 
                all_uninteresting_chks.update(chks)
1513
 
                interesting_items = []
1514
 
            else:
1515
 
                interesting_items = [item for item in node._items.iteritems()
1516
 
                                     if item not in all_uninteresting_items]
1517
 
                # TODO: Do we need to filter out items that we have already
1518
 
                #       seen on other pages? We don't really want to buffer the
1519
 
                #       whole thing, but it does mean that callers need to
1520
 
                #       understand they may get duplicate values.
1521
 
                # all_uninteresting_items.update(interesting_items)
1522
 
            yield record, interesting_items
1523
 
        chks_to_read = next_chks
 
1626
    iterator = CHKMapDifference(store, interesting_root_keys,
 
1627
                                uninteresting_root_keys,
 
1628
                                search_key_func=store._search_key_func,
 
1629
                                pb=pb)
 
1630
    return iterator.process()
1524
1631
 
1525
1632
 
1526
1633
try: