1
# Copyright (C) 2008, 2009 Canonical Ltd
1
# Copyright (C) 2008-2011 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
40
from __future__ import absolute_import
42
45
from bzrlib import lazy_import
43
46
lazy_import.lazy_import(globals(), """
44
47
from bzrlib import (
49
51
from bzrlib import (
59
from bzrlib.static_tuple import StaticTuple
57
62
# If each line is 50 bytes, and you have 255 internal pages, with 255-way fan
58
63
# out, it takes 3.1MB to cache the layer.
59
64
_PAGE_CACHE_SIZE = 4*1024*1024
60
# We are caching bytes so len(value) is perfectly accurate
61
_page_cache = lru_cache.LRUSizeCache(_PAGE_CACHE_SIZE)
65
# Per thread caches for 2 reasons:
66
# - in the server we may be serving very different content, so we get less
68
# - we avoid locking on every cache lookup.
69
_thread_caches = threading.local()
71
_thread_caches.page_cache = None
74
"""Get the per-thread page cache.
76
We need a function to do this because in a new thread the _thread_caches
77
threading.local object does not have the cache initialized yet.
79
page_cache = getattr(_thread_caches, 'page_cache', None)
80
if page_cache is None:
81
# We are caching bytes so len(value) is perfectly accurate
82
page_cache = lru_cache.LRUSizeCache(_PAGE_CACHE_SIZE)
83
_thread_caches.page_cache = page_cache
66
91
# If a ChildNode falls below this many bytes, we check for a remap
67
92
_INTERESTING_NEW_SIZE = 50
68
93
# If a ChildNode shrinks by more than this amount, we check for a remap
69
94
_INTERESTING_SHRINKAGE_LIMIT = 20
70
# If we delete more than this many nodes applying a delta, we check for a remap
71
_INTERESTING_DELETES_LIMIT = 5
74
97
def _search_key_plain(key):
83
106
class CHKMap(object):
84
107
"""A persistent map from string to string backed by a CHK store."""
109
__slots__ = ('_store', '_root_node', '_search_key_func')
86
111
def __init__(self, store, root_key, search_key_func=None):
87
112
"""Create a CHKMap object.
110
135
into the map; if old_key is not None, then the old mapping
111
136
of old_key is removed.
114
139
# Check preconditions first.
115
new_items = set([key for (old, key, value) in delta if key is not None
140
as_st = StaticTuple.from_sequence
141
new_items = set([as_st(key) for (old, key, value) in delta
142
if key is not None and old is None])
117
143
existing_new = list(self.iteritems(key_filter=new_items))
119
145
raise errors.InconsistentDeltaDelta(delta,
122
148
for old, new, value in delta:
123
149
if old is not None and old != new:
124
150
self.unmap(old, check_remap=False)
126
152
for old, new, value in delta:
127
153
if new is not None:
128
154
self.map(new, value)
129
if delete_count > _INTERESTING_DELETES_LIMIT:
130
trace.mutter("checking remap as %d deletions", delete_count)
131
156
self._check_remap()
132
157
return self._save()
134
159
def _ensure_root(self):
135
160
"""Ensure that the root node is an object not a key."""
136
if type(self._root_node) is tuple:
161
if type(self._root_node) is StaticTuple:
137
162
# Demand-load the root
138
163
self._root_node = self._get_node(self._root_node)
147
172
:param node: A tuple key or node object.
148
173
:return: A node object.
150
if type(node) is tuple:
175
if type(node) is StaticTuple:
151
176
bytes = self._read_bytes(node)
152
177
return _deserialise(bytes, node,
153
178
search_key_func=self._search_key_func)
157
182
def _read_bytes(self, key):
159
return _page_cache[key]
184
return _get_cache()[key]
161
186
stream = self._store.get_record_stream([key], 'unordered', True)
162
187
bytes = stream.next().get_bytes_as('fulltext')
163
_page_cache[key] = bytes
188
_get_cache()[key] = bytes
166
191
def _dump_tree(self, include_keys=False):
194
219
for key, value in sorted(node._items.iteritems()):
195
220
# Don't use prefix nor indent here to line up when used in
196
221
# tests in conjunction with assertEqualDiff
197
result.append(' %r %r' % (key, value))
222
result.append(' %r %r' % (tuple(key), value))
218
243
root_key = klass._create_directly(store, initial_value,
219
244
maximum_size=maximum_size, key_width=key_width,
220
245
search_key_func=search_key_func)
246
if type(root_key) is not StaticTuple:
247
raise AssertionError('we got a %s instead of a StaticTuple'
238
266
node = LeafNode(search_key_func=search_key_func)
239
267
node.set_maximum_size(maximum_size)
240
268
node._key_width = key_width
241
node._items = dict(initial_value)
269
as_st = StaticTuple.from_sequence
270
node._items = dict([(as_st(key), val) for key, val
271
in initial_value.iteritems()])
242
272
node._raw_size = sum([node._key_value_len(key, value)
243
for key,value in initial_value.iteritems()])
273
for key,value in node._items.iteritems()])
244
274
node._len = len(node._items)
245
275
node._compute_search_prefix()
246
276
node._compute_serialised_prefix()
482
512
def iteritems(self, key_filter=None):
483
513
"""Iterate over the entire CHKMap's contents."""
484
514
self._ensure_root()
515
if key_filter is not None:
516
as_st = StaticTuple.from_sequence
517
key_filter = [as_st(key) for key in key_filter]
485
518
return self._root_node.iteritems(self._store, key_filter=key_filter)
488
521
"""Return the key for this map."""
489
if type(self._root_node) is tuple:
522
if type(self._root_node) is StaticTuple:
490
523
return self._root_node
492
525
return self._root_node._key
501
534
:param key: A key to map.
502
535
:param value: The value to assign to key.
537
key = StaticTuple.from_sequence(key)
504
538
# Need a root object.
505
539
self._ensure_root()
506
540
prefix, node_details = self._root_node.map(self._store, key, value)
517
551
def _node_key(self, node):
518
552
"""Get the key for a node whether it's a tuple or node."""
519
553
if type(node) is tuple:
554
node = StaticTuple.from_sequence(node)
555
if type(node) is StaticTuple:
524
560
def unmap(self, key, check_remap=True):
525
561
"""remove key from the map."""
562
key = StaticTuple.from_sequence(key)
526
563
self._ensure_root()
527
564
if type(self._root_node) is InternalNode:
528
565
unmapped = self._root_node.unmap(self._store, key,
535
572
"""Check if nodes can be collapsed."""
536
573
self._ensure_root()
537
574
if type(self._root_node) is InternalNode:
538
self._root_node._check_remap(self._store)
575
self._root_node = self._root_node._check_remap(self._store)
541
578
"""Save the map completely.
543
580
:return: The key of the root node.
545
if type(self._root_node) is tuple:
582
if type(self._root_node) is StaticTuple:
547
584
return self._root_node
548
585
keys = list(self._root_node.serialise(self._store))
556
593
adding the header bytes, and without prefix compression.
596
__slots__ = ('_key', '_len', '_maximum_size', '_key_width',
597
'_raw_size', '_items', '_search_prefix', '_search_key_func'
559
600
def __init__(self, key_width=1):
560
601
"""Create a node.
650
691
the key/value pairs.
694
__slots__ = ('_common_serialised_prefix',)
653
696
def __init__(self, search_key_func=None):
654
697
Node.__init__(self)
655
698
# All of the keys in this leaf node share this common prefix
656
699
self._common_serialised_prefix = None
657
self._serialise_key = '\x00'.join
658
700
if search_key_func is None:
659
701
self._search_key_func = _search_key_plain
698
740
:param bytes: The bytes of the node.
699
741
:param key: The key that the serialised node has.
743
key = static_tuple.expect_static_tuple(key)
701
744
return _deserialise_leaf_node(bytes, key,
702
745
search_key_func=search_key_func)
843
886
raise AssertionError('%r must be known' % self._search_prefix)
844
887
return self._search_prefix, [("", self)]
889
_serialise_key = '\x00'.join
846
891
def serialise(self, store):
847
892
"""Serialise the LeafNode to store.
873
918
lines.append(serialized[prefix_len:])
874
919
lines.extend(value_lines)
875
920
sha1, _, _ = store.add_lines((None,), (), lines)
876
self._key = ("sha1:" + sha1,)
921
self._key = StaticTuple("sha1:" + sha1,).intern()
877
922
bytes = ''.join(lines)
878
923
if len(bytes) != self._current_size():
879
924
raise AssertionError('Invalid _current_size')
880
_page_cache.add(self._key, bytes)
925
_get_cache()[self._key] = bytes
881
926
return [self._key]
947
992
LeafNode or InternalNode.
995
__slots__ = ('_node_width',)
950
997
def __init__(self, prefix='', search_key_func=None):
951
998
Node.__init__(self)
952
999
# The size of an internalnode with default values and no children.
994
1041
:param key: The key that the serialised node has.
995
1042
:return: An InternalNode instance.
1044
key = static_tuple.expect_static_tuple(key)
997
1045
return _deserialise_internal_node(bytes, key,
998
1046
search_key_func=search_key_func)
1024
1072
# for whatever we are missing
1025
1073
shortcut = True
1026
1074
for prefix, node in self._items.iteritems():
1027
if node.__class__ is tuple:
1075
if node.__class__ is StaticTuple:
1028
1076
keys[node] = (prefix, None)
1030
1078
yield node, None
1059
1107
# A given key can only match 1 child node, if it isn't
1060
1108
# there, then we can just return nothing
1062
if node.__class__ is tuple:
1110
if node.__class__ is StaticTuple:
1063
1111
keys[node] = (search_prefix, [key])
1065
1113
# This is loaded, and the only thing that can match,
1092
1140
# We can ignore this one
1094
1142
node_key_filter = prefix_to_keys[search_prefix]
1095
if node.__class__ is tuple:
1143
if node.__class__ is StaticTuple:
1096
1144
keys[node] = (search_prefix, node_key_filter)
1098
1146
yield node, node_key_filter
1107
1155
if sub_prefix in length_filter:
1108
1156
node_key_filter.extend(prefix_to_keys[sub_prefix])
1109
1157
if node_key_filter: # this key matched something, yield it
1110
if node.__class__ is tuple:
1158
if node.__class__ is StaticTuple:
1111
1159
keys[node] = (prefix, node_key_filter)
1113
1161
yield node, node_key_filter
1147
1195
prefix, node_key_filter = keys[record.key]
1148
1196
node_and_filters.append((node, node_key_filter))
1149
1197
self._items[prefix] = node
1150
_page_cache.add(record.key, bytes)
1198
_get_cache()[record.key] = bytes
1151
1199
for info in node_and_filters:
1262
1310
lines.append('%s\n' % (self._search_prefix,))
1263
1311
prefix_len = len(self._search_prefix)
1264
1312
for prefix, node in sorted(self._items.items()):
1265
if type(node) is tuple:
1313
if type(node) is StaticTuple:
1268
1316
key = node._key[0]
1272
1320
% (serialised, self._search_prefix))
1273
1321
lines.append(serialised[prefix_len:])
1274
1322
sha1, _, _ = store.add_lines((None,), (), lines)
1275
self._key = ("sha1:" + sha1,)
1276
_page_cache.add(self._key, ''.join(lines))
1323
self._key = StaticTuple("sha1:" + sha1,).intern()
1324
_get_cache()[self._key] = ''.join(lines)
1277
1325
yield self._key
1279
1327
def _search_key(self, key):
1323
1371
return self._search_prefix
1325
1373
def unmap(self, store, key, check_remap=True):
1326
"""Remove key from this node and it's children."""
1374
"""Remove key from this node and its children."""
1327
1375
if not len(self._items):
1328
1376
raise AssertionError("can't unmap in an empty InternalNode.")
1329
1377
children = [node for node, _
1428
1476
def __init__(self, store, new_root_keys, old_root_keys,
1429
1477
search_key_func, pb=None):
1478
# TODO: Should we add a StaticTuple barrier here? It would be nice to
1479
# force callers to use StaticTuple, because there will often be
1480
# lots of keys passed in here. And even if we cast it locally,
1481
# that just meanst that we will have *both* a StaticTuple and a
1482
# tuple() in memory, referring to the same object. (so a net
1483
# increase in memory, not a decrease.)
1430
1484
self._store = store
1431
1485
self._new_root_keys = new_root_keys
1432
1486
self._old_root_keys = old_root_keys
1434
1488
# All uninteresting chks that we have seen. By the time they are added
1435
1489
# here, they should be either fully ignored, or queued up for
1491
# TODO: This might grow to a large size if there are lots of merge
1492
# parents, etc. However, it probably doesn't scale to O(history)
1493
# like _processed_new_refs does.
1437
1494
self._all_old_chks = set(self._old_root_keys)
1438
1495
# All items that we have seen from the old_root_keys
1439
1496
self._all_old_items = set()
1440
1497
# These are interesting items which were either read, or already in the
1441
1498
# interesting queue (so we don't need to walk them again)
1499
# TODO: processed_new_refs becomes O(all_chks), consider switching to
1442
1501
self._processed_new_refs = set()
1443
1502
self._search_key_func = search_key_func
1451
1510
self._state = None
1453
1512
def _read_nodes_from_store(self, keys):
1454
# We chose not to use _page_cache, because we think in terms of records
1455
# to be yielded. Also, we expect to touch each page only 1 time during
1456
# this code. (We may want to evaluate saving the raw bytes into the
1457
# page cache, which would allow a working tree update after the fetch
1458
# to not have to read the bytes again.)
1513
# We chose not to use _get_cache(), because we think in
1514
# terms of records to be yielded. Also, we expect to touch each page
1515
# only 1 time during this code. (We may want to evaluate saving the
1516
# raw bytes into the page cache, which would allow a working tree
1517
# update after the fetch to not have to read the bytes again.)
1518
as_st = StaticTuple.from_sequence
1459
1519
stream = self._store.get_record_stream(keys, 'unordered', True)
1460
1520
for record in stream:
1461
1521
if self._pb is not None:
1468
1528
if type(node) is InternalNode:
1469
1529
# Note we don't have to do node.refs() because we know that
1470
1530
# there are no children that have been pushed into this node
1531
# Note: Using as_st() here seemed to save 1.2MB, which would
1532
# indicate that we keep 100k prefix_refs around while
1533
# processing. They *should* be shorter lived than that...
1534
# It does cost us ~10s of processing time
1535
#prefix_refs = [as_st(item) for item in node._items.iteritems()]
1471
1536
prefix_refs = node._items.items()
1474
1539
prefix_refs = []
1540
# Note: We don't use a StaticTuple here. Profiling showed a
1541
# minor memory improvement (0.8MB out of 335MB peak 0.2%)
1542
# But a significant slowdown (15s / 145s, or 10%)
1475
1543
items = node._items.items()
1476
1544
yield record, node, prefix_refs, items
1485
1553
if p_r[1] not in all_old_chks]
1486
1554
new_refs = [p_r[1] for p_r in prefix_refs]
1487
1555
all_old_chks.update(new_refs)
1556
# TODO: This might be a good time to turn items into StaticTuple
1557
# instances and possibly intern them. However, this does not
1558
# impact 'initial branch' performance, so I'm not worrying
1488
1560
self._all_old_items.update(items)
1489
1561
# Queue up the uninteresting references
1490
1562
# Don't actually put them in the 'to-read' queue until we have
1543
1615
# current design allows for this, as callers will do the work
1544
1616
# to make the results unique. We might profile whether we
1545
1617
# gain anything by ensuring unique return values for items
1618
# TODO: This might be a good time to cast to StaticTuple, as
1619
# self._new_item_queue will hold the contents of multiple
1620
# records for an extended lifetime
1546
1621
new_items = [item for item in items
1547
1622
if item not in self._all_old_items]
1548
1623
self._new_item_queue.extend(new_items)
1574
1649
yield None, new_items
1575
1650
refs = refs.difference(all_old_chks)
1651
processed_new_refs.update(refs)
1653
# TODO: Using a SimpleSet for self._processed_new_refs and
1654
# saved as much as 10MB of peak memory. However, it requires
1655
# implementing a non-pyrex version.
1577
1656
next_refs = set()
1578
1657
next_refs_update = next_refs.update
1579
1658
# Inlining _read_nodes_from_store improves 'bzr branch bzr.dev'
1580
1659
# from 1m54s to 1m51s. Consider it.
1581
1660
for record, _, p_refs, items in self._read_nodes_from_store(refs):
1582
items = [item for item in items
1583
if item not in all_old_items]
1662
# using the 'if' check saves about 145s => 141s, when
1663
# streaming initial branch of Launchpad data.
1664
items = [item for item in items
1665
if item not in all_old_items]
1584
1666
yield record, items
1585
1667
next_refs_update([p_r[1] for p_r in p_refs])
1669
# set1.difference(set/dict) walks all of set1, and checks if it
1670
# exists in 'other'.
1671
# set1.difference(iterable) walks all of iterable, and does a
1672
# 'difference_update' on a clone of set1. Pick wisely based on the
1673
# expected sizes of objects.
1674
# in our case it is expected that 'new_refs' will always be quite
1586
1676
next_refs = next_refs.difference(all_old_chks)
1587
1677
next_refs = next_refs.difference(processed_new_refs)
1588
1678
processed_new_refs.update(next_refs)
1595
1685
self._old_queue = []
1596
1686
all_old_chks = self._all_old_chks
1597
1687
for record, _, prefix_refs, items in self._read_nodes_from_store(refs):
1688
# TODO: Use StaticTuple here?
1598
1689
self._all_old_items.update(items)
1599
1690
refs = [r for _,r in prefix_refs if r not in all_old_chks]
1600
1691
self._old_queue.extend(refs)
1637
1728
from bzrlib._chk_map_pyx import (
1638
1730
_search_key_16,
1639
1731
_search_key_255,
1640
1732
_deserialise_leaf_node,
1641
1733
_deserialise_internal_node,
1735
except ImportError, e:
1736
osutils.failed_to_load_extension(e)
1644
1737
from bzrlib._chk_map_py import (
1645
1739
_search_key_16,
1646
1740
_search_key_255,
1647
1741
_deserialise_leaf_node,
1650
1744
search_key_registry.register('hash-16-way', _search_key_16)
1651
1745
search_key_registry.register('hash-255-way', _search_key_255)
1748
def _check_key(key):
1749
"""Helper function to assert that a key is properly formatted.
1751
This generally shouldn't be used in production code, but it can be helpful
1754
if type(key) is not StaticTuple:
1755
raise TypeError('key %r is not StaticTuple but %s' % (key, type(key)))
1757
raise ValueError('key %r should have length 1, not %d' % (key, len(key),))
1758
if type(key[0]) is not str:
1759
raise TypeError('key %r should hold a str, not %r'
1760
% (key, type(key[0])))
1761
if not key[0].startswith('sha1:'):
1762
raise ValueError('key %r should point to a sha1:' % (key,))