~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/repofmt/knitpack_repo.py

  • Committer: Jelmer Vernooij
  • Date: 2011-04-05 17:45:29 UTC
  • mto: (5757.8.6 knitpackrepo-7)
  • mto: This revision was merged to the branch mainline in revision 5800.
  • Revision ID: jelmer@samba.org-20110405174529-np94wmbaiteemm35
Move Packer implementation to knitpack_repo.

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
 
19
19
from bzrlib.lazy_import import lazy_import
20
20
lazy_import(globals(), """
 
21
from itertools import izip
 
22
import time
 
23
 
21
24
from bzrlib import (
22
25
    bzrdir,
 
26
    debug,
 
27
    errors,
 
28
    pack,
23
29
    revision as _mod_revision,
 
30
    trace,
 
31
    ui,
24
32
    xml5,
25
33
    xml6,
26
34
    xml7,
552
560
                                          revision_ids=revision_ids,
553
561
                                          reload_func=reload_func)
554
562
 
 
563
    def _copy_nodes(self, nodes, index_map, writer, write_index,
 
564
        output_lines=None):
 
565
        """Copy knit nodes between packs with no graph references.
 
566
 
 
567
        :param output_lines: Output full texts of copied items.
 
568
        """
 
569
        pb = ui.ui_factory.nested_progress_bar()
 
570
        try:
 
571
            return self._do_copy_nodes(nodes, index_map, writer,
 
572
                write_index, pb, output_lines=output_lines)
 
573
        finally:
 
574
            pb.finished()
 
575
 
 
576
    def _do_copy_nodes(self, nodes, index_map, writer, write_index, pb,
 
577
        output_lines=None):
 
578
        # for record verification
 
579
        knit = KnitVersionedFiles(None, None)
 
580
        # plan a readv on each source pack:
 
581
        # group by pack
 
582
        nodes = sorted(nodes)
 
583
        # how to map this into knit.py - or knit.py into this?
 
584
        # we don't want the typical knit logic, we want grouping by pack
 
585
        # at this point - perhaps a helper library for the following code
 
586
        # duplication points?
 
587
        request_groups = {}
 
588
        for index, key, value in nodes:
 
589
            if index not in request_groups:
 
590
                request_groups[index] = []
 
591
            request_groups[index].append((key, value))
 
592
        record_index = 0
 
593
        pb.update("Copied record", record_index, len(nodes))
 
594
        for index, items in request_groups.iteritems():
 
595
            pack_readv_requests = []
 
596
            for key, value in items:
 
597
                # ---- KnitGraphIndex.get_position
 
598
                bits = value[1:].split(' ')
 
599
                offset, length = int(bits[0]), int(bits[1])
 
600
                pack_readv_requests.append((offset, length, (key, value[0])))
 
601
            # linear scan up the pack
 
602
            pack_readv_requests.sort()
 
603
            # copy the data
 
604
            pack_obj = index_map[index]
 
605
            transport, path = pack_obj.access_tuple()
 
606
            try:
 
607
                reader = pack.make_readv_reader(transport, path,
 
608
                    [offset[0:2] for offset in pack_readv_requests])
 
609
            except errors.NoSuchFile:
 
610
                if self._reload_func is not None:
 
611
                    self._reload_func()
 
612
                raise
 
613
            for (names, read_func), (_1, _2, (key, eol_flag)) in \
 
614
                izip(reader.iter_records(), pack_readv_requests):
 
615
                raw_data = read_func(None)
 
616
                # check the header only
 
617
                if output_lines is not None:
 
618
                    output_lines(knit._parse_record(key[-1], raw_data)[0])
 
619
                else:
 
620
                    df, _ = knit._parse_record_header(key, raw_data)
 
621
                    df.close()
 
622
                pos, size = writer.add_bytes_record(raw_data, names)
 
623
                write_index.add_node(key, eol_flag + "%d %d" % (pos, size))
 
624
                pb.update("Copied record", record_index)
 
625
                record_index += 1
 
626
 
 
627
    def _copy_nodes_graph(self, index_map, writer, write_index,
 
628
        readv_group_iter, total_items, output_lines=False):
 
629
        """Copy knit nodes between packs.
 
630
 
 
631
        :param output_lines: Return lines present in the copied data as
 
632
            an iterator of line,version_id.
 
633
        """
 
634
        pb = ui.ui_factory.nested_progress_bar()
 
635
        try:
 
636
            for result in self._do_copy_nodes_graph(index_map, writer,
 
637
                write_index, output_lines, pb, readv_group_iter, total_items):
 
638
                yield result
 
639
        except Exception:
 
640
            # Python 2.4 does not permit try:finally: in a generator.
 
641
            pb.finished()
 
642
            raise
 
643
        else:
 
644
            pb.finished()
 
645
 
 
646
    def _do_copy_nodes_graph(self, index_map, writer, write_index,
 
647
        output_lines, pb, readv_group_iter, total_items):
 
648
        # for record verification
 
649
        knit = KnitVersionedFiles(None, None)
 
650
        # for line extraction when requested (inventories only)
 
651
        if output_lines:
 
652
            factory = KnitPlainFactory()
 
653
        record_index = 0
 
654
        pb.update("Copied record", record_index, total_items)
 
655
        for index, readv_vector, node_vector in readv_group_iter:
 
656
            # copy the data
 
657
            pack_obj = index_map[index]
 
658
            transport, path = pack_obj.access_tuple()
 
659
            try:
 
660
                reader = pack.make_readv_reader(transport, path, readv_vector)
 
661
            except errors.NoSuchFile:
 
662
                if self._reload_func is not None:
 
663
                    self._reload_func()
 
664
                raise
 
665
            for (names, read_func), (key, eol_flag, references) in \
 
666
                izip(reader.iter_records(), node_vector):
 
667
                raw_data = read_func(None)
 
668
                if output_lines:
 
669
                    # read the entire thing
 
670
                    content, _ = knit._parse_record(key[-1], raw_data)
 
671
                    if len(references[-1]) == 0:
 
672
                        line_iterator = factory.get_fulltext_content(content)
 
673
                    else:
 
674
                        line_iterator = factory.get_linedelta_content(content)
 
675
                    for line in line_iterator:
 
676
                        yield line, key
 
677
                else:
 
678
                    # check the header only
 
679
                    df, _ = knit._parse_record_header(key, raw_data)
 
680
                    df.close()
 
681
                pos, size = writer.add_bytes_record(raw_data, names)
 
682
                write_index.add_node(key, eol_flag + "%d %d" % (pos, size), references)
 
683
                pb.update("Copied record", record_index)
 
684
                record_index += 1
 
685
 
 
686
    def _process_inventory_lines(self, inv_lines):
 
687
        """Use up the inv_lines generator and setup a text key filter."""
 
688
        repo = self._pack_collection.repo
 
689
        fileid_revisions = repo._find_file_ids_from_xml_inventory_lines(
 
690
            inv_lines, self.revision_keys)
 
691
        text_filter = []
 
692
        for fileid, file_revids in fileid_revisions.iteritems():
 
693
            text_filter.extend([(fileid, file_revid) for file_revid in file_revids])
 
694
        self._text_filter = text_filter
 
695
 
 
696
    def _copy_inventory_texts(self):
 
697
        # select inventory keys
 
698
        inv_keys = self._revision_keys # currently the same keyspace, and note that
 
699
        # querying for keys here could introduce a bug where an inventory item
 
700
        # is missed, so do not change it to query separately without cross
 
701
        # checking like the text key check below.
 
702
        inventory_index_map, inventory_indices = self._pack_map_and_index_list(
 
703
            'inventory_index')
 
704
        inv_nodes = self._index_contents(inventory_indices, inv_keys)
 
705
        # copy inventory keys and adjust values
 
706
        # XXX: Should be a helper function to allow different inv representation
 
707
        # at this point.
 
708
        self.pb.update("Copying inventory texts", 2)
 
709
        total_items, readv_group_iter = self._least_readv_node_readv(inv_nodes)
 
710
        # Only grab the output lines if we will be processing them
 
711
        output_lines = bool(self.revision_ids)
 
712
        inv_lines = self._copy_nodes_graph(inventory_index_map,
 
713
            self.new_pack._writer, self.new_pack.inventory_index,
 
714
            readv_group_iter, total_items, output_lines=output_lines)
 
715
        if self.revision_ids:
 
716
            self._process_inventory_lines(inv_lines)
 
717
        else:
 
718
            # eat the iterator to cause it to execute.
 
719
            list(inv_lines)
 
720
            self._text_filter = None
 
721
        if 'pack' in debug.debug_flags:
 
722
            trace.mutter('%s: create_pack: inventories copied: %s%s %d items t+%6.3fs',
 
723
                time.ctime(), self._pack_collection._upload_transport.base,
 
724
                self.new_pack.random_name,
 
725
                self.new_pack.inventory_index.key_count(),
 
726
                time.time() - self.new_pack.start_time)
 
727
 
 
728
    def _copy_revision_texts(self):
 
729
        # select revisions
 
730
        if self.revision_ids:
 
731
            revision_keys = [(revision_id,) for revision_id in self.revision_ids]
 
732
        else:
 
733
            revision_keys = None
 
734
        # select revision keys
 
735
        revision_index_map, revision_indices = self._pack_map_and_index_list(
 
736
            'revision_index')
 
737
        revision_nodes = self._index_contents(revision_indices, revision_keys)
 
738
        revision_nodes = list(revision_nodes)
 
739
        self._update_pack_order(revision_nodes, revision_index_map)
 
740
        # copy revision keys and adjust values
 
741
        self.pb.update("Copying revision texts", 1)
 
742
        total_items, readv_group_iter = self._revision_node_readv(revision_nodes)
 
743
        list(self._copy_nodes_graph(revision_index_map, self.new_pack._writer,
 
744
            self.new_pack.revision_index, readv_group_iter, total_items))
 
745
        if 'pack' in debug.debug_flags:
 
746
            trace.mutter('%s: create_pack: revisions copied: %s%s %d items t+%6.3fs',
 
747
                time.ctime(), self._pack_collection._upload_transport.base,
 
748
                self.new_pack.random_name,
 
749
                self.new_pack.revision_index.key_count(),
 
750
                time.time() - self.new_pack.start_time)
 
751
        self._revision_keys = revision_keys
 
752
 
 
753
    def _get_text_nodes(self):
 
754
        text_index_map, text_indices = self._pack_map_and_index_list(
 
755
            'text_index')
 
756
        return text_index_map, self._index_contents(text_indices,
 
757
            self._text_filter)
 
758
 
 
759
    def _copy_text_texts(self):
 
760
        # select text keys
 
761
        text_index_map, text_nodes = self._get_text_nodes()
 
762
        if self._text_filter is not None:
 
763
            # We could return the keys copied as part of the return value from
 
764
            # _copy_nodes_graph but this doesn't work all that well with the
 
765
            # need to get line output too, so we check separately, and as we're
 
766
            # going to buffer everything anyway, we check beforehand, which
 
767
            # saves reading knit data over the wire when we know there are
 
768
            # mising records.
 
769
            text_nodes = set(text_nodes)
 
770
            present_text_keys = set(_node[1] for _node in text_nodes)
 
771
            missing_text_keys = set(self._text_filter) - present_text_keys
 
772
            if missing_text_keys:
 
773
                # TODO: raise a specific error that can handle many missing
 
774
                # keys.
 
775
                trace.mutter("missing keys during fetch: %r", missing_text_keys)
 
776
                a_missing_key = missing_text_keys.pop()
 
777
                raise errors.RevisionNotPresent(a_missing_key[1],
 
778
                    a_missing_key[0])
 
779
        # copy text keys and adjust values
 
780
        self.pb.update("Copying content texts", 3)
 
781
        total_items, readv_group_iter = self._least_readv_node_readv(text_nodes)
 
782
        list(self._copy_nodes_graph(text_index_map, self.new_pack._writer,
 
783
            self.new_pack.text_index, readv_group_iter, total_items))
 
784
        self._log_copied_texts()
 
785
 
 
786
    def _copy_chks(self, refs=None):
 
787
        # XXX: Todo, recursive follow-pointers facility when fetching some
 
788
        # revisions only.
 
789
        chk_index_map, chk_indices = self._pack_map_and_index_list(
 
790
            'chk_index')
 
791
        chk_nodes = self._index_contents(chk_indices, refs)
 
792
        new_refs = set()
 
793
        # TODO: This isn't strictly tasteful as we are accessing some private
 
794
        #       variables (_serializer). Perhaps a better way would be to have
 
795
        #       Repository._deserialise_chk_node()
 
796
        search_key_func = chk_map.search_key_registry.get(
 
797
            self._pack_collection.repo._serializer.search_key_name)
 
798
        def accumlate_refs(lines):
 
799
            # XXX: move to a generic location
 
800
            # Yay mismatch:
 
801
            bytes = ''.join(lines)
 
802
            node = chk_map._deserialise(bytes, ("unknown",), search_key_func)
 
803
            new_refs.update(node.refs())
 
804
        self._copy_nodes(chk_nodes, chk_index_map, self.new_pack._writer,
 
805
            self.new_pack.chk_index, output_lines=accumlate_refs)
 
806
        return new_refs
 
807
 
 
808
    def _create_pack_from_packs(self):
 
809
        self.pb.update("Opening pack", 0, 5)
 
810
        self.new_pack = self.open_pack()
 
811
        new_pack = self.new_pack
 
812
        # buffer data - we won't be reading-back during the pack creation and
 
813
        # this makes a significant difference on sftp pushes.
 
814
        new_pack.set_write_cache_size(1024*1024)
 
815
        if 'pack' in debug.debug_flags:
 
816
            plain_pack_list = ['%s%s' % (a_pack.pack_transport.base, a_pack.name)
 
817
                for a_pack in self.packs]
 
818
            if self.revision_ids is not None:
 
819
                rev_count = len(self.revision_ids)
 
820
            else:
 
821
                rev_count = 'all'
 
822
            trace.mutter('%s: create_pack: creating pack from source packs: '
 
823
                '%s%s %s revisions wanted %s t=0',
 
824
                time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
 
825
                plain_pack_list, rev_count)
 
826
        self._copy_revision_texts()
 
827
        self._copy_inventory_texts()
 
828
        self._copy_text_texts()
 
829
        # select signature keys
 
830
        signature_filter = self._revision_keys # same keyspace
 
831
        signature_index_map, signature_indices = self._pack_map_and_index_list(
 
832
            'signature_index')
 
833
        signature_nodes = self._index_contents(signature_indices,
 
834
            signature_filter)
 
835
        # copy signature keys and adjust values
 
836
        self.pb.update("Copying signature texts", 4)
 
837
        self._copy_nodes(signature_nodes, signature_index_map, new_pack._writer,
 
838
            new_pack.signature_index)
 
839
        if 'pack' in debug.debug_flags:
 
840
            trace.mutter('%s: create_pack: revision signatures copied: %s%s %d items t+%6.3fs',
 
841
                time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
 
842
                new_pack.signature_index.key_count(),
 
843
                time.time() - new_pack.start_time)
 
844
        # copy chk contents
 
845
        # NB XXX: how to check CHK references are present? perhaps by yielding
 
846
        # the items? How should that interact with stacked repos?
 
847
        if new_pack.chk_index is not None:
 
848
            self._copy_chks()
 
849
            if 'pack' in debug.debug_flags:
 
850
                trace.mutter('%s: create_pack: chk content copied: %s%s %d items t+%6.3fs',
 
851
                    time.ctime(), self._pack_collection._upload_transport.base,
 
852
                    new_pack.random_name,
 
853
                    new_pack.chk_index.key_count(),
 
854
                    time.time() - new_pack.start_time)
 
855
        new_pack._check_references()
 
856
        if not self._use_pack(new_pack):
 
857
            new_pack.abort()
 
858
            return None
 
859
        self.pb.update("Finishing pack", 5)
 
860
        new_pack.finish()
 
861
        self._pack_collection.allocate(new_pack)
 
862
        return new_pack
 
863
 
555
864
 
556
865
class KnitReconcilePacker(KnitPacker):
557
866
    """A packer which regenerates indices etc as it copies.