~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/repofmt/knitpack_repo.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2011-04-19 01:07:44 UTC
  • mfrom: (5757.7.11 knitpackrepo-6)
  • Revision ID: pqm@pqm.ubuntu.com-20110419010744-ns5qnlw97wrrva7s
(jelmer) Split KnitPackRepository-specific bits out of Packer class into
 KnitPacker. (Jelmer Vernooij)

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
 
19
19
from bzrlib.lazy_import import lazy_import
20
20
lazy_import(globals(), """
 
21
from itertools import izip
 
22
import time
 
23
 
21
24
from bzrlib import (
22
25
    bzrdir,
 
26
    debug,
 
27
    errors,
23
28
    knit,
24
29
    osutils,
 
30
    pack,
25
31
    revision as _mod_revision,
 
32
    trace,
26
33
    tsort,
 
34
    ui,
27
35
    xml5,
28
36
    xml6,
29
37
    xml7,
39
47
    btree_index,
40
48
    )
41
49
from bzrlib.index import (
 
50
    CombinedGraphIndex,
42
51
    GraphIndex,
43
52
    GraphIndexPrefixAdapter,
44
53
    InMemoryGraphIndex,
47
56
    KnitRepository,
48
57
    )
49
58
from bzrlib.repofmt.pack_repo import (
 
59
    NewPack,
50
60
    RepositoryFormatPack,
 
61
    ResumedPack,
51
62
    Packer,
52
63
    PackCommitBuilder,
53
64
    PackRepository,
68
79
        if self._format.supports_chks:
69
80
            raise AssertionError("chk not supported")
70
81
        index_transport = self._transport.clone('indices')
71
 
        self._pack_collection = RepositoryPackCollection(self, self._transport,
 
82
        self._pack_collection = KnitRepositoryPackCollection(self,
 
83
            self._transport,
72
84
            index_transport,
73
85
            self._transport.clone('upload'),
74
86
            self._transport.clone('packs'),
75
87
            _format.index_builder_class,
76
88
            _format.index_class,
77
 
            use_chk_index=self._format.supports_chks,
 
89
            use_chk_index=False,
78
90
            )
79
91
        self.inventories = KnitVersionedFiles(
80
92
            _KnitGraphIndex(self._pack_collection.inventory_index.combined_index,
556
568
                                          revision_ids=revision_ids,
557
569
                                          reload_func=reload_func)
558
570
 
 
571
    def _pack_map_and_index_list(self, index_attribute):
 
572
        """Convert a list of packs to an index pack map and index list.
 
573
 
 
574
        :param index_attribute: The attribute that the desired index is found
 
575
            on.
 
576
        :return: A tuple (map, list) where map contains the dict from
 
577
            index:pack_tuple, and list contains the indices in the preferred
 
578
            access order.
 
579
        """
 
580
        indices = []
 
581
        pack_map = {}
 
582
        for pack_obj in self.packs:
 
583
            index = getattr(pack_obj, index_attribute)
 
584
            indices.append(index)
 
585
            pack_map[index] = pack_obj
 
586
        return pack_map, indices
 
587
 
 
588
    def _index_contents(self, indices, key_filter=None):
 
589
        """Get an iterable of the index contents from a pack_map.
 
590
 
 
591
        :param indices: The list of indices to query
 
592
        :param key_filter: An optional filter to limit the keys returned.
 
593
        """
 
594
        all_index = CombinedGraphIndex(indices)
 
595
        if key_filter is None:
 
596
            return all_index.iter_all_entries()
 
597
        else:
 
598
            return all_index.iter_entries(key_filter)
 
599
 
 
600
    def _copy_nodes(self, nodes, index_map, writer, write_index,
 
601
        output_lines=None):
 
602
        """Copy knit nodes between packs with no graph references.
 
603
 
 
604
        :param output_lines: Output full texts of copied items.
 
605
        """
 
606
        pb = ui.ui_factory.nested_progress_bar()
 
607
        try:
 
608
            return self._do_copy_nodes(nodes, index_map, writer,
 
609
                write_index, pb, output_lines=output_lines)
 
610
        finally:
 
611
            pb.finished()
 
612
 
 
613
    def _do_copy_nodes(self, nodes, index_map, writer, write_index, pb,
 
614
        output_lines=None):
 
615
        # for record verification
 
616
        knit = KnitVersionedFiles(None, None)
 
617
        # plan a readv on each source pack:
 
618
        # group by pack
 
619
        nodes = sorted(nodes)
 
620
        # how to map this into knit.py - or knit.py into this?
 
621
        # we don't want the typical knit logic, we want grouping by pack
 
622
        # at this point - perhaps a helper library for the following code
 
623
        # duplication points?
 
624
        request_groups = {}
 
625
        for index, key, value in nodes:
 
626
            if index not in request_groups:
 
627
                request_groups[index] = []
 
628
            request_groups[index].append((key, value))
 
629
        record_index = 0
 
630
        pb.update("Copied record", record_index, len(nodes))
 
631
        for index, items in request_groups.iteritems():
 
632
            pack_readv_requests = []
 
633
            for key, value in items:
 
634
                # ---- KnitGraphIndex.get_position
 
635
                bits = value[1:].split(' ')
 
636
                offset, length = int(bits[0]), int(bits[1])
 
637
                pack_readv_requests.append((offset, length, (key, value[0])))
 
638
            # linear scan up the pack
 
639
            pack_readv_requests.sort()
 
640
            # copy the data
 
641
            pack_obj = index_map[index]
 
642
            transport, path = pack_obj.access_tuple()
 
643
            try:
 
644
                reader = pack.make_readv_reader(transport, path,
 
645
                    [offset[0:2] for offset in pack_readv_requests])
 
646
            except errors.NoSuchFile:
 
647
                if self._reload_func is not None:
 
648
                    self._reload_func()
 
649
                raise
 
650
            for (names, read_func), (_1, _2, (key, eol_flag)) in \
 
651
                izip(reader.iter_records(), pack_readv_requests):
 
652
                raw_data = read_func(None)
 
653
                # check the header only
 
654
                if output_lines is not None:
 
655
                    output_lines(knit._parse_record(key[-1], raw_data)[0])
 
656
                else:
 
657
                    df, _ = knit._parse_record_header(key, raw_data)
 
658
                    df.close()
 
659
                pos, size = writer.add_bytes_record(raw_data, names)
 
660
                write_index.add_node(key, eol_flag + "%d %d" % (pos, size))
 
661
                pb.update("Copied record", record_index)
 
662
                record_index += 1
 
663
 
 
664
    def _copy_nodes_graph(self, index_map, writer, write_index,
 
665
        readv_group_iter, total_items, output_lines=False):
 
666
        """Copy knit nodes between packs.
 
667
 
 
668
        :param output_lines: Return lines present in the copied data as
 
669
            an iterator of line,version_id.
 
670
        """
 
671
        pb = ui.ui_factory.nested_progress_bar()
 
672
        try:
 
673
            for result in self._do_copy_nodes_graph(index_map, writer,
 
674
                write_index, output_lines, pb, readv_group_iter, total_items):
 
675
                yield result
 
676
        except Exception:
 
677
            # Python 2.4 does not permit try:finally: in a generator.
 
678
            pb.finished()
 
679
            raise
 
680
        else:
 
681
            pb.finished()
 
682
 
 
683
    def _do_copy_nodes_graph(self, index_map, writer, write_index,
 
684
        output_lines, pb, readv_group_iter, total_items):
 
685
        # for record verification
 
686
        knit = KnitVersionedFiles(None, None)
 
687
        # for line extraction when requested (inventories only)
 
688
        if output_lines:
 
689
            factory = KnitPlainFactory()
 
690
        record_index = 0
 
691
        pb.update("Copied record", record_index, total_items)
 
692
        for index, readv_vector, node_vector in readv_group_iter:
 
693
            # copy the data
 
694
            pack_obj = index_map[index]
 
695
            transport, path = pack_obj.access_tuple()
 
696
            try:
 
697
                reader = pack.make_readv_reader(transport, path, readv_vector)
 
698
            except errors.NoSuchFile:
 
699
                if self._reload_func is not None:
 
700
                    self._reload_func()
 
701
                raise
 
702
            for (names, read_func), (key, eol_flag, references) in \
 
703
                izip(reader.iter_records(), node_vector):
 
704
                raw_data = read_func(None)
 
705
                if output_lines:
 
706
                    # read the entire thing
 
707
                    content, _ = knit._parse_record(key[-1], raw_data)
 
708
                    if len(references[-1]) == 0:
 
709
                        line_iterator = factory.get_fulltext_content(content)
 
710
                    else:
 
711
                        line_iterator = factory.get_linedelta_content(content)
 
712
                    for line in line_iterator:
 
713
                        yield line, key
 
714
                else:
 
715
                    # check the header only
 
716
                    df, _ = knit._parse_record_header(key, raw_data)
 
717
                    df.close()
 
718
                pos, size = writer.add_bytes_record(raw_data, names)
 
719
                write_index.add_node(key, eol_flag + "%d %d" % (pos, size), references)
 
720
                pb.update("Copied record", record_index)
 
721
                record_index += 1
 
722
 
 
723
    def _process_inventory_lines(self, inv_lines):
 
724
        """Use up the inv_lines generator and setup a text key filter."""
 
725
        repo = self._pack_collection.repo
 
726
        fileid_revisions = repo._find_file_ids_from_xml_inventory_lines(
 
727
            inv_lines, self.revision_keys)
 
728
        text_filter = []
 
729
        for fileid, file_revids in fileid_revisions.iteritems():
 
730
            text_filter.extend([(fileid, file_revid) for file_revid in file_revids])
 
731
        self._text_filter = text_filter
 
732
 
 
733
    def _copy_inventory_texts(self):
 
734
        # select inventory keys
 
735
        inv_keys = self._revision_keys # currently the same keyspace, and note that
 
736
        # querying for keys here could introduce a bug where an inventory item
 
737
        # is missed, so do not change it to query separately without cross
 
738
        # checking like the text key check below.
 
739
        inventory_index_map, inventory_indices = self._pack_map_and_index_list(
 
740
            'inventory_index')
 
741
        inv_nodes = self._index_contents(inventory_indices, inv_keys)
 
742
        # copy inventory keys and adjust values
 
743
        # XXX: Should be a helper function to allow different inv representation
 
744
        # at this point.
 
745
        self.pb.update("Copying inventory texts", 2)
 
746
        total_items, readv_group_iter = self._least_readv_node_readv(inv_nodes)
 
747
        # Only grab the output lines if we will be processing them
 
748
        output_lines = bool(self.revision_ids)
 
749
        inv_lines = self._copy_nodes_graph(inventory_index_map,
 
750
            self.new_pack._writer, self.new_pack.inventory_index,
 
751
            readv_group_iter, total_items, output_lines=output_lines)
 
752
        if self.revision_ids:
 
753
            self._process_inventory_lines(inv_lines)
 
754
        else:
 
755
            # eat the iterator to cause it to execute.
 
756
            list(inv_lines)
 
757
            self._text_filter = None
 
758
        if 'pack' in debug.debug_flags:
 
759
            trace.mutter('%s: create_pack: inventories copied: %s%s %d items t+%6.3fs',
 
760
                time.ctime(), self._pack_collection._upload_transport.base,
 
761
                self.new_pack.random_name,
 
762
                self.new_pack.inventory_index.key_count(),
 
763
                time.time() - self.new_pack.start_time)
 
764
 
 
765
    def _update_pack_order(self, entries, index_to_pack_map):
 
766
        """Determine how we want our packs to be ordered.
 
767
 
 
768
        This changes the sort order of the self.packs list so that packs unused
 
769
        by 'entries' will be at the end of the list, so that future requests
 
770
        can avoid probing them.  Used packs will be at the front of the
 
771
        self.packs list, in the order of their first use in 'entries'.
 
772
 
 
773
        :param entries: A list of (index, ...) tuples
 
774
        :param index_to_pack_map: A mapping from index objects to pack objects.
 
775
        """
 
776
        packs = []
 
777
        seen_indexes = set()
 
778
        for entry in entries:
 
779
            index = entry[0]
 
780
            if index not in seen_indexes:
 
781
                packs.append(index_to_pack_map[index])
 
782
                seen_indexes.add(index)
 
783
        if len(packs) == len(self.packs):
 
784
            if 'pack' in debug.debug_flags:
 
785
                trace.mutter('Not changing pack list, all packs used.')
 
786
            return
 
787
        seen_packs = set(packs)
 
788
        for pack in self.packs:
 
789
            if pack not in seen_packs:
 
790
                packs.append(pack)
 
791
                seen_packs.add(pack)
 
792
        if 'pack' in debug.debug_flags:
 
793
            old_names = [p.access_tuple()[1] for p in self.packs]
 
794
            new_names = [p.access_tuple()[1] for p in packs]
 
795
            trace.mutter('Reordering packs\nfrom: %s\n  to: %s',
 
796
                   old_names, new_names)
 
797
        self.packs = packs
 
798
 
 
799
    def _copy_revision_texts(self):
 
800
        # select revisions
 
801
        if self.revision_ids:
 
802
            revision_keys = [(revision_id,) for revision_id in self.revision_ids]
 
803
        else:
 
804
            revision_keys = None
 
805
        # select revision keys
 
806
        revision_index_map, revision_indices = self._pack_map_and_index_list(
 
807
            'revision_index')
 
808
        revision_nodes = self._index_contents(revision_indices, revision_keys)
 
809
        revision_nodes = list(revision_nodes)
 
810
        self._update_pack_order(revision_nodes, revision_index_map)
 
811
        # copy revision keys and adjust values
 
812
        self.pb.update("Copying revision texts", 1)
 
813
        total_items, readv_group_iter = self._revision_node_readv(revision_nodes)
 
814
        list(self._copy_nodes_graph(revision_index_map, self.new_pack._writer,
 
815
            self.new_pack.revision_index, readv_group_iter, total_items))
 
816
        if 'pack' in debug.debug_flags:
 
817
            trace.mutter('%s: create_pack: revisions copied: %s%s %d items t+%6.3fs',
 
818
                time.ctime(), self._pack_collection._upload_transport.base,
 
819
                self.new_pack.random_name,
 
820
                self.new_pack.revision_index.key_count(),
 
821
                time.time() - self.new_pack.start_time)
 
822
        self._revision_keys = revision_keys
 
823
 
 
824
    def _get_text_nodes(self):
 
825
        text_index_map, text_indices = self._pack_map_and_index_list(
 
826
            'text_index')
 
827
        return text_index_map, self._index_contents(text_indices,
 
828
            self._text_filter)
 
829
 
 
830
    def _copy_text_texts(self):
 
831
        # select text keys
 
832
        text_index_map, text_nodes = self._get_text_nodes()
 
833
        if self._text_filter is not None:
 
834
            # We could return the keys copied as part of the return value from
 
835
            # _copy_nodes_graph but this doesn't work all that well with the
 
836
            # need to get line output too, so we check separately, and as we're
 
837
            # going to buffer everything anyway, we check beforehand, which
 
838
            # saves reading knit data over the wire when we know there are
 
839
            # mising records.
 
840
            text_nodes = set(text_nodes)
 
841
            present_text_keys = set(_node[1] for _node in text_nodes)
 
842
            missing_text_keys = set(self._text_filter) - present_text_keys
 
843
            if missing_text_keys:
 
844
                # TODO: raise a specific error that can handle many missing
 
845
                # keys.
 
846
                trace.mutter("missing keys during fetch: %r", missing_text_keys)
 
847
                a_missing_key = missing_text_keys.pop()
 
848
                raise errors.RevisionNotPresent(a_missing_key[1],
 
849
                    a_missing_key[0])
 
850
        # copy text keys and adjust values
 
851
        self.pb.update("Copying content texts", 3)
 
852
        total_items, readv_group_iter = self._least_readv_node_readv(text_nodes)
 
853
        list(self._copy_nodes_graph(text_index_map, self.new_pack._writer,
 
854
            self.new_pack.text_index, readv_group_iter, total_items))
 
855
        self._log_copied_texts()
 
856
 
 
857
    def _create_pack_from_packs(self):
 
858
        self.pb.update("Opening pack", 0, 5)
 
859
        self.new_pack = self.open_pack()
 
860
        new_pack = self.new_pack
 
861
        # buffer data - we won't be reading-back during the pack creation and
 
862
        # this makes a significant difference on sftp pushes.
 
863
        new_pack.set_write_cache_size(1024*1024)
 
864
        if 'pack' in debug.debug_flags:
 
865
            plain_pack_list = ['%s%s' % (a_pack.pack_transport.base, a_pack.name)
 
866
                for a_pack in self.packs]
 
867
            if self.revision_ids is not None:
 
868
                rev_count = len(self.revision_ids)
 
869
            else:
 
870
                rev_count = 'all'
 
871
            trace.mutter('%s: create_pack: creating pack from source packs: '
 
872
                '%s%s %s revisions wanted %s t=0',
 
873
                time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
 
874
                plain_pack_list, rev_count)
 
875
        self._copy_revision_texts()
 
876
        self._copy_inventory_texts()
 
877
        self._copy_text_texts()
 
878
        # select signature keys
 
879
        signature_filter = self._revision_keys # same keyspace
 
880
        signature_index_map, signature_indices = self._pack_map_and_index_list(
 
881
            'signature_index')
 
882
        signature_nodes = self._index_contents(signature_indices,
 
883
            signature_filter)
 
884
        # copy signature keys and adjust values
 
885
        self.pb.update("Copying signature texts", 4)
 
886
        self._copy_nodes(signature_nodes, signature_index_map, new_pack._writer,
 
887
            new_pack.signature_index)
 
888
        if 'pack' in debug.debug_flags:
 
889
            trace.mutter('%s: create_pack: revision signatures copied: %s%s %d items t+%6.3fs',
 
890
                time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
 
891
                new_pack.signature_index.key_count(),
 
892
                time.time() - new_pack.start_time)
 
893
        new_pack._check_references()
 
894
        if not self._use_pack(new_pack):
 
895
            new_pack.abort()
 
896
            return None
 
897
        self.pb.update("Finishing pack", 5)
 
898
        new_pack.finish()
 
899
        self._pack_collection.allocate(new_pack)
 
900
        return new_pack
 
901
 
 
902
    def _least_readv_node_readv(self, nodes):
 
903
        """Generate request groups for nodes using the least readv's.
 
904
 
 
905
        :param nodes: An iterable of graph index nodes.
 
906
        :return: Total node count and an iterator of the data needed to perform
 
907
            readvs to obtain the data for nodes. Each item yielded by the
 
908
            iterator is a tuple with:
 
909
            index, readv_vector, node_vector. readv_vector is a list ready to
 
910
            hand to the transport readv method, and node_vector is a list of
 
911
            (key, eol_flag, references) for the node retrieved by the
 
912
            matching readv_vector.
 
913
        """
 
914
        # group by pack so we do one readv per pack
 
915
        nodes = sorted(nodes)
 
916
        total = len(nodes)
 
917
        request_groups = {}
 
918
        for index, key, value, references in nodes:
 
919
            if index not in request_groups:
 
920
                request_groups[index] = []
 
921
            request_groups[index].append((key, value, references))
 
922
        result = []
 
923
        for index, items in request_groups.iteritems():
 
924
            pack_readv_requests = []
 
925
            for key, value, references in items:
 
926
                # ---- KnitGraphIndex.get_position
 
927
                bits = value[1:].split(' ')
 
928
                offset, length = int(bits[0]), int(bits[1])
 
929
                pack_readv_requests.append(
 
930
                    ((offset, length), (key, value[0], references)))
 
931
            # linear scan up the pack to maximum range combining.
 
932
            pack_readv_requests.sort()
 
933
            # split out the readv and the node data.
 
934
            pack_readv = [readv for readv, node in pack_readv_requests]
 
935
            node_vector = [node for readv, node in pack_readv_requests]
 
936
            result.append((index, pack_readv, node_vector))
 
937
        return total, result
 
938
 
 
939
    def _revision_node_readv(self, revision_nodes):
 
940
        """Return the total revisions and the readv's to issue.
 
941
 
 
942
        :param revision_nodes: The revision index contents for the packs being
 
943
            incorporated into the new pack.
 
944
        :return: As per _least_readv_node_readv.
 
945
        """
 
946
        return self._least_readv_node_readv(revision_nodes)
 
947
 
559
948
 
560
949
class KnitReconcilePacker(KnitPacker):
561
950
    """A packer which regenerates indices etc as it copies.
699
1088
        return new_pack.data_inserted() and self._data_changed
700
1089
 
701
1090
 
 
1091
class OptimisingKnitPacker(KnitPacker):
 
1092
    """A packer which spends more time to create better disk layouts."""
 
1093
 
 
1094
    def _revision_node_readv(self, revision_nodes):
 
1095
        """Return the total revisions and the readv's to issue.
 
1096
 
 
1097
        This sort places revisions in topological order with the ancestors
 
1098
        after the children.
 
1099
 
 
1100
        :param revision_nodes: The revision index contents for the packs being
 
1101
            incorporated into the new pack.
 
1102
        :return: As per _least_readv_node_readv.
 
1103
        """
 
1104
        # build an ancestors dict
 
1105
        ancestors = {}
 
1106
        by_key = {}
 
1107
        for index, key, value, references in revision_nodes:
 
1108
            ancestors[key] = references[0]
 
1109
            by_key[key] = (index, value, references)
 
1110
        order = tsort.topo_sort(ancestors)
 
1111
        total = len(order)
 
1112
        # Single IO is pathological, but it will work as a starting point.
 
1113
        requests = []
 
1114
        for key in reversed(order):
 
1115
            index, value, references = by_key[key]
 
1116
            # ---- KnitGraphIndex.get_position
 
1117
            bits = value[1:].split(' ')
 
1118
            offset, length = int(bits[0]), int(bits[1])
 
1119
            requests.append(
 
1120
                (index, [(offset, length)], [(key, value[0], references)]))
 
1121
        # TODO: combine requests in the same index that are in ascending order.
 
1122
        return total, requests
 
1123
 
 
1124
    def open_pack(self):
 
1125
        """Open a pack for the pack we are creating."""
 
1126
        new_pack = super(OptimisingKnitPacker, self).open_pack()
 
1127
        # Turn on the optimization flags for all the index builders.
 
1128
        new_pack.revision_index.set_optimize(for_size=True)
 
1129
        new_pack.inventory_index.set_optimize(for_size=True)
 
1130
        new_pack.text_index.set_optimize(for_size=True)
 
1131
        new_pack.signature_index.set_optimize(for_size=True)
 
1132
        return new_pack
 
1133
 
 
1134
 
 
1135
class KnitRepositoryPackCollection(RepositoryPackCollection):
 
1136
    """A knit pack collection."""
 
1137
 
 
1138
    pack_factory = NewPack
 
1139
    resumed_pack_factory = ResumedPack
 
1140
    normal_packer_class = KnitPacker
 
1141
    optimising_packer_class = OptimisingKnitPacker
 
1142
 
 
1143
 
702
1144