2350
2360
:param revision_ids: The expected revision ids of the inventories.
2361
:param ordering: optional ordering, e.g. 'topological'. If not
2362
specified, the order of revision_ids will be preserved (by
2363
buffering if necessary).
2351
2364
:return: An iterator of inventories.
2353
2366
if ((None in revision_ids)
2354
2367
or (_mod_revision.NULL_REVISION in revision_ids)):
2355
2368
raise ValueError('cannot get null revision inventory')
2356
return self._iter_inventories(revision_ids)
2369
return self._iter_inventories(revision_ids, ordering)
2358
def _iter_inventories(self, revision_ids):
2371
def _iter_inventories(self, revision_ids, ordering):
2359
2372
"""single-document based inventory iteration."""
2360
for text, revision_id in self._iter_inventory_xmls(revision_ids):
2373
inv_xmls = self._iter_inventory_xmls(revision_ids, ordering)
2374
for text, revision_id in inv_xmls:
2361
2375
yield self.deserialise_inventory(revision_id, text)
2363
def _iter_inventory_xmls(self, revision_ids):
2377
def _iter_inventory_xmls(self, revision_ids, ordering):
2378
if ordering is None:
2379
order_as_requested = True
2380
ordering = 'unordered'
2382
order_as_requested = False
2364
2383
keys = [(revision_id,) for revision_id in revision_ids]
2365
stream = self.inventories.get_record_stream(keys, 'unordered', True)
2386
if order_as_requested:
2387
key_iter = iter(keys)
2388
next_key = key_iter.next()
2389
stream = self.inventories.get_record_stream(keys, ordering, True)
2366
2390
text_chunks = {}
2367
2391
for record in stream:
2368
2392
if record.storage_kind != 'absent':
2369
text_chunks[record.key] = record.get_bytes_as('chunked')
2393
chunks = record.get_bytes_as('chunked')
2394
if order_as_requested:
2395
text_chunks[record.key] = chunks
2397
yield ''.join(chunks), record.key[-1]
2371
2399
raise errors.NoSuchRevision(self, record.key)
2373
chunks = text_chunks.pop(key)
2374
yield ''.join(chunks), key[-1]
2400
if order_as_requested:
2401
# Yield as many results as we can while preserving order.
2402
while next_key in text_chunks:
2403
chunks = text_chunks.pop(next_key)
2404
yield ''.join(chunks), next_key[-1]
2406
next_key = key_iter.next()
2407
except StopIteration:
2408
# We still want to fully consume the get_record_stream,
2409
# just in case it is not actually finished at this point
2376
2413
def deserialise_inventory(self, revision_id, xml):
2377
2414
"""Transform the xml into an inventory object.
3691
3742
return deltas[0][1:]
3693
def _get_parent_keys(self, root_key, parent_map):
3694
"""Get the parent keys for a given root id."""
3695
root_id, rev_id = root_key
3696
# Include direct parents of the revision, but only if they used
3697
# the same root_id and are heads.
3699
for parent_id in parent_map[rev_id]:
3700
if parent_id == _mod_revision.NULL_REVISION:
3702
if parent_id not in self._revision_id_to_root_id:
3703
# We probably didn't read this revision, go spend the
3704
# extra effort to actually check
3706
tree = self.source.revision_tree(parent_id)
3707
except errors.NoSuchRevision:
3708
# Ghost, fill out _revision_id_to_root_id in case we
3709
# encounter this again.
3710
# But set parent_root_id to None since we don't really know
3711
parent_root_id = None
3713
parent_root_id = tree.get_root_id()
3714
self._revision_id_to_root_id[parent_id] = None
3716
parent_root_id = self._revision_id_to_root_id[parent_id]
3717
if root_id == parent_root_id:
3718
# With stacking we _might_ want to refer to a non-local
3719
# revision, but this code path only applies when we have the
3720
# full content available, so ghosts really are ghosts, not just
3721
# the edge of local data.
3722
parent_keys.append((parent_id,))
3724
# root_id may be in the parent anyway.
3726
tree = self.source.revision_tree(parent_id)
3727
except errors.NoSuchRevision:
3728
# ghost, can't refer to it.
3732
parent_keys.append((tree.inventory[root_id].revision,))
3733
except errors.NoSuchId:
3736
g = graph.Graph(self.source.revisions)
3737
heads = g.heads(parent_keys)
3739
for key in parent_keys:
3740
if key in heads and key not in selected_keys:
3741
selected_keys.append(key)
3742
return tuple([(root_id,)+ key for key in selected_keys])
3744
def _new_root_data_stream(self, root_keys_to_create, parent_map):
3745
for root_key in root_keys_to_create:
3746
parent_keys = self._get_parent_keys(root_key, parent_map)
3747
yield versionedfile.FulltextContentFactory(root_key,
3748
parent_keys, None, '')
3750
3744
def _fetch_batch(self, revision_ids, basis_id, cache):
3751
3745
"""Fetch across a few revisions.
4237
4234
self.target_repo.pack(hint=hint)
4238
4235
return [], set()
4240
def _extract_and_insert_inventories(self, substream, serializer):
4237
def _extract_and_insert_inventory_deltas(self, substream, serializer):
4238
target_rich_root = self.target_repo._format.rich_root_data
4239
target_tree_refs = self.target_repo._format.supports_tree_reference
4240
for record in substream:
4241
# Insert the delta directly
4242
inventory_delta_bytes = record.get_bytes_as('fulltext')
4243
deserialiser = inventory_delta.InventoryDeltaDeserializer()
4245
parse_result = deserialiser.parse_text_bytes(
4246
inventory_delta_bytes)
4247
except inventory_delta.IncompatibleInventoryDelta, err:
4248
trace.mutter("Incompatible delta: %s", err.msg)
4249
raise errors.IncompatibleRevision(self.target_repo._format)
4250
basis_id, new_id, rich_root, tree_refs, inv_delta = parse_result
4251
revision_id = new_id
4252
parents = [key[0] for key in record.parents]
4253
self.target_repo.add_inventory_by_delta(
4254
basis_id, inv_delta, revision_id, parents)
4256
def _extract_and_insert_inventories(self, substream, serializer,
4241
4258
"""Generate a new inventory versionedfile in target, converting data.
4243
4260
The inventory is retrieved from the source, (deserializing it), and
4244
4261
stored in the target (reserializing it in a different format).
4263
target_rich_root = self.target_repo._format.rich_root_data
4264
target_tree_refs = self.target_repo._format.supports_tree_reference
4246
4265
for record in substream:
4266
# It's not a delta, so it must be a fulltext in the source
4267
# serializer's format.
4247
4268
bytes = record.get_bytes_as('fulltext')
4248
4269
revision_id = record.key[0]
4249
4270
inv = serializer.read_inventory_from_string(bytes, revision_id)
4250
4271
parents = [key[0] for key in record.parents]
4251
4272
self.target_repo.add_inventory(revision_id, inv, parents)
4273
# No need to keep holding this full inv in memory when the rest of
4274
# the substream is likely to be all deltas.
4253
4277
def _extract_and_insert_revisions(self, substream, serializer):
4254
4278
for record in substream:
4412
4442
return (not self.from_repository._format.rich_root_data and
4413
4443
self.to_format.rich_root_data)
4415
def _get_inventory_stream(self, revision_ids):
4445
def _get_inventory_stream(self, revision_ids, missing=False):
4416
4446
from_format = self.from_repository._format
4417
if (from_format.supports_chks and self.to_format.supports_chks
4418
and (from_format._serializer == self.to_format._serializer)):
4419
# Both sides support chks, and they use the same serializer, so it
4420
# is safe to transmit the chk pages and inventory pages across
4422
return self._get_chk_inventory_stream(revision_ids)
4423
elif (not from_format.supports_chks):
4424
# Source repository doesn't support chks. So we can transmit the
4425
# inventories 'as-is' and either they are just accepted on the
4426
# target, or the Sink will properly convert it.
4427
return self._get_simple_inventory_stream(revision_ids)
4447
if (from_format.supports_chks and self.to_format.supports_chks and
4448
from_format.network_name() == self.to_format.network_name()):
4449
raise AssertionError(
4450
"this case should be handled by GroupCHKStreamSource")
4451
elif 'forceinvdeltas' in debug.debug_flags:
4452
return self._get_convertable_inventory_stream(revision_ids,
4453
delta_versus_null=missing)
4454
elif from_format.network_name() == self.to_format.network_name():
4456
return self._get_simple_inventory_stream(revision_ids,
4458
elif (not from_format.supports_chks and not self.to_format.supports_chks
4459
and from_format._serializer == self.to_format._serializer):
4460
# Essentially the same format.
4461
return self._get_simple_inventory_stream(revision_ids,
4429
# XXX: Hack to make not-chk->chk fetch: copy the inventories as
4430
# inventories. Note that this should probably be done somehow
4431
# as part of bzrlib.repository.StreamSink. Except JAM couldn't
4432
# figure out how a non-chk repository could possibly handle
4433
# deserializing an inventory stream from a chk repo, as it
4434
# doesn't have a way to understand individual pages.
4435
return self._get_convertable_inventory_stream(revision_ids)
4464
# Any time we switch serializations, we want to use an
4465
# inventory-delta based approach.
4466
return self._get_convertable_inventory_stream(revision_ids,
4467
delta_versus_null=missing)
4437
def _get_simple_inventory_stream(self, revision_ids):
4469
def _get_simple_inventory_stream(self, revision_ids, missing=False):
4470
# NB: This currently reopens the inventory weave in source;
4471
# using a single stream interface instead would avoid this.
4438
4472
from_weave = self.from_repository.inventories
4474
delta_closure = True
4476
delta_closure = not self.delta_on_metadata()
4439
4477
yield ('inventories', from_weave.get_record_stream(
4440
4478
[(rev_id,) for rev_id in revision_ids],
4441
self.inventory_fetch_order(),
4442
not self.delta_on_metadata()))
4444
def _get_chk_inventory_stream(self, revision_ids):
4445
"""Fetch the inventory texts, along with the associated chk maps."""
4446
# We want an inventory outside of the search set, so that we can filter
4447
# out uninteresting chk pages. For now we use
4448
# _find_revision_outside_set, but if we had a Search with cut_revs, we
4449
# could use that instead.
4450
start_rev_id = self.from_repository._find_revision_outside_set(
4452
start_rev_key = (start_rev_id,)
4453
inv_keys_to_fetch = [(rev_id,) for rev_id in revision_ids]
4454
if start_rev_id != _mod_revision.NULL_REVISION:
4455
inv_keys_to_fetch.append((start_rev_id,))
4456
# Any repo that supports chk_bytes must also support out-of-order
4457
# insertion. At least, that is how we expect it to work
4458
# We use get_record_stream instead of iter_inventories because we want
4459
# to be able to insert the stream as well. We could instead fetch
4460
# allowing deltas, and then iter_inventories, but we don't know whether
4461
# source or target is more 'local' anway.
4462
inv_stream = self.from_repository.inventories.get_record_stream(
4463
inv_keys_to_fetch, 'unordered',
4464
True) # We need them as full-texts so we can find their references
4465
uninteresting_chk_roots = set()
4466
interesting_chk_roots = set()
4467
def filter_inv_stream(inv_stream):
4468
for idx, record in enumerate(inv_stream):
4469
### child_pb.update('fetch inv', idx, len(inv_keys_to_fetch))
4470
bytes = record.get_bytes_as('fulltext')
4471
chk_inv = inventory.CHKInventory.deserialise(
4472
self.from_repository.chk_bytes, bytes, record.key)
4473
if record.key == start_rev_key:
4474
uninteresting_chk_roots.add(chk_inv.id_to_entry.key())
4475
p_id_map = chk_inv.parent_id_basename_to_file_id
4476
if p_id_map is not None:
4477
uninteresting_chk_roots.add(p_id_map.key())
4480
interesting_chk_roots.add(chk_inv.id_to_entry.key())
4481
p_id_map = chk_inv.parent_id_basename_to_file_id
4482
if p_id_map is not None:
4483
interesting_chk_roots.add(p_id_map.key())
4484
### pb.update('fetch inventory', 0, 2)
4485
yield ('inventories', filter_inv_stream(inv_stream))
4486
# Now that we have worked out all of the interesting root nodes, grab
4487
# all of the interesting pages and insert them
4488
### pb.update('fetch inventory', 1, 2)
4489
interesting = chk_map.iter_interesting_nodes(
4490
self.from_repository.chk_bytes, interesting_chk_roots,
4491
uninteresting_chk_roots)
4492
def to_stream_adapter():
4493
"""Adapt the iter_interesting_nodes result to a single stream.
4495
iter_interesting_nodes returns records as it processes them, along
4496
with keys. However, we only want to return the records themselves.
4498
for record, items in interesting:
4499
if record is not None:
4501
# XXX: We could instead call get_record_stream(records.keys())
4502
# ATM, this will always insert the records as fulltexts, and
4503
# requires that you can hang on to records once you have gone
4504
# on to the next one. Further, it causes the target to
4505
# recompress the data. Testing shows it to be faster than
4506
# requesting the records again, though.
4507
yield ('chk_bytes', to_stream_adapter())
4508
### pb.update('fetch inventory', 2, 2)
4510
def _get_convertable_inventory_stream(self, revision_ids):
4511
# XXX: One of source or target is using chks, and they don't have
4512
# compatible serializations. The StreamSink code expects to be
4513
# able to convert on the target, so we need to put
4514
# bytes-on-the-wire that can be converted
4515
yield ('inventories', self._stream_invs_as_fulltexts(revision_ids))
4517
def _stream_invs_as_fulltexts(self, revision_ids):
4479
self.inventory_fetch_order(), delta_closure))
4481
def _get_convertable_inventory_stream(self, revision_ids,
4482
delta_versus_null=False):
4483
# The source is using CHKs, but the target either doesn't or it has a
4484
# different serializer. The StreamSink code expects to be able to
4485
# convert on the target, so we need to put bytes-on-the-wire that can
4486
# be converted. That means inventory deltas (if the remote is <1.19,
4487
# RemoteStreamSink will fallback to VFS to insert the deltas).
4488
yield ('inventory-deltas',
4489
self._stream_invs_as_deltas(revision_ids,
4490
delta_versus_null=delta_versus_null))
4492
def _stream_invs_as_deltas(self, revision_ids, delta_versus_null=False):
4493
"""Return a stream of inventory-deltas for the given rev ids.
4495
:param revision_ids: The list of inventories to transmit
4496
:param delta_versus_null: Don't try to find a minimal delta for this
4497
entry, instead compute the delta versus the NULL_REVISION. This
4498
effectively streams a complete inventory. Used for stuff like
4499
filling in missing parents, etc.
4518
4501
from_repo = self.from_repository
4519
from_serializer = from_repo._format._serializer
4520
4502
revision_keys = [(rev_id,) for rev_id in revision_ids]
4521
4503
parent_map = from_repo.inventories.get_parent_map(revision_keys)
4522
for inv in self.from_repository.iter_inventories(revision_ids):
4523
# XXX: This is a bit hackish, but it works. Basically,
4524
# CHKSerializer 'accidentally' supports
4525
# read/write_inventory_to_string, even though that is never
4526
# the format that is stored on disk. It *does* give us a
4527
# single string representation for an inventory, so live with
4529
# This would be far better if we had a 'serialized inventory
4530
# delta' form. Then we could use 'inventory._make_delta', and
4531
# transmit that. This would both be faster to generate, and
4532
# result in fewer bytes-on-the-wire.
4533
as_bytes = from_serializer.write_inventory_to_string(inv)
4504
# XXX: possibly repos could implement a more efficient iter_inv_deltas
4506
inventories = self.from_repository.iter_inventories(
4507
revision_ids, 'topological')
4508
format = from_repo._format
4509
invs_sent_so_far = set([_mod_revision.NULL_REVISION])
4510
inventory_cache = lru_cache.LRUCache(50)
4511
null_inventory = from_repo.revision_tree(
4512
_mod_revision.NULL_REVISION).inventory
4513
# XXX: ideally the rich-root/tree-refs flags would be per-revision, not
4514
# per-repo (e.g. streaming a non-rich-root revision out of a rich-root
4515
# repo back into a non-rich-root repo ought to be allowed)
4516
serializer = inventory_delta.InventoryDeltaSerializer(
4517
versioned_root=format.rich_root_data,
4518
tree_references=format.supports_tree_reference)
4519
for inv in inventories:
4534
4520
key = (inv.revision_id,)
4535
4521
parent_keys = parent_map.get(key, ())
4523
if not delta_versus_null and parent_keys:
4524
# The caller did not ask for complete inventories and we have
4525
# some parents that we can delta against. Make a delta against
4526
# each parent so that we can find the smallest.
4527
parent_ids = [parent_key[0] for parent_key in parent_keys]
4528
for parent_id in parent_ids:
4529
if parent_id not in invs_sent_so_far:
4530
# We don't know that the remote side has this basis, so
4533
if parent_id == _mod_revision.NULL_REVISION:
4534
parent_inv = null_inventory
4536
parent_inv = inventory_cache.get(parent_id, None)
4537
if parent_inv is None:
4538
parent_inv = from_repo.get_inventory(parent_id)
4539
candidate_delta = inv._make_delta(parent_inv)
4540
if (delta is None or
4541
len(delta) > len(candidate_delta)):
4542
delta = candidate_delta
4543
basis_id = parent_id
4545
# Either none of the parents ended up being suitable, or we
4546
# were asked to delta against NULL
4547
basis_id = _mod_revision.NULL_REVISION
4548
delta = inv._make_delta(null_inventory)
4549
invs_sent_so_far.add(inv.revision_id)
4550
inventory_cache[inv.revision_id] = inv
4551
delta_serialized = ''.join(
4552
serializer.delta_to_lines(basis_id, key[-1], delta))
4536
4553
yield versionedfile.FulltextContentFactory(
4537
key, parent_keys, None, as_bytes)
4554
key, parent_keys, None, delta_serialized)
4540
4557
def _iter_for_revno(repo, partial_history_cache, stop_index=None,