~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/repofmt/pack_repo.py

  • Committer: Jelmer Vernooij
  • Date: 2011-04-05 17:45:29 UTC
  • mto: (5757.8.6 knitpackrepo-7)
  • mto: This revision was merged to the branch mainline in revision 5800.
  • Revision ID: jelmer@samba.org-20110405174529-np94wmbaiteemm35
Move Packer implementation to knitpack_repo.

Show diffs side-by-side

added added

removed removed

Lines of Context:
38
38
    GraphIndexPrefixAdapter,
39
39
    )
40
40
from bzrlib.knit import (
41
 
    KnitPlainFactory,
42
 
    KnitVersionedFiles,
43
41
    _DirectPackAccess,
44
42
    )
45
43
""")
781
779
 
782
780
    def _copy_revision_texts(self):
783
781
        """Copy revision data to the new pack."""
784
 
        # select revisions
785
 
        if self.revision_ids:
786
 
            revision_keys = [(revision_id,) for revision_id in self.revision_ids]
787
 
        else:
788
 
            revision_keys = None
789
 
        # select revision keys
790
 
        revision_index_map, revision_indices = self._pack_map_and_index_list(
791
 
            'revision_index')
792
 
        revision_nodes = self._index_contents(revision_indices, revision_keys)
793
 
        revision_nodes = list(revision_nodes)
794
 
        self._update_pack_order(revision_nodes, revision_index_map)
795
 
        # copy revision keys and adjust values
796
 
        self.pb.update("Copying revision texts", 1)
797
 
        total_items, readv_group_iter = self._revision_node_readv(revision_nodes)
798
 
        list(self._copy_nodes_graph(revision_index_map, self.new_pack._writer,
799
 
            self.new_pack.revision_index, readv_group_iter, total_items))
800
 
        if 'pack' in debug.debug_flags:
801
 
            mutter('%s: create_pack: revisions copied: %s%s %d items t+%6.3fs',
802
 
                time.ctime(), self._pack_collection._upload_transport.base,
803
 
                self.new_pack.random_name,
804
 
                self.new_pack.revision_index.key_count(),
805
 
                time.time() - self.new_pack.start_time)
806
 
        self._revision_keys = revision_keys
 
782
        raise NotImplementedError(self._copy_revision_texts)
807
783
 
808
784
    def _copy_inventory_texts(self):
809
785
        """Copy the inventory texts to the new pack.
812
788
 
813
789
        Sets self._text_filter appropriately.
814
790
        """
815
 
        # select inventory keys
816
 
        inv_keys = self._revision_keys # currently the same keyspace, and note that
817
 
        # querying for keys here could introduce a bug where an inventory item
818
 
        # is missed, so do not change it to query separately without cross
819
 
        # checking like the text key check below.
820
 
        inventory_index_map, inventory_indices = self._pack_map_and_index_list(
821
 
            'inventory_index')
822
 
        inv_nodes = self._index_contents(inventory_indices, inv_keys)
823
 
        # copy inventory keys and adjust values
824
 
        # XXX: Should be a helper function to allow different inv representation
825
 
        # at this point.
826
 
        self.pb.update("Copying inventory texts", 2)
827
 
        total_items, readv_group_iter = self._least_readv_node_readv(inv_nodes)
828
 
        # Only grab the output lines if we will be processing them
829
 
        output_lines = bool(self.revision_ids)
830
 
        inv_lines = self._copy_nodes_graph(inventory_index_map,
831
 
            self.new_pack._writer, self.new_pack.inventory_index,
832
 
            readv_group_iter, total_items, output_lines=output_lines)
833
 
        if self.revision_ids:
834
 
            self._process_inventory_lines(inv_lines)
835
 
        else:
836
 
            # eat the iterator to cause it to execute.
837
 
            list(inv_lines)
838
 
            self._text_filter = None
839
 
        if 'pack' in debug.debug_flags:
840
 
            mutter('%s: create_pack: inventories copied: %s%s %d items t+%6.3fs',
841
 
                time.ctime(), self._pack_collection._upload_transport.base,
842
 
                self.new_pack.random_name,
843
 
                self.new_pack.inventory_index.key_count(),
844
 
                time.time() - self.new_pack.start_time)
 
791
        raise NotImplementedError(self._copy_inventory_texts)
845
792
 
846
793
    def _copy_text_texts(self):
847
 
        # select text keys
848
 
        text_index_map, text_nodes = self._get_text_nodes()
849
 
        if self._text_filter is not None:
850
 
            # We could return the keys copied as part of the return value from
851
 
            # _copy_nodes_graph but this doesn't work all that well with the
852
 
            # need to get line output too, so we check separately, and as we're
853
 
            # going to buffer everything anyway, we check beforehand, which
854
 
            # saves reading knit data over the wire when we know there are
855
 
            # mising records.
856
 
            text_nodes = set(text_nodes)
857
 
            present_text_keys = set(_node[1] for _node in text_nodes)
858
 
            missing_text_keys = set(self._text_filter) - present_text_keys
859
 
            if missing_text_keys:
860
 
                # TODO: raise a specific error that can handle many missing
861
 
                # keys.
862
 
                mutter("missing keys during fetch: %r", missing_text_keys)
863
 
                a_missing_key = missing_text_keys.pop()
864
 
                raise errors.RevisionNotPresent(a_missing_key[1],
865
 
                    a_missing_key[0])
866
 
        # copy text keys and adjust values
867
 
        self.pb.update("Copying content texts", 3)
868
 
        total_items, readv_group_iter = self._least_readv_node_readv(text_nodes)
869
 
        list(self._copy_nodes_graph(text_index_map, self.new_pack._writer,
870
 
            self.new_pack.text_index, readv_group_iter, total_items))
871
 
        self._log_copied_texts()
 
794
        raise NotImplementedError(self._copy_text_texts)
872
795
 
873
796
    def _create_pack_from_packs(self):
874
 
        self.pb.update("Opening pack", 0, 5)
875
 
        self.new_pack = self.open_pack()
876
 
        new_pack = self.new_pack
877
 
        # buffer data - we won't be reading-back during the pack creation and
878
 
        # this makes a significant difference on sftp pushes.
879
 
        new_pack.set_write_cache_size(1024*1024)
880
 
        if 'pack' in debug.debug_flags:
881
 
            plain_pack_list = ['%s%s' % (a_pack.pack_transport.base, a_pack.name)
882
 
                for a_pack in self.packs]
883
 
            if self.revision_ids is not None:
884
 
                rev_count = len(self.revision_ids)
885
 
            else:
886
 
                rev_count = 'all'
887
 
            mutter('%s: create_pack: creating pack from source packs: '
888
 
                '%s%s %s revisions wanted %s t=0',
889
 
                time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
890
 
                plain_pack_list, rev_count)
891
 
        self._copy_revision_texts()
892
 
        self._copy_inventory_texts()
893
 
        self._copy_text_texts()
894
 
        # select signature keys
895
 
        signature_filter = self._revision_keys # same keyspace
896
 
        signature_index_map, signature_indices = self._pack_map_and_index_list(
897
 
            'signature_index')
898
 
        signature_nodes = self._index_contents(signature_indices,
899
 
            signature_filter)
900
 
        # copy signature keys and adjust values
901
 
        self.pb.update("Copying signature texts", 4)
902
 
        self._copy_nodes(signature_nodes, signature_index_map, new_pack._writer,
903
 
            new_pack.signature_index)
904
 
        if 'pack' in debug.debug_flags:
905
 
            mutter('%s: create_pack: revision signatures copied: %s%s %d items t+%6.3fs',
906
 
                time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
907
 
                new_pack.signature_index.key_count(),
908
 
                time.time() - new_pack.start_time)
909
 
        # copy chk contents
910
 
        # NB XXX: how to check CHK references are present? perhaps by yielding
911
 
        # the items? How should that interact with stacked repos?
912
 
        if new_pack.chk_index is not None:
913
 
            self._copy_chks()
914
 
            if 'pack' in debug.debug_flags:
915
 
                mutter('%s: create_pack: chk content copied: %s%s %d items t+%6.3fs',
916
 
                    time.ctime(), self._pack_collection._upload_transport.base,
917
 
                    new_pack.random_name,
918
 
                    new_pack.chk_index.key_count(),
919
 
                    time.time() - new_pack.start_time)
920
 
        new_pack._check_references()
921
 
        if not self._use_pack(new_pack):
922
 
            new_pack.abort()
923
 
            return None
924
 
        self.pb.update("Finishing pack", 5)
925
 
        new_pack.finish()
926
 
        self._pack_collection.allocate(new_pack)
927
 
        return new_pack
928
 
 
929
 
    def _copy_chks(self, refs=None):
930
 
        # XXX: Todo, recursive follow-pointers facility when fetching some
931
 
        # revisions only.
932
 
        chk_index_map, chk_indices = self._pack_map_and_index_list(
933
 
            'chk_index')
934
 
        chk_nodes = self._index_contents(chk_indices, refs)
935
 
        new_refs = set()
936
 
        # TODO: This isn't strictly tasteful as we are accessing some private
937
 
        #       variables (_serializer). Perhaps a better way would be to have
938
 
        #       Repository._deserialise_chk_node()
939
 
        search_key_func = chk_map.search_key_registry.get(
940
 
            self._pack_collection.repo._serializer.search_key_name)
941
 
        def accumlate_refs(lines):
942
 
            # XXX: move to a generic location
943
 
            # Yay mismatch:
944
 
            bytes = ''.join(lines)
945
 
            node = chk_map._deserialise(bytes, ("unknown",), search_key_func)
946
 
            new_refs.update(node.refs())
947
 
        self._copy_nodes(chk_nodes, chk_index_map, self.new_pack._writer,
948
 
            self.new_pack.chk_index, output_lines=accumlate_refs)
949
 
        return new_refs
950
 
 
951
 
    def _copy_nodes(self, nodes, index_map, writer, write_index,
952
 
        output_lines=None):
953
 
        """Copy knit nodes between packs with no graph references.
954
 
 
955
 
        :param output_lines: Output full texts of copied items.
956
 
        """
957
 
        pb = ui.ui_factory.nested_progress_bar()
958
 
        try:
959
 
            return self._do_copy_nodes(nodes, index_map, writer,
960
 
                write_index, pb, output_lines=output_lines)
961
 
        finally:
962
 
            pb.finished()
963
 
 
964
 
    def _do_copy_nodes(self, nodes, index_map, writer, write_index, pb,
965
 
        output_lines=None):
966
 
        # for record verification
967
 
        knit = KnitVersionedFiles(None, None)
968
 
        # plan a readv on each source pack:
969
 
        # group by pack
970
 
        nodes = sorted(nodes)
971
 
        # how to map this into knit.py - or knit.py into this?
972
 
        # we don't want the typical knit logic, we want grouping by pack
973
 
        # at this point - perhaps a helper library for the following code
974
 
        # duplication points?
975
 
        request_groups = {}
976
 
        for index, key, value in nodes:
977
 
            if index not in request_groups:
978
 
                request_groups[index] = []
979
 
            request_groups[index].append((key, value))
980
 
        record_index = 0
981
 
        pb.update("Copied record", record_index, len(nodes))
982
 
        for index, items in request_groups.iteritems():
983
 
            pack_readv_requests = []
984
 
            for key, value in items:
985
 
                # ---- KnitGraphIndex.get_position
986
 
                bits = value[1:].split(' ')
987
 
                offset, length = int(bits[0]), int(bits[1])
988
 
                pack_readv_requests.append((offset, length, (key, value[0])))
989
 
            # linear scan up the pack
990
 
            pack_readv_requests.sort()
991
 
            # copy the data
992
 
            pack_obj = index_map[index]
993
 
            transport, path = pack_obj.access_tuple()
994
 
            try:
995
 
                reader = pack.make_readv_reader(transport, path,
996
 
                    [offset[0:2] for offset in pack_readv_requests])
997
 
            except errors.NoSuchFile:
998
 
                if self._reload_func is not None:
999
 
                    self._reload_func()
1000
 
                raise
1001
 
            for (names, read_func), (_1, _2, (key, eol_flag)) in \
1002
 
                izip(reader.iter_records(), pack_readv_requests):
1003
 
                raw_data = read_func(None)
1004
 
                # check the header only
1005
 
                if output_lines is not None:
1006
 
                    output_lines(knit._parse_record(key[-1], raw_data)[0])
1007
 
                else:
1008
 
                    df, _ = knit._parse_record_header(key, raw_data)
1009
 
                    df.close()
1010
 
                pos, size = writer.add_bytes_record(raw_data, names)
1011
 
                write_index.add_node(key, eol_flag + "%d %d" % (pos, size))
1012
 
                pb.update("Copied record", record_index)
1013
 
                record_index += 1
1014
 
 
1015
 
    def _copy_nodes_graph(self, index_map, writer, write_index,
1016
 
        readv_group_iter, total_items, output_lines=False):
1017
 
        """Copy knit nodes between packs.
1018
 
 
1019
 
        :param output_lines: Return lines present in the copied data as
1020
 
            an iterator of line,version_id.
1021
 
        """
1022
 
        pb = ui.ui_factory.nested_progress_bar()
1023
 
        try:
1024
 
            for result in self._do_copy_nodes_graph(index_map, writer,
1025
 
                write_index, output_lines, pb, readv_group_iter, total_items):
1026
 
                yield result
1027
 
        except Exception:
1028
 
            # Python 2.4 does not permit try:finally: in a generator.
1029
 
            pb.finished()
1030
 
            raise
1031
 
        else:
1032
 
            pb.finished()
1033
 
 
1034
 
    def _do_copy_nodes_graph(self, index_map, writer, write_index,
1035
 
        output_lines, pb, readv_group_iter, total_items):
1036
 
        # for record verification
1037
 
        knit = KnitVersionedFiles(None, None)
1038
 
        # for line extraction when requested (inventories only)
1039
 
        if output_lines:
1040
 
            factory = KnitPlainFactory()
1041
 
        record_index = 0
1042
 
        pb.update("Copied record", record_index, total_items)
1043
 
        for index, readv_vector, node_vector in readv_group_iter:
1044
 
            # copy the data
1045
 
            pack_obj = index_map[index]
1046
 
            transport, path = pack_obj.access_tuple()
1047
 
            try:
1048
 
                reader = pack.make_readv_reader(transport, path, readv_vector)
1049
 
            except errors.NoSuchFile:
1050
 
                if self._reload_func is not None:
1051
 
                    self._reload_func()
1052
 
                raise
1053
 
            for (names, read_func), (key, eol_flag, references) in \
1054
 
                izip(reader.iter_records(), node_vector):
1055
 
                raw_data = read_func(None)
1056
 
                if output_lines:
1057
 
                    # read the entire thing
1058
 
                    content, _ = knit._parse_record(key[-1], raw_data)
1059
 
                    if len(references[-1]) == 0:
1060
 
                        line_iterator = factory.get_fulltext_content(content)
1061
 
                    else:
1062
 
                        line_iterator = factory.get_linedelta_content(content)
1063
 
                    for line in line_iterator:
1064
 
                        yield line, key
1065
 
                else:
1066
 
                    # check the header only
1067
 
                    df, _ = knit._parse_record_header(key, raw_data)
1068
 
                    df.close()
1069
 
                pos, size = writer.add_bytes_record(raw_data, names)
1070
 
                write_index.add_node(key, eol_flag + "%d %d" % (pos, size), references)
1071
 
                pb.update("Copied record", record_index)
1072
 
                record_index += 1
1073
 
 
1074
 
    def _get_text_nodes(self):
1075
 
        text_index_map, text_indices = self._pack_map_and_index_list(
1076
 
            'text_index')
1077
 
        return text_index_map, self._index_contents(text_indices,
1078
 
            self._text_filter)
 
797
        raise NotImplementedError(self._create_pack_from_packs)
1079
798
 
1080
799
    def _least_readv_node_readv(self, nodes):
1081
800
        """Generate request groups for nodes using the least readv's.
1122
841
                self.new_pack.text_index.key_count(),
1123
842
                time.time() - self.new_pack.start_time)
1124
843
 
1125
 
    def _process_inventory_lines(self, inv_lines):
1126
 
        """Use up the inv_lines generator and setup a text key filter."""
1127
 
        repo = self._pack_collection.repo
1128
 
        fileid_revisions = repo._find_file_ids_from_xml_inventory_lines(
1129
 
            inv_lines, self.revision_keys)
1130
 
        text_filter = []
1131
 
        for fileid, file_revids in fileid_revisions.iteritems():
1132
 
            text_filter.extend([(fileid, file_revid) for file_revid in file_revids])
1133
 
        self._text_filter = text_filter
1134
 
 
1135
844
    def _revision_node_readv(self, revision_nodes):
1136
845
        """Return the total revisions and the readv's to issue.
1137
846