1
# Copyright (C) 2008, 2009, 2010 Canonical Ltd
1
# Copyright (C) 2008, 2009 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
57
from bzrlib.static_tuple import StaticTuple
60
57
# If each line is 50 bytes, and you have 255 internal pages, with 255-way fan
61
58
# out, it takes 3.1MB to cache the layer.
62
59
_PAGE_CACHE_SIZE = 4*1024*1024
63
# Per thread caches for 2 reasons:
64
# - in the server we may be serving very different content, so we get less
66
# - we avoid locking on every cache lookup.
67
_thread_caches = threading.local()
69
_thread_caches.page_cache = None
72
"""Get the per-thread page cache.
74
We need a function to do this because in a new thread the _thread_caches
75
threading.local object does not have the cache initialized yet.
77
page_cache = getattr(_thread_caches, 'page_cache', None)
78
if page_cache is None:
79
# We are caching bytes so len(value) is perfectly accurate
80
page_cache = lru_cache.LRUSizeCache(_PAGE_CACHE_SIZE)
81
_thread_caches.page_cache = page_cache
60
# We are caching bytes so len(value) is perfectly accurate
61
_page_cache = lru_cache.LRUSizeCache(_PAGE_CACHE_SIZE)
89
63
# If a ChildNode falls below this many bytes, we check for a remap
90
64
_INTERESTING_NEW_SIZE = 50
91
65
# If a ChildNode shrinks by more than this amount, we check for a remap
92
66
_INTERESTING_SHRINKAGE_LIMIT = 20
67
# If we delete more than this many nodes applying a delta, we check for a remap
68
_INTERESTING_DELETES_LIMIT = 5
95
71
def _search_key_plain(key):
104
80
class CHKMap(object):
105
81
"""A persistent map from string to string backed by a CHK store."""
107
__slots__ = ('_store', '_root_node', '_search_key_func')
109
83
def __init__(self, store, root_key, search_key_func=None):
110
84
"""Create a CHKMap object.
133
107
into the map; if old_key is not None, then the old mapping
134
108
of old_key is removed.
137
111
# Check preconditions first.
138
as_st = StaticTuple.from_sequence
139
new_items = set([as_st(key) for (old, key, value) in delta
140
if key is not None and old is None])
112
new_items = set([key for (old, key, value) in delta if key is not None
141
114
existing_new = list(self.iteritems(key_filter=new_items))
143
116
raise errors.InconsistentDeltaDelta(delta,
146
119
for old, new, value in delta:
147
120
if old is not None and old != new:
148
121
self.unmap(old, check_remap=False)
150
123
for old, new, value in delta:
151
124
if new is not None:
152
125
self.map(new, value)
126
if delete_count > _INTERESTING_DELETES_LIMIT:
127
trace.mutter("checking remap as %d deletions", delete_count)
154
128
self._check_remap()
155
129
return self._save()
157
131
def _ensure_root(self):
158
132
"""Ensure that the root node is an object not a key."""
159
if type(self._root_node) is StaticTuple:
133
if type(self._root_node) is tuple:
160
134
# Demand-load the root
161
135
self._root_node = self._get_node(self._root_node)
170
144
:param node: A tuple key or node object.
171
145
:return: A node object.
173
if type(node) is StaticTuple:
147
if type(node) is tuple:
174
148
bytes = self._read_bytes(node)
175
149
return _deserialise(bytes, node,
176
150
search_key_func=self._search_key_func)
180
154
def _read_bytes(self, key):
182
return _get_cache()[key]
156
return _page_cache[key]
184
158
stream = self._store.get_record_stream([key], 'unordered', True)
185
159
bytes = stream.next().get_bytes_as('fulltext')
186
_get_cache()[key] = bytes
160
_page_cache[key] = bytes
189
163
def _dump_tree(self, include_keys=False):
217
191
for key, value in sorted(node._items.iteritems()):
218
192
# Don't use prefix nor indent here to line up when used in
219
193
# tests in conjunction with assertEqualDiff
220
result.append(' %r %r' % (tuple(key), value))
194
result.append(' %r %r' % (key, value))
241
215
root_key = klass._create_directly(store, initial_value,
242
216
maximum_size=maximum_size, key_width=key_width,
243
217
search_key_func=search_key_func)
244
if type(root_key) is not StaticTuple:
245
raise AssertionError('we got a %s instead of a StaticTuple'
264
235
node = LeafNode(search_key_func=search_key_func)
265
236
node.set_maximum_size(maximum_size)
266
237
node._key_width = key_width
267
as_st = StaticTuple.from_sequence
268
node._items = dict([(as_st(key), val) for key, val
269
in initial_value.iteritems()])
238
node._items = dict(initial_value)
270
239
node._raw_size = sum([node._key_value_len(key, value)
271
for key,value in node._items.iteritems()])
240
for key,value in initial_value.iteritems()])
272
241
node._len = len(node._items)
273
242
node._compute_search_prefix()
274
243
node._compute_serialised_prefix()
510
479
def iteritems(self, key_filter=None):
511
480
"""Iterate over the entire CHKMap's contents."""
512
481
self._ensure_root()
513
if key_filter is not None:
514
as_st = StaticTuple.from_sequence
515
key_filter = [as_st(key) for key in key_filter]
516
482
return self._root_node.iteritems(self._store, key_filter=key_filter)
519
485
"""Return the key for this map."""
520
if type(self._root_node) is StaticTuple:
486
if type(self._root_node) is tuple:
521
487
return self._root_node
523
489
return self._root_node._key
532
498
:param key: A key to map.
533
499
:param value: The value to assign to key.
535
key = StaticTuple.from_sequence(key)
536
501
# Need a root object.
537
502
self._ensure_root()
538
503
prefix, node_details = self._root_node.map(self._store, key, value)
549
514
def _node_key(self, node):
550
515
"""Get the key for a node whether it's a tuple or node."""
551
516
if type(node) is tuple:
552
node = StaticTuple.from_sequence(node)
553
if type(node) is StaticTuple:
558
521
def unmap(self, key, check_remap=True):
559
522
"""remove key from the map."""
560
key = StaticTuple.from_sequence(key)
561
523
self._ensure_root()
562
524
if type(self._root_node) is InternalNode:
563
525
unmapped = self._root_node.unmap(self._store, key,
570
532
"""Check if nodes can be collapsed."""
571
533
self._ensure_root()
572
534
if type(self._root_node) is InternalNode:
573
self._root_node = self._root_node._check_remap(self._store)
535
self._root_node._check_remap(self._store)
576
538
"""Save the map completely.
578
540
:return: The key of the root node.
580
if type(self._root_node) is StaticTuple:
542
if type(self._root_node) is tuple:
582
544
return self._root_node
583
545
keys = list(self._root_node.serialise(self._store))
591
553
adding the header bytes, and without prefix compression.
594
__slots__ = ('_key', '_len', '_maximum_size', '_key_width',
595
'_raw_size', '_items', '_search_prefix', '_search_key_func'
598
556
def __init__(self, key_width=1):
599
557
"""Create a node.
689
647
the key/value pairs.
692
__slots__ = ('_common_serialised_prefix',)
694
650
def __init__(self, search_key_func=None):
695
651
Node.__init__(self)
696
652
# All of the keys in this leaf node share this common prefix
697
653
self._common_serialised_prefix = None
654
self._serialise_key = '\x00'.join
698
655
if search_key_func is None:
699
656
self._search_key_func = _search_key_plain
738
695
:param bytes: The bytes of the node.
739
696
:param key: The key that the serialised node has.
741
key = static_tuple.expect_static_tuple(key)
742
698
return _deserialise_leaf_node(bytes, key,
743
699
search_key_func=search_key_func)
884
840
raise AssertionError('%r must be known' % self._search_prefix)
885
841
return self._search_prefix, [("", self)]
887
_serialise_key = '\x00'.join
889
843
def serialise(self, store):
890
844
"""Serialise the LeafNode to store.
916
870
lines.append(serialized[prefix_len:])
917
871
lines.extend(value_lines)
918
872
sha1, _, _ = store.add_lines((None,), (), lines)
919
self._key = StaticTuple("sha1:" + sha1,).intern()
873
self._key = ("sha1:" + sha1,)
920
874
bytes = ''.join(lines)
921
875
if len(bytes) != self._current_size():
922
876
raise AssertionError('Invalid _current_size')
923
_get_cache().add(self._key, bytes)
877
_page_cache.add(self._key, bytes)
924
878
return [self._key]
990
944
LeafNode or InternalNode.
993
__slots__ = ('_node_width',)
995
947
def __init__(self, prefix='', search_key_func=None):
996
948
Node.__init__(self)
997
949
# The size of an internalnode with default values and no children.
1039
991
:param key: The key that the serialised node has.
1040
992
:return: An InternalNode instance.
1042
key = static_tuple.expect_static_tuple(key)
1043
994
return _deserialise_internal_node(bytes, key,
1044
995
search_key_func=search_key_func)
1070
1021
# for whatever we are missing
1071
1022
shortcut = True
1072
1023
for prefix, node in self._items.iteritems():
1073
if node.__class__ is StaticTuple:
1024
if node.__class__ is tuple:
1074
1025
keys[node] = (prefix, None)
1076
1027
yield node, None
1105
1056
# A given key can only match 1 child node, if it isn't
1106
1057
# there, then we can just return nothing
1108
if node.__class__ is StaticTuple:
1059
if node.__class__ is tuple:
1109
1060
keys[node] = (search_prefix, [key])
1111
1062
# This is loaded, and the only thing that can match,
1138
1089
# We can ignore this one
1140
1091
node_key_filter = prefix_to_keys[search_prefix]
1141
if node.__class__ is StaticTuple:
1092
if node.__class__ is tuple:
1142
1093
keys[node] = (search_prefix, node_key_filter)
1144
1095
yield node, node_key_filter
1153
1104
if sub_prefix in length_filter:
1154
1105
node_key_filter.extend(prefix_to_keys[sub_prefix])
1155
1106
if node_key_filter: # this key matched something, yield it
1156
if node.__class__ is StaticTuple:
1107
if node.__class__ is tuple:
1157
1108
keys[node] = (prefix, node_key_filter)
1159
1110
yield node, node_key_filter
1193
1144
prefix, node_key_filter = keys[record.key]
1194
1145
node_and_filters.append((node, node_key_filter))
1195
1146
self._items[prefix] = node
1196
_get_cache().add(record.key, bytes)
1147
_page_cache.add(record.key, bytes)
1197
1148
for info in node_and_filters:
1308
1259
lines.append('%s\n' % (self._search_prefix,))
1309
1260
prefix_len = len(self._search_prefix)
1310
1261
for prefix, node in sorted(self._items.items()):
1311
if type(node) is StaticTuple:
1262
if type(node) is tuple:
1314
1265
key = node._key[0]
1318
1269
% (serialised, self._search_prefix))
1319
1270
lines.append(serialised[prefix_len:])
1320
1271
sha1, _, _ = store.add_lines((None,), (), lines)
1321
self._key = StaticTuple("sha1:" + sha1,).intern()
1322
_get_cache().add(self._key, ''.join(lines))
1272
self._key = ("sha1:" + sha1,)
1273
_page_cache.add(self._key, ''.join(lines))
1323
1274
yield self._key
1325
1276
def _search_key(self, key):
1474
1425
def __init__(self, store, new_root_keys, old_root_keys,
1475
1426
search_key_func, pb=None):
1476
# TODO: Should we add a StaticTuple barrier here? It would be nice to
1477
# force callers to use StaticTuple, because there will often be
1478
# lots of keys passed in here. And even if we cast it locally,
1479
# that just meanst that we will have *both* a StaticTuple and a
1480
# tuple() in memory, referring to the same object. (so a net
1481
# increase in memory, not a decrease.)
1482
1427
self._store = store
1483
1428
self._new_root_keys = new_root_keys
1484
1429
self._old_root_keys = old_root_keys
1486
1431
# All uninteresting chks that we have seen. By the time they are added
1487
1432
# here, they should be either fully ignored, or queued up for
1489
# TODO: This might grow to a large size if there are lots of merge
1490
# parents, etc. However, it probably doesn't scale to O(history)
1491
# like _processed_new_refs does.
1492
1434
self._all_old_chks = set(self._old_root_keys)
1493
1435
# All items that we have seen from the old_root_keys
1494
1436
self._all_old_items = set()
1495
1437
# These are interesting items which were either read, or already in the
1496
1438
# interesting queue (so we don't need to walk them again)
1497
# TODO: processed_new_refs becomes O(all_chks), consider switching to
1499
1439
self._processed_new_refs = set()
1500
1440
self._search_key_func = search_key_func
1508
1448
self._state = None
1510
1450
def _read_nodes_from_store(self, keys):
1511
# We chose not to use _get_cache(), because we think in
1512
# terms of records to be yielded. Also, we expect to touch each page
1513
# only 1 time during this code. (We may want to evaluate saving the
1514
# raw bytes into the page cache, which would allow a working tree
1515
# update after the fetch to not have to read the bytes again.)
1516
as_st = StaticTuple.from_sequence
1451
# We chose not to use _page_cache, because we think in terms of records
1452
# to be yielded. Also, we expect to touch each page only 1 time during
1453
# this code. (We may want to evaluate saving the raw bytes into the
1454
# page cache, which would allow a working tree update after the fetch
1455
# to not have to read the bytes again.)
1517
1456
stream = self._store.get_record_stream(keys, 'unordered', True)
1518
1457
for record in stream:
1519
1458
if self._pb is not None:
1526
1465
if type(node) is InternalNode:
1527
1466
# Note we don't have to do node.refs() because we know that
1528
1467
# there are no children that have been pushed into this node
1529
# Note: Using as_st() here seemed to save 1.2MB, which would
1530
# indicate that we keep 100k prefix_refs around while
1531
# processing. They *should* be shorter lived than that...
1532
# It does cost us ~10s of processing time
1533
#prefix_refs = [as_st(item) for item in node._items.iteritems()]
1534
1468
prefix_refs = node._items.items()
1537
1471
prefix_refs = []
1538
# Note: We don't use a StaticTuple here. Profiling showed a
1539
# minor memory improvement (0.8MB out of 335MB peak 0.2%)
1540
# But a significant slowdown (15s / 145s, or 10%)
1541
1472
items = node._items.items()
1542
1473
yield record, node, prefix_refs, items
1551
1482
if p_r[1] not in all_old_chks]
1552
1483
new_refs = [p_r[1] for p_r in prefix_refs]
1553
1484
all_old_chks.update(new_refs)
1554
# TODO: This might be a good time to turn items into StaticTuple
1555
# instances and possibly intern them. However, this does not
1556
# impact 'initial branch' performance, so I'm not worrying
1558
1485
self._all_old_items.update(items)
1559
1486
# Queue up the uninteresting references
1560
1487
# Don't actually put them in the 'to-read' queue until we have
1613
1540
# current design allows for this, as callers will do the work
1614
1541
# to make the results unique. We might profile whether we
1615
1542
# gain anything by ensuring unique return values for items
1616
# TODO: This might be a good time to cast to StaticTuple, as
1617
# self._new_item_queue will hold the contents of multiple
1618
# records for an extended lifetime
1619
1543
new_items = [item for item in items
1620
1544
if item not in self._all_old_items]
1621
1545
self._new_item_queue.extend(new_items)
1647
1571
yield None, new_items
1648
1572
refs = refs.difference(all_old_chks)
1649
processed_new_refs.update(refs)
1651
# TODO: Using a SimpleSet for self._processed_new_refs and
1652
# saved as much as 10MB of peak memory. However, it requires
1653
# implementing a non-pyrex version.
1654
1574
next_refs = set()
1655
1575
next_refs_update = next_refs.update
1656
1576
# Inlining _read_nodes_from_store improves 'bzr branch bzr.dev'
1657
1577
# from 1m54s to 1m51s. Consider it.
1658
1578
for record, _, p_refs, items in self._read_nodes_from_store(refs):
1660
# using the 'if' check saves about 145s => 141s, when
1661
# streaming initial branch of Launchpad data.
1662
items = [item for item in items
1663
if item not in all_old_items]
1579
items = [item for item in items
1580
if item not in all_old_items]
1664
1581
yield record, items
1665
1582
next_refs_update([p_r[1] for p_r in p_refs])
1667
# set1.difference(set/dict) walks all of set1, and checks if it
1668
# exists in 'other'.
1669
# set1.difference(iterable) walks all of iterable, and does a
1670
# 'difference_update' on a clone of set1. Pick wisely based on the
1671
# expected sizes of objects.
1672
# in our case it is expected that 'new_refs' will always be quite
1674
1583
next_refs = next_refs.difference(all_old_chks)
1675
1584
next_refs = next_refs.difference(processed_new_refs)
1676
1585
processed_new_refs.update(next_refs)
1683
1592
self._old_queue = []
1684
1593
all_old_chks = self._all_old_chks
1685
1594
for record, _, prefix_refs, items in self._read_nodes_from_store(refs):
1686
# TODO: Use StaticTuple here?
1687
1595
self._all_old_items.update(items)
1688
1596
refs = [r for _,r in prefix_refs if r not in all_old_chks]
1689
1597
self._old_queue.extend(refs)
1726
1634
from bzrlib._chk_map_pyx import (
1728
1635
_search_key_16,
1729
1636
_search_key_255,
1730
1637
_deserialise_leaf_node,
1731
1638
_deserialise_internal_node,
1733
except ImportError, e:
1734
osutils.failed_to_load_extension(e)
1735
1641
from bzrlib._chk_map_py import (
1737
1642
_search_key_16,
1738
1643
_search_key_255,
1739
1644
_deserialise_leaf_node,
1742
1647
search_key_registry.register('hash-16-way', _search_key_16)
1743
1648
search_key_registry.register('hash-255-way', _search_key_255)
1746
def _check_key(key):
1747
"""Helper function to assert that a key is properly formatted.
1749
This generally shouldn't be used in production code, but it can be helpful
1752
if type(key) is not StaticTuple:
1753
raise TypeError('key %r is not StaticTuple but %s' % (key, type(key)))
1755
raise ValueError('key %r should have length 1, not %d' % (key, len(key),))
1756
if type(key[0]) is not str:
1757
raise TypeError('key %r should hold a str, not %r'
1758
% (key, type(key[0])))
1759
if not key[0].startswith('sha1:'):
1760
raise ValueError('key %r should point to a sha1:' % (key,))