~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/btree_index.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2008-09-03 22:30:56 UTC
  • mfrom: (3644.2.13 index_builder_cleanup)
  • Revision ID: pqm@pqm.ubuntu.com-20080903223056-b108iytb38xkznci
(jam) Streamline BTreeBuilder.add_node et al to make btree creation
        faster.

Show diffs side-by-side

added added

removed removed

Lines of Context:
37
37
    trace,
38
38
    )
39
39
from bzrlib.index import _OPTION_NODE_REFS, _OPTION_KEY_ELEMENTS, _OPTION_LEN
40
 
from bzrlib.osutils import basename, dirname
41
40
from bzrlib.transport import get_transport
42
41
 
43
42
 
78
77
            del byte_lines[-1]
79
78
            skipped_bytes = padding
80
79
        self.spool.writelines(byte_lines)
81
 
        if (self.spool.tell() + skipped_bytes) % _PAGE_SIZE != 0:
82
 
            raise AssertionError("incorrect node length")
 
80
        remainder = (self.spool.tell() + skipped_bytes) % _PAGE_SIZE
 
81
        if remainder != 0:
 
82
            raise AssertionError("incorrect node length: %d, %d"
 
83
                                 % (self.spool.tell(), remainder))
83
84
        self.nodes += 1
84
85
        self.writer = None
85
86
 
135
136
            key_elements=key_elements)
136
137
        self._spill_at = spill_at
137
138
        self._backing_indices = []
 
139
        # A map of {key: (node_refs, value)}
 
140
        self._nodes = {}
 
141
        # Indicate it hasn't been built yet
 
142
        self._nodes_by_key = None
138
143
 
139
144
    def add_node(self, key, value, references=()):
140
145
        """Add a node to the index.
150
155
        :param value: The value to associate with the key. It may be any
151
156
            bytes as long as it does not contain \0 or \n.
152
157
        """
153
 
        index.GraphIndexBuilder.add_node(self, key, value, references=references)
 
158
        # we don't care about absent_references
 
159
        node_refs, _ = self._check_key_ref_value(key, references, value)
 
160
        if key in self._nodes:
 
161
            raise errors.BadIndexDuplicateKey(key, self)
 
162
        self._nodes[key] = (node_refs, value)
 
163
        self._keys.add(key)
 
164
        if self._nodes_by_key is not None and self._key_length > 1:
 
165
            self._update_nodes_by_key(key, value, node_refs)
154
166
        if len(self._keys) < self._spill_at:
155
167
            return
156
 
        iterators_to_combine = [iter(sorted(self._iter_mem_nodes()))]
 
168
        self._spill_mem_keys_to_disk()
 
169
 
 
170
    def _spill_mem_keys_to_disk(self):
 
171
        """Write the in memory keys down to disk to cap memory consumption.
 
172
 
 
173
        If we already have some keys written to disk, we will combine them so
 
174
        as to preserve the sorted order.  The algorithm for combining uses
 
175
        powers of two.  So on the first spill, write all mem nodes into a
 
176
        single index. On the second spill, combine the mem nodes with the nodes
 
177
        on disk to create a 2x sized disk index and get rid of the first index.
 
178
        On the third spill, create a single new disk index, which will contain
 
179
        the mem nodes, and preserve the existing 2x sized index.  On the fourth,
 
180
        combine mem with the first and second indexes, creating a new one of
 
181
        size 4x. On the fifth create a single new one, etc.
 
182
        """
 
183
        iterators_to_combine = [self._iter_mem_nodes()]
157
184
        pos = -1
158
185
        for pos, backing in enumerate(self._backing_indices):
159
186
            if backing is None:
163
190
        backing_pos = pos + 1
164
191
        new_backing_file, size = \
165
192
            self._write_nodes(self._iter_smallest(iterators_to_combine))
166
 
        new_backing = BTreeGraphIndex(
167
 
            get_transport(dirname(new_backing_file.name)),
168
 
            basename(new_backing_file.name), size)
 
193
        dir_path, base_name = osutils.split(new_backing_file.name)
 
194
        # Note: The transport here isn't strictly needed, because we will use
 
195
        #       direct access to the new_backing._file object
 
196
        new_backing = BTreeGraphIndex(get_transport(dir_path),
 
197
                                      base_name, size)
169
198
        # GC will clean up the file
170
199
        new_backing._file = new_backing_file
171
200
        if len(self._backing_indices) == backing_pos:
175
204
            self._backing_indices[pos] = None
176
205
        self._keys = set()
177
206
        self._nodes = {}
178
 
        self._nodes_by_key = {}
 
207
        self._nodes_by_key = None
179
208
 
180
209
    def add_nodes(self, nodes):
181
210
        """Add nodes to the index.
191
220
 
192
221
    def _iter_mem_nodes(self):
193
222
        """Iterate over the nodes held in memory."""
 
223
        nodes = self._nodes
194
224
        if self.reference_lists:
195
 
            for key, (absent, references, value) in self._nodes.iteritems():
196
 
                if not absent:
197
 
                    yield self, key, value, references
 
225
            for key in sorted(nodes):
 
226
                references, value = nodes[key]
 
227
                yield self, key, value, references
198
228
        else:
199
 
            for key, (absent, references, value) in self._nodes.iteritems():
200
 
                if not absent:
201
 
                    yield self, key, value
 
229
            for key in sorted(nodes):
 
230
                references, value = nodes[key]
 
231
                yield self, key, value
202
232
 
203
233
    def _iter_smallest(self, iterators_to_combine):
204
234
        if len(iterators_to_combine) == 1:
358
388
            copied_len = osutils.pumpfile(row.spool, result)
359
389
            if copied_len != (row.nodes - 1) * _PAGE_SIZE:
360
390
                if type(row) != _LeafBuilderRow:
361
 
                    raise AssertionError("Not enough data copied")
 
391
                    raise AssertionError("Incorrect amount of data copied"
 
392
                        " expected: %d, got: %d"
 
393
                        % ((row.nodes - 1) * _PAGE_SIZE,
 
394
                           copied_len))
362
395
        result.flush()
363
396
        size = result.tell()
364
397
        result.seek(0)
384
417
                "iter_all_entries scales with size of history.")
385
418
        # Doing serial rather than ordered would be faster; but this shouldn't
386
419
        # be getting called routinely anyway.
387
 
        iterators = [iter(sorted(self._iter_mem_nodes()))]
 
420
        iterators = [self._iter_mem_nodes()]
388
421
        for backing in self._backing_indices:
389
422
            if backing is not None:
390
423
                iterators.append(backing.iter_all_entries())
404
437
        if self.reference_lists:
405
438
            for key in keys.intersection(self._keys):
406
439
                node = self._nodes[key]
407
 
                if not node[0]:
408
 
                    yield self, key, node[2], node[1]
 
440
                yield self, key, node[1], node[0]
409
441
        else:
410
442
            for key in keys.intersection(self._keys):
411
443
                node = self._nodes[key]
412
 
                if not node[0]:
413
 
                    yield self, key, node[2]
 
444
                yield self, key, node[1]
414
445
        keys.difference_update(self._keys)
415
446
        for backing in self._backing_indices:
416
447
            if backing is None:
459
490
                    node = self._nodes[key]
460
491
                except KeyError:
461
492
                    continue
462
 
                if node[0]:
463
 
                    continue
464
493
                if self.reference_lists:
465
 
                    yield self, key, node[2], node[1]
 
494
                    yield self, key, node[1], node[0]
466
495
                else:
467
 
                    yield self, key, node[2]
 
496
                    yield self, key, node[1]
468
497
            return
469
498
        for key in keys:
470
499
            # sanity check
473
502
            if len(key) != self._key_length:
474
503
                raise errors.BadIndexKey(key)
475
504
            # find what it refers to:
476
 
            key_dict = self._nodes_by_key
 
505
            key_dict = self._get_nodes_by_key()
477
506
            elements = list(key)
478
507
            # find the subdict to return
479
508
            try:
499
528
            else:
500
529
                yield (self, ) + key_dict
501
530
 
 
531
    def _get_nodes_by_key(self):
 
532
        if self._nodes_by_key is None:
 
533
            nodes_by_key = {}
 
534
            if self.reference_lists:
 
535
                for key, (references, value) in self._nodes.iteritems():
 
536
                    key_dict = nodes_by_key
 
537
                    for subkey in key[:-1]:
 
538
                        key_dict = key_dict.setdefault(subkey, {})
 
539
                    key_dict[key[-1]] = key, value, references
 
540
            else:
 
541
                for key, (references, value) in self._nodes.iteritems():
 
542
                    key_dict = nodes_by_key
 
543
                    for subkey in key[:-1]:
 
544
                        key_dict = key_dict.setdefault(subkey, {})
 
545
                    key_dict[key[-1]] = key, value
 
546
            self._nodes_by_key = nodes_by_key
 
547
        return self._nodes_by_key
 
548
 
502
549
    def key_count(self):
503
550
        """Return an estimate of the number of keys in this index.
504
551