~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to repofmt.py

Merge trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
20
20
import md5
21
21
import time
22
22
 
23
 
from bzrlib import debug, errors, pack, repository
 
23
from bzrlib import (
 
24
    debug,
 
25
    errors,
 
26
    knit,
 
27
    inventory,
 
28
    pack,
 
29
    repository,
 
30
    ui,
 
31
    )
24
32
from bzrlib.btree_index import (
25
33
    BTreeBuilder,
26
34
    BTreeGraphIndex,
49
57
try:
50
58
    from bzrlib.repofmt.pack_repo import (
51
59
    CHKInventoryRepository,
52
 
    RepositoryFormatPackDevelopment4,
53
 
    RepositoryFormatPackDevelopment4Subtree,
 
60
    RepositoryFormatPackDevelopment5,
 
61
    RepositoryFormatPackDevelopment5Hash16,
 
62
##    RepositoryFormatPackDevelopment5Hash16b,
 
63
##    RepositoryFormatPackDevelopment5Hash63,
 
64
##    RepositoryFormatPackDevelopment5Hash127a,
 
65
##    RepositoryFormatPackDevelopment5Hash127b,
 
66
    RepositoryFormatPackDevelopment5Hash255,
54
67
    )
 
68
    from bzrlib import chk_map
55
69
    chk_support = True
56
70
except ImportError:
57
71
    chk_support = False
58
 
from bzrlib import ui
 
72
from bzrlib.trace import mutter, note
59
73
 
60
74
 
61
75
def open_pack(self):
72
86
    def __init__(self, pack_collection, upload_suffix='', file_mode=None):
73
87
        """Create a NewPack instance.
74
88
 
75
 
        :param upload_transport: A writable transport for the pack to be
76
 
            incrementally uploaded to.
77
 
        :param index_transport: A writable transport for the pack's indices to
78
 
            be written to when the pack is finished.
79
 
        :param pack_transport: A writable transport for the pack to be renamed
80
 
            to when the upload is complete. This *must* be the same as
81
 
            upload_transport.clone('../packs').
 
89
        :param pack_collection: A PackCollection into which this is being
 
90
            inserted.
82
91
        :param upload_suffix: An optional suffix to be given to any temporary
83
92
            files created during the pack creation. e.g '.autopack'
84
93
        :param file_mode: An optional file mode to create the new files with.
228
237
        self.repo.signatures._index._add_callback = self.signature_index.add_callback
229
238
        self.repo.texts._index._add_callback = self.text_index.add_callback
230
239
 
 
240
    def _get_filtered_inv_stream(self, source_vf, keys):
 
241
        """Filter the texts of inventories, to find the chk pages."""
 
242
        id_roots = []
 
243
        p_id_roots = []
 
244
        id_roots_set = set()
 
245
        p_id_roots_set = set()
 
246
        def _filter_inv_stream(stream):
 
247
            for idx, record in enumerate(stream):
 
248
                ### child_pb.update('fetch inv', idx, len(inv_keys_to_fetch))
 
249
                bytes = record.get_bytes_as('fulltext')
 
250
                chk_inv = inventory.CHKInventory.deserialise(None, bytes, record.key)
 
251
                key = chk_inv.id_to_entry.key()
 
252
                if key not in id_roots_set:
 
253
                    id_roots.append(key)
 
254
                    id_roots_set.add(key)
 
255
                p_id_map = chk_inv.parent_id_basename_to_file_id
 
256
                if p_id_map is not None:
 
257
                    key = p_id_map.key()
 
258
                    if key not in p_id_roots_set:
 
259
                        p_id_roots_set.add(key)
 
260
                        p_id_roots.append(key)
 
261
                yield record
 
262
        stream = source_vf.get_record_stream(keys, 'gc-optimal', True)
 
263
        return _filter_inv_stream(stream), id_roots, p_id_roots
 
264
 
 
265
    def _get_chk_stream(self, source_vf, keys, id_roots, p_id_roots, pb=None):
 
266
        # We want to stream the keys from 'id_roots', and things they
 
267
        # reference, and then stream things from p_id_roots and things they
 
268
        # reference, and then any remaining keys that we didn't get to.
 
269
 
 
270
        # We also group referenced texts together, so if one root references a
 
271
        # text with prefix 'a', and another root references a node with prefix
 
272
        # 'a', we want to yield those nodes before we yield the nodes for 'b'
 
273
        # This keeps 'similar' nodes together.
 
274
 
 
275
        # Note: We probably actually want multiple streams here, to help the
 
276
        #       client understand that the different levels won't compress well
 
277
        #       against each other.
 
278
        #       Test the difference between using one Group per level, and
 
279
        #       using 1 Group per prefix. (so '' (root) would get a group, then
 
280
        #       all the references to search-key 'a' would get a group, etc.)
 
281
        remaining_keys = set(keys)
 
282
        counter = [0]
 
283
        def _get_referenced_stream(root_keys):
 
284
            cur_keys = root_keys
 
285
            while cur_keys:
 
286
                keys_by_search_prefix = {}
 
287
                remaining_keys.difference_update(cur_keys)
 
288
                next_keys = set()
 
289
                stream = source_vf.get_record_stream(cur_keys, 'as-requested',
 
290
                                                     True)
 
291
                def next_stream():
 
292
                    for record in stream:
 
293
                        bytes = record.get_bytes_as('fulltext')
 
294
                        # We don't care about search_key_func for this code,
 
295
                        # because we only care about external references.
 
296
                        node = chk_map._deserialise(bytes, record.key,
 
297
                                                    search_key_func=None)
 
298
                        common_base = node._search_prefix
 
299
                        if isinstance(node, chk_map.InternalNode):
 
300
                            for prefix, value in node._items.iteritems():
 
301
                                if not isinstance(value, tuple):
 
302
                                    raise AssertionError("value is %s when"
 
303
                                        " tuple expected" % (value.__class__))
 
304
                                if value not in next_keys:
 
305
                                    keys_by_search_prefix.setdefault(prefix,
 
306
                                        []).append(value)
 
307
                                    next_keys.add(value)
 
308
                        counter[0] += 1
 
309
                        if pb is not None:
 
310
                            pb.update('chk node', counter[0])
 
311
                        yield record
 
312
                yield next_stream()
 
313
                # Double check that we won't be emitting any keys twice
 
314
                next_keys = next_keys.intersection(remaining_keys)
 
315
                cur_keys = []
 
316
                for prefix in sorted(keys_by_search_prefix):
 
317
                    cur_keys.extend(keys_by_search_prefix[prefix])
 
318
        for stream in _get_referenced_stream(id_roots):
 
319
            yield stream
 
320
        for stream in _get_referenced_stream(p_id_roots):
 
321
            yield stream
 
322
        if remaining_keys:
 
323
            note('There were %d keys in the chk index, which were not'
 
324
                ' referenced from inventories', len(remaining_keys))
 
325
            stream = source_vf.get_record_stream(remaining_keys, 'unordered',
 
326
                                                 True)
 
327
            yield stream
 
328
 
 
329
    def _execute_pack_operations(self, pack_operations, _packer_class=Packer,
 
330
                                 reload_func=None):
 
331
        """Execute a series of pack operations.
 
332
 
 
333
        :param pack_operations: A list of [revision_count, packs_to_combine].
 
334
        :param _packer_class: The class of packer to use (default: Packer).
 
335
        :return: None.
 
336
        """
 
337
        for revision_count, packs in pack_operations:
 
338
            # we may have no-ops from the setup logic
 
339
            if len(packs) == 0:
 
340
                continue
 
341
            # Create a new temp VersionedFile instance based on these packs,
 
342
            # and then just fetch everything into the target
 
343
 
 
344
            to_copy = [('revision_index', 'revisions'),
 
345
                       ('inventory_index', 'inventories'),
 
346
                       ('text_index', 'texts'),
 
347
                       ('signature_index', 'signatures'),
 
348
                      ]
 
349
            # TODO: This is a very non-optimal ordering for chk_bytes. The
 
350
            #       issue is that pages that are similar are not transmitted
 
351
            #       together. Perhaps get_record_stream('gc-optimal') should be
 
352
            #       taught about how to group chk pages?
 
353
            has_chk = False
 
354
            if getattr(self, 'chk_index', None) is not None:
 
355
                has_chk = True
 
356
                to_copy.insert(2, ('chk_index', 'chk_bytes'))
 
357
 
 
358
            # Shouldn't we start_write_group around this?
 
359
            if self._new_pack is not None:
 
360
                raise errors.BzrError('call to %s.pack() while another pack is'
 
361
                                      ' being written.'
 
362
                                      % (self.__class__.__name__,))
 
363
            new_pack = self.pack_factory(self, '.autopack',
 
364
                file_mode=self.repo.bzrdir._get_file_mode())
 
365
            new_pack.set_write_cache_size(1024*1024)
 
366
            # TODO: A better alternative is to probably use Packer.open_pack(), and
 
367
            #       then create a GroupCompressVersionedFiles() around the
 
368
            #       target pack to insert into.
 
369
            pb = ui.ui_factory.nested_progress_bar()
 
370
            try:
 
371
                for idx, (index_name, vf_name) in enumerate(to_copy):
 
372
                    pb.update('repacking %s' % (vf_name,), idx + 1, len(to_copy))
 
373
                    keys = set()
 
374
                    new_index = getattr(new_pack, index_name)
 
375
                    new_index.set_optimize(for_size=True)
 
376
                    for pack in packs:
 
377
                        source_index = getattr(pack, index_name)
 
378
                        keys.update(e[1] for e in source_index.iter_all_entries())
 
379
                    source_vf = getattr(self.repo, vf_name)
 
380
                    target_access = knit._DirectPackAccess({})
 
381
                    target_access.set_writer(new_pack._writer, new_index,
 
382
                                             new_pack.access_tuple())
 
383
                    target_vf = GroupCompressVersionedFiles(
 
384
                        _GCGraphIndex(new_index,
 
385
                                      add_callback=new_index.add_nodes,
 
386
                                      parents=source_vf._index._parents,
 
387
                                      is_locked=self.repo.is_locked),
 
388
                        access=target_access,
 
389
                        delta=source_vf._delta)
 
390
                    stream = None
 
391
                    child_pb = ui.ui_factory.nested_progress_bar()
 
392
                    try:
 
393
                        if has_chk:
 
394
                            if vf_name == 'inventories':
 
395
                                stream, id_roots, p_id_roots = self._get_filtered_inv_stream(
 
396
                                    source_vf, keys)
 
397
                            elif vf_name == 'chk_bytes':
 
398
                                for stream in self._get_chk_stream(source_vf, keys,
 
399
                                                    id_roots, p_id_roots,
 
400
                                                    pb=child_pb):
 
401
                                    target_vf.insert_record_stream(stream)
 
402
                                # No more to copy
 
403
                                stream = []
 
404
                        if stream is None:
 
405
                            def pb_stream():
 
406
                                substream = source_vf.get_record_stream(keys, 'gc-optimal', True)
 
407
                                for idx, record in enumerate(substream):
 
408
                                    child_pb.update(vf_name, idx + 1, len(keys))
 
409
                                    yield record
 
410
                            stream = pb_stream()
 
411
                        target_vf.insert_record_stream(stream)
 
412
                    finally:
 
413
                        child_pb.finished()
 
414
                new_pack._check_references() # shouldn't be needed
 
415
            except:
 
416
                pb.finished()
 
417
                new_pack.abort()
 
418
                raise
 
419
            else:
 
420
                pb.finished()
 
421
                if not new_pack.data_inserted():
 
422
                    raise AssertionError('We copied from pack files,'
 
423
                                         ' but had no data copied')
 
424
                    # we need to abort somehow, because we don't want to remove
 
425
                    # the other packs
 
426
                new_pack.finish()
 
427
                self.allocate(new_pack)
 
428
            for pack in packs:
 
429
                self._remove_pack_from_memory(pack)
 
430
        # record the newly available packs and stop advertising the old
 
431
        # packs
 
432
        self._save_pack_names(clear_obsolete_packs=True)
 
433
        # Move the old packs out of the way now they are no longer referenced.
 
434
        for revision_count, packs in pack_operations:
 
435
            self._obsolete_packs(packs)
 
436
 
231
437
 
232
438
 
233
439
class GCPackRepository(KnitPackRepository):
234
440
    """GC customisation of KnitPackRepository."""
235
441
 
 
442
    # Note: I think the CHK support can be dropped from this class as it's
 
443
    # implemented via the GCCHKPackRepository class defined next. IGC 20090301
 
444
 
236
445
    def __init__(self, _format, a_bzrdir, control_files, _commit_builder_class,
237
446
        _serializer):
238
447
        """Overridden to change pack collection class."""
300
509
        self._reconcile_does_inventory_gc = True
301
510
        self._reconcile_fixes_text_parents = True
302
511
        self._reconcile_backsup_inventory = False
 
512
        # Note: We cannot unpack a delta that references a text we haven't seen yet.
 
513
        #       There are 2 options, work in fulltexts, or require topological
 
514
        #       sorting. Using fulltexts is more optimal for local operations,
 
515
        #       because the source can be smart about extracting multiple
 
516
        #       in-a-row (and sharing strings). Topological is better for
 
517
        #       remote, because we access less data.
 
518
        self._fetch_order = 'unordered'
 
519
        self._fetch_gc_optimal = True
 
520
        self._fetch_uses_deltas = False
303
521
 
304
522
 
305
523
if chk_support:
313
531
                _commit_builder_class, _serializer)
314
532
            # and now replace everything it did :)
315
533
            index_transport = self._transport.clone('indices')
316
 
            if chk_support:
317
 
                self._pack_collection = GCRepositoryPackCollection(self,
318
 
                    self._transport, index_transport,
319
 
                    self._transport.clone('upload'),
320
 
                    self._transport.clone('packs'),
321
 
                    _format.index_builder_class,
322
 
                    _format.index_class,
323
 
                    use_chk_index=self._format.supports_chks,
324
 
                    )
325
 
            else:
326
 
                self._pack_collection = GCRepositoryPackCollection(self,
327
 
                    self._transport, index_transport,
328
 
                    self._transport.clone('upload'),
329
 
                    self._transport.clone('packs'),
330
 
                    _format.index_builder_class,
331
 
                    _format.index_class)
 
534
            self._pack_collection = GCRepositoryPackCollection(self,
 
535
                self._transport, index_transport,
 
536
                self._transport.clone('upload'),
 
537
                self._transport.clone('packs'),
 
538
                _format.index_builder_class,
 
539
                _format.index_class,
 
540
                use_chk_index=self._format.supports_chks,
 
541
                )
332
542
            self.inventories = GroupCompressVersionedFiles(
333
543
                _GCGraphIndex(self._pack_collection.inventory_index.combined_index,
334
544
                    add_callback=self._pack_collection.inventory_index.add_callback,
351
561
                    add_callback=self._pack_collection.text_index.add_callback,
352
562
                    parents=True, is_locked=self.is_locked),
353
563
                access=self._pack_collection.text_index.data_access)
354
 
            if chk_support and _format.supports_chks:
355
 
                # No graph, no compression:- references from chks are between
356
 
                # different objects not temporal versions of the same; and without
357
 
                # some sort of temporal structure knit compression will just fail.
358
 
                self.chk_bytes = GroupCompressVersionedFiles(
359
 
                    _GCGraphIndex(self._pack_collection.chk_index.combined_index,
360
 
                        add_callback=self._pack_collection.chk_index.add_callback,
361
 
                        parents=False, is_locked=self.is_locked),
362
 
                    access=self._pack_collection.chk_index.data_access)
363
 
            else:
364
 
                self.chk_bytes = None
 
564
            assert _format.supports_chks
 
565
            # No parents, individual CHK pages don't have specific ancestry
 
566
            self.chk_bytes = GroupCompressVersionedFiles(
 
567
                _GCGraphIndex(self._pack_collection.chk_index.combined_index,
 
568
                    add_callback=self._pack_collection.chk_index.add_callback,
 
569
                    parents=False, is_locked=self.is_locked),
 
570
                access=self._pack_collection.chk_index.data_access)
365
571
            # True when the repository object is 'write locked' (as opposed to the
366
 
            # physical lock only taken out around changes to the pack-names list.) 
 
572
            # physical lock only taken out around changes to the pack-names list.)
367
573
            # Another way to represent this would be a decorator around the control
368
574
            # files object that presents logical locks as physical ones - if this
369
575
            # gets ugly consider that alternative design. RBC 20071011
373
579
            self._reconcile_does_inventory_gc = True
374
580
            self._reconcile_fixes_text_parents = True
375
581
            self._reconcile_backsup_inventory = False
 
582
            # Note: We cannot unpack a delta that references a text we haven't
 
583
            # seen yet. There are 2 options, work in fulltexts, or require
 
584
            # topological sorting. Using fulltexts is more optimal for local
 
585
            # operations, because the source can be smart about extracting
 
586
            # multiple in-a-row (and sharing strings). Topological is better
 
587
            # for remote, because we access less data.
 
588
            self._fetch_order = 'unordered'
 
589
            self._fetch_gc_optimal = True
 
590
            self._fetch_uses_deltas = False
376
591
 
377
592
 
378
593
class RepositoryFormatPackGCPlain(RepositoryFormatPackDevelopment2):
423
638
            ", interoperates with pack-0.92-subtrees\n")
424
639
 
425
640
if chk_support:
426
 
    'Bazaar development format - 1.9+gc (needs bzr.dev from 1.9)\n',
427
 
    class RepositoryFormatPackGCPlainCHK(RepositoryFormatPackDevelopment4):
 
641
    class RepositoryFormatPackGCPlainCHK(RepositoryFormatPackDevelopment5):
428
642
        """A CHK+group compress pack repository."""
429
643
 
430
644
        repository_class = GCCHKPackRepository
431
645
 
432
646
        def get_format_string(self):
433
647
            """See RepositoryFormat.get_format_string()."""
434
 
            return ('Bazaar development format - chk+gc '
435
 
                '(needs bzr.dev from 1.12)\n')
436
 
 
437
 
        def get_format_description(self):
438
 
            """See RepositoryFormat.get_format_description()."""
439
 
            return ("Development repository format - chk+groupcompress "
440
 
                ", interoperates with pack-0.92\n")
441
 
 
442
 
 
443
 
 
 
648
            return ('Bazaar development format - chk+gc'
 
649
                    ' (needs bzr.dev from 1.13)\n')
 
650
 
 
651
        def get_format_description(self):
 
652
            """See RepositoryFormat.get_format_description()."""
 
653
            return ("Development repository format - chk+groupcompress")
 
654
 
 
655
 
 
656
    class RepositoryFormatPackGCPlainCHK16(RepositoryFormatPackDevelopment5Hash16):
 
657
        """A hashed CHK+group compress pack repository."""
 
658
 
 
659
        repository_class = GCCHKPackRepository
 
660
 
 
661
        def get_format_string(self):
 
662
            """See RepositoryFormat.get_format_string()."""
 
663
            return ('Bazaar development format - hash16chk+gc'
 
664
                    ' (needs bzr.dev from 1.13)\n')
 
665
 
 
666
        def get_format_description(self):
 
667
            """See RepositoryFormat.get_format_description()."""
 
668
            return ("Development repository format - hash16chk+groupcompress")
 
669
 
 
670
 
 
671
##    class RepositoryFormatPackGCPlainCHK16b(RepositoryFormatPackDevelopment5Hash16b):
 
672
##        """A hashed CHK+group compress pack repository."""
 
673
##
 
674
##        repository_class = GCCHKPackRepository
 
675
##
 
676
##        def get_format_string(self):
 
677
##            """See RepositoryFormat.get_format_string()."""
 
678
##            return ('Bazaar development format - hash16bchk+gc'
 
679
##                    ' (needs bzr.dev from 1.13)\n')
 
680
##
 
681
##        def get_format_description(self):
 
682
##            """See RepositoryFormat.get_format_description()."""
 
683
##            return ("Development repository format - hash16bchk+groupcompress")
 
684
##
 
685
##
 
686
##    class RepositoryFormatPackGCPlainCHK63(RepositoryFormatPackDevelopment5Hash63):
 
687
##        """A hashed CHK+group compress pack repository."""
 
688
##
 
689
##        repository_class = GCCHKPackRepository
 
690
##
 
691
##        def get_format_string(self):
 
692
##            """See RepositoryFormat.get_format_string()."""
 
693
##            return ('Bazaar development format - hash63+gc'
 
694
##                    ' (needs bzr.dev from 1.13)\n')
 
695
##
 
696
##        def get_format_description(self):
 
697
##            """See RepositoryFormat.get_format_description()."""
 
698
##            return ("Development repository format - hash63+groupcompress")
 
699
##
 
700
##
 
701
##    class RepositoryFormatPackGCPlainCHK127a(RepositoryFormatPackDevelopment5Hash127a):
 
702
##        """A hashed CHK+group compress pack repository."""
 
703
##
 
704
##        repository_class = GCCHKPackRepository
 
705
##
 
706
##        def get_format_string(self):
 
707
##            """See RepositoryFormat.get_format_string()."""
 
708
##            return ('Bazaar development format - hash127a+gc'
 
709
##                    ' (needs bzr.dev from 1.13)\n')
 
710
##
 
711
##        def get_format_description(self):
 
712
##            """See RepositoryFormat.get_format_description()."""
 
713
##            return ("Development repository format - hash127a+groupcompress")
 
714
##
 
715
##
 
716
##    class RepositoryFormatPackGCPlainCHK127b(RepositoryFormatPackDevelopment5Hash127b):
 
717
##        """A hashed CHK+group compress pack repository."""
 
718
##
 
719
##        repository_class = GCCHKPackRepository
 
720
##
 
721
##        def get_format_string(self):
 
722
##            """See RepositoryFormat.get_format_string()."""
 
723
##            return ('Bazaar development format - hash127b+gc'
 
724
##                    ' (needs bzr.dev from 1.13)\n')
 
725
##
 
726
##        def get_format_description(self):
 
727
##            """See RepositoryFormat.get_format_description()."""
 
728
##            return ("Development repository format - hash127b+groupcompress")
 
729
 
 
730
 
 
731
    class RepositoryFormatPackGCPlainCHK255(RepositoryFormatPackDevelopment5Hash255):
 
732
        """A hashed CHK+group compress pack repository."""
 
733
 
 
734
        repository_class = GCCHKPackRepository
 
735
 
 
736
        def get_format_string(self):
 
737
            """See RepositoryFormat.get_format_string()."""
 
738
            return ('Bazaar development format - hash255chk+gc'
 
739
                    ' (needs bzr.dev from 1.13)\n')
 
740
 
 
741
        def get_format_description(self):
 
742
            """See RepositoryFormat.get_format_description()."""
 
743
            return ("Development repository format - hash255chk+groupcompress")
444
744
 
445
745
 
446
746
def pack_incompatible(source, target, orig_method=InterPackRepo.is_compatible):
448
748
    formats = (RepositoryFormatPackGCPlain, RepositoryFormatPackGCRichRoot,
449
749
        RepositoryFormatPackGCSubtrees)
450
750
    if chk_support:
451
 
        formats = formats + (RepositoryFormatPackGCPlain,)
 
751
        formats = formats + (RepositoryFormatPackGCPlainCHK,
 
752
                             RepositoryFormatPackGCPlainCHK16,
 
753
                             ## RepositoryFormatPackGCPlainCHK16b,
 
754
                             ## RepositoryFormatPackGCPlainCHK63,
 
755
                             ## RepositoryFormatPackGCPlainCHK127a,
 
756
                             ## RepositoryFormatPackGCPlainCHK127b,
 
757
                             RepositoryFormatPackGCPlainCHK255)
452
758
    if isinstance(source._format, formats) or isinstance(target._format, formats):
453
759
        return False
454
760
    else: