584
589
:returns: format_signature, list of (version, options, length, parents),
587
if not isinstance(required_versions, set):
588
required_versions = set(required_versions)
589
# we don't care about inclusions, the caller cares.
590
# but we need to setup a list of records to visit.
592
required_version_set = frozenset(required_versions)
594
# list of revisions that can just be sent without waiting for their
597
# map from revision to the children based on it
599
# first, read all relevant index data, enough to sort into the right
591
601
for version_id in required_versions:
592
602
if not self.has_version(version_id):
593
603
raise RevisionNotPresent(version_id, self.filename)
594
# Pick the desired versions out of the index in oldest-to-newest order
596
for version_id in self.versions():
597
if version_id in required_versions:
598
version_list.append(version_id)
600
# create the list of version information for the result
601
copy_queue_records = []
603
result_version_list = []
604
for version_id in version_list:
605
604
options = self._index.get_options(version_id)
606
605
parents = self._index.get_parents_with_ghosts(version_id)
607
606
index_memo = self._index.get_position(version_id)
607
version_index[version_id] = (index_memo, options, parents)
608
if parents and parents[0] in required_version_set:
609
# must wait until the parent has been sent
610
deferred.setdefault(parents[0], []). \
613
# either a fulltext, or a delta whose parent the client did
614
# not ask for and presumably already has
615
ready_to_send.append(version_id)
616
# build a list of results to return, plus instructions for data to
618
copy_queue_records = []
619
temp_version_list = []
621
# XXX: pushing and popping lists may be a bit inefficient
622
version_id = ready_to_send.pop(0)
623
(index_memo, options, parents) = version_index[version_id]
608
624
copy_queue_records.append((version_id, index_memo))
609
625
none, data_pos, data_size = index_memo
610
copy_set.add(version_id)
611
# version, options, length, parents
612
result_version_list.append((version_id, options, data_size,
626
temp_version_list.append((version_id, options, data_size,
615
# Read the compressed record data.
617
# From here down to the return should really be logic in the returned
618
# callable -- in a class that adapts read_records_iter_raw to read
628
if version_id in deferred:
629
# now we can send all the children of this revision - we could
630
# put them in anywhere, but we hope that sending them soon
631
# after the fulltext will give good locality in the receiver
632
ready_to_send[:0] = deferred.pop(version_id)
633
assert len(deferred) == 0, \
634
"Still have compressed child versions waiting to be sent"
635
# XXX: The stream format is such that we cannot stream it - we have to
636
# know the length of all the data a-priori.
638
result_version_list = []
621
639
for (version_id, raw_data), \
622
640
(version_id2, options, _, parents) in \
623
641
izip(self._data.read_records_iter_raw(copy_queue_records),
624
result_version_list):
625
assert version_id == version_id2, 'logic error, inconsistent results'
643
assert version_id == version_id2, \
644
'logic error, inconsistent results'
626
645
raw_datum.append(raw_data)
646
result_version_list.append(
647
(version_id, options, len(raw_data), parents))
648
# provide a callback to get data incrementally.
627
649
pseudo_file = StringIO(''.join(raw_datum))
628
650
def read(length):
629
651
if length is None:
2221
2246
except AttributeError:
2249
def _copy_texts(self, pb, msg, version_ids, ignore_missing=False):
2250
"""Copy texts to the target by extracting and adding them one by one.
2252
see join() for the parameter definitions.
2254
version_ids = self._get_source_version_ids(version_ids, ignore_missing)
2255
graph = self.source.get_graph(version_ids)
2256
order = topo_sort(graph.items())
2258
def size_of_content(content):
2259
return sum(len(line) for line in content.text())
2260
# Cache at most 10MB of parent texts
2261
parent_cache = lru_cache.LRUSizeCache(max_size=10*1024*1024,
2262
compute_size=size_of_content)
2263
# TODO: jam 20071116 It would be nice to have a streaming interface to
2264
# get multiple texts from a source. The source could be smarter
2265
# about how it handled intermediate stages.
2266
# get_line_list() or make_mpdiffs() seem like a possibility, but
2267
# at the moment they extract all full texts into memory, which
2268
# causes us to store more than our 3x fulltext goal.
2269
# Repository.iter_files_bytes() may be another possibility
2270
to_process = [version for version in order
2271
if version not in self.target]
2272
total = len(to_process)
2273
pb = ui.ui_factory.nested_progress_bar()
2275
for index, version in enumerate(to_process):
2276
pb.update('Converting versioned data', index, total)
2277
sha1, num_bytes, parent_text = self.target.add_lines(version,
2278
self.source.get_parents(version),
2279
self.source.get_lines(version),
2280
parent_texts=parent_cache)
2281
parent_cache[version] = parent_text
2224
2286
def join(self, pb=None, msg=None, version_ids=None, ignore_missing=False):
2225
2287
"""See InterVersionedFile.join."""
2226
2288
assert isinstance(self.source, KnitVersionedFile)
2233
2295
elif self.source.factory.annotated:
2234
2296
converter = self._anno_to_plain_converter
2236
# We're converting from a plain to an annotated knit. This requires
2237
# building the annotations from scratch. The generic join code
2238
# handles this implicitly so we delegate to it.
2239
return super(InterKnit, self).join(pb, msg, version_ids,
2298
# We're converting from a plain to an annotated knit. Copy them
2299
# across by full texts.
2300
return self._copy_texts(pb, msg, version_ids, ignore_missing)
2242
2302
version_ids = self._get_source_version_ids(version_ids, ignore_missing)
2243
2303
if not version_ids: