~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/repofmt/groupcompress_repo.py

  • Committer: Jelmer Vernooij
  • Date: 2011-04-05 01:12:15 UTC
  • mto: This revision was merged to the branch mainline in revision 5757.
  • Revision ID: jelmer@samba.org-20110405011215-8g6izwf3uz8v4174
Remove some unnecessary imports, clean up lazy imports.

Show diffs side-by-side

added added

removed removed

Lines of Context:
26
26
    errors,
27
27
    index as _mod_index,
28
28
    inventory,
 
29
    knit,
29
30
    osutils,
30
31
    pack,
31
32
    revision as _mod_revision,
43
44
    GroupCompressVersionedFiles,
44
45
    )
45
46
from bzrlib.repofmt.pack_repo import (
46
 
    _DirectPackAccess,
47
47
    Pack,
48
48
    NewPack,
49
 
    PackRepository,
 
49
    KnitPackRepository,
 
50
    KnitPackStreamSource,
50
51
    PackRootCommitBuilder,
51
52
    RepositoryPackCollection,
52
53
    RepositoryFormatPack,
53
54
    ResumedPack,
54
55
    Packer,
55
56
    )
56
 
from bzrlib.repository import (
57
 
    StreamSource,
58
 
    )
59
57
from bzrlib.static_tuple import StaticTuple
60
58
 
61
59
 
354
352
        """Build a VersionedFiles instance on top of this group of packs."""
355
353
        index_name = index_name + '_index'
356
354
        index_to_pack = {}
357
 
        access = _DirectPackAccess(index_to_pack,
358
 
                                   reload_func=self._reload_func)
 
355
        access = knit._DirectPackAccess(index_to_pack,
 
356
                                        reload_func=self._reload_func)
359
357
        if for_write:
360
358
            # Use new_pack
361
359
            if self.new_pack is None:
605
603
    def __init__(self, *args, **kwargs):
606
604
        super(GCCHKCanonicalizingPacker, self).__init__(*args, **kwargs)
607
605
        self._data_changed = False
608
 
 
 
606
    
609
607
    def _exhaust_stream(self, source_vf, keys, message, vf_to_stream, pb_offset):
610
608
        """Create and exhaust a stream, but don't insert it.
611
 
 
 
609
        
612
610
        This is useful to get the side-effects of generating a stream.
613
611
        """
614
612
        self.pb.update('scanning %s' % (message,), pb_offset)
703
701
 
704
702
    pack_factory = GCPack
705
703
    resumed_pack_factory = ResumedGCPack
706
 
    normal_packer_class = GCCHKPacker
707
 
    optimising_packer_class = GCCHKPacker
708
704
 
709
705
    def _check_new_inventories(self):
710
706
        """Detect missing inventories or chk root entries for the new revisions
792
788
                % (sorted(missing_text_keys),))
793
789
        return problems
794
790
 
795
 
 
796
 
class CHKInventoryRepository(PackRepository):
797
 
    """subclass of PackRepository that uses CHK based inventories."""
 
791
    def _execute_pack_operations(self, pack_operations,
 
792
                                 _packer_class=GCCHKPacker,
 
793
                                 reload_func=None):
 
794
        """Execute a series of pack operations.
 
795
 
 
796
        :param pack_operations: A list of [revision_count, packs_to_combine].
 
797
        :param _packer_class: The class of packer to use (default: Packer).
 
798
        :return: None.
 
799
        """
 
800
        # XXX: Copied across from RepositoryPackCollection simply because we
 
801
        #      want to override the _packer_class ... :(
 
802
        for revision_count, packs in pack_operations:
 
803
            # we may have no-ops from the setup logic
 
804
            if len(packs) == 0:
 
805
                continue
 
806
            packer = GCCHKPacker(self, packs, '.autopack',
 
807
                                 reload_func=reload_func)
 
808
            try:
 
809
                result = packer.pack()
 
810
            except errors.RetryWithNewPacks:
 
811
                # An exception is propagating out of this context, make sure
 
812
                # this packer has cleaned up. Packer() doesn't set its new_pack
 
813
                # state into the RepositoryPackCollection object, so we only
 
814
                # have access to it directly here.
 
815
                if packer.new_pack is not None:
 
816
                    packer.new_pack.abort()
 
817
                raise
 
818
            if result is None:
 
819
                return
 
820
            for pack in packs:
 
821
                self._remove_pack_from_memory(pack)
 
822
        # record the newly available packs and stop advertising the old
 
823
        # packs
 
824
        to_be_obsoleted = []
 
825
        for _, packs in pack_operations:
 
826
            to_be_obsoleted.extend(packs)
 
827
        result = self._save_pack_names(clear_obsolete_packs=True,
 
828
                                       obsolete_packs=to_be_obsoleted)
 
829
        return result
 
830
 
 
831
 
 
832
class CHKInventoryRepository(KnitPackRepository):
 
833
    """subclass of KnitPackRepository that uses CHK based inventories."""
798
834
 
799
835
    def __init__(self, _format, a_bzrdir, control_files, _commit_builder_class,
800
836
        _serializer):
801
837
        """Overridden to change pack collection class."""
802
 
        super(CHKInventoryRepository, self).__init__(_format, a_bzrdir,
803
 
            control_files, _commit_builder_class, _serializer)
 
838
        KnitPackRepository.__init__(self, _format, a_bzrdir, control_files,
 
839
            _commit_builder_class, _serializer)
 
840
        # and now replace everything it did :)
804
841
        index_transport = self._transport.clone('indices')
805
842
        self._pack_collection = GCRepositoryPackCollection(self,
806
843
            self._transport, index_transport,
1109
1146
            return GroupCHKStreamSource(self, to_format)
1110
1147
        return super(CHKInventoryRepository, self)._get_source(to_format)
1111
1148
 
1112
 
    def _find_inconsistent_revision_parents(self, revisions_iterator=None):
1113
 
        """Find revisions with different parent lists in the revision object
1114
 
        and in the index graph.
1115
 
 
1116
 
        :param revisions_iterator: None, or an iterator of (revid,
1117
 
            Revision-or-None). This iterator controls the revisions checked.
1118
 
        :returns: an iterator yielding tuples of (revison-id, parents-in-index,
1119
 
            parents-in-revision).
1120
 
        """
1121
 
        if not self.is_locked():
1122
 
            raise AssertionError()
1123
 
        vf = self.revisions
1124
 
        if revisions_iterator is None:
1125
 
            revisions_iterator = self._iter_revisions(None)
1126
 
        for revid, revision in revisions_iterator:
1127
 
            if revision is None:
1128
 
                pass
1129
 
            parent_map = vf.get_parent_map([(revid,)])
1130
 
            parents_according_to_index = tuple(parent[-1] for parent in
1131
 
                parent_map[(revid,)])
1132
 
            parents_according_to_revision = tuple(revision.parent_ids)
1133
 
            if parents_according_to_index != parents_according_to_revision:
1134
 
                yield (revid, parents_according_to_index,
1135
 
                    parents_according_to_revision)
1136
 
 
1137
 
    def _check_for_inconsistent_revision_parents(self):
1138
 
        inconsistencies = list(self._find_inconsistent_revision_parents())
1139
 
        if inconsistencies:
1140
 
            raise errors.BzrCheckError(
1141
 
                "Revision index has inconsistent parents.")
1142
 
 
1143
 
 
1144
 
class GroupCHKStreamSource(StreamSource):
 
1149
 
 
1150
class GroupCHKStreamSource(KnitPackStreamSource):
1145
1151
    """Used when both the source and target repo are GroupCHK repos."""
1146
1152
 
1147
1153
    def __init__(self, from_repository, to_format):
1234
1240
            self._chk_p_id_roots = None
1235
1241
        yield 'chk_bytes', _get_parent_id_basename_to_file_id_pages()
1236
1242
 
1237
 
    def _get_text_stream(self):
1238
 
        # Note: We know we don't have to handle adding root keys, because both
1239
 
        # the source and target are the identical network name.
1240
 
        text_stream = self.from_repository.texts.get_record_stream(
1241
 
                        self._text_keys, self._text_fetch_order, False)
1242
 
        return ('texts', text_stream)
1243
 
 
1244
1243
    def get_stream(self, search):
1245
1244
        def wrap_and_count(pb, rc, stream):
1246
1245
            """Yield records from stream while showing progress."""