~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/repofmt/knitpack_repo.py

  • Committer: Jelmer Vernooij
  • Date: 2011-04-09 22:14:24 UTC
  • mfrom: (5777.4.2 mutableinventorytree)
  • mto: This revision was merged to the branch mainline in revision 5787.
  • Revision ID: jelmer@samba.org-20110409221424-a2air35exbz50hi8
Mergemutableinventorytree.

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
 
19
19
from bzrlib.lazy_import import lazy_import
20
20
lazy_import(globals(), """
21
 
from itertools import izip
22
 
import time
23
 
 
24
21
from bzrlib import (
25
22
    bzrdir,
26
 
    debug,
27
 
    errors,
28
23
    knit,
29
24
    osutils,
30
 
    pack,
31
25
    revision as _mod_revision,
32
 
    trace,
33
26
    tsort,
34
 
    ui,
35
27
    xml5,
36
28
    xml6,
37
29
    xml7,
47
39
    btree_index,
48
40
    )
49
41
from bzrlib.index import (
50
 
    CombinedGraphIndex,
51
42
    GraphIndex,
52
43
    GraphIndexPrefixAdapter,
53
44
    InMemoryGraphIndex,
56
47
    KnitRepository,
57
48
    )
58
49
from bzrlib.repofmt.pack_repo import (
59
 
    _DirectPackAccess,
60
 
    NewPack,
61
50
    RepositoryFormatPack,
62
 
    ResumedPack,
63
51
    Packer,
64
52
    PackCommitBuilder,
65
53
    PackRepository,
66
54
    PackRootCommitBuilder,
67
55
    RepositoryPackCollection,
68
56
    )
69
 
from bzrlib.vf_repository import (
 
57
from bzrlib.repository import (
70
58
    StreamSource,
71
59
    )
72
60
 
73
61
 
74
 
class KnitPackRepository(PackRepository, KnitRepository):
 
62
class KnitPackRepository(PackRepository):
75
63
 
76
64
    def __init__(self, _format, a_bzrdir, control_files, _commit_builder_class,
77
65
        _serializer):
78
 
        PackRepository.__init__(self, _format, a_bzrdir, control_files,
 
66
        KnitRepository.__init__(self, _format, a_bzrdir, control_files,
79
67
            _commit_builder_class, _serializer)
80
68
        if self._format.supports_chks:
81
69
            raise AssertionError("chk not supported")
82
70
        index_transport = self._transport.clone('indices')
83
 
        self._pack_collection = KnitRepositoryPackCollection(self,
84
 
            self._transport,
 
71
        self._pack_collection = RepositoryPackCollection(self, self._transport,
85
72
            index_transport,
86
73
            self._transport.clone('upload'),
87
74
            self._transport.clone('packs'),
88
75
            _format.index_builder_class,
89
76
            _format.index_class,
90
 
            use_chk_index=False,
 
77
            use_chk_index=self._format.supports_chks,
91
78
            )
92
79
        self.inventories = KnitVersionedFiles(
93
80
            _KnitGraphIndex(self._pack_collection.inventory_index.combined_index,
569
556
                                          revision_ids=revision_ids,
570
557
                                          reload_func=reload_func)
571
558
 
572
 
    def _pack_map_and_index_list(self, index_attribute):
573
 
        """Convert a list of packs to an index pack map and index list.
574
 
 
575
 
        :param index_attribute: The attribute that the desired index is found
576
 
            on.
577
 
        :return: A tuple (map, list) where map contains the dict from
578
 
            index:pack_tuple, and list contains the indices in the preferred
579
 
            access order.
580
 
        """
581
 
        indices = []
582
 
        pack_map = {}
583
 
        for pack_obj in self.packs:
584
 
            index = getattr(pack_obj, index_attribute)
585
 
            indices.append(index)
586
 
            pack_map[index] = pack_obj
587
 
        return pack_map, indices
588
 
 
589
 
    def _index_contents(self, indices, key_filter=None):
590
 
        """Get an iterable of the index contents from a pack_map.
591
 
 
592
 
        :param indices: The list of indices to query
593
 
        :param key_filter: An optional filter to limit the keys returned.
594
 
        """
595
 
        all_index = CombinedGraphIndex(indices)
596
 
        if key_filter is None:
597
 
            return all_index.iter_all_entries()
598
 
        else:
599
 
            return all_index.iter_entries(key_filter)
600
 
 
601
 
    def _copy_nodes(self, nodes, index_map, writer, write_index,
602
 
        output_lines=None):
603
 
        """Copy knit nodes between packs with no graph references.
604
 
 
605
 
        :param output_lines: Output full texts of copied items.
606
 
        """
607
 
        pb = ui.ui_factory.nested_progress_bar()
608
 
        try:
609
 
            return self._do_copy_nodes(nodes, index_map, writer,
610
 
                write_index, pb, output_lines=output_lines)
611
 
        finally:
612
 
            pb.finished()
613
 
 
614
 
    def _do_copy_nodes(self, nodes, index_map, writer, write_index, pb,
615
 
        output_lines=None):
616
 
        # for record verification
617
 
        knit = KnitVersionedFiles(None, None)
618
 
        # plan a readv on each source pack:
619
 
        # group by pack
620
 
        nodes = sorted(nodes)
621
 
        # how to map this into knit.py - or knit.py into this?
622
 
        # we don't want the typical knit logic, we want grouping by pack
623
 
        # at this point - perhaps a helper library for the following code
624
 
        # duplication points?
625
 
        request_groups = {}
626
 
        for index, key, value in nodes:
627
 
            if index not in request_groups:
628
 
                request_groups[index] = []
629
 
            request_groups[index].append((key, value))
630
 
        record_index = 0
631
 
        pb.update("Copied record", record_index, len(nodes))
632
 
        for index, items in request_groups.iteritems():
633
 
            pack_readv_requests = []
634
 
            for key, value in items:
635
 
                # ---- KnitGraphIndex.get_position
636
 
                bits = value[1:].split(' ')
637
 
                offset, length = int(bits[0]), int(bits[1])
638
 
                pack_readv_requests.append((offset, length, (key, value[0])))
639
 
            # linear scan up the pack
640
 
            pack_readv_requests.sort()
641
 
            # copy the data
642
 
            pack_obj = index_map[index]
643
 
            transport, path = pack_obj.access_tuple()
644
 
            try:
645
 
                reader = pack.make_readv_reader(transport, path,
646
 
                    [offset[0:2] for offset in pack_readv_requests])
647
 
            except errors.NoSuchFile:
648
 
                if self._reload_func is not None:
649
 
                    self._reload_func()
650
 
                raise
651
 
            for (names, read_func), (_1, _2, (key, eol_flag)) in \
652
 
                izip(reader.iter_records(), pack_readv_requests):
653
 
                raw_data = read_func(None)
654
 
                # check the header only
655
 
                if output_lines is not None:
656
 
                    output_lines(knit._parse_record(key[-1], raw_data)[0])
657
 
                else:
658
 
                    df, _ = knit._parse_record_header(key, raw_data)
659
 
                    df.close()
660
 
                pos, size = writer.add_bytes_record(raw_data, names)
661
 
                write_index.add_node(key, eol_flag + "%d %d" % (pos, size))
662
 
                pb.update("Copied record", record_index)
663
 
                record_index += 1
664
 
 
665
 
    def _copy_nodes_graph(self, index_map, writer, write_index,
666
 
        readv_group_iter, total_items, output_lines=False):
667
 
        """Copy knit nodes between packs.
668
 
 
669
 
        :param output_lines: Return lines present in the copied data as
670
 
            an iterator of line,version_id.
671
 
        """
672
 
        pb = ui.ui_factory.nested_progress_bar()
673
 
        try:
674
 
            for result in self._do_copy_nodes_graph(index_map, writer,
675
 
                write_index, output_lines, pb, readv_group_iter, total_items):
676
 
                yield result
677
 
        except Exception:
678
 
            # Python 2.4 does not permit try:finally: in a generator.
679
 
            pb.finished()
680
 
            raise
681
 
        else:
682
 
            pb.finished()
683
 
 
684
 
    def _do_copy_nodes_graph(self, index_map, writer, write_index,
685
 
        output_lines, pb, readv_group_iter, total_items):
686
 
        # for record verification
687
 
        knit = KnitVersionedFiles(None, None)
688
 
        # for line extraction when requested (inventories only)
689
 
        if output_lines:
690
 
            factory = KnitPlainFactory()
691
 
        record_index = 0
692
 
        pb.update("Copied record", record_index, total_items)
693
 
        for index, readv_vector, node_vector in readv_group_iter:
694
 
            # copy the data
695
 
            pack_obj = index_map[index]
696
 
            transport, path = pack_obj.access_tuple()
697
 
            try:
698
 
                reader = pack.make_readv_reader(transport, path, readv_vector)
699
 
            except errors.NoSuchFile:
700
 
                if self._reload_func is not None:
701
 
                    self._reload_func()
702
 
                raise
703
 
            for (names, read_func), (key, eol_flag, references) in \
704
 
                izip(reader.iter_records(), node_vector):
705
 
                raw_data = read_func(None)
706
 
                if output_lines:
707
 
                    # read the entire thing
708
 
                    content, _ = knit._parse_record(key[-1], raw_data)
709
 
                    if len(references[-1]) == 0:
710
 
                        line_iterator = factory.get_fulltext_content(content)
711
 
                    else:
712
 
                        line_iterator = factory.get_linedelta_content(content)
713
 
                    for line in line_iterator:
714
 
                        yield line, key
715
 
                else:
716
 
                    # check the header only
717
 
                    df, _ = knit._parse_record_header(key, raw_data)
718
 
                    df.close()
719
 
                pos, size = writer.add_bytes_record(raw_data, names)
720
 
                write_index.add_node(key, eol_flag + "%d %d" % (pos, size), references)
721
 
                pb.update("Copied record", record_index)
722
 
                record_index += 1
723
 
 
724
 
    def _process_inventory_lines(self, inv_lines):
725
 
        """Use up the inv_lines generator and setup a text key filter."""
726
 
        repo = self._pack_collection.repo
727
 
        fileid_revisions = repo._find_file_ids_from_xml_inventory_lines(
728
 
            inv_lines, self.revision_keys)
729
 
        text_filter = []
730
 
        for fileid, file_revids in fileid_revisions.iteritems():
731
 
            text_filter.extend([(fileid, file_revid) for file_revid in file_revids])
732
 
        self._text_filter = text_filter
733
 
 
734
 
    def _copy_inventory_texts(self):
735
 
        # select inventory keys
736
 
        inv_keys = self._revision_keys # currently the same keyspace, and note that
737
 
        # querying for keys here could introduce a bug where an inventory item
738
 
        # is missed, so do not change it to query separately without cross
739
 
        # checking like the text key check below.
740
 
        inventory_index_map, inventory_indices = self._pack_map_and_index_list(
741
 
            'inventory_index')
742
 
        inv_nodes = self._index_contents(inventory_indices, inv_keys)
743
 
        # copy inventory keys and adjust values
744
 
        # XXX: Should be a helper function to allow different inv representation
745
 
        # at this point.
746
 
        self.pb.update("Copying inventory texts", 2)
747
 
        total_items, readv_group_iter = self._least_readv_node_readv(inv_nodes)
748
 
        # Only grab the output lines if we will be processing them
749
 
        output_lines = bool(self.revision_ids)
750
 
        inv_lines = self._copy_nodes_graph(inventory_index_map,
751
 
            self.new_pack._writer, self.new_pack.inventory_index,
752
 
            readv_group_iter, total_items, output_lines=output_lines)
753
 
        if self.revision_ids:
754
 
            self._process_inventory_lines(inv_lines)
755
 
        else:
756
 
            # eat the iterator to cause it to execute.
757
 
            list(inv_lines)
758
 
            self._text_filter = None
759
 
        if 'pack' in debug.debug_flags:
760
 
            trace.mutter('%s: create_pack: inventories copied: %s%s %d items t+%6.3fs',
761
 
                time.ctime(), self._pack_collection._upload_transport.base,
762
 
                self.new_pack.random_name,
763
 
                self.new_pack.inventory_index.key_count(),
764
 
                time.time() - self.new_pack.start_time)
765
 
 
766
 
    def _update_pack_order(self, entries, index_to_pack_map):
767
 
        """Determine how we want our packs to be ordered.
768
 
 
769
 
        This changes the sort order of the self.packs list so that packs unused
770
 
        by 'entries' will be at the end of the list, so that future requests
771
 
        can avoid probing them.  Used packs will be at the front of the
772
 
        self.packs list, in the order of their first use in 'entries'.
773
 
 
774
 
        :param entries: A list of (index, ...) tuples
775
 
        :param index_to_pack_map: A mapping from index objects to pack objects.
776
 
        """
777
 
        packs = []
778
 
        seen_indexes = set()
779
 
        for entry in entries:
780
 
            index = entry[0]
781
 
            if index not in seen_indexes:
782
 
                packs.append(index_to_pack_map[index])
783
 
                seen_indexes.add(index)
784
 
        if len(packs) == len(self.packs):
785
 
            if 'pack' in debug.debug_flags:
786
 
                trace.mutter('Not changing pack list, all packs used.')
787
 
            return
788
 
        seen_packs = set(packs)
789
 
        for pack in self.packs:
790
 
            if pack not in seen_packs:
791
 
                packs.append(pack)
792
 
                seen_packs.add(pack)
793
 
        if 'pack' in debug.debug_flags:
794
 
            old_names = [p.access_tuple()[1] for p in self.packs]
795
 
            new_names = [p.access_tuple()[1] for p in packs]
796
 
            trace.mutter('Reordering packs\nfrom: %s\n  to: %s',
797
 
                   old_names, new_names)
798
 
        self.packs = packs
799
 
 
800
 
    def _copy_revision_texts(self):
801
 
        # select revisions
802
 
        if self.revision_ids:
803
 
            revision_keys = [(revision_id,) for revision_id in self.revision_ids]
804
 
        else:
805
 
            revision_keys = None
806
 
        # select revision keys
807
 
        revision_index_map, revision_indices = self._pack_map_and_index_list(
808
 
            'revision_index')
809
 
        revision_nodes = self._index_contents(revision_indices, revision_keys)
810
 
        revision_nodes = list(revision_nodes)
811
 
        self._update_pack_order(revision_nodes, revision_index_map)
812
 
        # copy revision keys and adjust values
813
 
        self.pb.update("Copying revision texts", 1)
814
 
        total_items, readv_group_iter = self._revision_node_readv(revision_nodes)
815
 
        list(self._copy_nodes_graph(revision_index_map, self.new_pack._writer,
816
 
            self.new_pack.revision_index, readv_group_iter, total_items))
817
 
        if 'pack' in debug.debug_flags:
818
 
            trace.mutter('%s: create_pack: revisions copied: %s%s %d items t+%6.3fs',
819
 
                time.ctime(), self._pack_collection._upload_transport.base,
820
 
                self.new_pack.random_name,
821
 
                self.new_pack.revision_index.key_count(),
822
 
                time.time() - self.new_pack.start_time)
823
 
        self._revision_keys = revision_keys
824
 
 
825
 
    def _get_text_nodes(self):
826
 
        text_index_map, text_indices = self._pack_map_and_index_list(
827
 
            'text_index')
828
 
        return text_index_map, self._index_contents(text_indices,
829
 
            self._text_filter)
830
 
 
831
 
    def _copy_text_texts(self):
832
 
        # select text keys
833
 
        text_index_map, text_nodes = self._get_text_nodes()
834
 
        if self._text_filter is not None:
835
 
            # We could return the keys copied as part of the return value from
836
 
            # _copy_nodes_graph but this doesn't work all that well with the
837
 
            # need to get line output too, so we check separately, and as we're
838
 
            # going to buffer everything anyway, we check beforehand, which
839
 
            # saves reading knit data over the wire when we know there are
840
 
            # mising records.
841
 
            text_nodes = set(text_nodes)
842
 
            present_text_keys = set(_node[1] for _node in text_nodes)
843
 
            missing_text_keys = set(self._text_filter) - present_text_keys
844
 
            if missing_text_keys:
845
 
                # TODO: raise a specific error that can handle many missing
846
 
                # keys.
847
 
                trace.mutter("missing keys during fetch: %r", missing_text_keys)
848
 
                a_missing_key = missing_text_keys.pop()
849
 
                raise errors.RevisionNotPresent(a_missing_key[1],
850
 
                    a_missing_key[0])
851
 
        # copy text keys and adjust values
852
 
        self.pb.update("Copying content texts", 3)
853
 
        total_items, readv_group_iter = self._least_readv_node_readv(text_nodes)
854
 
        list(self._copy_nodes_graph(text_index_map, self.new_pack._writer,
855
 
            self.new_pack.text_index, readv_group_iter, total_items))
856
 
        self._log_copied_texts()
857
 
 
858
 
    def _create_pack_from_packs(self):
859
 
        self.pb.update("Opening pack", 0, 5)
860
 
        self.new_pack = self.open_pack()
861
 
        new_pack = self.new_pack
862
 
        # buffer data - we won't be reading-back during the pack creation and
863
 
        # this makes a significant difference on sftp pushes.
864
 
        new_pack.set_write_cache_size(1024*1024)
865
 
        if 'pack' in debug.debug_flags:
866
 
            plain_pack_list = ['%s%s' % (a_pack.pack_transport.base, a_pack.name)
867
 
                for a_pack in self.packs]
868
 
            if self.revision_ids is not None:
869
 
                rev_count = len(self.revision_ids)
870
 
            else:
871
 
                rev_count = 'all'
872
 
            trace.mutter('%s: create_pack: creating pack from source packs: '
873
 
                '%s%s %s revisions wanted %s t=0',
874
 
                time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
875
 
                plain_pack_list, rev_count)
876
 
        self._copy_revision_texts()
877
 
        self._copy_inventory_texts()
878
 
        self._copy_text_texts()
879
 
        # select signature keys
880
 
        signature_filter = self._revision_keys # same keyspace
881
 
        signature_index_map, signature_indices = self._pack_map_and_index_list(
882
 
            'signature_index')
883
 
        signature_nodes = self._index_contents(signature_indices,
884
 
            signature_filter)
885
 
        # copy signature keys and adjust values
886
 
        self.pb.update("Copying signature texts", 4)
887
 
        self._copy_nodes(signature_nodes, signature_index_map, new_pack._writer,
888
 
            new_pack.signature_index)
889
 
        if 'pack' in debug.debug_flags:
890
 
            trace.mutter('%s: create_pack: revision signatures copied: %s%s %d items t+%6.3fs',
891
 
                time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
892
 
                new_pack.signature_index.key_count(),
893
 
                time.time() - new_pack.start_time)
894
 
        new_pack._check_references()
895
 
        if not self._use_pack(new_pack):
896
 
            new_pack.abort()
897
 
            return None
898
 
        self.pb.update("Finishing pack", 5)
899
 
        new_pack.finish()
900
 
        self._pack_collection.allocate(new_pack)
901
 
        return new_pack
902
 
 
903
 
    def _least_readv_node_readv(self, nodes):
904
 
        """Generate request groups for nodes using the least readv's.
905
 
 
906
 
        :param nodes: An iterable of graph index nodes.
907
 
        :return: Total node count and an iterator of the data needed to perform
908
 
            readvs to obtain the data for nodes. Each item yielded by the
909
 
            iterator is a tuple with:
910
 
            index, readv_vector, node_vector. readv_vector is a list ready to
911
 
            hand to the transport readv method, and node_vector is a list of
912
 
            (key, eol_flag, references) for the node retrieved by the
913
 
            matching readv_vector.
914
 
        """
915
 
        # group by pack so we do one readv per pack
916
 
        nodes = sorted(nodes)
917
 
        total = len(nodes)
918
 
        request_groups = {}
919
 
        for index, key, value, references in nodes:
920
 
            if index not in request_groups:
921
 
                request_groups[index] = []
922
 
            request_groups[index].append((key, value, references))
923
 
        result = []
924
 
        for index, items in request_groups.iteritems():
925
 
            pack_readv_requests = []
926
 
            for key, value, references in items:
927
 
                # ---- KnitGraphIndex.get_position
928
 
                bits = value[1:].split(' ')
929
 
                offset, length = int(bits[0]), int(bits[1])
930
 
                pack_readv_requests.append(
931
 
                    ((offset, length), (key, value[0], references)))
932
 
            # linear scan up the pack to maximum range combining.
933
 
            pack_readv_requests.sort()
934
 
            # split out the readv and the node data.
935
 
            pack_readv = [readv for readv, node in pack_readv_requests]
936
 
            node_vector = [node for readv, node in pack_readv_requests]
937
 
            result.append((index, pack_readv, node_vector))
938
 
        return total, result
939
 
 
940
 
    def _revision_node_readv(self, revision_nodes):
941
 
        """Return the total revisions and the readv's to issue.
942
 
 
943
 
        :param revision_nodes: The revision index contents for the packs being
944
 
            incorporated into the new pack.
945
 
        :return: As per _least_readv_node_readv.
946
 
        """
947
 
        return self._least_readv_node_readv(revision_nodes)
948
 
 
949
559
 
950
560
class KnitReconcilePacker(KnitPacker):
951
561
    """A packer which regenerates indices etc as it copies.
1042
652
            self.new_pack.text_index,
1043
653
            ('blank', ), 1,
1044
654
            add_nodes_callback=self.new_pack.text_index.add_nodes)
1045
 
        data_access = _DirectPackAccess(
 
655
        data_access = knit._DirectPackAccess(
1046
656
                {self.new_pack.text_index:self.new_pack.access_tuple()})
1047
657
        data_access.set_writer(self.new_pack._writer, self.new_pack.text_index,
1048
658
            self.new_pack.access_tuple())
1089
699
        return new_pack.data_inserted() and self._data_changed
1090
700
 
1091
701
 
1092
 
class OptimisingKnitPacker(KnitPacker):
1093
 
    """A packer which spends more time to create better disk layouts."""
1094
 
 
1095
 
    def _revision_node_readv(self, revision_nodes):
1096
 
        """Return the total revisions and the readv's to issue.
1097
 
 
1098
 
        This sort places revisions in topological order with the ancestors
1099
 
        after the children.
1100
 
 
1101
 
        :param revision_nodes: The revision index contents for the packs being
1102
 
            incorporated into the new pack.
1103
 
        :return: As per _least_readv_node_readv.
1104
 
        """
1105
 
        # build an ancestors dict
1106
 
        ancestors = {}
1107
 
        by_key = {}
1108
 
        for index, key, value, references in revision_nodes:
1109
 
            ancestors[key] = references[0]
1110
 
            by_key[key] = (index, value, references)
1111
 
        order = tsort.topo_sort(ancestors)
1112
 
        total = len(order)
1113
 
        # Single IO is pathological, but it will work as a starting point.
1114
 
        requests = []
1115
 
        for key in reversed(order):
1116
 
            index, value, references = by_key[key]
1117
 
            # ---- KnitGraphIndex.get_position
1118
 
            bits = value[1:].split(' ')
1119
 
            offset, length = int(bits[0]), int(bits[1])
1120
 
            requests.append(
1121
 
                (index, [(offset, length)], [(key, value[0], references)]))
1122
 
        # TODO: combine requests in the same index that are in ascending order.
1123
 
        return total, requests
1124
 
 
1125
 
    def open_pack(self):
1126
 
        """Open a pack for the pack we are creating."""
1127
 
        new_pack = super(OptimisingKnitPacker, self).open_pack()
1128
 
        # Turn on the optimization flags for all the index builders.
1129
 
        new_pack.revision_index.set_optimize(for_size=True)
1130
 
        new_pack.inventory_index.set_optimize(for_size=True)
1131
 
        new_pack.text_index.set_optimize(for_size=True)
1132
 
        new_pack.signature_index.set_optimize(for_size=True)
1133
 
        return new_pack
1134
 
 
1135
 
 
1136
 
class KnitRepositoryPackCollection(RepositoryPackCollection):
1137
 
    """A knit pack collection."""
1138
 
 
1139
 
    pack_factory = NewPack
1140
 
    resumed_pack_factory = ResumedPack
1141
 
    normal_packer_class = KnitPacker
1142
 
    optimising_packer_class = OptimisingKnitPacker
1143
 
 
1144
 
 
1145
702