552
560
revision_ids=revision_ids,
553
561
reload_func=reload_func)
563
def _copy_nodes(self, nodes, index_map, writer, write_index,
565
"""Copy knit nodes between packs with no graph references.
567
:param output_lines: Output full texts of copied items.
569
pb = ui.ui_factory.nested_progress_bar()
571
return self._do_copy_nodes(nodes, index_map, writer,
572
write_index, pb, output_lines=output_lines)
576
def _do_copy_nodes(self, nodes, index_map, writer, write_index, pb,
578
# for record verification
579
knit = KnitVersionedFiles(None, None)
580
# plan a readv on each source pack:
582
nodes = sorted(nodes)
583
# how to map this into knit.py - or knit.py into this?
584
# we don't want the typical knit logic, we want grouping by pack
585
# at this point - perhaps a helper library for the following code
586
# duplication points?
588
for index, key, value in nodes:
589
if index not in request_groups:
590
request_groups[index] = []
591
request_groups[index].append((key, value))
593
pb.update("Copied record", record_index, len(nodes))
594
for index, items in request_groups.iteritems():
595
pack_readv_requests = []
596
for key, value in items:
597
# ---- KnitGraphIndex.get_position
598
bits = value[1:].split(' ')
599
offset, length = int(bits[0]), int(bits[1])
600
pack_readv_requests.append((offset, length, (key, value[0])))
601
# linear scan up the pack
602
pack_readv_requests.sort()
604
pack_obj = index_map[index]
605
transport, path = pack_obj.access_tuple()
607
reader = pack.make_readv_reader(transport, path,
608
[offset[0:2] for offset in pack_readv_requests])
609
except errors.NoSuchFile:
610
if self._reload_func is not None:
613
for (names, read_func), (_1, _2, (key, eol_flag)) in \
614
izip(reader.iter_records(), pack_readv_requests):
615
raw_data = read_func(None)
616
# check the header only
617
if output_lines is not None:
618
output_lines(knit._parse_record(key[-1], raw_data)[0])
620
df, _ = knit._parse_record_header(key, raw_data)
622
pos, size = writer.add_bytes_record(raw_data, names)
623
write_index.add_node(key, eol_flag + "%d %d" % (pos, size))
624
pb.update("Copied record", record_index)
627
def _copy_nodes_graph(self, index_map, writer, write_index,
628
readv_group_iter, total_items, output_lines=False):
629
"""Copy knit nodes between packs.
631
:param output_lines: Return lines present in the copied data as
632
an iterator of line,version_id.
634
pb = ui.ui_factory.nested_progress_bar()
636
for result in self._do_copy_nodes_graph(index_map, writer,
637
write_index, output_lines, pb, readv_group_iter, total_items):
640
# Python 2.4 does not permit try:finally: in a generator.
646
def _do_copy_nodes_graph(self, index_map, writer, write_index,
647
output_lines, pb, readv_group_iter, total_items):
648
# for record verification
649
knit = KnitVersionedFiles(None, None)
650
# for line extraction when requested (inventories only)
652
factory = KnitPlainFactory()
654
pb.update("Copied record", record_index, total_items)
655
for index, readv_vector, node_vector in readv_group_iter:
657
pack_obj = index_map[index]
658
transport, path = pack_obj.access_tuple()
660
reader = pack.make_readv_reader(transport, path, readv_vector)
661
except errors.NoSuchFile:
662
if self._reload_func is not None:
665
for (names, read_func), (key, eol_flag, references) in \
666
izip(reader.iter_records(), node_vector):
667
raw_data = read_func(None)
669
# read the entire thing
670
content, _ = knit._parse_record(key[-1], raw_data)
671
if len(references[-1]) == 0:
672
line_iterator = factory.get_fulltext_content(content)
674
line_iterator = factory.get_linedelta_content(content)
675
for line in line_iterator:
678
# check the header only
679
df, _ = knit._parse_record_header(key, raw_data)
681
pos, size = writer.add_bytes_record(raw_data, names)
682
write_index.add_node(key, eol_flag + "%d %d" % (pos, size), references)
683
pb.update("Copied record", record_index)
686
def _process_inventory_lines(self, inv_lines):
687
"""Use up the inv_lines generator and setup a text key filter."""
688
repo = self._pack_collection.repo
689
fileid_revisions = repo._find_file_ids_from_xml_inventory_lines(
690
inv_lines, self.revision_keys)
692
for fileid, file_revids in fileid_revisions.iteritems():
693
text_filter.extend([(fileid, file_revid) for file_revid in file_revids])
694
self._text_filter = text_filter
696
def _copy_inventory_texts(self):
697
# select inventory keys
698
inv_keys = self._revision_keys # currently the same keyspace, and note that
699
# querying for keys here could introduce a bug where an inventory item
700
# is missed, so do not change it to query separately without cross
701
# checking like the text key check below.
702
inventory_index_map, inventory_indices = self._pack_map_and_index_list(
704
inv_nodes = self._index_contents(inventory_indices, inv_keys)
705
# copy inventory keys and adjust values
706
# XXX: Should be a helper function to allow different inv representation
708
self.pb.update("Copying inventory texts", 2)
709
total_items, readv_group_iter = self._least_readv_node_readv(inv_nodes)
710
# Only grab the output lines if we will be processing them
711
output_lines = bool(self.revision_ids)
712
inv_lines = self._copy_nodes_graph(inventory_index_map,
713
self.new_pack._writer, self.new_pack.inventory_index,
714
readv_group_iter, total_items, output_lines=output_lines)
715
if self.revision_ids:
716
self._process_inventory_lines(inv_lines)
718
# eat the iterator to cause it to execute.
720
self._text_filter = None
721
if 'pack' in debug.debug_flags:
722
trace.mutter('%s: create_pack: inventories copied: %s%s %d items t+%6.3fs',
723
time.ctime(), self._pack_collection._upload_transport.base,
724
self.new_pack.random_name,
725
self.new_pack.inventory_index.key_count(),
726
time.time() - self.new_pack.start_time)
728
def _copy_revision_texts(self):
730
if self.revision_ids:
731
revision_keys = [(revision_id,) for revision_id in self.revision_ids]
734
# select revision keys
735
revision_index_map, revision_indices = self._pack_map_and_index_list(
737
revision_nodes = self._index_contents(revision_indices, revision_keys)
738
revision_nodes = list(revision_nodes)
739
self._update_pack_order(revision_nodes, revision_index_map)
740
# copy revision keys and adjust values
741
self.pb.update("Copying revision texts", 1)
742
total_items, readv_group_iter = self._revision_node_readv(revision_nodes)
743
list(self._copy_nodes_graph(revision_index_map, self.new_pack._writer,
744
self.new_pack.revision_index, readv_group_iter, total_items))
745
if 'pack' in debug.debug_flags:
746
trace.mutter('%s: create_pack: revisions copied: %s%s %d items t+%6.3fs',
747
time.ctime(), self._pack_collection._upload_transport.base,
748
self.new_pack.random_name,
749
self.new_pack.revision_index.key_count(),
750
time.time() - self.new_pack.start_time)
751
self._revision_keys = revision_keys
753
def _get_text_nodes(self):
754
text_index_map, text_indices = self._pack_map_and_index_list(
756
return text_index_map, self._index_contents(text_indices,
759
def _copy_text_texts(self):
761
text_index_map, text_nodes = self._get_text_nodes()
762
if self._text_filter is not None:
763
# We could return the keys copied as part of the return value from
764
# _copy_nodes_graph but this doesn't work all that well with the
765
# need to get line output too, so we check separately, and as we're
766
# going to buffer everything anyway, we check beforehand, which
767
# saves reading knit data over the wire when we know there are
769
text_nodes = set(text_nodes)
770
present_text_keys = set(_node[1] for _node in text_nodes)
771
missing_text_keys = set(self._text_filter) - present_text_keys
772
if missing_text_keys:
773
# TODO: raise a specific error that can handle many missing
775
trace.mutter("missing keys during fetch: %r", missing_text_keys)
776
a_missing_key = missing_text_keys.pop()
777
raise errors.RevisionNotPresent(a_missing_key[1],
779
# copy text keys and adjust values
780
self.pb.update("Copying content texts", 3)
781
total_items, readv_group_iter = self._least_readv_node_readv(text_nodes)
782
list(self._copy_nodes_graph(text_index_map, self.new_pack._writer,
783
self.new_pack.text_index, readv_group_iter, total_items))
784
self._log_copied_texts()
786
def _copy_chks(self, refs=None):
787
# XXX: Todo, recursive follow-pointers facility when fetching some
789
chk_index_map, chk_indices = self._pack_map_and_index_list(
791
chk_nodes = self._index_contents(chk_indices, refs)
793
# TODO: This isn't strictly tasteful as we are accessing some private
794
# variables (_serializer). Perhaps a better way would be to have
795
# Repository._deserialise_chk_node()
796
search_key_func = chk_map.search_key_registry.get(
797
self._pack_collection.repo._serializer.search_key_name)
798
def accumlate_refs(lines):
799
# XXX: move to a generic location
801
bytes = ''.join(lines)
802
node = chk_map._deserialise(bytes, ("unknown",), search_key_func)
803
new_refs.update(node.refs())
804
self._copy_nodes(chk_nodes, chk_index_map, self.new_pack._writer,
805
self.new_pack.chk_index, output_lines=accumlate_refs)
808
def _create_pack_from_packs(self):
809
self.pb.update("Opening pack", 0, 5)
810
self.new_pack = self.open_pack()
811
new_pack = self.new_pack
812
# buffer data - we won't be reading-back during the pack creation and
813
# this makes a significant difference on sftp pushes.
814
new_pack.set_write_cache_size(1024*1024)
815
if 'pack' in debug.debug_flags:
816
plain_pack_list = ['%s%s' % (a_pack.pack_transport.base, a_pack.name)
817
for a_pack in self.packs]
818
if self.revision_ids is not None:
819
rev_count = len(self.revision_ids)
822
trace.mutter('%s: create_pack: creating pack from source packs: '
823
'%s%s %s revisions wanted %s t=0',
824
time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
825
plain_pack_list, rev_count)
826
self._copy_revision_texts()
827
self._copy_inventory_texts()
828
self._copy_text_texts()
829
# select signature keys
830
signature_filter = self._revision_keys # same keyspace
831
signature_index_map, signature_indices = self._pack_map_and_index_list(
833
signature_nodes = self._index_contents(signature_indices,
835
# copy signature keys and adjust values
836
self.pb.update("Copying signature texts", 4)
837
self._copy_nodes(signature_nodes, signature_index_map, new_pack._writer,
838
new_pack.signature_index)
839
if 'pack' in debug.debug_flags:
840
trace.mutter('%s: create_pack: revision signatures copied: %s%s %d items t+%6.3fs',
841
time.ctime(), self._pack_collection._upload_transport.base, new_pack.random_name,
842
new_pack.signature_index.key_count(),
843
time.time() - new_pack.start_time)
845
# NB XXX: how to check CHK references are present? perhaps by yielding
846
# the items? How should that interact with stacked repos?
847
if new_pack.chk_index is not None:
849
if 'pack' in debug.debug_flags:
850
trace.mutter('%s: create_pack: chk content copied: %s%s %d items t+%6.3fs',
851
time.ctime(), self._pack_collection._upload_transport.base,
852
new_pack.random_name,
853
new_pack.chk_index.key_count(),
854
time.time() - new_pack.start_time)
855
new_pack._check_references()
856
if not self._use_pack(new_pack):
859
self.pb.update("Finishing pack", 5)
861
self._pack_collection.allocate(new_pack)
556
865
class KnitReconcilePacker(KnitPacker):
557
866
"""A packer which regenerates indices etc as it copies.