2346
2366
:param revision_ids: The expected revision ids of the inventories.
2367
:param ordering: optional ordering, e.g. 'topological'. If not
2368
specified, the order of revision_ids will be preserved (by
2369
buffering if necessary).
2347
2370
:return: An iterator of inventories.
2349
2372
if ((None in revision_ids)
2350
2373
or (_mod_revision.NULL_REVISION in revision_ids)):
2351
2374
raise ValueError('cannot get null revision inventory')
2352
return self._iter_inventories(revision_ids)
2375
return self._iter_inventories(revision_ids, ordering)
2354
def _iter_inventories(self, revision_ids):
2377
def _iter_inventories(self, revision_ids, ordering):
2355
2378
"""single-document based inventory iteration."""
2356
for text, revision_id in self._iter_inventory_xmls(revision_ids):
2379
inv_xmls = self._iter_inventory_xmls(revision_ids, ordering)
2380
for text, revision_id in inv_xmls:
2357
2381
yield self.deserialise_inventory(revision_id, text)
2359
def _iter_inventory_xmls(self, revision_ids):
2383
def _iter_inventory_xmls(self, revision_ids, ordering):
2384
if ordering is None:
2385
order_as_requested = True
2386
ordering = 'unordered'
2388
order_as_requested = False
2360
2389
keys = [(revision_id,) for revision_id in revision_ids]
2361
stream = self.inventories.get_record_stream(keys, 'unordered', True)
2392
if order_as_requested:
2393
key_iter = iter(keys)
2394
next_key = key_iter.next()
2395
stream = self.inventories.get_record_stream(keys, ordering, True)
2362
2396
text_chunks = {}
2363
2397
for record in stream:
2364
2398
if record.storage_kind != 'absent':
2365
text_chunks[record.key] = record.get_bytes_as('chunked')
2399
chunks = record.get_bytes_as('chunked')
2400
if order_as_requested:
2401
text_chunks[record.key] = chunks
2403
yield ''.join(chunks), record.key[-1]
2367
2405
raise errors.NoSuchRevision(self, record.key)
2369
chunks = text_chunks.pop(key)
2370
yield ''.join(chunks), key[-1]
2406
if order_as_requested:
2407
# Yield as many results as we can while preserving order.
2408
while next_key in text_chunks:
2409
chunks = text_chunks.pop(next_key)
2410
yield ''.join(chunks), next_key[-1]
2412
next_key = key_iter.next()
2413
except StopIteration:
2414
# We still want to fully consume the get_record_stream,
2415
# just in case it is not actually finished at this point
2372
2419
def deserialise_inventory(self, revision_id, xml):
2373
2420
"""Transform the xml into an inventory object.
3660
3717
# This is redundant with format.check_conversion_target(), however that
3661
3718
# raises an exception, and we just want to say "False" as in we won't
3662
3719
# support converting between these formats.
3720
if 'IDS_never' in debug.debug_flags:
3663
3722
if source.supports_rich_root() and not target.supports_rich_root():
3665
3724
if (source._format.supports_tree_reference
3666
3725
and not target._format.supports_tree_reference):
3727
if target._fallback_repositories and target._format.supports_chks:
3728
# IDS doesn't know how to copy CHKs for the parent inventories it
3729
# adds to stacked repos.
3731
if 'IDS_always' in debug.debug_flags:
3733
# Only use this code path for local source and target. IDS does far
3734
# too much IO (both bandwidth and roundtrips) over a network.
3735
if not source.bzrdir.transport.base.startswith('file:///'):
3737
if not target.bzrdir.transport.base.startswith('file:///'):
3670
def _get_delta_for_revision(self, tree, parent_ids, basis_id, cache):
3741
def _get_trees(self, revision_ids, cache):
3743
for rev_id in revision_ids:
3745
possible_trees.append((rev_id, cache[rev_id]))
3747
# Not cached, but inventory might be present anyway.
3749
tree = self.source.revision_tree(rev_id)
3750
except errors.NoSuchRevision:
3751
# Nope, parent is ghost.
3754
cache[rev_id] = tree
3755
possible_trees.append((rev_id, tree))
3756
return possible_trees
3758
def _get_delta_for_revision(self, tree, parent_ids, possible_trees):
3671
3759
"""Get the best delta and base for this revision.
3673
3761
:return: (basis_id, delta)
3675
possible_trees = [(parent_id, cache[parent_id])
3676
for parent_id in parent_ids
3677
if parent_id in cache]
3678
if len(possible_trees) == 0:
3679
# There either aren't any parents, or the parents aren't in the
3680
# cache, so just use the last converted tree
3681
possible_trees.append((basis_id, cache[basis_id]))
3764
# Generate deltas against each tree, to find the shortest.
3765
texts_possibly_new_in_tree = set()
3683
3766
for basis_id, basis_tree in possible_trees:
3684
3767
delta = tree.inventory._make_delta(basis_tree.inventory)
3768
for old_path, new_path, file_id, new_entry in delta:
3769
if new_path is None:
3770
# This file_id isn't present in the new rev, so we don't
3774
# Rich roots are handled elsewhere...
3776
kind = new_entry.kind
3777
if kind != 'directory' and kind != 'file':
3778
# No text record associated with this inventory entry.
3780
# This is a directory or file that has changed somehow.
3781
texts_possibly_new_in_tree.add((file_id, new_entry.revision))
3685
3782
deltas.append((len(delta), basis_id, delta))
3687
3784
return deltas[0][1:]
3689
def _get_parent_keys(self, root_key, parent_map):
3690
"""Get the parent keys for a given root id."""
3691
root_id, rev_id = root_key
3692
# Include direct parents of the revision, but only if they used
3693
# the same root_id and are heads.
3695
for parent_id in parent_map[rev_id]:
3696
if parent_id == _mod_revision.NULL_REVISION:
3698
if parent_id not in self._revision_id_to_root_id:
3699
# We probably didn't read this revision, go spend the
3700
# extra effort to actually check
3702
tree = self.source.revision_tree(parent_id)
3703
except errors.NoSuchRevision:
3704
# Ghost, fill out _revision_id_to_root_id in case we
3705
# encounter this again.
3706
# But set parent_root_id to None since we don't really know
3707
parent_root_id = None
3709
parent_root_id = tree.get_root_id()
3710
self._revision_id_to_root_id[parent_id] = None
3712
parent_root_id = self._revision_id_to_root_id[parent_id]
3713
if root_id == parent_root_id:
3714
# With stacking we _might_ want to refer to a non-local
3715
# revision, but this code path only applies when we have the
3716
# full content available, so ghosts really are ghosts, not just
3717
# the edge of local data.
3718
parent_keys.append((parent_id,))
3720
# root_id may be in the parent anyway.
3722
tree = self.source.revision_tree(parent_id)
3723
except errors.NoSuchRevision:
3724
# ghost, can't refer to it.
3728
parent_keys.append((tree.inventory[root_id].revision,))
3729
except errors.NoSuchId:
3732
g = graph.Graph(self.source.revisions)
3733
heads = g.heads(parent_keys)
3735
for key in parent_keys:
3736
if key in heads and key not in selected_keys:
3737
selected_keys.append(key)
3738
return tuple([(root_id,)+ key for key in selected_keys])
3786
def _fetch_parent_invs_for_stacking(self, parent_map, cache):
3787
"""Find all parent revisions that are absent, but for which the
3788
inventory is present, and copy those inventories.
3740
def _new_root_data_stream(self, root_keys_to_create, parent_map):
3741
for root_key in root_keys_to_create:
3742
parent_keys = self._get_parent_keys(root_key, parent_map)
3743
yield versionedfile.FulltextContentFactory(root_key,
3744
parent_keys, None, '')
3790
This is necessary to preserve correctness when the source is stacked
3791
without fallbacks configured. (Note that in cases like upgrade the
3792
source may be not have _fallback_repositories even though it is
3796
for parents in parent_map.values():
3797
parent_revs.update(parents)
3798
present_parents = self.source.get_parent_map(parent_revs)
3799
absent_parents = set(parent_revs).difference(present_parents)
3800
parent_invs_keys_for_stacking = self.source.inventories.get_parent_map(
3801
(rev_id,) for rev_id in absent_parents)
3802
parent_inv_ids = [key[-1] for key in parent_invs_keys_for_stacking]
3803
for parent_tree in self.source.revision_trees(parent_inv_ids):
3804
current_revision_id = parent_tree.get_revision_id()
3805
parents_parents_keys = parent_invs_keys_for_stacking[
3806
(current_revision_id,)]
3807
parents_parents = [key[-1] for key in parents_parents_keys]
3808
basis_id = _mod_revision.NULL_REVISION
3809
basis_tree = self.source.revision_tree(basis_id)
3810
delta = parent_tree.inventory._make_delta(basis_tree.inventory)
3811
self.target.add_inventory_by_delta(
3812
basis_id, delta, current_revision_id, parents_parents)
3813
cache[current_revision_id] = parent_tree
3746
3815
def _fetch_batch(self, revision_ids, basis_id, cache):
3747
3816
"""Fetch across a few revisions.
3761
3830
pending_deltas = []
3762
3831
pending_revisions = []
3763
3832
parent_map = self.source.get_parent_map(revision_ids)
3833
self._fetch_parent_invs_for_stacking(parent_map, cache)
3764
3834
for tree in self.source.revision_trees(revision_ids):
3835
# Find a inventory delta for this revision.
3836
# Find text entries that need to be copied, too.
3765
3837
current_revision_id = tree.get_revision_id()
3766
3838
parent_ids = parent_map.get(current_revision_id, ())
3839
parent_trees = self._get_trees(parent_ids, cache)
3840
possible_trees = list(parent_trees)
3841
if len(possible_trees) == 0:
3842
# There either aren't any parents, or the parents are ghosts,
3843
# so just use the last converted tree.
3844
possible_trees.append((basis_id, cache[basis_id]))
3767
3845
basis_id, delta = self._get_delta_for_revision(tree, parent_ids,
3847
revision = self.source.get_revision(current_revision_id)
3848
pending_deltas.append((basis_id, delta,
3849
current_revision_id, revision.parent_ids))
3769
3850
if self._converting_to_rich_root:
3770
3851
self._revision_id_to_root_id[current_revision_id] = \
3771
3852
tree.get_root_id()
3772
# Find text entries that need to be copied
3853
# Determine which texts are in present in this revision but not in
3854
# any of the available parents.
3855
texts_possibly_new_in_tree = set()
3773
3856
for old_path, new_path, file_id, entry in delta:
3774
if new_path is not None:
3777
if not self.target.supports_rich_root():
3778
# The target doesn't support rich root, so we don't
3781
if self._converting_to_rich_root:
3782
# This can't be copied normally, we have to insert
3784
root_keys_to_create.add((file_id, entry.revision))
3786
text_keys.add((file_id, entry.revision))
3787
revision = self.source.get_revision(current_revision_id)
3788
pending_deltas.append((basis_id, delta,
3789
current_revision_id, revision.parent_ids))
3857
if new_path is None:
3858
# This file_id isn't present in the new rev
3862
if not self.target.supports_rich_root():
3863
# The target doesn't support rich root, so we don't
3866
if self._converting_to_rich_root:
3867
# This can't be copied normally, we have to insert
3869
root_keys_to_create.add((file_id, entry.revision))
3872
texts_possibly_new_in_tree.add((file_id, entry.revision))
3873
for basis_id, basis_tree in possible_trees:
3874
basis_inv = basis_tree.inventory
3875
for file_key in list(texts_possibly_new_in_tree):
3876
file_id, file_revision = file_key
3878
entry = basis_inv[file_id]
3879
except errors.NoSuchId:
3881
if entry.revision == file_revision:
3882
texts_possibly_new_in_tree.remove(file_key)
3883
text_keys.update(texts_possibly_new_in_tree)
3790
3884
pending_revisions.append(revision)
3791
3885
cache[current_revision_id] = tree
3792
3886
basis_id = current_revision_id
4228
4335
self.target_repo.pack(hint=hint)
4229
4336
return [], set()
4231
def _extract_and_insert_inventories(self, substream, serializer):
4338
def _extract_and_insert_inventory_deltas(self, substream, serializer):
4339
target_rich_root = self.target_repo._format.rich_root_data
4340
target_tree_refs = self.target_repo._format.supports_tree_reference
4341
for record in substream:
4342
# Insert the delta directly
4343
inventory_delta_bytes = record.get_bytes_as('fulltext')
4344
deserialiser = inventory_delta.InventoryDeltaDeserializer()
4346
parse_result = deserialiser.parse_text_bytes(
4347
inventory_delta_bytes)
4348
except inventory_delta.IncompatibleInventoryDelta, err:
4349
trace.mutter("Incompatible delta: %s", err.msg)
4350
raise errors.IncompatibleRevision(self.target_repo._format)
4351
basis_id, new_id, rich_root, tree_refs, inv_delta = parse_result
4352
revision_id = new_id
4353
parents = [key[0] for key in record.parents]
4354
self.target_repo.add_inventory_by_delta(
4355
basis_id, inv_delta, revision_id, parents)
4357
def _extract_and_insert_inventories(self, substream, serializer,
4232
4359
"""Generate a new inventory versionedfile in target, converting data.
4234
4361
The inventory is retrieved from the source, (deserializing it), and
4235
4362
stored in the target (reserializing it in a different format).
4364
target_rich_root = self.target_repo._format.rich_root_data
4365
target_tree_refs = self.target_repo._format.supports_tree_reference
4237
4366
for record in substream:
4367
# It's not a delta, so it must be a fulltext in the source
4368
# serializer's format.
4238
4369
bytes = record.get_bytes_as('fulltext')
4239
4370
revision_id = record.key[0]
4240
4371
inv = serializer.read_inventory_from_string(bytes, revision_id)
4241
4372
parents = [key[0] for key in record.parents]
4242
4373
self.target_repo.add_inventory(revision_id, inv, parents)
4374
# No need to keep holding this full inv in memory when the rest of
4375
# the substream is likely to be all deltas.
4244
4378
def _extract_and_insert_revisions(self, substream, serializer):
4245
4379
for record in substream:
4403
4543
return (not self.from_repository._format.rich_root_data and
4404
4544
self.to_format.rich_root_data)
4406
def _get_inventory_stream(self, revision_ids):
4546
def _get_inventory_stream(self, revision_ids, missing=False):
4407
4547
from_format = self.from_repository._format
4408
if (from_format.supports_chks and self.to_format.supports_chks
4409
and (from_format._serializer == self.to_format._serializer)):
4410
# Both sides support chks, and they use the same serializer, so it
4411
# is safe to transmit the chk pages and inventory pages across
4413
return self._get_chk_inventory_stream(revision_ids)
4414
elif (not from_format.supports_chks):
4415
# Source repository doesn't support chks. So we can transmit the
4416
# inventories 'as-is' and either they are just accepted on the
4417
# target, or the Sink will properly convert it.
4418
return self._get_simple_inventory_stream(revision_ids)
4548
if (from_format.supports_chks and self.to_format.supports_chks and
4549
from_format.network_name() == self.to_format.network_name()):
4550
raise AssertionError(
4551
"this case should be handled by GroupCHKStreamSource")
4552
elif 'forceinvdeltas' in debug.debug_flags:
4553
return self._get_convertable_inventory_stream(revision_ids,
4554
delta_versus_null=missing)
4555
elif from_format.network_name() == self.to_format.network_name():
4557
return self._get_simple_inventory_stream(revision_ids,
4559
elif (not from_format.supports_chks and not self.to_format.supports_chks
4560
and from_format._serializer == self.to_format._serializer):
4561
# Essentially the same format.
4562
return self._get_simple_inventory_stream(revision_ids,
4420
# XXX: Hack to make not-chk->chk fetch: copy the inventories as
4421
# inventories. Note that this should probably be done somehow
4422
# as part of bzrlib.repository.StreamSink. Except JAM couldn't
4423
# figure out how a non-chk repository could possibly handle
4424
# deserializing an inventory stream from a chk repo, as it
4425
# doesn't have a way to understand individual pages.
4426
return self._get_convertable_inventory_stream(revision_ids)
4565
# Any time we switch serializations, we want to use an
4566
# inventory-delta based approach.
4567
return self._get_convertable_inventory_stream(revision_ids,
4568
delta_versus_null=missing)
4428
def _get_simple_inventory_stream(self, revision_ids):
4570
def _get_simple_inventory_stream(self, revision_ids, missing=False):
4571
# NB: This currently reopens the inventory weave in source;
4572
# using a single stream interface instead would avoid this.
4429
4573
from_weave = self.from_repository.inventories
4575
delta_closure = True
4577
delta_closure = not self.delta_on_metadata()
4430
4578
yield ('inventories', from_weave.get_record_stream(
4431
4579
[(rev_id,) for rev_id in revision_ids],
4432
self.inventory_fetch_order(),
4433
not self.delta_on_metadata()))
4435
def _get_chk_inventory_stream(self, revision_ids):
4436
"""Fetch the inventory texts, along with the associated chk maps."""
4437
# We want an inventory outside of the search set, so that we can filter
4438
# out uninteresting chk pages. For now we use
4439
# _find_revision_outside_set, but if we had a Search with cut_revs, we
4440
# could use that instead.
4441
start_rev_id = self.from_repository._find_revision_outside_set(
4443
start_rev_key = (start_rev_id,)
4444
inv_keys_to_fetch = [(rev_id,) for rev_id in revision_ids]
4445
if start_rev_id != _mod_revision.NULL_REVISION:
4446
inv_keys_to_fetch.append((start_rev_id,))
4447
# Any repo that supports chk_bytes must also support out-of-order
4448
# insertion. At least, that is how we expect it to work
4449
# We use get_record_stream instead of iter_inventories because we want
4450
# to be able to insert the stream as well. We could instead fetch
4451
# allowing deltas, and then iter_inventories, but we don't know whether
4452
# source or target is more 'local' anway.
4453
inv_stream = self.from_repository.inventories.get_record_stream(
4454
inv_keys_to_fetch, 'unordered',
4455
True) # We need them as full-texts so we can find their references
4456
uninteresting_chk_roots = set()
4457
interesting_chk_roots = set()
4458
def filter_inv_stream(inv_stream):
4459
for idx, record in enumerate(inv_stream):
4460
### child_pb.update('fetch inv', idx, len(inv_keys_to_fetch))
4461
bytes = record.get_bytes_as('fulltext')
4462
chk_inv = inventory.CHKInventory.deserialise(
4463
self.from_repository.chk_bytes, bytes, record.key)
4464
if record.key == start_rev_key:
4465
uninteresting_chk_roots.add(chk_inv.id_to_entry.key())
4466
p_id_map = chk_inv.parent_id_basename_to_file_id
4467
if p_id_map is not None:
4468
uninteresting_chk_roots.add(p_id_map.key())
4471
interesting_chk_roots.add(chk_inv.id_to_entry.key())
4472
p_id_map = chk_inv.parent_id_basename_to_file_id
4473
if p_id_map is not None:
4474
interesting_chk_roots.add(p_id_map.key())
4475
### pb.update('fetch inventory', 0, 2)
4476
yield ('inventories', filter_inv_stream(inv_stream))
4477
# Now that we have worked out all of the interesting root nodes, grab
4478
# all of the interesting pages and insert them
4479
### pb.update('fetch inventory', 1, 2)
4480
interesting = chk_map.iter_interesting_nodes(
4481
self.from_repository.chk_bytes, interesting_chk_roots,
4482
uninteresting_chk_roots)
4483
def to_stream_adapter():
4484
"""Adapt the iter_interesting_nodes result to a single stream.
4486
iter_interesting_nodes returns records as it processes them, along
4487
with keys. However, we only want to return the records themselves.
4489
for record, items in interesting:
4490
if record is not None:
4492
# XXX: We could instead call get_record_stream(records.keys())
4493
# ATM, this will always insert the records as fulltexts, and
4494
# requires that you can hang on to records once you have gone
4495
# on to the next one. Further, it causes the target to
4496
# recompress the data. Testing shows it to be faster than
4497
# requesting the records again, though.
4498
yield ('chk_bytes', to_stream_adapter())
4499
### pb.update('fetch inventory', 2, 2)
4501
def _get_convertable_inventory_stream(self, revision_ids):
4502
# XXX: One of source or target is using chks, and they don't have
4503
# compatible serializations. The StreamSink code expects to be
4504
# able to convert on the target, so we need to put
4505
# bytes-on-the-wire that can be converted
4506
yield ('inventories', self._stream_invs_as_fulltexts(revision_ids))
4508
def _stream_invs_as_fulltexts(self, revision_ids):
4580
self.inventory_fetch_order(), delta_closure))
4582
def _get_convertable_inventory_stream(self, revision_ids,
4583
delta_versus_null=False):
4584
# The source is using CHKs, but the target either doesn't or it has a
4585
# different serializer. The StreamSink code expects to be able to
4586
# convert on the target, so we need to put bytes-on-the-wire that can
4587
# be converted. That means inventory deltas (if the remote is <1.19,
4588
# RemoteStreamSink will fallback to VFS to insert the deltas).
4589
yield ('inventory-deltas',
4590
self._stream_invs_as_deltas(revision_ids,
4591
delta_versus_null=delta_versus_null))
4593
def _stream_invs_as_deltas(self, revision_ids, delta_versus_null=False):
4594
"""Return a stream of inventory-deltas for the given rev ids.
4596
:param revision_ids: The list of inventories to transmit
4597
:param delta_versus_null: Don't try to find a minimal delta for this
4598
entry, instead compute the delta versus the NULL_REVISION. This
4599
effectively streams a complete inventory. Used for stuff like
4600
filling in missing parents, etc.
4509
4602
from_repo = self.from_repository
4510
from_serializer = from_repo._format._serializer
4511
4603
revision_keys = [(rev_id,) for rev_id in revision_ids]
4512
4604
parent_map = from_repo.inventories.get_parent_map(revision_keys)
4513
for inv in self.from_repository.iter_inventories(revision_ids):
4514
# XXX: This is a bit hackish, but it works. Basically,
4515
# CHKSerializer 'accidentally' supports
4516
# read/write_inventory_to_string, even though that is never
4517
# the format that is stored on disk. It *does* give us a
4518
# single string representation for an inventory, so live with
4520
# This would be far better if we had a 'serialized inventory
4521
# delta' form. Then we could use 'inventory._make_delta', and
4522
# transmit that. This would both be faster to generate, and
4523
# result in fewer bytes-on-the-wire.
4524
as_bytes = from_serializer.write_inventory_to_string(inv)
4605
# XXX: possibly repos could implement a more efficient iter_inv_deltas
4607
inventories = self.from_repository.iter_inventories(
4608
revision_ids, 'topological')
4609
format = from_repo._format
4610
invs_sent_so_far = set([_mod_revision.NULL_REVISION])
4611
inventory_cache = lru_cache.LRUCache(50)
4612
null_inventory = from_repo.revision_tree(
4613
_mod_revision.NULL_REVISION).inventory
4614
# XXX: ideally the rich-root/tree-refs flags would be per-revision, not
4615
# per-repo (e.g. streaming a non-rich-root revision out of a rich-root
4616
# repo back into a non-rich-root repo ought to be allowed)
4617
serializer = inventory_delta.InventoryDeltaSerializer(
4618
versioned_root=format.rich_root_data,
4619
tree_references=format.supports_tree_reference)
4620
for inv in inventories:
4525
4621
key = (inv.revision_id,)
4526
4622
parent_keys = parent_map.get(key, ())
4624
if not delta_versus_null and parent_keys:
4625
# The caller did not ask for complete inventories and we have
4626
# some parents that we can delta against. Make a delta against
4627
# each parent so that we can find the smallest.
4628
parent_ids = [parent_key[0] for parent_key in parent_keys]
4629
for parent_id in parent_ids:
4630
if parent_id not in invs_sent_so_far:
4631
# We don't know that the remote side has this basis, so
4634
if parent_id == _mod_revision.NULL_REVISION:
4635
parent_inv = null_inventory
4637
parent_inv = inventory_cache.get(parent_id, None)
4638
if parent_inv is None:
4639
parent_inv = from_repo.get_inventory(parent_id)
4640
candidate_delta = inv._make_delta(parent_inv)
4641
if (delta is None or
4642
len(delta) > len(candidate_delta)):
4643
delta = candidate_delta
4644
basis_id = parent_id
4646
# Either none of the parents ended up being suitable, or we
4647
# were asked to delta against NULL
4648
basis_id = _mod_revision.NULL_REVISION
4649
delta = inv._make_delta(null_inventory)
4650
invs_sent_so_far.add(inv.revision_id)
4651
inventory_cache[inv.revision_id] = inv
4652
delta_serialized = ''.join(
4653
serializer.delta_to_lines(basis_id, key[-1], delta))
4527
4654
yield versionedfile.FulltextContentFactory(
4528
key, parent_keys, None, as_bytes)
4655
key, parent_keys, None, delta_serialized)
4531
4658
def _iter_for_revno(repo, partial_history_cache, stop_index=None,