~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to repofmt.py

Merge in the latest updates to the gc trunk.

Show diffs side-by-side

added added

removed removed

Lines of Context:
24
24
    debug,
25
25
    errors,
26
26
    knit,
 
27
    inventory,
27
28
    pack,
28
29
    repository,
29
30
    ui,
58
59
    CHKInventoryRepository,
59
60
    RepositoryFormatPackDevelopment5,
60
61
    RepositoryFormatPackDevelopment5Hash16,
 
62
##    RepositoryFormatPackDevelopment5Hash16b,
 
63
##    RepositoryFormatPackDevelopment5Hash63,
 
64
##    RepositoryFormatPackDevelopment5Hash127a,
 
65
##    RepositoryFormatPackDevelopment5Hash127b,
61
66
    RepositoryFormatPackDevelopment5Hash255,
62
67
    )
 
68
    from bzrlib import chk_map
63
69
    chk_support = True
64
70
except ImportError:
65
71
    chk_support = False
236
242
        self.repo.signatures._index._add_callback = self.signature_index.add_callback
237
243
        self.repo.texts._index._add_callback = self.text_index.add_callback
238
244
 
 
245
    def _get_filtered_inv_stream(self, source_vf, keys):
 
246
        """Filter the texts of inventories, to find the chk pages."""
 
247
        id_roots = []
 
248
        p_id_roots = []
 
249
        id_roots_set = set()
 
250
        p_id_roots_set = set()
 
251
        def _filter_inv_stream(stream):
 
252
            for idx, record in enumerate(stream):
 
253
                ### child_pb.update('fetch inv', idx, len(inv_keys_to_fetch))
 
254
                bytes = record.get_bytes_as('fulltext')
 
255
                chk_inv = inventory.CHKInventory.deserialise(None, bytes, record.key)
 
256
                key = chk_inv.id_to_entry.key()
 
257
                if key not in id_roots_set:
 
258
                    id_roots.append(key)
 
259
                    id_roots_set.add(key)
 
260
                p_id_map = chk_inv.parent_id_basename_to_file_id
 
261
                if p_id_map is not None:
 
262
                    key = p_id_map.key()
 
263
                    if key not in p_id_roots_set:
 
264
                        p_id_roots_set.add(key)
 
265
                        p_id_roots.append(key)
 
266
                yield record
 
267
        stream = source_vf.get_record_stream(keys, 'gc-optimal', True)
 
268
        return _filter_inv_stream(stream), id_roots, p_id_roots
 
269
 
 
270
    def _get_chk_stream(self, source_vf, keys, id_roots, p_id_roots, pb=None):
 
271
        # We want to stream the keys from 'id_roots', and things they
 
272
        # reference, and then stream things from p_id_roots and things they
 
273
        # reference, and then any remaining keys that we didn't get to.
 
274
 
 
275
        # We also group referenced texts together, so if one root references a
 
276
        # text with prefix 'a', and another root references a node with prefix
 
277
        # 'a', we want to yield those nodes before we yield the nodes for 'b'
 
278
        # This keeps 'similar' nodes together
 
279
 
 
280
        # Note: We probably actually want multiple streams here, to help the
 
281
        #       client understand that the different levels won't compress well
 
282
        #       against eachother
 
283
        #       Test the difference between using one Group per level, and
 
284
        #       using 1 Group per prefix. (so '' (root) would get a group, then
 
285
        #       all the references to search-key 'a' would get a group, etc.)
 
286
        remaining_keys = set(keys)
 
287
        counter = [0]
 
288
        def _get_referenced_stream(root_keys):
 
289
            cur_keys = root_keys
 
290
            while cur_keys:
 
291
                keys_by_search_prefix = {}
 
292
                remaining_keys.difference_update(cur_keys)
 
293
                next_keys = set()
 
294
                stream = source_vf.get_record_stream(cur_keys, 'as-requested',
 
295
                                                     True)
 
296
                def next_stream():
 
297
                    for record in stream:
 
298
                        bytes = record.get_bytes_as('fulltext')
 
299
                        # We don't care about search_key_func for this code,
 
300
                        # because we only care about external references.
 
301
                        node = chk_map._deserialise(bytes, record.key,
 
302
                                                    search_key_func=None)
 
303
                        common_base = node._search_prefix
 
304
                        if isinstance(node, chk_map.InternalNode):
 
305
                            for prefix, value in node._items.iteritems():
 
306
                                assert isinstance(value, tuple)
 
307
                                if value not in next_keys:
 
308
                                    keys_by_search_prefix.setdefault(prefix,
 
309
                                        []).append(value)
 
310
                                    next_keys.add(value)
 
311
                        counter[0] += 1
 
312
                        if pb is not None:
 
313
                            pb.update('chk node', counter[0])
 
314
                        yield record
 
315
                yield next_stream()
 
316
                # Double check that we won't be emitting any keys twice
 
317
                next_keys = next_keys.intersection(remaining_keys)
 
318
                cur_keys = []
 
319
                for prefix in sorted(keys_by_search_prefix):
 
320
                    cur_keys.extend(keys_by_search_prefix[prefix])
 
321
        for stream in _get_referenced_stream(id_roots):
 
322
            yield stream
 
323
        for stream in _get_referenced_stream(p_id_roots):
 
324
            yield stream
 
325
        if remaining_keys:
 
326
            trace.note('There were %d keys in the chk index, which'
 
327
                       ' were not referenced from inventories',
 
328
                       len(remaining_keys))
 
329
            stream = source_vf.get_record_stream(remaining_keys, 'unordered',
 
330
                                                 True)
 
331
            yield stream
 
332
 
239
333
    def _execute_pack_operations(self, pack_operations, _packer_class=Packer,
240
334
                                 reload_func=None):
241
335
        """Execute a series of pack operations.
267
361
                       ('text_index', 'texts'),
268
362
                       ('signature_index', 'signatures'),
269
363
                      ]
 
364
            # TODO: This is a very non-optimal ordering for chk_bytes. The
 
365
            #       issue is that pages that are similar are not transmitted
 
366
            #       together. Perhaps get_record_stream('gc-optimal') should be
 
367
            #       taught about how to group chk pages?
 
368
            has_chk = False
270
369
            if getattr(self, 'chk_index', None) is not None:
 
370
                has_chk = True
271
371
                to_copy.insert(2, ('chk_index', 'chk_bytes'))
272
372
 
273
373
            # Shouldn't we start_write_group around this?
302
402
                                      is_locked=self.repo.is_locked),
303
403
                        access=target_access,
304
404
                        delta=source_vf._delta)
305
 
                    stream = source_vf.get_record_stream(keys, 'gc-optimal', True)
306
 
                    target_vf.insert_record_stream(stream)
 
405
                    stream = None
 
406
                    child_pb = ui.ui_factory.nested_progress_bar()
 
407
                    try:
 
408
                        if has_chk:
 
409
                            if vf_name == 'inventories':
 
410
                                stream, id_roots, p_id_roots = self._get_filtered_inv_stream(
 
411
                                    source_vf, keys)
 
412
                            elif vf_name == 'chk_bytes':
 
413
                                for stream in self._get_chk_stream(source_vf, keys,
 
414
                                                    id_roots, p_id_roots,
 
415
                                                    pb=child_pb):
 
416
                                    target_vf.insert_record_stream(stream)
 
417
                                # No more to copy
 
418
                                stream = []
 
419
                        if stream is None:
 
420
                            def pb_stream():
 
421
                                substream = source_vf.get_record_stream(keys, 'gc-optimal', True)
 
422
                                for idx, record in enumerate(substream):
 
423
                                    child_pb.update(vf_name, idx, len(keys))
 
424
                                    yield record
 
425
                            stream = pb_stream()
 
426
                        target_vf.insert_record_stream(stream)
 
427
                    finally:
 
428
                        child_pb.finished()
307
429
                new_pack._check_references() # shouldn't be needed
308
430
            except:
309
431
                pb.finished()