2393
class KnitPackStreamSource(StreamSource):
2394
"""A StreamSource used to transfer data between same-format KnitPack repos.
2396
This source assumes:
2397
1) Same serialization format for all objects
2398
2) Same root information
2399
3) XML format inventories
2400
4) Atomic inserts (so we can stream inventory texts before text
2405
def __init__(self, from_repository, to_format):
2406
super(KnitPackStreamSource, self).__init__(from_repository, to_format)
2407
self._text_keys = None
2408
self._text_fetch_order = 'unordered'
2410
def _get_filtered_inv_stream(self, revision_ids):
2411
from_repo = self.from_repository
2412
parent_ids = from_repo._find_parent_ids_of_revisions(revision_ids)
2413
parent_keys = [(p,) for p in parent_ids]
2414
find_text_keys = from_repo._find_text_key_references_from_xml_inventory_lines
2415
parent_text_keys = set(find_text_keys(
2416
from_repo._inventory_xml_lines_for_keys(parent_keys)))
2417
content_text_keys = set()
2418
knit = KnitVersionedFiles(None, None)
2419
factory = KnitPlainFactory()
2420
def find_text_keys_from_content(record):
2421
if record.storage_kind not in ('knit-delta-gz', 'knit-ft-gz'):
2422
raise ValueError("Unknown content storage kind for"
2423
" inventory text: %s" % (record.storage_kind,))
2424
# It's a knit record, it has a _raw_record field (even if it was
2425
# reconstituted from a network stream).
2426
raw_data = record._raw_record
2427
# read the entire thing
2428
revision_id = record.key[-1]
2429
content, _ = knit._parse_record(revision_id, raw_data)
2430
if record.storage_kind == 'knit-delta-gz':
2431
line_iterator = factory.get_linedelta_content(content)
2432
elif record.storage_kind == 'knit-ft-gz':
2433
line_iterator = factory.get_fulltext_content(content)
2434
content_text_keys.update(find_text_keys(
2435
[(line, revision_id) for line in line_iterator]))
2436
revision_keys = [(r,) for r in revision_ids]
2437
def _filtered_inv_stream():
2438
source_vf = from_repo.inventories
2439
stream = source_vf.get_record_stream(revision_keys,
2441
for record in stream:
2442
if record.storage_kind == 'absent':
2443
raise errors.NoSuchRevision(from_repo, record.key)
2444
find_text_keys_from_content(record)
2446
self._text_keys = content_text_keys - parent_text_keys
2447
return ('inventories', _filtered_inv_stream())
2449
def _get_text_stream(self):
2450
# Note: We know we don't have to handle adding root keys, because both
2451
# the source and target are the identical network name.
2452
text_stream = self.from_repository.texts.get_record_stream(
2453
self._text_keys, self._text_fetch_order, False)
2454
return ('texts', text_stream)
2456
def get_stream(self, search):
2457
revision_ids = search.get_keys()
2458
for stream_info in self._fetch_revision_texts(revision_ids):
2460
self._revision_keys = [(rev_id,) for rev_id in revision_ids]
2461
yield self._get_filtered_inv_stream(revision_ids)
2462
yield self._get_text_stream()
2466
2387
class RepositoryFormatPack(MetaDirRepositoryFormat):
2467
2388
"""Format logic for pack structured repositories.