540
491
ie.revision = parent_entry.revision
541
492
return self._get_delta(ie, basis_inv, path), False, None
542
493
ie.reference_revision = content_summary[3]
543
if ie.reference_revision is None:
544
raise AssertionError("invalid content_summary for nested tree: %r"
545
% (content_summary,))
546
self._add_text_to_weave(ie.file_id, '', heads, None)
495
self._add_text_to_weave(ie.file_id, lines, heads, None)
548
497
raise NotImplementedError('unknown kind')
549
498
ie.revision = self._new_revision_id
550
self._any_changes = True
551
499
return self._get_delta(ie, basis_inv, path), True, fingerprint
553
def record_iter_changes(self, tree, basis_revision_id, iter_changes,
554
_entry_factory=entry_factory):
555
"""Record a new tree via iter_changes.
557
:param tree: The tree to obtain text contents from for changed objects.
558
:param basis_revision_id: The revision id of the tree the iter_changes
559
has been generated against. Currently assumed to be the same
560
as self.parents[0] - if it is not, errors may occur.
561
:param iter_changes: An iter_changes iterator with the changes to apply
562
to basis_revision_id. The iterator must not include any items with
563
a current kind of None - missing items must be either filtered out
564
or errored-on beefore record_iter_changes sees the item.
565
:param _entry_factory: Private method to bind entry_factory locally for
567
:return: A generator of (file_id, relpath, fs_hash) tuples for use with
570
# Create an inventory delta based on deltas between all the parents and
571
# deltas between all the parent inventories. We use inventory delta's
572
# between the inventory objects because iter_changes masks
573
# last-changed-field only changes.
575
# file_id -> change map, change is fileid, paths, changed, versioneds,
576
# parents, names, kinds, executables
578
# {file_id -> revision_id -> inventory entry, for entries in parent
579
# trees that are not parents[0]
583
revtrees = list(self.repository.revision_trees(self.parents))
584
except errors.NoSuchRevision:
585
# one or more ghosts, slow path.
587
for revision_id in self.parents:
589
revtrees.append(self.repository.revision_tree(revision_id))
590
except errors.NoSuchRevision:
592
basis_revision_id = _mod_revision.NULL_REVISION
594
revtrees.append(self.repository.revision_tree(
595
_mod_revision.NULL_REVISION))
596
# The basis inventory from a repository
598
basis_inv = revtrees[0].inventory
600
basis_inv = self.repository.revision_tree(
601
_mod_revision.NULL_REVISION).inventory
602
if len(self.parents) > 0:
603
if basis_revision_id != self.parents[0] and not ghost_basis:
605
"arbitrary basis parents not yet supported with merges")
606
for revtree in revtrees[1:]:
607
for change in revtree.inventory._make_delta(basis_inv):
608
if change[1] is None:
609
# Not present in this parent.
611
if change[2] not in merged_ids:
612
if change[0] is not None:
613
basis_entry = basis_inv[change[2]]
614
merged_ids[change[2]] = [
616
basis_entry.revision,
619
parent_entries[change[2]] = {
621
basis_entry.revision:basis_entry,
623
change[3].revision:change[3],
626
merged_ids[change[2]] = [change[3].revision]
627
parent_entries[change[2]] = {change[3].revision:change[3]}
629
merged_ids[change[2]].append(change[3].revision)
630
parent_entries[change[2]][change[3].revision] = change[3]
633
# Setup the changes from the tree:
634
# changes maps file_id -> (change, [parent revision_ids])
636
for change in iter_changes:
637
# This probably looks up in basis_inv way to much.
638
if change[1][0] is not None:
639
head_candidate = [basis_inv[change[0]].revision]
642
changes[change[0]] = change, merged_ids.get(change[0],
644
unchanged_merged = set(merged_ids) - set(changes)
645
# Extend the changes dict with synthetic changes to record merges of
647
for file_id in unchanged_merged:
648
# Record a merged version of these items that did not change vs the
649
# basis. This can be either identical parallel changes, or a revert
650
# of a specific file after a merge. The recorded content will be
651
# that of the current tree (which is the same as the basis), but
652
# the per-file graph will reflect a merge.
653
# NB:XXX: We are reconstructing path information we had, this
654
# should be preserved instead.
655
# inv delta change: (file_id, (path_in_source, path_in_target),
656
# changed_content, versioned, parent, name, kind,
659
basis_entry = basis_inv[file_id]
660
except errors.NoSuchId:
661
# a change from basis->some_parents but file_id isn't in basis
662
# so was new in the merge, which means it must have changed
663
# from basis -> current, and as it hasn't the add was reverted
664
# by the user. So we discard this change.
668
(basis_inv.id2path(file_id), tree.id2path(file_id)),
670
(basis_entry.parent_id, basis_entry.parent_id),
671
(basis_entry.name, basis_entry.name),
672
(basis_entry.kind, basis_entry.kind),
673
(basis_entry.executable, basis_entry.executable))
674
changes[file_id] = (change, merged_ids[file_id])
675
# changes contains tuples with the change and a set of inventory
676
# candidates for the file.
678
# old_path, new_path, file_id, new_inventory_entry
679
seen_root = False # Is the root in the basis delta?
680
inv_delta = self._basis_delta
681
modified_rev = self._new_revision_id
682
for change, head_candidates in changes.values():
683
if change[3][1]: # versioned in target.
684
# Several things may be happening here:
685
# We may have a fork in the per-file graph
686
# - record a change with the content from tree
687
# We may have a change against < all trees
688
# - carry over the tree that hasn't changed
689
# We may have a change against all trees
690
# - record the change with the content from tree
693
entry = _entry_factory[kind](file_id, change[5][1],
695
head_set = self._heads(change[0], set(head_candidates))
698
for head_candidate in head_candidates:
699
if head_candidate in head_set:
700
heads.append(head_candidate)
701
head_set.remove(head_candidate)
704
# Could be a carry-over situation:
705
parent_entry_revs = parent_entries.get(file_id, None)
706
if parent_entry_revs:
707
parent_entry = parent_entry_revs.get(heads[0], None)
710
if parent_entry is None:
711
# The parent iter_changes was called against is the one
712
# that is the per-file head, so any change is relevant
713
# iter_changes is valid.
714
carry_over_possible = False
716
# could be a carry over situation
717
# A change against the basis may just indicate a merge,
718
# we need to check the content against the source of the
719
# merge to determine if it was changed after the merge
721
if (parent_entry.kind != entry.kind or
722
parent_entry.parent_id != entry.parent_id or
723
parent_entry.name != entry.name):
724
# Metadata common to all entries has changed
725
# against per-file parent
726
carry_over_possible = False
728
carry_over_possible = True
729
# per-type checks for changes against the parent_entry
732
# Cannot be a carry-over situation
733
carry_over_possible = False
734
# Populate the entry in the delta
736
# XXX: There is still a small race here: If someone reverts the content of a file
737
# after iter_changes examines and decides it has changed,
738
# we will unconditionally record a new version even if some
739
# other process reverts it while commit is running (with
740
# the revert happening after iter_changes did it's
743
entry.executable = True
745
entry.executable = False
746
if (carry_over_possible and
747
parent_entry.executable == entry.executable):
748
# Check the file length, content hash after reading
750
nostore_sha = parent_entry.text_sha1
753
file_obj, stat_value = tree.get_file_with_stat(file_id, change[1][1])
755
text = file_obj.read()
759
entry.text_sha1, entry.text_size = self._add_text_to_weave(
760
file_id, text, heads, nostore_sha)
761
yield file_id, change[1][1], (entry.text_sha1, stat_value)
762
except errors.ExistingContent:
763
# No content change against a carry_over parent
764
# Perhaps this should also yield a fs hash update?
766
entry.text_size = parent_entry.text_size
767
entry.text_sha1 = parent_entry.text_sha1
768
elif kind == 'symlink':
770
entry.symlink_target = tree.get_symlink_target(file_id)
771
if (carry_over_possible and
772
parent_entry.symlink_target == entry.symlink_target):
775
self._add_text_to_weave(change[0], '', heads, None)
776
elif kind == 'directory':
777
if carry_over_possible:
780
# Nothing to set on the entry.
781
# XXX: split into the Root and nonRoot versions.
782
if change[1][1] != '' or self.repository.supports_rich_root():
783
self._add_text_to_weave(change[0], '', heads, None)
784
elif kind == 'tree-reference':
785
if not self.repository._format.supports_tree_reference:
786
# This isn't quite sane as an error, but we shouldn't
787
# ever see this code path in practice: tree's don't
788
# permit references when the repo doesn't support tree
790
raise errors.UnsupportedOperation(tree.add_reference,
792
reference_revision = tree.get_reference_revision(change[0])
793
entry.reference_revision = reference_revision
794
if (carry_over_possible and
795
parent_entry.reference_revision == reference_revision):
798
self._add_text_to_weave(change[0], '', heads, None)
800
raise AssertionError('unknown kind %r' % kind)
802
entry.revision = modified_rev
804
entry.revision = parent_entry.revision
807
new_path = change[1][1]
808
inv_delta.append((change[1][0], new_path, change[0], entry))
811
self.new_inventory = None
813
# This should perhaps be guarded by a check that the basis we
814
# commit against is the basis for the commit and if not do a delta
816
self._any_changes = True
818
# housekeeping root entry changes do not affect no-change commits.
819
self._require_root_change(tree)
820
self.basis_delta_revision = basis_revision_id
822
def _add_text_to_weave(self, file_id, new_text, parents, nostore_sha):
823
parent_keys = tuple([(file_id, parent) for parent in parents])
824
return self.repository.texts._add_text(
825
(file_id, self._new_revision_id), parent_keys, new_text,
826
nostore_sha=nostore_sha, random_id=self.random_revid)[0:2]
501
def _add_text_to_weave(self, file_id, new_lines, parents, nostore_sha):
502
# Note: as we read the content directly from the tree, we know its not
503
# been turned into unicode or badly split - but a broken tree
504
# implementation could give us bad output from readlines() so this is
505
# not a guarantee of safety. What would be better is always checking
506
# the content during test suite execution. RBC 20070912
507
parent_keys = tuple((file_id, parent) for parent in parents)
508
return self.repository.texts.add_lines(
509
(file_id, self._new_revision_id), parent_keys, new_lines,
510
nostore_sha=nostore_sha, random_id=self.random_revid,
511
check_content=False)[0:2]
829
514
class RootCommitBuilder(CommitBuilder):
830
515
"""This commitbuilder actually records the root id"""
832
517
# the root entry gets versioned properly by this builder.
833
518
_versioned_root = True
1160
796
# The old API returned a list, should this actually be a set?
1161
797
return parent_map.keys()
1163
def _check_inventories(self, checker):
1164
"""Check the inventories found from the revision scan.
1166
This is responsible for verifying the sha1 of inventories and
1167
creating a pending_keys set that covers data referenced by inventories.
1169
bar = ui.ui_factory.nested_progress_bar()
1171
self._do_check_inventories(checker, bar)
1175
def _do_check_inventories(self, checker, bar):
1176
"""Helper for _check_inventories."""
1178
keys = {'chk_bytes':set(), 'inventories':set(), 'texts':set()}
1179
kinds = ['chk_bytes', 'texts']
1180
count = len(checker.pending_keys)
1181
bar.update("inventories", 0, 2)
1182
current_keys = checker.pending_keys
1183
checker.pending_keys = {}
1184
# Accumulate current checks.
1185
for key in current_keys:
1186
if key[0] != 'inventories' and key[0] not in kinds:
1187
checker._report_items.append('unknown key type %r' % (key,))
1188
keys[key[0]].add(key[1:])
1189
if keys['inventories']:
1190
# NB: output order *should* be roughly sorted - topo or
1191
# inverse topo depending on repository - either way decent
1192
# to just delta against. However, pre-CHK formats didn't
1193
# try to optimise inventory layout on disk. As such the
1194
# pre-CHK code path does not use inventory deltas.
1196
for record in self.inventories.check(keys=keys['inventories']):
1197
if record.storage_kind == 'absent':
1198
checker._report_items.append(
1199
'Missing inventory {%s}' % (record.key,))
1201
last_object = self._check_record('inventories', record,
1202
checker, last_object,
1203
current_keys[('inventories',) + record.key])
1204
del keys['inventories']
1207
bar.update("texts", 1)
1208
while (checker.pending_keys or keys['chk_bytes']
1210
# Something to check.
1211
current_keys = checker.pending_keys
1212
checker.pending_keys = {}
1213
# Accumulate current checks.
1214
for key in current_keys:
1215
if key[0] not in kinds:
1216
checker._report_items.append('unknown key type %r' % (key,))
1217
keys[key[0]].add(key[1:])
1218
# Check the outermost kind only - inventories || chk_bytes || texts
1222
for record in getattr(self, kind).check(keys=keys[kind]):
1223
if record.storage_kind == 'absent':
1224
checker._report_items.append(
1225
'Missing %s {%s}' % (kind, record.key,))
1227
last_object = self._check_record(kind, record,
1228
checker, last_object, current_keys[(kind,) + record.key])
1232
def _check_record(self, kind, record, checker, last_object, item_data):
1233
"""Check a single text from this repository."""
1234
if kind == 'inventories':
1235
rev_id = record.key[0]
1236
inv = self.deserialise_inventory(rev_id,
1237
record.get_bytes_as('fulltext'))
1238
if last_object is not None:
1239
delta = inv._make_delta(last_object)
1240
for old_path, path, file_id, ie in delta:
1243
ie.check(checker, rev_id, inv)
1245
for path, ie in inv.iter_entries():
1246
ie.check(checker, rev_id, inv)
1247
if self._format.fast_deltas:
1249
elif kind == 'chk_bytes':
1250
# No code written to check chk_bytes for this repo format.
1251
checker._report_items.append(
1252
'unsupported key type chk_bytes for %s' % (record.key,))
1253
elif kind == 'texts':
1254
self._check_text(record, checker, item_data)
1256
checker._report_items.append(
1257
'unknown key type %s for %s' % (kind, record.key))
1259
def _check_text(self, record, checker, item_data):
1260
"""Check a single text."""
1261
# Check it is extractable.
1262
# TODO: check length.
1263
if record.storage_kind == 'chunked':
1264
chunks = record.get_bytes_as(record.storage_kind)
1265
sha1 = osutils.sha_strings(chunks)
1266
length = sum(map(len, chunks))
1268
content = record.get_bytes_as('fulltext')
1269
sha1 = osutils.sha_string(content)
1270
length = len(content)
1271
if item_data and sha1 != item_data[1]:
1272
checker._report_items.append(
1273
'sha1 mismatch: %s has sha1 %s expected %s referenced by %s' %
1274
(record.key, sha1, item_data[1], item_data[2]))
1277
800
def create(a_bzrdir):
1278
801
"""Construct the current default format repository in a_bzrdir."""
1543
1073
"""Commit the contents accrued within the current write group.
1545
1075
:seealso: start_write_group.
1547
:return: it may return an opaque hint that can be passed to 'pack'.
1549
1077
if self._write_group is not self.get_transaction():
1550
1078
# has an unlock or relock occured ?
1551
1079
raise errors.BzrError('mismatched lock context %r and '
1552
1080
'write group %r.' %
1553
1081
(self.get_transaction(), self._write_group))
1554
result = self._commit_write_group()
1082
self._commit_write_group()
1555
1083
self._write_group = None
1558
1085
def _commit_write_group(self):
1559
1086
"""Template method for per-repository write group cleanup.
1561
This is called before the write group is considered to be
1088
This is called before the write group is considered to be
1562
1089
finished and should ensure that all data handed to the repository
1563
for writing during the write group is safely committed (to the
1090
for writing during the write group is safely committed (to the
1564
1091
extent possible considering file system caching etc).
1567
def suspend_write_group(self):
1568
raise errors.UnsuspendableWriteGroup(self)
1570
def get_missing_parent_inventories(self, check_for_missing_texts=True):
1571
"""Return the keys of missing inventory parents for revisions added in
1574
A revision is not complete if the inventory delta for that revision
1575
cannot be calculated. Therefore if the parent inventories of a
1576
revision are not present, the revision is incomplete, and e.g. cannot
1577
be streamed by a smart server. This method finds missing inventory
1578
parents for revisions added in this write group.
1580
if not self._format.supports_external_lookups:
1581
# This is only an issue for stacked repositories
1583
if not self.is_in_write_group():
1584
raise AssertionError('not in a write group')
1586
# XXX: We assume that every added revision already has its
1587
# corresponding inventory, so we only check for parent inventories that
1588
# might be missing, rather than all inventories.
1589
parents = set(self.revisions._index.get_missing_parents())
1590
parents.discard(_mod_revision.NULL_REVISION)
1591
unstacked_inventories = self.inventories._index
1592
present_inventories = unstacked_inventories.get_parent_map(
1593
key[-1:] for key in parents)
1594
parents.difference_update(present_inventories)
1595
if len(parents) == 0:
1596
# No missing parent inventories.
1598
if not check_for_missing_texts:
1599
return set(('inventories', rev_id) for (rev_id,) in parents)
1600
# Ok, now we have a list of missing inventories. But these only matter
1601
# if the inventories that reference them are missing some texts they
1602
# appear to introduce.
1603
# XXX: Texts referenced by all added inventories need to be present,
1604
# but at the moment we're only checking for texts referenced by
1605
# inventories at the graph's edge.
1606
key_deps = self.revisions._index._key_dependencies
1607
key_deps.add_keys(present_inventories)
1608
referrers = frozenset(r[0] for r in key_deps.get_referrers())
1609
file_ids = self.fileids_altered_by_revision_ids(referrers)
1610
missing_texts = set()
1611
for file_id, version_ids in file_ids.iteritems():
1612
missing_texts.update(
1613
(file_id, version_id) for version_id in version_ids)
1614
present_texts = self.texts.get_parent_map(missing_texts)
1615
missing_texts.difference_update(present_texts)
1616
if not missing_texts:
1617
# No texts are missing, so all revisions and their deltas are
1620
# Alternatively the text versions could be returned as the missing
1621
# keys, but this is likely to be less data.
1622
missing_keys = set(('inventories', rev_id) for (rev_id,) in parents)
1625
def refresh_data(self):
1626
"""Re-read any data needed to to synchronise with disk.
1628
This method is intended to be called after another repository instance
1629
(such as one used by a smart server) has inserted data into the
1630
repository. It may not be called during a write group, but may be
1631
called at any other time.
1633
if self.is_in_write_group():
1634
raise errors.InternalBzrError(
1635
"May not refresh_data while in a write group.")
1636
self._refresh_data()
1638
def resume_write_group(self, tokens):
1639
if not self.is_write_locked():
1640
raise errors.NotWriteLocked(self)
1641
if self._write_group:
1642
raise errors.BzrError('already in a write group')
1643
self._resume_write_group(tokens)
1644
# so we can detect unlock/relock - the write group is now entered.
1645
self._write_group = self.get_transaction()
1647
def _resume_write_group(self, tokens):
1648
raise errors.UnsuspendableWriteGroup(self)
1650
def fetch(self, source, revision_id=None, pb=None, find_ghosts=False,
1094
def fetch(self, source, revision_id=None, pb=None, find_ghosts=False):
1652
1095
"""Fetch the content required to construct revision_id from source.
1654
If revision_id is None and fetch_spec is None, then all content is
1657
fetch() may not be used when the repository is in a write group -
1658
either finish the current write group before using fetch, or use
1659
fetch before starting the write group.
1097
If revision_id is None all content is copied.
1661
1098
:param find_ghosts: Find and copy revisions in the source that are
1662
1099
ghosts in the target (and not reachable directly by walking out to
1663
1100
the first-present revision in target from revision_id).
1664
:param revision_id: If specified, all the content needed for this
1665
revision ID will be copied to the target. Fetch will determine for
1666
itself which content needs to be copied.
1667
:param fetch_spec: If specified, a SearchResult or
1668
PendingAncestryResult that describes which revisions to copy. This
1669
allows copying multiple heads at once. Mutually exclusive with
1672
if fetch_spec is not None and revision_id is not None:
1673
raise AssertionError(
1674
"fetch_spec and revision_id are mutually exclusive.")
1675
if self.is_in_write_group():
1676
raise errors.InternalBzrError(
1677
"May not fetch while in a write group.")
1678
1102
# fast path same-url fetch operations
1679
# TODO: lift out to somewhere common with RemoteRepository
1680
# <https://bugs.edge.launchpad.net/bzr/+bug/401646>
1681
if (self.has_same_location(source)
1682
and fetch_spec is None
1683
and self._has_same_fallbacks(source)):
1103
if self.has_same_location(source):
1684
1104
# check that last_revision is in 'from' and then return a
1685
1105
# no-operation.
1686
1106
if (revision_id is not None and
3705
2903
return self.source.revision_ids_to_search_result(result_set)
3708
class InterDifferingSerializer(InterRepository):
2906
class InterPackRepo(InterSameDataRepository):
2907
"""Optimised code paths between Pack based repositories."""
2910
def _get_repo_format_to_test(self):
2911
from bzrlib.repofmt import pack_repo
2912
return pack_repo.RepositoryFormatKnitPack1()
2915
def is_compatible(source, target):
2916
"""Be compatible with known Pack formats.
2918
We don't test for the stores being of specific types because that
2919
could lead to confusing results, and there is no need to be
2922
from bzrlib.repofmt.pack_repo import RepositoryFormatPack
2924
are_packs = (isinstance(source._format, RepositoryFormatPack) and
2925
isinstance(target._format, RepositoryFormatPack))
2926
except AttributeError:
2928
return are_packs and InterRepository._same_model(source, target)
2931
def fetch(self, revision_id=None, pb=None, find_ghosts=False):
2932
"""See InterRepository.fetch()."""
2933
if (len(self.source._fallback_repositories) > 0 or
2934
len(self.target._fallback_repositories) > 0):
2935
# The pack layer is not aware of fallback repositories, so when
2936
# fetching from a stacked repository or into a stacked repository
2937
# we use the generic fetch logic which uses the VersionedFiles
2938
# attributes on repository.
2939
from bzrlib.fetch import RepoFetcher
2940
fetcher = RepoFetcher(self.target, self.source, revision_id,
2942
return fetcher.count_copied, fetcher.failed_revisions
2943
mutter("Using fetch logic to copy between %s(%s) and %s(%s)",
2944
self.source, self.source._format, self.target, self.target._format)
2945
self.count_copied = 0
2946
if revision_id is None:
2948
# everything to do - use pack logic
2949
# to fetch from all packs to one without
2950
# inventory parsing etc, IFF nothing to be copied is in the target.
2952
source_revision_ids = frozenset(self.source.all_revision_ids())
2953
revision_ids = source_revision_ids - \
2954
frozenset(self.target_get_parent_map(source_revision_ids))
2955
revision_keys = [(revid,) for revid in revision_ids]
2956
target_pack_collection = self._get_target_pack_collection()
2957
index = target_pack_collection.revision_index.combined_index
2958
present_revision_ids = set(item[1][0] for item in
2959
index.iter_entries(revision_keys))
2960
revision_ids = set(revision_ids) - present_revision_ids
2961
# implementing the TODO will involve:
2962
# - detecting when all of a pack is selected
2963
# - avoiding as much as possible pre-selection, so the
2964
# more-core routines such as create_pack_from_packs can filter in
2965
# a just-in-time fashion. (though having a HEADS list on a
2966
# repository might make this a lot easier, because we could
2967
# sensibly detect 'new revisions' without doing a full index scan.
2968
elif _mod_revision.is_null(revision_id):
2973
revision_ids = self.search_missing_revision_ids(revision_id,
2974
find_ghosts=find_ghosts).get_keys()
2975
except errors.NoSuchRevision:
2976
raise errors.InstallFailed([revision_id])
2977
if len(revision_ids) == 0:
2979
return self._pack(self.source, self.target, revision_ids)
2981
def _pack(self, source, target, revision_ids):
2982
from bzrlib.repofmt.pack_repo import Packer
2983
target_pack_collection = self._get_target_pack_collection()
2984
packs = source._pack_collection.all_packs()
2985
pack = Packer(target_pack_collection, packs, '.fetch',
2986
revision_ids).pack()
2987
if pack is not None:
2988
target_pack_collection._save_pack_names()
2989
copied_revs = pack.get_revision_count()
2990
# Trigger an autopack. This may duplicate effort as we've just done
2991
# a pack creation, but for now it is simpler to think about as
2992
# 'upload data, then repack if needed'.
2994
return (copied_revs, [])
2998
def _autopack(self):
2999
self.target._pack_collection.autopack()
3001
def _get_target_pack_collection(self):
3002
return self.target._pack_collection
3005
def search_missing_revision_ids(self, revision_id=None, find_ghosts=True):
3006
"""See InterRepository.missing_revision_ids().
3008
:param find_ghosts: Find ghosts throughout the ancestry of
3011
if not find_ghosts and revision_id is not None:
3012
return self._walk_to_common_revisions([revision_id])
3013
elif revision_id is not None:
3014
# Find ghosts: search for revisions pointing from one repository to
3015
# the other, and vice versa, anywhere in the history of revision_id.
3016
graph = self.target_get_graph(other_repository=self.source)
3017
searcher = graph._make_breadth_first_searcher([revision_id])
3021
next_revs, ghosts = searcher.next_with_ghosts()
3022
except StopIteration:
3024
if revision_id in ghosts:
3025
raise errors.NoSuchRevision(self.source, revision_id)
3026
found_ids.update(next_revs)
3027
found_ids.update(ghosts)
3028
found_ids = frozenset(found_ids)
3029
# Double query here: should be able to avoid this by changing the
3030
# graph api further.
3031
result_set = found_ids - frozenset(
3032
self.target_get_parent_map(found_ids))
3034
source_ids = self.source.all_revision_ids()
3035
# source_ids is the worst possible case we may need to pull.
3036
# now we want to filter source_ids against what we actually
3037
# have in target, but don't try to check for existence where we know
3038
# we do not have a revision as that would be pointless.
3039
target_ids = set(self.target.all_revision_ids())
3040
result_set = set(source_ids).difference(target_ids)
3041
return self.source.revision_ids_to_search_result(result_set)
3044
class InterModel1and2(InterRepository):
3047
def _get_repo_format_to_test(self):
3051
def is_compatible(source, target):
3052
if not source.supports_rich_root() and target.supports_rich_root():
3058
def fetch(self, revision_id=None, pb=None, find_ghosts=False):
3059
"""See InterRepository.fetch()."""
3060
from bzrlib.fetch import Model1toKnit2Fetcher
3061
f = Model1toKnit2Fetcher(to_repository=self.target,
3062
from_repository=self.source,
3063
last_revision=revision_id,
3064
pb=pb, find_ghosts=find_ghosts)
3065
return f.count_copied, f.failed_revisions
3068
def copy_content(self, revision_id=None):
3069
"""Make a complete copy of the content in self into destination.
3071
This is a destructive operation! Do not use it on existing
3074
:param revision_id: Only copy the content needed to construct
3075
revision_id and its parents.
3078
self.target.set_make_working_trees(self.source.make_working_trees())
3079
except NotImplementedError:
3081
# but don't bother fetching if we have the needed data now.
3082
if (revision_id not in (None, _mod_revision.NULL_REVISION) and
3083
self.target.has_revision(revision_id)):
3085
self.target.fetch(self.source, revision_id=revision_id)
3088
class InterKnit1and2(InterKnitRepo):
3091
def _get_repo_format_to_test(self):
3095
def is_compatible(source, target):
3096
"""Be compatible with Knit1 source and Knit3 target"""
3098
from bzrlib.repofmt.knitrepo import (
3099
RepositoryFormatKnit1,
3100
RepositoryFormatKnit3,
3102
from bzrlib.repofmt.pack_repo import (
3103
RepositoryFormatKnitPack1,
3104
RepositoryFormatKnitPack3,
3105
RepositoryFormatKnitPack4,
3106
RepositoryFormatKnitPack5,
3107
RepositoryFormatKnitPack5RichRoot,
3108
RepositoryFormatKnitPack6,
3109
RepositoryFormatKnitPack6RichRoot,
3110
RepositoryFormatPackDevelopment2,
3111
RepositoryFormatPackDevelopment2Subtree,
3114
RepositoryFormatKnit1, # no rr, no subtree
3115
RepositoryFormatKnitPack1, # no rr, no subtree
3116
RepositoryFormatPackDevelopment2, # no rr, no subtree
3117
RepositoryFormatKnitPack5, # no rr, no subtree
3118
RepositoryFormatKnitPack6, # no rr, no subtree
3121
RepositoryFormatKnit3, # rr, subtree
3122
RepositoryFormatKnitPack3, # rr, subtree
3123
RepositoryFormatKnitPack4, # rr, no subtree
3124
RepositoryFormatKnitPack5RichRoot,# rr, no subtree
3125
RepositoryFormatKnitPack6RichRoot,# rr, no subtree
3126
RepositoryFormatPackDevelopment2Subtree, # rr, subtree
3128
for format in norichroot:
3129
if format.rich_root_data:
3130
raise AssertionError('Format %s is a rich-root format'
3131
' but is included in the non-rich-root list'
3133
for format in richroot:
3134
if not format.rich_root_data:
3135
raise AssertionError('Format %s is not a rich-root format'
3136
' but is included in the rich-root list'
3138
# TODO: One alternative is to just check format.rich_root_data,
3139
# instead of keeping membership lists. However, the formats
3140
# *also* have to use the same 'Knit' style of storage
3141
# (line-deltas, fulltexts, etc.)
3142
return (isinstance(source._format, norichroot) and
3143
isinstance(target._format, richroot))
3144
except AttributeError:
3148
def fetch(self, revision_id=None, pb=None, find_ghosts=False):
3149
"""See InterRepository.fetch()."""
3150
from bzrlib.fetch import Knit1to2Fetcher
3151
mutter("Using fetch logic to copy between %s(%s) and %s(%s)",
3152
self.source, self.source._format, self.target,
3153
self.target._format)
3154
f = Knit1to2Fetcher(to_repository=self.target,
3155
from_repository=self.source,
3156
last_revision=revision_id,
3157
pb=pb, find_ghosts=find_ghosts)
3158
return f.count_copied, f.failed_revisions
3161
class InterDifferingSerializer(InterKnitRepo):
3711
3164
def _get_repo_format_to_test(self):
3715
3168
def is_compatible(source, target):
3716
3169
"""Be compatible with Knit2 source and Knit3 target"""
3717
# This is redundant with format.check_conversion_target(), however that
3718
# raises an exception, and we just want to say "False" as in we won't
3719
# support converting between these formats.
3720
if 'IDS_never' in debug.debug_flags:
3722
if source.supports_rich_root() and not target.supports_rich_root():
3724
if (source._format.supports_tree_reference
3725
and not target._format.supports_tree_reference):
3727
if target._fallback_repositories and target._format.supports_chks:
3728
# IDS doesn't know how to copy CHKs for the parent inventories it
3729
# adds to stacked repos.
3731
if 'IDS_always' in debug.debug_flags:
3733
# Only use this code path for local source and target. IDS does far
3734
# too much IO (both bandwidth and roundtrips) over a network.
3735
if not source.bzrdir.transport.base.startswith('file:///'):
3737
if not target.bzrdir.transport.base.startswith('file:///'):
3170
if source.supports_rich_root() != target.supports_rich_root():
3172
# Ideally, we'd support fetching if the source had no tree references
3173
# even if it supported them...
3174
if (getattr(source, '_format.supports_tree_reference', False) and
3175
not getattr(target, '_format.supports_tree_reference', False)):
3741
def _get_trees(self, revision_ids, cache):
3743
for rev_id in revision_ids:
3745
possible_trees.append((rev_id, cache[rev_id]))
3747
# Not cached, but inventory might be present anyway.
3749
tree = self.source.revision_tree(rev_id)
3750
except errors.NoSuchRevision:
3751
# Nope, parent is ghost.
3754
cache[rev_id] = tree
3755
possible_trees.append((rev_id, tree))
3756
return possible_trees
3758
def _get_delta_for_revision(self, tree, parent_ids, possible_trees):
3759
"""Get the best delta and base for this revision.
3761
:return: (basis_id, delta)
3764
# Generate deltas against each tree, to find the shortest.
3765
texts_possibly_new_in_tree = set()
3766
for basis_id, basis_tree in possible_trees:
3767
delta = tree.inventory._make_delta(basis_tree.inventory)
3768
for old_path, new_path, file_id, new_entry in delta:
3769
if new_path is None:
3770
# This file_id isn't present in the new rev, so we don't
3774
# Rich roots are handled elsewhere...
3776
kind = new_entry.kind
3777
if kind != 'directory' and kind != 'file':
3778
# No text record associated with this inventory entry.
3780
# This is a directory or file that has changed somehow.
3781
texts_possibly_new_in_tree.add((file_id, new_entry.revision))
3782
deltas.append((len(delta), basis_id, delta))
3784
return deltas[0][1:]
3786
def _fetch_parent_invs_for_stacking(self, parent_map, cache):
3787
"""Find all parent revisions that are absent, but for which the
3788
inventory is present, and copy those inventories.
3790
This is necessary to preserve correctness when the source is stacked
3791
without fallbacks configured. (Note that in cases like upgrade the
3792
source may be not have _fallback_repositories even though it is
3796
for parents in parent_map.values():
3797
parent_revs.update(parents)
3798
present_parents = self.source.get_parent_map(parent_revs)
3799
absent_parents = set(parent_revs).difference(present_parents)
3800
parent_invs_keys_for_stacking = self.source.inventories.get_parent_map(
3801
(rev_id,) for rev_id in absent_parents)
3802
parent_inv_ids = [key[-1] for key in parent_invs_keys_for_stacking]
3803
for parent_tree in self.source.revision_trees(parent_inv_ids):
3804
current_revision_id = parent_tree.get_revision_id()
3805
parents_parents_keys = parent_invs_keys_for_stacking[
3806
(current_revision_id,)]
3807
parents_parents = [key[-1] for key in parents_parents_keys]
3808
basis_id = _mod_revision.NULL_REVISION
3809
basis_tree = self.source.revision_tree(basis_id)
3810
delta = parent_tree.inventory._make_delta(basis_tree.inventory)
3811
self.target.add_inventory_by_delta(
3812
basis_id, delta, current_revision_id, parents_parents)
3813
cache[current_revision_id] = parent_tree
3815
def _fetch_batch(self, revision_ids, basis_id, cache):
3179
def _fetch_batch(self, revision_ids, basis_id, basis_tree):
3816
3180
"""Fetch across a few revisions.
3818
3182
:param revision_ids: The revisions to copy
3819
:param basis_id: The revision_id of a tree that must be in cache, used
3820
as a basis for delta when no other base is available
3821
:param cache: A cache of RevisionTrees that we can use.
3822
:return: The revision_id of the last converted tree. The RevisionTree
3823
for it will be in cache
3183
:param basis_id: The revision_id of basis_tree
3184
:param basis_tree: A tree that is not in revision_ids which should
3185
already exist in the target.
3186
:return: (basis_id, basis_tree) A new basis to use now that these trees
3825
3189
# Walk though all revisions; get inventory deltas, copy referenced
3826
3190
# texts that delta references, insert the delta, revision and
3828
root_keys_to_create = set()
3829
3192
text_keys = set()
3830
3193
pending_deltas = []
3831
3194
pending_revisions = []
3832
parent_map = self.source.get_parent_map(revision_ids)
3833
self._fetch_parent_invs_for_stacking(parent_map, cache)
3834
3195
for tree in self.source.revision_trees(revision_ids):
3835
# Find a inventory delta for this revision.
3836
# Find text entries that need to be copied, too.
3837
3196
current_revision_id = tree.get_revision_id()
3838
parent_ids = parent_map.get(current_revision_id, ())
3839
parent_trees = self._get_trees(parent_ids, cache)
3840
possible_trees = list(parent_trees)
3841
if len(possible_trees) == 0:
3842
# There either aren't any parents, or the parents are ghosts,
3843
# so just use the last converted tree.
3844
possible_trees.append((basis_id, cache[basis_id]))
3845
basis_id, delta = self._get_delta_for_revision(tree, parent_ids,
3197
delta = tree.inventory._make_delta(basis_tree.inventory)
3198
for old_path, new_path, file_id, entry in delta:
3199
if new_path is not None:
3200
if not (new_path or self.target.supports_rich_root()):
3201
# We leave the inventory delta in, because that
3202
# will have the deserialised inventory root
3206
# "if entry.revision == current_revision_id" ?
3207
if entry.revision == current_revision_id:
3208
text_keys.add((file_id, entry.revision))
3847
3209
revision = self.source.get_revision(current_revision_id)
3848
3210
pending_deltas.append((basis_id, delta,
3849
3211
current_revision_id, revision.parent_ids))
3850
if self._converting_to_rich_root:
3851
self._revision_id_to_root_id[current_revision_id] = \
3853
# Determine which texts are in present in this revision but not in
3854
# any of the available parents.
3855
texts_possibly_new_in_tree = set()
3856
for old_path, new_path, file_id, entry in delta:
3857
if new_path is None:
3858
# This file_id isn't present in the new rev
3862
if not self.target.supports_rich_root():
3863
# The target doesn't support rich root, so we don't
3866
if self._converting_to_rich_root:
3867
# This can't be copied normally, we have to insert
3869
root_keys_to_create.add((file_id, entry.revision))
3872
texts_possibly_new_in_tree.add((file_id, entry.revision))
3873
for basis_id, basis_tree in possible_trees:
3874
basis_inv = basis_tree.inventory
3875
for file_key in list(texts_possibly_new_in_tree):
3876
file_id, file_revision = file_key
3878
entry = basis_inv[file_id]
3879
except errors.NoSuchId:
3881
if entry.revision == file_revision:
3882
texts_possibly_new_in_tree.remove(file_key)
3883
text_keys.update(texts_possibly_new_in_tree)
3884
3212
pending_revisions.append(revision)
3885
cache[current_revision_id] = tree
3886
3213
basis_id = current_revision_id
3887
3215
# Copy file texts
3888
3216
from_texts = self.source.texts
3889
3217
to_texts = self.target.texts
3890
if root_keys_to_create:
3891
from bzrlib.fetch import _new_root_data_stream
3892
root_stream = _new_root_data_stream(
3893
root_keys_to_create, self._revision_id_to_root_id, parent_map,
3895
to_texts.insert_record_stream(root_stream)
3896
3218
to_texts.insert_record_stream(from_texts.get_record_stream(
3897
text_keys, self.target._format._fetch_order,
3898
not self.target._format._fetch_uses_deltas))
3899
# insert inventory deltas
3219
text_keys, self.target._fetch_order,
3220
not self.target._fetch_uses_deltas))
3900
3222
for delta in pending_deltas:
3901
3223
self.target.add_inventory_by_delta(*delta)
3902
if self.target._fallback_repositories:
3903
# Make sure this stacked repository has all the parent inventories
3904
# for the new revisions that we are about to insert. We do this
3905
# before adding the revisions so that no revision is added until
3906
# all the inventories it may depend on are added.
3907
# Note that this is overzealous, as we may have fetched these in an
3910
revision_ids = set()
3911
for revision in pending_revisions:
3912
revision_ids.add(revision.revision_id)
3913
parent_ids.update(revision.parent_ids)
3914
parent_ids.difference_update(revision_ids)
3915
parent_ids.discard(_mod_revision.NULL_REVISION)
3916
parent_map = self.source.get_parent_map(parent_ids)
3917
# we iterate over parent_map and not parent_ids because we don't
3918
# want to try copying any revision which is a ghost
3919
for parent_tree in self.source.revision_trees(parent_map):
3920
current_revision_id = parent_tree.get_revision_id()
3921
parents_parents = parent_map[current_revision_id]
3922
possible_trees = self._get_trees(parents_parents, cache)
3923
if len(possible_trees) == 0:
3924
# There either aren't any parents, or the parents are
3925
# ghosts, so just use the last converted tree.
3926
possible_trees.append((basis_id, cache[basis_id]))
3927
basis_id, delta = self._get_delta_for_revision(parent_tree,
3928
parents_parents, possible_trees)
3929
self.target.add_inventory_by_delta(
3930
basis_id, delta, current_revision_id, parents_parents)
3931
3224
# insert signatures and revisions
3932
3225
for revision in pending_revisions:
4034
3303
return basis_id, basis_tree
3306
class InterOtherToRemote(InterRepository):
3307
"""An InterRepository that simply delegates to the 'real' InterRepository
3308
calculated for (source, target._real_repository).
3311
_walk_to_common_revisions_batch_size = 50
3313
def __init__(self, source, target):
3314
InterRepository.__init__(self, source, target)
3315
self._real_inter = None
3318
def is_compatible(source, target):
3319
if isinstance(target, remote.RemoteRepository):
3323
def _ensure_real_inter(self):
3324
if self._real_inter is None:
3325
self.target._ensure_real()
3326
real_target = self.target._real_repository
3327
self._real_inter = InterRepository.get(self.source, real_target)
3328
# Make _real_inter use the RemoteRepository for get_parent_map
3329
self._real_inter.target_get_graph = self.target.get_graph
3330
self._real_inter.target_get_parent_map = self.target.get_parent_map
3332
def copy_content(self, revision_id=None):
3333
self._ensure_real_inter()
3334
self._real_inter.copy_content(revision_id=revision_id)
3336
def fetch(self, revision_id=None, pb=None, find_ghosts=False):
3337
self._ensure_real_inter()
3338
return self._real_inter.fetch(revision_id=revision_id, pb=pb,
3339
find_ghosts=find_ghosts)
3342
def _get_repo_format_to_test(self):
3346
class InterRemoteToOther(InterRepository):
3348
def __init__(self, source, target):
3349
InterRepository.__init__(self, source, target)
3350
self._real_inter = None
3353
def is_compatible(source, target):
3354
if not isinstance(source, remote.RemoteRepository):
3356
# Is source's model compatible with target's model?
3357
source._ensure_real()
3358
real_source = source._real_repository
3359
if isinstance(real_source, remote.RemoteRepository):
3360
raise NotImplementedError(
3361
"We don't support remote repos backed by remote repos yet.")
3362
return InterRepository._same_model(real_source, target)
3364
def _ensure_real_inter(self):
3365
if self._real_inter is None:
3366
self.source._ensure_real()
3367
real_source = self.source._real_repository
3368
self._real_inter = InterRepository.get(real_source, self.target)
3370
def fetch(self, revision_id=None, pb=None, find_ghosts=False):
3371
self._ensure_real_inter()
3372
return self._real_inter.fetch(revision_id=revision_id, pb=pb,
3373
find_ghosts=find_ghosts)
3375
def copy_content(self, revision_id=None):
3376
self._ensure_real_inter()
3377
self._real_inter.copy_content(revision_id=revision_id)
3380
def _get_repo_format_to_test(self):
3385
class InterPackToRemotePack(InterPackRepo):
3386
"""A specialisation of InterPackRepo for a target that is a
3389
This will use the get_parent_map RPC rather than plain readvs, and also
3390
uses an RPC for autopacking.
3393
_walk_to_common_revisions_batch_size = 50
3396
def is_compatible(source, target):
3397
from bzrlib.repofmt.pack_repo import RepositoryFormatPack
3398
if isinstance(source._format, RepositoryFormatPack):
3399
if isinstance(target, remote.RemoteRepository):
3400
target._ensure_real()
3401
if isinstance(target._real_repository._format,
3402
RepositoryFormatPack):
3403
if InterRepository._same_model(source, target):
3407
def _autopack(self):
3408
self.target.autopack()
3410
def _get_target_pack_collection(self):
3411
return self.target._real_repository._pack_collection
3414
def _get_repo_format_to_test(self):
4037
3418
InterRepository.register_optimiser(InterDifferingSerializer)
4038
3419
InterRepository.register_optimiser(InterSameDataRepository)
4039
3420
InterRepository.register_optimiser(InterWeaveRepo)
4040
3421
InterRepository.register_optimiser(InterKnitRepo)
3422
InterRepository.register_optimiser(InterModel1and2)
3423
InterRepository.register_optimiser(InterKnit1and2)
3424
InterRepository.register_optimiser(InterPackRepo)
3425
InterRepository.register_optimiser(InterOtherToRemote)
3426
InterRepository.register_optimiser(InterRemoteToOther)
3427
InterRepository.register_optimiser(InterPackToRemotePack)
4043
3430
class CopyConverter(object):
4044
3431
"""A repository conversion tool which just performs a copy of the content.
4046
3433
This is slow but quite reliable.
4203
3581
revision_graph[key] = tuple(parent for parent in parents if parent
4204
3582
in revision_graph)
4205
3583
return revision_graph
4208
class StreamSink(object):
4209
"""An object that can insert a stream into a repository.
4211
This interface handles the complexity of reserialising inventories and
4212
revisions from different formats, and allows unidirectional insertion into
4213
stacked repositories without looking for the missing basis parents
4217
def __init__(self, target_repo):
4218
self.target_repo = target_repo
4220
def insert_stream(self, stream, src_format, resume_tokens):
4221
"""Insert a stream's content into the target repository.
4223
:param src_format: a bzr repository format.
4225
:return: a list of resume tokens and an iterable of keys additional
4226
items required before the insertion can be completed.
4228
self.target_repo.lock_write()
4231
self.target_repo.resume_write_group(resume_tokens)
4234
self.target_repo.start_write_group()
4237
# locked_insert_stream performs a commit|suspend.
4238
return self._locked_insert_stream(stream, src_format, is_resume)
4240
self.target_repo.abort_write_group(suppress_errors=True)
4243
self.target_repo.unlock()
4245
def _locked_insert_stream(self, stream, src_format, is_resume):
4246
to_serializer = self.target_repo._format._serializer
4247
src_serializer = src_format._serializer
4249
if to_serializer == src_serializer:
4250
# If serializers match and the target is a pack repository, set the
4251
# write cache size on the new pack. This avoids poor performance
4252
# on transports where append is unbuffered (such as
4253
# RemoteTransport). This is safe to do because nothing should read
4254
# back from the target repository while a stream with matching
4255
# serialization is being inserted.
4256
# The exception is that a delta record from the source that should
4257
# be a fulltext may need to be expanded by the target (see
4258
# test_fetch_revisions_with_deltas_into_pack); but we take care to
4259
# explicitly flush any buffered writes first in that rare case.
4261
new_pack = self.target_repo._pack_collection._new_pack
4262
except AttributeError:
4263
# Not a pack repository
4266
new_pack.set_write_cache_size(1024*1024)
4267
for substream_type, substream in stream:
4268
if 'stream' in debug.debug_flags:
4269
mutter('inserting substream: %s', substream_type)
4270
if substream_type == 'texts':
4271
self.target_repo.texts.insert_record_stream(substream)
4272
elif substream_type == 'inventories':
4273
if src_serializer == to_serializer:
4274
self.target_repo.inventories.insert_record_stream(
4277
self._extract_and_insert_inventories(
4278
substream, src_serializer)
4279
elif substream_type == 'inventory-deltas':
4280
self._extract_and_insert_inventory_deltas(
4281
substream, src_serializer)
4282
elif substream_type == 'chk_bytes':
4283
# XXX: This doesn't support conversions, as it assumes the
4284
# conversion was done in the fetch code.
4285
self.target_repo.chk_bytes.insert_record_stream(substream)
4286
elif substream_type == 'revisions':
4287
# This may fallback to extract-and-insert more often than
4288
# required if the serializers are different only in terms of
4290
if src_serializer == to_serializer:
4291
self.target_repo.revisions.insert_record_stream(
4294
self._extract_and_insert_revisions(substream,
4296
elif substream_type == 'signatures':
4297
self.target_repo.signatures.insert_record_stream(substream)
4299
raise AssertionError('kaboom! %s' % (substream_type,))
4300
# Done inserting data, and the missing_keys calculations will try to
4301
# read back from the inserted data, so flush the writes to the new pack
4302
# (if this is pack format).
4303
if new_pack is not None:
4304
new_pack._write_data('', flush=True)
4305
# Find all the new revisions (including ones from resume_tokens)
4306
missing_keys = self.target_repo.get_missing_parent_inventories(
4307
check_for_missing_texts=is_resume)
4309
for prefix, versioned_file in (
4310
('texts', self.target_repo.texts),
4311
('inventories', self.target_repo.inventories),
4312
('revisions', self.target_repo.revisions),
4313
('signatures', self.target_repo.signatures),
4314
('chk_bytes', self.target_repo.chk_bytes),
4316
if versioned_file is None:
4318
missing_keys.update((prefix,) + key for key in
4319
versioned_file.get_missing_compression_parent_keys())
4320
except NotImplementedError:
4321
# cannot even attempt suspending, and missing would have failed
4322
# during stream insertion.
4323
missing_keys = set()
4326
# suspend the write group and tell the caller what we is
4327
# missing. We know we can suspend or else we would not have
4328
# entered this code path. (All repositories that can handle
4329
# missing keys can handle suspending a write group).
4330
write_group_tokens = self.target_repo.suspend_write_group()
4331
return write_group_tokens, missing_keys
4332
hint = self.target_repo.commit_write_group()
4333
if (to_serializer != src_serializer and
4334
self.target_repo._format.pack_compresses):
4335
self.target_repo.pack(hint=hint)
4338
def _extract_and_insert_inventory_deltas(self, substream, serializer):
4339
target_rich_root = self.target_repo._format.rich_root_data
4340
target_tree_refs = self.target_repo._format.supports_tree_reference
4341
for record in substream:
4342
# Insert the delta directly
4343
inventory_delta_bytes = record.get_bytes_as('fulltext')
4344
deserialiser = inventory_delta.InventoryDeltaDeserializer()
4346
parse_result = deserialiser.parse_text_bytes(
4347
inventory_delta_bytes)
4348
except inventory_delta.IncompatibleInventoryDelta, err:
4349
trace.mutter("Incompatible delta: %s", err.msg)
4350
raise errors.IncompatibleRevision(self.target_repo._format)
4351
basis_id, new_id, rich_root, tree_refs, inv_delta = parse_result
4352
revision_id = new_id
4353
parents = [key[0] for key in record.parents]
4354
self.target_repo.add_inventory_by_delta(
4355
basis_id, inv_delta, revision_id, parents)
4357
def _extract_and_insert_inventories(self, substream, serializer,
4359
"""Generate a new inventory versionedfile in target, converting data.
4361
The inventory is retrieved from the source, (deserializing it), and
4362
stored in the target (reserializing it in a different format).
4364
target_rich_root = self.target_repo._format.rich_root_data
4365
target_tree_refs = self.target_repo._format.supports_tree_reference
4366
for record in substream:
4367
# It's not a delta, so it must be a fulltext in the source
4368
# serializer's format.
4369
bytes = record.get_bytes_as('fulltext')
4370
revision_id = record.key[0]
4371
inv = serializer.read_inventory_from_string(bytes, revision_id)
4372
parents = [key[0] for key in record.parents]
4373
self.target_repo.add_inventory(revision_id, inv, parents)
4374
# No need to keep holding this full inv in memory when the rest of
4375
# the substream is likely to be all deltas.
4378
def _extract_and_insert_revisions(self, substream, serializer):
4379
for record in substream:
4380
bytes = record.get_bytes_as('fulltext')
4381
revision_id = record.key[0]
4382
rev = serializer.read_revision_from_string(bytes)
4383
if rev.revision_id != revision_id:
4384
raise AssertionError('wtf: %s != %s' % (rev, revision_id))
4385
self.target_repo.add_revision(revision_id, rev)
4388
if self.target_repo._format._fetch_reconcile:
4389
self.target_repo.reconcile()
4392
class StreamSource(object):
4393
"""A source of a stream for fetching between repositories."""
4395
def __init__(self, from_repository, to_format):
4396
"""Create a StreamSource streaming from from_repository."""
4397
self.from_repository = from_repository
4398
self.to_format = to_format
4400
def delta_on_metadata(self):
4401
"""Return True if delta's are permitted on metadata streams.
4403
That is on revisions and signatures.
4405
src_serializer = self.from_repository._format._serializer
4406
target_serializer = self.to_format._serializer
4407
return (self.to_format._fetch_uses_deltas and
4408
src_serializer == target_serializer)
4410
def _fetch_revision_texts(self, revs):
4411
# fetch signatures first and then the revision texts
4412
# may need to be a InterRevisionStore call here.
4413
from_sf = self.from_repository.signatures
4414
# A missing signature is just skipped.
4415
keys = [(rev_id,) for rev_id in revs]
4416
signatures = versionedfile.filter_absent(from_sf.get_record_stream(
4418
self.to_format._fetch_order,
4419
not self.to_format._fetch_uses_deltas))
4420
# If a revision has a delta, this is actually expanded inside the
4421
# insert_record_stream code now, which is an alternate fix for
4423
from_rf = self.from_repository.revisions
4424
revisions = from_rf.get_record_stream(
4426
self.to_format._fetch_order,
4427
not self.delta_on_metadata())
4428
return [('signatures', signatures), ('revisions', revisions)]
4430
def _generate_root_texts(self, revs):
4431
"""This will be called by get_stream between fetching weave texts and
4432
fetching the inventory weave.
4434
if self._rich_root_upgrade():
4436
return bzrlib.fetch.Inter1and2Helper(
4437
self.from_repository).generate_root_texts(revs)
4441
def get_stream(self, search):
4443
revs = search.get_keys()
4444
graph = self.from_repository.get_graph()
4445
revs = tsort.topo_sort(graph.get_parent_map(revs))
4446
data_to_fetch = self.from_repository.item_keys_introduced_by(revs)
4448
for knit_kind, file_id, revisions in data_to_fetch:
4449
if knit_kind != phase:
4451
# Make a new progress bar for this phase
4452
if knit_kind == "file":
4453
# Accumulate file texts
4454
text_keys.extend([(file_id, revision) for revision in
4456
elif knit_kind == "inventory":
4457
# Now copy the file texts.
4458
from_texts = self.from_repository.texts
4459
yield ('texts', from_texts.get_record_stream(
4460
text_keys, self.to_format._fetch_order,
4461
not self.to_format._fetch_uses_deltas))
4462
# Cause an error if a text occurs after we have done the
4465
# Before we process the inventory we generate the root
4466
# texts (if necessary) so that the inventories references
4468
for _ in self._generate_root_texts(revs):
4470
# we fetch only the referenced inventories because we do not
4471
# know for unselected inventories whether all their required
4472
# texts are present in the other repository - it could be
4474
for info in self._get_inventory_stream(revs):
4476
elif knit_kind == "signatures":
4477
# Nothing to do here; this will be taken care of when
4478
# _fetch_revision_texts happens.
4480
elif knit_kind == "revisions":
4481
for record in self._fetch_revision_texts(revs):
4484
raise AssertionError("Unknown knit kind %r" % knit_kind)
4486
def get_stream_for_missing_keys(self, missing_keys):
4487
# missing keys can only occur when we are byte copying and not
4488
# translating (because translation means we don't send
4489
# unreconstructable deltas ever).
4491
keys['texts'] = set()
4492
keys['revisions'] = set()
4493
keys['inventories'] = set()
4494
keys['chk_bytes'] = set()
4495
keys['signatures'] = set()
4496
for key in missing_keys:
4497
keys[key[0]].add(key[1:])
4498
if len(keys['revisions']):
4499
# If we allowed copying revisions at this point, we could end up
4500
# copying a revision without copying its required texts: a
4501
# violation of the requirements for repository integrity.
4502
raise AssertionError(
4503
'cannot copy revisions to fill in missing deltas %s' % (
4504
keys['revisions'],))
4505
for substream_kind, keys in keys.iteritems():
4506
vf = getattr(self.from_repository, substream_kind)
4507
if vf is None and keys:
4508
raise AssertionError(
4509
"cannot fill in keys for a versioned file we don't"
4510
" have: %s needs %s" % (substream_kind, keys))
4512
# No need to stream something we don't have
4514
if substream_kind == 'inventories':
4515
# Some missing keys are genuinely ghosts, filter those out.
4516
present = self.from_repository.inventories.get_parent_map(keys)
4517
revs = [key[0] for key in present]
4518
# Get the inventory stream more-or-less as we do for the
4519
# original stream; there's no reason to assume that records
4520
# direct from the source will be suitable for the sink. (Think
4521
# e.g. 2a -> 1.9-rich-root).
4522
for info in self._get_inventory_stream(revs, missing=True):
4526
# Ask for full texts always so that we don't need more round trips
4527
# after this stream.
4528
# Some of the missing keys are genuinely ghosts, so filter absent
4529
# records. The Sink is responsible for doing another check to
4530
# ensure that ghosts don't introduce missing data for future
4532
stream = versionedfile.filter_absent(vf.get_record_stream(keys,
4533
self.to_format._fetch_order, True))
4534
yield substream_kind, stream
4536
def inventory_fetch_order(self):
4537
if self._rich_root_upgrade():
4538
return 'topological'
4540
return self.to_format._fetch_order
4542
def _rich_root_upgrade(self):
4543
return (not self.from_repository._format.rich_root_data and
4544
self.to_format.rich_root_data)
4546
def _get_inventory_stream(self, revision_ids, missing=False):
4547
from_format = self.from_repository._format
4548
if (from_format.supports_chks and self.to_format.supports_chks and
4549
from_format.network_name() == self.to_format.network_name()):
4550
raise AssertionError(
4551
"this case should be handled by GroupCHKStreamSource")
4552
elif 'forceinvdeltas' in debug.debug_flags:
4553
return self._get_convertable_inventory_stream(revision_ids,
4554
delta_versus_null=missing)
4555
elif from_format.network_name() == self.to_format.network_name():
4557
return self._get_simple_inventory_stream(revision_ids,
4559
elif (not from_format.supports_chks and not self.to_format.supports_chks
4560
and from_format._serializer == self.to_format._serializer):
4561
# Essentially the same format.
4562
return self._get_simple_inventory_stream(revision_ids,
4565
# Any time we switch serializations, we want to use an
4566
# inventory-delta based approach.
4567
return self._get_convertable_inventory_stream(revision_ids,
4568
delta_versus_null=missing)
4570
def _get_simple_inventory_stream(self, revision_ids, missing=False):
4571
# NB: This currently reopens the inventory weave in source;
4572
# using a single stream interface instead would avoid this.
4573
from_weave = self.from_repository.inventories
4575
delta_closure = True
4577
delta_closure = not self.delta_on_metadata()
4578
yield ('inventories', from_weave.get_record_stream(
4579
[(rev_id,) for rev_id in revision_ids],
4580
self.inventory_fetch_order(), delta_closure))
4582
def _get_convertable_inventory_stream(self, revision_ids,
4583
delta_versus_null=False):
4584
# The source is using CHKs, but the target either doesn't or it has a
4585
# different serializer. The StreamSink code expects to be able to
4586
# convert on the target, so we need to put bytes-on-the-wire that can
4587
# be converted. That means inventory deltas (if the remote is <1.19,
4588
# RemoteStreamSink will fallback to VFS to insert the deltas).
4589
yield ('inventory-deltas',
4590
self._stream_invs_as_deltas(revision_ids,
4591
delta_versus_null=delta_versus_null))
4593
def _stream_invs_as_deltas(self, revision_ids, delta_versus_null=False):
4594
"""Return a stream of inventory-deltas for the given rev ids.
4596
:param revision_ids: The list of inventories to transmit
4597
:param delta_versus_null: Don't try to find a minimal delta for this
4598
entry, instead compute the delta versus the NULL_REVISION. This
4599
effectively streams a complete inventory. Used for stuff like
4600
filling in missing parents, etc.
4602
from_repo = self.from_repository
4603
revision_keys = [(rev_id,) for rev_id in revision_ids]
4604
parent_map = from_repo.inventories.get_parent_map(revision_keys)
4605
# XXX: possibly repos could implement a more efficient iter_inv_deltas
4607
inventories = self.from_repository.iter_inventories(
4608
revision_ids, 'topological')
4609
format = from_repo._format
4610
invs_sent_so_far = set([_mod_revision.NULL_REVISION])
4611
inventory_cache = lru_cache.LRUCache(50)
4612
null_inventory = from_repo.revision_tree(
4613
_mod_revision.NULL_REVISION).inventory
4614
# XXX: ideally the rich-root/tree-refs flags would be per-revision, not
4615
# per-repo (e.g. streaming a non-rich-root revision out of a rich-root
4616
# repo back into a non-rich-root repo ought to be allowed)
4617
serializer = inventory_delta.InventoryDeltaSerializer(
4618
versioned_root=format.rich_root_data,
4619
tree_references=format.supports_tree_reference)
4620
for inv in inventories:
4621
key = (inv.revision_id,)
4622
parent_keys = parent_map.get(key, ())
4624
if not delta_versus_null and parent_keys:
4625
# The caller did not ask for complete inventories and we have
4626
# some parents that we can delta against. Make a delta against
4627
# each parent so that we can find the smallest.
4628
parent_ids = [parent_key[0] for parent_key in parent_keys]
4629
for parent_id in parent_ids:
4630
if parent_id not in invs_sent_so_far:
4631
# We don't know that the remote side has this basis, so
4634
if parent_id == _mod_revision.NULL_REVISION:
4635
parent_inv = null_inventory
4637
parent_inv = inventory_cache.get(parent_id, None)
4638
if parent_inv is None:
4639
parent_inv = from_repo.get_inventory(parent_id)
4640
candidate_delta = inv._make_delta(parent_inv)
4641
if (delta is None or
4642
len(delta) > len(candidate_delta)):
4643
delta = candidate_delta
4644
basis_id = parent_id
4646
# Either none of the parents ended up being suitable, or we
4647
# were asked to delta against NULL
4648
basis_id = _mod_revision.NULL_REVISION
4649
delta = inv._make_delta(null_inventory)
4650
invs_sent_so_far.add(inv.revision_id)
4651
inventory_cache[inv.revision_id] = inv
4652
delta_serialized = ''.join(
4653
serializer.delta_to_lines(basis_id, key[-1], delta))
4654
yield versionedfile.FulltextContentFactory(
4655
key, parent_keys, None, delta_serialized)
4658
def _iter_for_revno(repo, partial_history_cache, stop_index=None,
4659
stop_revision=None):
4660
"""Extend the partial history to include a given index
4662
If a stop_index is supplied, stop when that index has been reached.
4663
If a stop_revision is supplied, stop when that revision is
4664
encountered. Otherwise, stop when the beginning of history is
4667
:param stop_index: The index which should be present. When it is
4668
present, history extension will stop.
4669
:param stop_revision: The revision id which should be present. When
4670
it is encountered, history extension will stop.
4672
start_revision = partial_history_cache[-1]
4673
iterator = repo.iter_reverse_revision_history(start_revision)
4675
#skip the last revision in the list
4678
if (stop_index is not None and
4679
len(partial_history_cache) > stop_index):
4681
if partial_history_cache[-1] == stop_revision:
4683
revision_id = iterator.next()
4684
partial_history_cache.append(revision_id)
4685
except StopIteration: