538
407
# unchanged, carry over.
539
408
ie.reference_revision = parent_entry.reference_revision
540
409
ie.revision = parent_entry.revision
541
return self._get_delta(ie, basis_inv, path), False, None
410
return self._get_delta(ie, basis_inv, path), False
542
411
ie.reference_revision = content_summary[3]
543
self._add_text_to_weave(ie.file_id, '', heads, None)
413
self._add_text_to_weave(ie.file_id, lines, heads, None)
545
415
raise NotImplementedError('unknown kind')
546
416
ie.revision = self._new_revision_id
547
self._any_changes = True
548
return self._get_delta(ie, basis_inv, path), True, fingerprint
550
def record_iter_changes(self, tree, basis_revision_id, iter_changes,
551
_entry_factory=entry_factory):
552
"""Record a new tree via iter_changes.
554
:param tree: The tree to obtain text contents from for changed objects.
555
:param basis_revision_id: The revision id of the tree the iter_changes
556
has been generated against. Currently assumed to be the same
557
as self.parents[0] - if it is not, errors may occur.
558
:param iter_changes: An iter_changes iterator with the changes to apply
559
to basis_revision_id. The iterator must not include any items with
560
a current kind of None - missing items must be either filtered out
561
or errored-on beefore record_iter_changes sees the item.
562
:param _entry_factory: Private method to bind entry_factory locally for
564
:return: A generator of (file_id, relpath, fs_hash) tuples for use with
567
# Create an inventory delta based on deltas between all the parents and
568
# deltas between all the parent inventories. We use inventory delta's
569
# between the inventory objects because iter_changes masks
570
# last-changed-field only changes.
572
# file_id -> change map, change is fileid, paths, changed, versioneds,
573
# parents, names, kinds, executables
575
# {file_id -> revision_id -> inventory entry, for entries in parent
576
# trees that are not parents[0]
580
revtrees = list(self.repository.revision_trees(self.parents))
581
except errors.NoSuchRevision:
582
# one or more ghosts, slow path.
584
for revision_id in self.parents:
586
revtrees.append(self.repository.revision_tree(revision_id))
587
except errors.NoSuchRevision:
589
basis_revision_id = _mod_revision.NULL_REVISION
591
revtrees.append(self.repository.revision_tree(
592
_mod_revision.NULL_REVISION))
593
# The basis inventory from a repository
595
basis_inv = revtrees[0].inventory
597
basis_inv = self.repository.revision_tree(
598
_mod_revision.NULL_REVISION).inventory
599
if len(self.parents) > 0:
600
if basis_revision_id != self.parents[0] and not ghost_basis:
602
"arbitrary basis parents not yet supported with merges")
603
for revtree in revtrees[1:]:
604
for change in revtree.inventory._make_delta(basis_inv):
605
if change[1] is None:
606
# Not present in this parent.
608
if change[2] not in merged_ids:
609
if change[0] is not None:
610
basis_entry = basis_inv[change[2]]
611
merged_ids[change[2]] = [
613
basis_entry.revision,
616
parent_entries[change[2]] = {
618
basis_entry.revision:basis_entry,
620
change[3].revision:change[3],
623
merged_ids[change[2]] = [change[3].revision]
624
parent_entries[change[2]] = {change[3].revision:change[3]}
626
merged_ids[change[2]].append(change[3].revision)
627
parent_entries[change[2]][change[3].revision] = change[3]
630
# Setup the changes from the tree:
631
# changes maps file_id -> (change, [parent revision_ids])
633
for change in iter_changes:
634
# This probably looks up in basis_inv way to much.
635
if change[1][0] is not None:
636
head_candidate = [basis_inv[change[0]].revision]
639
changes[change[0]] = change, merged_ids.get(change[0],
641
unchanged_merged = set(merged_ids) - set(changes)
642
# Extend the changes dict with synthetic changes to record merges of
644
for file_id in unchanged_merged:
645
# Record a merged version of these items that did not change vs the
646
# basis. This can be either identical parallel changes, or a revert
647
# of a specific file after a merge. The recorded content will be
648
# that of the current tree (which is the same as the basis), but
649
# the per-file graph will reflect a merge.
650
# NB:XXX: We are reconstructing path information we had, this
651
# should be preserved instead.
652
# inv delta change: (file_id, (path_in_source, path_in_target),
653
# changed_content, versioned, parent, name, kind,
656
basis_entry = basis_inv[file_id]
657
except errors.NoSuchId:
658
# a change from basis->some_parents but file_id isn't in basis
659
# so was new in the merge, which means it must have changed
660
# from basis -> current, and as it hasn't the add was reverted
661
# by the user. So we discard this change.
665
(basis_inv.id2path(file_id), tree.id2path(file_id)),
667
(basis_entry.parent_id, basis_entry.parent_id),
668
(basis_entry.name, basis_entry.name),
669
(basis_entry.kind, basis_entry.kind),
670
(basis_entry.executable, basis_entry.executable))
671
changes[file_id] = (change, merged_ids[file_id])
672
# changes contains tuples with the change and a set of inventory
673
# candidates for the file.
675
# old_path, new_path, file_id, new_inventory_entry
676
seen_root = False # Is the root in the basis delta?
677
inv_delta = self._basis_delta
678
modified_rev = self._new_revision_id
679
for change, head_candidates in changes.values():
680
if change[3][1]: # versioned in target.
681
# Several things may be happening here:
682
# We may have a fork in the per-file graph
683
# - record a change with the content from tree
684
# We may have a change against < all trees
685
# - carry over the tree that hasn't changed
686
# We may have a change against all trees
687
# - record the change with the content from tree
690
entry = _entry_factory[kind](file_id, change[5][1],
692
head_set = self._heads(change[0], set(head_candidates))
695
for head_candidate in head_candidates:
696
if head_candidate in head_set:
697
heads.append(head_candidate)
698
head_set.remove(head_candidate)
701
# Could be a carry-over situation:
702
parent_entry_revs = parent_entries.get(file_id, None)
703
if parent_entry_revs:
704
parent_entry = parent_entry_revs.get(heads[0], None)
707
if parent_entry is None:
708
# The parent iter_changes was called against is the one
709
# that is the per-file head, so any change is relevant
710
# iter_changes is valid.
711
carry_over_possible = False
713
# could be a carry over situation
714
# A change against the basis may just indicate a merge,
715
# we need to check the content against the source of the
716
# merge to determine if it was changed after the merge
718
if (parent_entry.kind != entry.kind or
719
parent_entry.parent_id != entry.parent_id or
720
parent_entry.name != entry.name):
721
# Metadata common to all entries has changed
722
# against per-file parent
723
carry_over_possible = False
725
carry_over_possible = True
726
# per-type checks for changes against the parent_entry
729
# Cannot be a carry-over situation
730
carry_over_possible = False
731
# Populate the entry in the delta
733
# XXX: There is still a small race here: If someone reverts the content of a file
734
# after iter_changes examines and decides it has changed,
735
# we will unconditionally record a new version even if some
736
# other process reverts it while commit is running (with
737
# the revert happening after iter_changes did it's
740
entry.executable = True
742
entry.executable = False
743
if (carry_over_possible and
744
parent_entry.executable == entry.executable):
745
# Check the file length, content hash after reading
747
nostore_sha = parent_entry.text_sha1
750
file_obj, stat_value = tree.get_file_with_stat(file_id, change[1][1])
752
text = file_obj.read()
756
entry.text_sha1, entry.text_size = self._add_text_to_weave(
757
file_id, text, heads, nostore_sha)
758
yield file_id, change[1][1], (entry.text_sha1, stat_value)
759
except errors.ExistingContent:
760
# No content change against a carry_over parent
761
# Perhaps this should also yield a fs hash update?
763
entry.text_size = parent_entry.text_size
764
entry.text_sha1 = parent_entry.text_sha1
765
elif kind == 'symlink':
767
entry.symlink_target = tree.get_symlink_target(file_id)
768
if (carry_over_possible and
769
parent_entry.symlink_target == entry.symlink_target):
772
self._add_text_to_weave(change[0], '', heads, None)
773
elif kind == 'directory':
774
if carry_over_possible:
777
# Nothing to set on the entry.
778
# XXX: split into the Root and nonRoot versions.
779
if change[1][1] != '' or self.repository.supports_rich_root():
780
self._add_text_to_weave(change[0], '', heads, None)
781
elif kind == 'tree-reference':
782
if not self.repository._format.supports_tree_reference:
783
# This isn't quite sane as an error, but we shouldn't
784
# ever see this code path in practice: tree's don't
785
# permit references when the repo doesn't support tree
787
raise errors.UnsupportedOperation(tree.add_reference,
789
reference_revision = tree.get_reference_revision(change[0])
790
entry.reference_revision = reference_revision
791
if (carry_over_possible and
792
parent_entry.reference_revision == reference_revision):
795
self._add_text_to_weave(change[0], '', heads, None)
797
raise AssertionError('unknown kind %r' % kind)
799
entry.revision = modified_rev
801
entry.revision = parent_entry.revision
804
new_path = change[1][1]
805
inv_delta.append((change[1][0], new_path, change[0], entry))
808
self.new_inventory = None
810
# This should perhaps be guarded by a check that the basis we
811
# commit against is the basis for the commit and if not do a delta
813
self._any_changes = True
815
# housekeeping root entry changes do not affect no-change commits.
816
self._require_root_change(tree)
817
self.basis_delta_revision = basis_revision_id
819
def _add_text_to_weave(self, file_id, new_text, parents, nostore_sha):
820
parent_keys = tuple([(file_id, parent) for parent in parents])
821
return self.repository.texts._add_text(
822
(file_id, self._new_revision_id), parent_keys, new_text,
823
nostore_sha=nostore_sha, random_id=self.random_revid)[0:2]
417
return self._get_delta(ie, basis_inv, path), True
419
def _add_text_to_weave(self, file_id, new_lines, parents, nostore_sha):
420
# Note: as we read the content directly from the tree, we know its not
421
# been turned into unicode or badly split - but a broken tree
422
# implementation could give us bad output from readlines() so this is
423
# not a guarantee of safety. What would be better is always checking
424
# the content during test suite execution. RBC 20070912
425
parent_keys = tuple((file_id, parent) for parent in parents)
426
return self.repository.texts.add_lines(
427
(file_id, self._new_revision_id), parent_keys, new_lines,
428
nostore_sha=nostore_sha, random_id=self.random_revid,
429
check_content=False)[0:2]
826
432
class RootCommitBuilder(CommitBuilder):
827
433
"""This commitbuilder actually records the root id"""
829
435
# the root entry gets versioned properly by this builder.
830
436
_versioned_root = True
1157
662
# The old API returned a list, should this actually be a set?
1158
663
return parent_map.keys()
1160
def _check_inventories(self, checker):
1161
"""Check the inventories found from the revision scan.
1163
This is responsible for verifying the sha1 of inventories and
1164
creating a pending_keys set that covers data referenced by inventories.
1166
bar = ui.ui_factory.nested_progress_bar()
1168
self._do_check_inventories(checker, bar)
1172
def _do_check_inventories(self, checker, bar):
1173
"""Helper for _check_inventories."""
1175
keys = {'chk_bytes':set(), 'inventories':set(), 'texts':set()}
1176
kinds = ['chk_bytes', 'texts']
1177
count = len(checker.pending_keys)
1178
bar.update("inventories", 0, 2)
1179
current_keys = checker.pending_keys
1180
checker.pending_keys = {}
1181
# Accumulate current checks.
1182
for key in current_keys:
1183
if key[0] != 'inventories' and key[0] not in kinds:
1184
checker._report_items.append('unknown key type %r' % (key,))
1185
keys[key[0]].add(key[1:])
1186
if keys['inventories']:
1187
# NB: output order *should* be roughly sorted - topo or
1188
# inverse topo depending on repository - either way decent
1189
# to just delta against. However, pre-CHK formats didn't
1190
# try to optimise inventory layout on disk. As such the
1191
# pre-CHK code path does not use inventory deltas.
1193
for record in self.inventories.check(keys=keys['inventories']):
1194
if record.storage_kind == 'absent':
1195
checker._report_items.append(
1196
'Missing inventory {%s}' % (record.key,))
1198
last_object = self._check_record('inventories', record,
1199
checker, last_object,
1200
current_keys[('inventories',) + record.key])
1201
del keys['inventories']
1204
bar.update("texts", 1)
1205
while (checker.pending_keys or keys['chk_bytes']
1207
# Something to check.
1208
current_keys = checker.pending_keys
1209
checker.pending_keys = {}
1210
# Accumulate current checks.
1211
for key in current_keys:
1212
if key[0] not in kinds:
1213
checker._report_items.append('unknown key type %r' % (key,))
1214
keys[key[0]].add(key[1:])
1215
# Check the outermost kind only - inventories || chk_bytes || texts
1219
for record in getattr(self, kind).check(keys=keys[kind]):
1220
if record.storage_kind == 'absent':
1221
checker._report_items.append(
1222
'Missing inventory {%s}' % (record.key,))
1224
last_object = self._check_record(kind, record,
1225
checker, last_object, current_keys[(kind,) + record.key])
1229
def _check_record(self, kind, record, checker, last_object, item_data):
1230
"""Check a single text from this repository."""
1231
if kind == 'inventories':
1232
rev_id = record.key[0]
1233
inv = self.deserialise_inventory(rev_id,
1234
record.get_bytes_as('fulltext'))
1235
if last_object is not None:
1236
delta = inv._make_delta(last_object)
1237
for old_path, path, file_id, ie in delta:
1240
ie.check(checker, rev_id, inv)
1242
for path, ie in inv.iter_entries():
1243
ie.check(checker, rev_id, inv)
1244
if self._format.fast_deltas:
1246
elif kind == 'chk_bytes':
1247
# No code written to check chk_bytes for this repo format.
1248
checker._report_items.append(
1249
'unsupported key type chk_bytes for %s' % (record.key,))
1250
elif kind == 'texts':
1251
self._check_text(record, checker, item_data)
1253
checker._report_items.append(
1254
'unknown key type %s for %s' % (kind, record.key))
1256
def _check_text(self, record, checker, item_data):
1257
"""Check a single text."""
1258
# Check it is extractable.
1259
# TODO: check length.
1260
if record.storage_kind == 'chunked':
1261
chunks = record.get_bytes_as(record.storage_kind)
1262
sha1 = osutils.sha_strings(chunks)
1263
length = sum(map(len, chunks))
1265
content = record.get_bytes_as('fulltext')
1266
sha1 = osutils.sha_string(content)
1267
length = len(content)
1268
if item_data and sha1 != item_data[1]:
1269
checker._report_items.append(
1270
'sha1 mismatch: %s has sha1 %s expected %s referenced by %s' %
1271
(record.key, sha1, item_data[1], item_data[2]))
1274
666
def create(a_bzrdir):
1275
667
"""Construct the current default format repository in a_bzrdir."""
1540
939
"""Commit the contents accrued within the current write group.
1542
941
:seealso: start_write_group.
1544
:return: it may return an opaque hint that can be passed to 'pack'.
1546
943
if self._write_group is not self.get_transaction():
1547
944
# has an unlock or relock occured ?
1548
945
raise errors.BzrError('mismatched lock context %r and '
1549
946
'write group %r.' %
1550
947
(self.get_transaction(), self._write_group))
1551
result = self._commit_write_group()
948
self._commit_write_group()
1552
949
self._write_group = None
1555
951
def _commit_write_group(self):
1556
952
"""Template method for per-repository write group cleanup.
1558
This is called before the write group is considered to be
954
This is called before the write group is considered to be
1559
955
finished and should ensure that all data handed to the repository
1560
for writing during the write group is safely committed (to the
956
for writing during the write group is safely committed (to the
1561
957
extent possible considering file system caching etc).
1564
def suspend_write_group(self):
1565
raise errors.UnsuspendableWriteGroup(self)
1567
def get_missing_parent_inventories(self, check_for_missing_texts=True):
1568
"""Return the keys of missing inventory parents for revisions added in
1571
A revision is not complete if the inventory delta for that revision
1572
cannot be calculated. Therefore if the parent inventories of a
1573
revision are not present, the revision is incomplete, and e.g. cannot
1574
be streamed by a smart server. This method finds missing inventory
1575
parents for revisions added in this write group.
1577
if not self._format.supports_external_lookups:
1578
# This is only an issue for stacked repositories
1580
if not self.is_in_write_group():
1581
raise AssertionError('not in a write group')
1583
# XXX: We assume that every added revision already has its
1584
# corresponding inventory, so we only check for parent inventories that
1585
# might be missing, rather than all inventories.
1586
parents = set(self.revisions._index.get_missing_parents())
1587
parents.discard(_mod_revision.NULL_REVISION)
1588
unstacked_inventories = self.inventories._index
1589
present_inventories = unstacked_inventories.get_parent_map(
1590
key[-1:] for key in parents)
1591
parents.difference_update(present_inventories)
1592
if len(parents) == 0:
1593
# No missing parent inventories.
1595
if not check_for_missing_texts:
1596
return set(('inventories', rev_id) for (rev_id,) in parents)
1597
# Ok, now we have a list of missing inventories. But these only matter
1598
# if the inventories that reference them are missing some texts they
1599
# appear to introduce.
1600
# XXX: Texts referenced by all added inventories need to be present,
1601
# but at the moment we're only checking for texts referenced by
1602
# inventories at the graph's edge.
1603
key_deps = self.revisions._index._key_dependencies
1604
key_deps.add_keys(present_inventories)
1605
referrers = frozenset(r[0] for r in key_deps.get_referrers())
1606
file_ids = self.fileids_altered_by_revision_ids(referrers)
1607
missing_texts = set()
1608
for file_id, version_ids in file_ids.iteritems():
1609
missing_texts.update(
1610
(file_id, version_id) for version_id in version_ids)
1611
present_texts = self.texts.get_parent_map(missing_texts)
1612
missing_texts.difference_update(present_texts)
1613
if not missing_texts:
1614
# No texts are missing, so all revisions and their deltas are
1617
# Alternatively the text versions could be returned as the missing
1618
# keys, but this is likely to be less data.
1619
missing_keys = set(('inventories', rev_id) for (rev_id,) in parents)
1622
def refresh_data(self):
1623
"""Re-read any data needed to to synchronise with disk.
1625
This method is intended to be called after another repository instance
1626
(such as one used by a smart server) has inserted data into the
1627
repository. It may not be called during a write group, but may be
1628
called at any other time.
1630
if self.is_in_write_group():
1631
raise errors.InternalBzrError(
1632
"May not refresh_data while in a write group.")
1633
self._refresh_data()
1635
def resume_write_group(self, tokens):
1636
if not self.is_write_locked():
1637
raise errors.NotWriteLocked(self)
1638
if self._write_group:
1639
raise errors.BzrError('already in a write group')
1640
self._resume_write_group(tokens)
1641
# so we can detect unlock/relock - the write group is now entered.
1642
self._write_group = self.get_transaction()
1644
def _resume_write_group(self, tokens):
1645
raise errors.UnsuspendableWriteGroup(self)
1647
def fetch(self, source, revision_id=None, pb=None, find_ghosts=False,
960
def fetch(self, source, revision_id=None, pb=None, find_ghosts=False):
1649
961
"""Fetch the content required to construct revision_id from source.
1651
If revision_id is None and fetch_spec is None, then all content is
1654
fetch() may not be used when the repository is in a write group -
1655
either finish the current write group before using fetch, or use
1656
fetch before starting the write group.
963
If revision_id is None all content is copied.
1658
964
:param find_ghosts: Find and copy revisions in the source that are
1659
965
ghosts in the target (and not reachable directly by walking out to
1660
966
the first-present revision in target from revision_id).
1661
:param revision_id: If specified, all the content needed for this
1662
revision ID will be copied to the target. Fetch will determine for
1663
itself which content needs to be copied.
1664
:param fetch_spec: If specified, a SearchResult or
1665
PendingAncestryResult that describes which revisions to copy. This
1666
allows copying multiple heads at once. Mutually exclusive with
1669
if fetch_spec is not None and revision_id is not None:
1670
raise AssertionError(
1671
"fetch_spec and revision_id are mutually exclusive.")
1672
if self.is_in_write_group():
1673
raise errors.InternalBzrError(
1674
"May not fetch while in a write group.")
1675
968
# fast path same-url fetch operations
1676
# TODO: lift out to somewhere common with RemoteRepository
1677
# <https://bugs.edge.launchpad.net/bzr/+bug/401646>
1678
if (self.has_same_location(source)
1679
and fetch_spec is None
1680
and self._has_same_fallbacks(source)):
969
if self.has_same_location(source):
1681
970
# check that last_revision is in 'from' and then return a
1683
972
if (revision_id is not None and
1845
1120
@needs_read_lock
1846
1121
def get_revisions(self, revision_ids):
1847
"""Get many revisions at once.
1849
Repositories that need to check data on every revision read should
1850
subclass this method.
1122
"""Get many revisions at once."""
1852
1123
return self._get_revisions(revision_ids)
1854
1125
@needs_read_lock
1855
1126
def _get_revisions(self, revision_ids):
1856
1127
"""Core work logic to get many revisions without sanity checks."""
1128
for rev_id in revision_ids:
1129
if not rev_id or not isinstance(rev_id, basestring):
1130
raise errors.InvalidRevisionId(revision_id=rev_id, branch=self)
1131
keys = [(key,) for key in revision_ids]
1132
stream = self.revisions.get_record_stream(keys, 'unordered', True)
1858
for revid, rev in self._iter_revisions(revision_ids):
1860
raise errors.NoSuchRevision(self, revid)
1134
for record in stream:
1135
if record.storage_kind == 'absent':
1136
raise errors.NoSuchRevision(self, record.key[0])
1137
text = record.get_bytes_as('fulltext')
1138
rev = self._serializer.read_revision_from_string(text)
1139
revs[record.key[0]] = rev
1862
1140
return [revs[revid] for revid in revision_ids]
1864
def _iter_revisions(self, revision_ids):
1865
"""Iterate over revision objects.
1867
:param revision_ids: An iterable of revisions to examine. None may be
1868
passed to request all revisions known to the repository. Note that
1869
not all repositories can find unreferenced revisions; for those
1870
repositories only referenced ones will be returned.
1871
:return: An iterator of (revid, revision) tuples. Absent revisions (
1872
those asked for but not available) are returned as (revid, None).
1874
if revision_ids is None:
1875
revision_ids = self.all_revision_ids()
1877
for rev_id in revision_ids:
1878
if not rev_id or not isinstance(rev_id, basestring):
1879
raise errors.InvalidRevisionId(revision_id=rev_id, branch=self)
1880
keys = [(key,) for key in revision_ids]
1881
stream = self.revisions.get_record_stream(keys, 'unordered', True)
1882
for record in stream:
1883
revid = record.key[0]
1884
if record.storage_kind == 'absent':
1887
text = record.get_bytes_as('fulltext')
1888
rev = self._serializer.read_revision_from_string(text)
1891
1142
@needs_read_lock
1892
1143
def get_revision_xml(self, revision_id):
1893
1144
# TODO: jam 20070210 This shouldn't be necessary since get_revision
1894
1145
# would have already do it.
1895
1146
# TODO: jam 20070210 Just use _serializer.write_revision_to_string()
1896
# TODO: this can't just be replaced by:
1897
# return self._serializer.write_revision_to_string(
1898
# self.get_revision(revision_id))
1899
# as cStringIO preservers the encoding unlike write_revision_to_string
1900
# or some other call down the path.
1901
1147
rev = self.get_revision(revision_id)
1902
rev_tmp = cStringIO.StringIO()
1148
rev_tmp = StringIO()
1903
1149
# the current serializer..
1904
1150
self._serializer.write_revision(rev, rev_tmp)
1905
1151
rev_tmp.seek(0)
1906
1152
return rev_tmp.getvalue()
1908
def get_deltas_for_revisions(self, revisions, specific_fileids=None):
1154
def get_deltas_for_revisions(self, revisions):
1909
1155
"""Produce a generator of revision deltas.
1911
1157
Note that the input is a sequence of REVISIONS, not revision_ids.
1912
1158
Trees will be held in memory until the generator exits.
1913
1159
Each delta is relative to the revision's lefthand predecessor.
1915
:param specific_fileids: if not None, the result is filtered
1916
so that only those file-ids, their parents and their
1917
children are included.
1919
# Get the revision-ids of interest
1920
1161
required_trees = set()
1921
1162
for revision in revisions:
1922
1163
required_trees.add(revision.revision_id)
1923
1164
required_trees.update(revision.parent_ids[:1])
1925
# Get the matching filtered trees. Note that it's more
1926
# efficient to pass filtered trees to changes_from() rather
1927
# than doing the filtering afterwards. changes_from() could
1928
# arguably do the filtering itself but it's path-based, not
1929
# file-id based, so filtering before or afterwards is
1931
if specific_fileids is None:
1932
trees = dict((t.get_revision_id(), t) for
1933
t in self.revision_trees(required_trees))
1935
trees = dict((t.get_revision_id(), t) for
1936
t in self._filtered_revision_trees(required_trees,
1939
# Calculate the deltas
1165
trees = dict((t.get_revision_id(), t) for
1166
t in self.revision_trees(required_trees))
1940
1167
for revision in revisions:
1941
1168
if not revision.parent_ids:
1942
1169
old_tree = self.revision_tree(_mod_revision.NULL_REVISION)
3702
2725
return self.source.revision_ids_to_search_result(result_set)
3705
class InterDifferingSerializer(InterRepository):
2728
class InterPackRepo(InterSameDataRepository):
2729
"""Optimised code paths between Pack based repositories."""
2732
def _get_repo_format_to_test(self):
2733
from bzrlib.repofmt import pack_repo
2734
return pack_repo.RepositoryFormatKnitPack1()
2737
def is_compatible(source, target):
2738
"""Be compatible with known Pack formats.
2740
We don't test for the stores being of specific types because that
2741
could lead to confusing results, and there is no need to be
2744
from bzrlib.repofmt.pack_repo import RepositoryFormatPack
2746
are_packs = (isinstance(source._format, RepositoryFormatPack) and
2747
isinstance(target._format, RepositoryFormatPack))
2748
except AttributeError:
2750
return are_packs and InterRepository._same_model(source, target)
2753
def fetch(self, revision_id=None, pb=None, find_ghosts=False):
2754
"""See InterRepository.fetch()."""
2755
if (len(self.source._fallback_repositories) > 0 or
2756
len(self.target._fallback_repositories) > 0):
2757
# The pack layer is not aware of fallback repositories, so when
2758
# fetching from a stacked repository or into a stacked repository
2759
# we use the generic fetch logic which uses the VersionedFiles
2760
# attributes on repository.
2761
from bzrlib.fetch import RepoFetcher
2762
fetcher = RepoFetcher(self.target, self.source, revision_id,
2764
return fetcher.count_copied, fetcher.failed_revisions
2765
from bzrlib.repofmt.pack_repo import Packer
2766
mutter("Using fetch logic to copy between %s(%s) and %s(%s)",
2767
self.source, self.source._format, self.target, self.target._format)
2768
self.count_copied = 0
2769
if revision_id is None:
2771
# everything to do - use pack logic
2772
# to fetch from all packs to one without
2773
# inventory parsing etc, IFF nothing to be copied is in the target.
2775
source_revision_ids = frozenset(self.source.all_revision_ids())
2776
revision_ids = source_revision_ids - \
2777
frozenset(self.target.get_parent_map(source_revision_ids))
2778
revision_keys = [(revid,) for revid in revision_ids]
2779
index = self.target._pack_collection.revision_index.combined_index
2780
present_revision_ids = set(item[1][0] for item in
2781
index.iter_entries(revision_keys))
2782
revision_ids = set(revision_ids) - present_revision_ids
2783
# implementing the TODO will involve:
2784
# - detecting when all of a pack is selected
2785
# - avoiding as much as possible pre-selection, so the
2786
# more-core routines such as create_pack_from_packs can filter in
2787
# a just-in-time fashion. (though having a HEADS list on a
2788
# repository might make this a lot easier, because we could
2789
# sensibly detect 'new revisions' without doing a full index scan.
2790
elif _mod_revision.is_null(revision_id):
2795
revision_ids = self.search_missing_revision_ids(revision_id,
2796
find_ghosts=find_ghosts).get_keys()
2797
except errors.NoSuchRevision:
2798
raise errors.InstallFailed([revision_id])
2799
if len(revision_ids) == 0:
2801
packs = self.source._pack_collection.all_packs()
2802
pack = Packer(self.target._pack_collection, packs, '.fetch',
2803
revision_ids).pack()
2804
if pack is not None:
2805
self.target._pack_collection._save_pack_names()
2806
# Trigger an autopack. This may duplicate effort as we've just done
2807
# a pack creation, but for now it is simpler to think about as
2808
# 'upload data, then repack if needed'.
2809
self.target._pack_collection.autopack()
2810
return (pack.get_revision_count(), [])
2815
def search_missing_revision_ids(self, revision_id=None, find_ghosts=True):
2816
"""See InterRepository.missing_revision_ids().
2818
:param find_ghosts: Find ghosts throughout the ancestry of
2821
if not find_ghosts and revision_id is not None:
2822
return self._walk_to_common_revisions([revision_id])
2823
elif revision_id is not None:
2824
# Find ghosts: search for revisions pointing from one repository to
2825
# the other, and vice versa, anywhere in the history of revision_id.
2826
graph = self.target.get_graph(other_repository=self.source)
2827
searcher = graph._make_breadth_first_searcher([revision_id])
2831
next_revs, ghosts = searcher.next_with_ghosts()
2832
except StopIteration:
2834
if revision_id in ghosts:
2835
raise errors.NoSuchRevision(self.source, revision_id)
2836
found_ids.update(next_revs)
2837
found_ids.update(ghosts)
2838
found_ids = frozenset(found_ids)
2839
# Double query here: should be able to avoid this by changing the
2840
# graph api further.
2841
result_set = found_ids - frozenset(
2842
self.target.get_parent_map(found_ids))
2844
source_ids = self.source.all_revision_ids()
2845
# source_ids is the worst possible case we may need to pull.
2846
# now we want to filter source_ids against what we actually
2847
# have in target, but don't try to check for existence where we know
2848
# we do not have a revision as that would be pointless.
2849
target_ids = set(self.target.all_revision_ids())
2850
result_set = set(source_ids).difference(target_ids)
2851
return self.source.revision_ids_to_search_result(result_set)
2854
class InterModel1and2(InterRepository):
2857
def _get_repo_format_to_test(self):
2861
def is_compatible(source, target):
2862
if not source.supports_rich_root() and target.supports_rich_root():
2868
def fetch(self, revision_id=None, pb=None, find_ghosts=False):
2869
"""See InterRepository.fetch()."""
2870
from bzrlib.fetch import Model1toKnit2Fetcher
2871
f = Model1toKnit2Fetcher(to_repository=self.target,
2872
from_repository=self.source,
2873
last_revision=revision_id,
2874
pb=pb, find_ghosts=find_ghosts)
2875
return f.count_copied, f.failed_revisions
2878
def copy_content(self, revision_id=None):
2879
"""Make a complete copy of the content in self into destination.
2881
This is a destructive operation! Do not use it on existing
2884
:param revision_id: Only copy the content needed to construct
2885
revision_id and its parents.
2888
self.target.set_make_working_trees(self.source.make_working_trees())
2889
except NotImplementedError:
2891
# but don't bother fetching if we have the needed data now.
2892
if (revision_id not in (None, _mod_revision.NULL_REVISION) and
2893
self.target.has_revision(revision_id)):
2895
self.target.fetch(self.source, revision_id=revision_id)
2898
class InterKnit1and2(InterKnitRepo):
2901
def _get_repo_format_to_test(self):
2905
def is_compatible(source, target):
2906
"""Be compatible with Knit1 source and Knit3 target"""
2908
from bzrlib.repofmt.knitrepo import (
2909
RepositoryFormatKnit1,
2910
RepositoryFormatKnit3,
2912
from bzrlib.repofmt.pack_repo import (
2913
RepositoryFormatKnitPack1,
2914
RepositoryFormatKnitPack3,
2915
RepositoryFormatKnitPack4,
2916
RepositoryFormatKnitPack5,
2917
RepositoryFormatKnitPack5RichRoot,
2918
RepositoryFormatPackDevelopment1,
2919
RepositoryFormatPackDevelopment1Subtree,
2922
RepositoryFormatKnit1, # no rr, no subtree
2923
RepositoryFormatKnitPack1, # no rr, no subtree
2924
RepositoryFormatPackDevelopment1, # no rr, no subtree
2925
RepositoryFormatKnitPack5, # no rr, no subtree
2928
RepositoryFormatKnit3, # rr, subtree
2929
RepositoryFormatKnitPack3, # rr, subtree
2930
RepositoryFormatKnitPack4, # rr, no subtree
2931
RepositoryFormatKnitPack5RichRoot,# rr, no subtree
2932
RepositoryFormatPackDevelopment1Subtree, # rr, subtree
2934
for format in norichroot:
2935
if format.rich_root_data:
2936
raise AssertionError('Format %s is a rich-root format'
2937
' but is included in the non-rich-root list'
2939
for format in richroot:
2940
if not format.rich_root_data:
2941
raise AssertionError('Format %s is not a rich-root format'
2942
' but is included in the rich-root list'
2944
# TODO: One alternative is to just check format.rich_root_data,
2945
# instead of keeping membership lists. However, the formats
2946
# *also* have to use the same 'Knit' style of storage
2947
# (line-deltas, fulltexts, etc.)
2948
return (isinstance(source._format, norichroot) and
2949
isinstance(target._format, richroot))
2950
except AttributeError:
2954
def fetch(self, revision_id=None, pb=None, find_ghosts=False):
2955
"""See InterRepository.fetch()."""
2956
from bzrlib.fetch import Knit1to2Fetcher
2957
mutter("Using fetch logic to copy between %s(%s) and %s(%s)",
2958
self.source, self.source._format, self.target,
2959
self.target._format)
2960
f = Knit1to2Fetcher(to_repository=self.target,
2961
from_repository=self.source,
2962
last_revision=revision_id,
2963
pb=pb, find_ghosts=find_ghosts)
2964
return f.count_copied, f.failed_revisions
2967
class InterDifferingSerializer(InterKnitRepo):
3708
2970
def _get_repo_format_to_test(self):
3712
2974
def is_compatible(source, target):
3713
2975
"""Be compatible with Knit2 source and Knit3 target"""
3714
# This is redundant with format.check_conversion_target(), however that
3715
# raises an exception, and we just want to say "False" as in we won't
3716
# support converting between these formats.
3717
if 'IDS_never' in debug.debug_flags:
3719
if source.supports_rich_root() and not target.supports_rich_root():
3721
if (source._format.supports_tree_reference
3722
and not target._format.supports_tree_reference):
3724
if target._fallback_repositories and target._format.supports_chks:
3725
# IDS doesn't know how to copy CHKs for the parent inventories it
3726
# adds to stacked repos.
3728
if 'IDS_always' in debug.debug_flags:
3730
# Only use this code path for local source and target. IDS does far
3731
# too much IO (both bandwidth and roundtrips) over a network.
3732
if not source.bzrdir.transport.base.startswith('file:///'):
3734
if not target.bzrdir.transport.base.startswith('file:///'):
2976
if source.supports_rich_root() != target.supports_rich_root():
2978
# Ideally, we'd support fetching if the source had no tree references
2979
# even if it supported them...
2980
if (getattr(source, '_format.supports_tree_reference', False) and
2981
not getattr(target, '_format.supports_tree_reference', False)):
3738
def _get_trees(self, revision_ids, cache):
3740
for rev_id in revision_ids:
3742
possible_trees.append((rev_id, cache[rev_id]))
3744
# Not cached, but inventory might be present anyway.
3746
tree = self.source.revision_tree(rev_id)
3747
except errors.NoSuchRevision:
3748
# Nope, parent is ghost.
3751
cache[rev_id] = tree
3752
possible_trees.append((rev_id, tree))
3753
return possible_trees
3755
def _get_delta_for_revision(self, tree, parent_ids, possible_trees):
3756
"""Get the best delta and base for this revision.
3758
:return: (basis_id, delta)
3761
# Generate deltas against each tree, to find the shortest.
3762
texts_possibly_new_in_tree = set()
3763
for basis_id, basis_tree in possible_trees:
3764
delta = tree.inventory._make_delta(basis_tree.inventory)
3765
for old_path, new_path, file_id, new_entry in delta:
3766
if new_path is None:
3767
# This file_id isn't present in the new rev, so we don't
3771
# Rich roots are handled elsewhere...
3773
kind = new_entry.kind
3774
if kind != 'directory' and kind != 'file':
3775
# No text record associated with this inventory entry.
3777
# This is a directory or file that has changed somehow.
3778
texts_possibly_new_in_tree.add((file_id, new_entry.revision))
3779
deltas.append((len(delta), basis_id, delta))
3781
return deltas[0][1:]
3783
def _fetch_parent_invs_for_stacking(self, parent_map, cache):
3784
"""Find all parent revisions that are absent, but for which the
3785
inventory is present, and copy those inventories.
3787
This is necessary to preserve correctness when the source is stacked
3788
without fallbacks configured. (Note that in cases like upgrade the
3789
source may be not have _fallback_repositories even though it is
3793
for parents in parent_map.values():
3794
parent_revs.update(parents)
3795
present_parents = self.source.get_parent_map(parent_revs)
3796
absent_parents = set(parent_revs).difference(present_parents)
3797
parent_invs_keys_for_stacking = self.source.inventories.get_parent_map(
3798
(rev_id,) for rev_id in absent_parents)
3799
parent_inv_ids = [key[-1] for key in parent_invs_keys_for_stacking]
3800
for parent_tree in self.source.revision_trees(parent_inv_ids):
3801
current_revision_id = parent_tree.get_revision_id()
3802
parents_parents_keys = parent_invs_keys_for_stacking[
3803
(current_revision_id,)]
3804
parents_parents = [key[-1] for key in parents_parents_keys]
3805
basis_id = _mod_revision.NULL_REVISION
3806
basis_tree = self.source.revision_tree(basis_id)
3807
delta = parent_tree.inventory._make_delta(basis_tree.inventory)
3808
self.target.add_inventory_by_delta(
3809
basis_id, delta, current_revision_id, parents_parents)
3810
cache[current_revision_id] = parent_tree
3812
def _fetch_batch(self, revision_ids, basis_id, cache):
3813
"""Fetch across a few revisions.
3815
:param revision_ids: The revisions to copy
3816
:param basis_id: The revision_id of a tree that must be in cache, used
3817
as a basis for delta when no other base is available
3818
:param cache: A cache of RevisionTrees that we can use.
3819
:return: The revision_id of the last converted tree. The RevisionTree
3820
for it will be in cache
3822
# Walk though all revisions; get inventory deltas, copy referenced
3823
# texts that delta references, insert the delta, revision and
3825
root_keys_to_create = set()
3828
pending_revisions = []
3829
parent_map = self.source.get_parent_map(revision_ids)
3830
self._fetch_parent_invs_for_stacking(parent_map, cache)
3831
for tree in self.source.revision_trees(revision_ids):
3832
# Find a inventory delta for this revision.
3833
# Find text entries that need to be copied, too.
3834
current_revision_id = tree.get_revision_id()
3835
parent_ids = parent_map.get(current_revision_id, ())
3836
parent_trees = self._get_trees(parent_ids, cache)
3837
possible_trees = list(parent_trees)
3838
if len(possible_trees) == 0:
3839
# There either aren't any parents, or the parents are ghosts,
3840
# so just use the last converted tree.
3841
possible_trees.append((basis_id, cache[basis_id]))
3842
basis_id, delta = self._get_delta_for_revision(tree, parent_ids,
3844
if self._converting_to_rich_root:
3845
self._revision_id_to_root_id[current_revision_id] = \
3847
# Determine which texts are in present in this revision but not in
3848
# any of the available parents.
3849
texts_possibly_new_in_tree = set()
3850
for old_path, new_path, file_id, entry in delta:
3851
if new_path is None:
3852
# This file_id isn't present in the new rev
3856
if not self.target.supports_rich_root():
3857
# The target doesn't support rich root, so we don't
3860
if self._converting_to_rich_root:
3861
# This can't be copied normally, we have to insert
3863
root_keys_to_create.add((file_id, entry.revision))
3866
texts_possibly_new_in_tree.add((file_id, entry.revision))
3867
for basis_id, basis_tree in possible_trees:
3868
basis_inv = basis_tree.inventory
3869
for file_key in list(texts_possibly_new_in_tree):
3870
file_id, file_revision = file_key
3872
entry = basis_inv[file_id]
3873
except errors.NoSuchId:
3875
if entry.revision == file_revision:
3876
texts_possibly_new_in_tree.remove(file_key)
3877
text_keys.update(texts_possibly_new_in_tree)
3878
revision = self.source.get_revision(current_revision_id)
3879
pending_deltas.append((basis_id, delta,
3880
current_revision_id, revision.parent_ids))
3881
pending_revisions.append(revision)
3882
cache[current_revision_id] = tree
3883
basis_id = current_revision_id
3885
from_texts = self.source.texts
3886
to_texts = self.target.texts
3887
if root_keys_to_create:
3888
from bzrlib.fetch import _new_root_data_stream
3889
root_stream = _new_root_data_stream(
3890
root_keys_to_create, self._revision_id_to_root_id, parent_map,
3892
to_texts.insert_record_stream(root_stream)
3893
to_texts.insert_record_stream(from_texts.get_record_stream(
3894
text_keys, self.target._format._fetch_order,
3895
not self.target._format._fetch_uses_deltas))
3896
# insert inventory deltas
3897
for delta in pending_deltas:
3898
self.target.add_inventory_by_delta(*delta)
3899
if self.target._fallback_repositories:
3900
# Make sure this stacked repository has all the parent inventories
3901
# for the new revisions that we are about to insert. We do this
3902
# before adding the revisions so that no revision is added until
3903
# all the inventories it may depend on are added.
3904
# Note that this is overzealous, as we may have fetched these in an
3907
revision_ids = set()
3908
for revision in pending_revisions:
3909
revision_ids.add(revision.revision_id)
3910
parent_ids.update(revision.parent_ids)
3911
parent_ids.difference_update(revision_ids)
3912
parent_ids.discard(_mod_revision.NULL_REVISION)
3913
parent_map = self.source.get_parent_map(parent_ids)
3914
# we iterate over parent_map and not parent_ids because we don't
3915
# want to try copying any revision which is a ghost
3916
for parent_tree in self.source.revision_trees(parent_map):
3917
current_revision_id = parent_tree.get_revision_id()
3918
parents_parents = parent_map[current_revision_id]
3919
possible_trees = self._get_trees(parents_parents, cache)
3920
if len(possible_trees) == 0:
3921
# There either aren't any parents, or the parents are
3922
# ghosts, so just use the last converted tree.
3923
possible_trees.append((basis_id, cache[basis_id]))
3924
basis_id, delta = self._get_delta_for_revision(parent_tree,
3925
parents_parents, possible_trees)
3926
self.target.add_inventory_by_delta(
3927
basis_id, delta, current_revision_id, parents_parents)
3928
# insert signatures and revisions
3929
for revision in pending_revisions:
3931
signature = self.source.get_signature_text(
3932
revision.revision_id)
3933
self.target.add_signature_text(revision.revision_id,
3935
except errors.NoSuchRevision:
3937
self.target.add_revision(revision.revision_id, revision)
3940
def _fetch_all_revisions(self, revision_ids, pb):
3941
"""Fetch everything for the list of revisions.
3943
:param revision_ids: The list of revisions to fetch. Must be in
3945
:param pb: A ProgressTask
3948
basis_id, basis_tree = self._get_basis(revision_ids[0])
3950
cache = lru_cache.LRUCache(100)
3951
cache[basis_id] = basis_tree
3952
del basis_tree # We don't want to hang on to it here
3954
for offset in range(0, len(revision_ids), batch_size):
3955
self.target.start_write_group()
3957
pb.update('Transferring revisions', offset,
3959
batch = revision_ids[offset:offset+batch_size]
3960
basis_id = self._fetch_batch(batch, basis_id, cache)
3962
self.target.abort_write_group()
3965
hint = self.target.commit_write_group()
3968
if hints and self.target._format.pack_compresses:
3969
self.target.pack(hint=hints)
3970
pb.update('Transferring revisions', len(revision_ids),
3973
2985
@needs_write_lock
3974
def fetch(self, revision_id=None, pb=None, find_ghosts=False,
2986
def fetch(self, revision_id=None, pb=None, find_ghosts=False):
3976
2987
"""See InterRepository.fetch()."""
3977
if fetch_spec is not None:
3978
raise AssertionError("Not implemented yet...")
3979
if (not self.source.supports_rich_root()
3980
and self.target.supports_rich_root()):
3981
self._converting_to_rich_root = True
3982
self._revision_id_to_root_id = {}
3984
self._converting_to_rich_root = False
3985
2988
revision_ids = self.target.search_missing_revision_ids(self.source,
3986
2989
revision_id, find_ghosts=find_ghosts).get_keys()
3987
if not revision_ids:
3989
2990
revision_ids = tsort.topo_sort(
3990
2991
self.source.get_graph().get_parent_map(revision_ids))
3991
if not revision_ids:
3993
# Walk though all revisions; get inventory deltas, copy referenced
3994
# texts that delta references, insert the delta, revision and
2992
def revisions_iterator():
2993
for current_revision_id in revision_ids:
2994
revision = self.source.get_revision(current_revision_id)
2995
tree = self.source.revision_tree(current_revision_id)
2997
signature = self.source.get_signature_text(
2998
current_revision_id)
2999
except errors.NoSuchRevision:
3001
yield revision, tree, signature
3997
3003
my_pb = ui.ui_factory.nested_progress_bar()
4000
symbol_versioning.warn(
4001
symbol_versioning.deprecated_in((1, 14, 0))
4002
% "pb parameter to fetch()")
4005
self._fetch_all_revisions(revision_ids, pb)
3008
install_revisions(self.target, revisions_iterator(),
3009
len(revision_ids), pb)
4007
3011
if my_pb is not None:
4008
3012
my_pb.finished()
4009
3013
return len(revision_ids), 0
4011
def _get_basis(self, first_revision_id):
4012
"""Get a revision and tree which exists in the target.
4014
This assumes that first_revision_id is selected for transmission
4015
because all other ancestors are already present. If we can't find an
4016
ancestor we fall back to NULL_REVISION since we know that is safe.
4018
:return: (basis_id, basis_tree)
4020
first_rev = self.source.get_revision(first_revision_id)
4022
basis_id = first_rev.parent_ids[0]
4023
# only valid as a basis if the target has it
4024
self.target.get_revision(basis_id)
4025
# Try to get a basis tree - if its a ghost it will hit the
4026
# NoSuchRevision case.
4027
basis_tree = self.source.revision_tree(basis_id)
4028
except (IndexError, errors.NoSuchRevision):
4029
basis_id = _mod_revision.NULL_REVISION
4030
basis_tree = self.source.revision_tree(basis_id)
4031
return basis_id, basis_tree
3016
class InterOtherToRemote(InterRepository):
3018
def __init__(self, source, target):
3019
InterRepository.__init__(self, source, target)
3020
self._real_inter = None
3023
def is_compatible(source, target):
3024
if isinstance(target, remote.RemoteRepository):
3028
def _ensure_real_inter(self):
3029
if self._real_inter is None:
3030
self.target._ensure_real()
3031
real_target = self.target._real_repository
3032
self._real_inter = InterRepository.get(self.source, real_target)
3034
def copy_content(self, revision_id=None):
3035
self._ensure_real_inter()
3036
self._real_inter.copy_content(revision_id=revision_id)
3038
def fetch(self, revision_id=None, pb=None, find_ghosts=False):
3039
self._ensure_real_inter()
3040
return self._real_inter.fetch(revision_id=revision_id, pb=pb,
3041
find_ghosts=find_ghosts)
3044
def _get_repo_format_to_test(self):
3048
class InterRemoteToOther(InterRepository):
3050
def __init__(self, source, target):
3051
InterRepository.__init__(self, source, target)
3052
self._real_inter = None
3055
def is_compatible(source, target):
3056
if not isinstance(source, remote.RemoteRepository):
3058
# Is source's model compatible with target's model?
3059
source._ensure_real()
3060
real_source = source._real_repository
3061
if isinstance(real_source, remote.RemoteRepository):
3062
raise NotImplementedError(
3063
"We don't support remote repos backed by remote repos yet.")
3064
return InterRepository._same_model(real_source, target)
3066
def _ensure_real_inter(self):
3067
if self._real_inter is None:
3068
self.source._ensure_real()
3069
real_source = self.source._real_repository
3070
self._real_inter = InterRepository.get(real_source, self.target)
3072
def fetch(self, revision_id=None, pb=None, find_ghosts=False):
3073
self._ensure_real_inter()
3074
return self._real_inter.fetch(revision_id=revision_id, pb=pb,
3075
find_ghosts=find_ghosts)
3077
def copy_content(self, revision_id=None):
3078
self._ensure_real_inter()
3079
self._real_inter.copy_content(revision_id=revision_id)
3082
def _get_repo_format_to_test(self):
4034
3087
InterRepository.register_optimiser(InterDifferingSerializer)
4035
3088
InterRepository.register_optimiser(InterSameDataRepository)
4036
3089
InterRepository.register_optimiser(InterWeaveRepo)
4037
3090
InterRepository.register_optimiser(InterKnitRepo)
3091
InterRepository.register_optimiser(InterModel1and2)
3092
InterRepository.register_optimiser(InterKnit1and2)
3093
InterRepository.register_optimiser(InterPackRepo)
3094
InterRepository.register_optimiser(InterOtherToRemote)
3095
InterRepository.register_optimiser(InterRemoteToOther)
4040
3098
class CopyConverter(object):
4041
3099
"""A repository conversion tool which just performs a copy of the content.
4043
3101
This is slow but quite reliable.
4200
3249
revision_graph[key] = tuple(parent for parent in parents if parent
4201
3250
in revision_graph)
4202
3251
return revision_graph
4205
class StreamSink(object):
4206
"""An object that can insert a stream into a repository.
4208
This interface handles the complexity of reserialising inventories and
4209
revisions from different formats, and allows unidirectional insertion into
4210
stacked repositories without looking for the missing basis parents
4214
def __init__(self, target_repo):
4215
self.target_repo = target_repo
4217
def insert_stream(self, stream, src_format, resume_tokens):
4218
"""Insert a stream's content into the target repository.
4220
:param src_format: a bzr repository format.
4222
:return: a list of resume tokens and an iterable of keys additional
4223
items required before the insertion can be completed.
4225
self.target_repo.lock_write()
4228
self.target_repo.resume_write_group(resume_tokens)
4231
self.target_repo.start_write_group()
4234
# locked_insert_stream performs a commit|suspend.
4235
return self._locked_insert_stream(stream, src_format, is_resume)
4237
self.target_repo.abort_write_group(suppress_errors=True)
4240
self.target_repo.unlock()
4242
def _locked_insert_stream(self, stream, src_format, is_resume):
4243
to_serializer = self.target_repo._format._serializer
4244
src_serializer = src_format._serializer
4246
if to_serializer == src_serializer:
4247
# If serializers match and the target is a pack repository, set the
4248
# write cache size on the new pack. This avoids poor performance
4249
# on transports where append is unbuffered (such as
4250
# RemoteTransport). This is safe to do because nothing should read
4251
# back from the target repository while a stream with matching
4252
# serialization is being inserted.
4253
# The exception is that a delta record from the source that should
4254
# be a fulltext may need to be expanded by the target (see
4255
# test_fetch_revisions_with_deltas_into_pack); but we take care to
4256
# explicitly flush any buffered writes first in that rare case.
4258
new_pack = self.target_repo._pack_collection._new_pack
4259
except AttributeError:
4260
# Not a pack repository
4263
new_pack.set_write_cache_size(1024*1024)
4264
for substream_type, substream in stream:
4265
if 'stream' in debug.debug_flags:
4266
mutter('inserting substream: %s', substream_type)
4267
if substream_type == 'texts':
4268
self.target_repo.texts.insert_record_stream(substream)
4269
elif substream_type == 'inventories':
4270
if src_serializer == to_serializer:
4271
self.target_repo.inventories.insert_record_stream(
4274
self._extract_and_insert_inventories(
4275
substream, src_serializer)
4276
elif substream_type == 'inventory-deltas':
4277
self._extract_and_insert_inventory_deltas(
4278
substream, src_serializer)
4279
elif substream_type == 'chk_bytes':
4280
# XXX: This doesn't support conversions, as it assumes the
4281
# conversion was done in the fetch code.
4282
self.target_repo.chk_bytes.insert_record_stream(substream)
4283
elif substream_type == 'revisions':
4284
# This may fallback to extract-and-insert more often than
4285
# required if the serializers are different only in terms of
4287
if src_serializer == to_serializer:
4288
self.target_repo.revisions.insert_record_stream(
4291
self._extract_and_insert_revisions(substream,
4293
elif substream_type == 'signatures':
4294
self.target_repo.signatures.insert_record_stream(substream)
4296
raise AssertionError('kaboom! %s' % (substream_type,))
4297
# Done inserting data, and the missing_keys calculations will try to
4298
# read back from the inserted data, so flush the writes to the new pack
4299
# (if this is pack format).
4300
if new_pack is not None:
4301
new_pack._write_data('', flush=True)
4302
# Find all the new revisions (including ones from resume_tokens)
4303
missing_keys = self.target_repo.get_missing_parent_inventories(
4304
check_for_missing_texts=is_resume)
4306
for prefix, versioned_file in (
4307
('texts', self.target_repo.texts),
4308
('inventories', self.target_repo.inventories),
4309
('revisions', self.target_repo.revisions),
4310
('signatures', self.target_repo.signatures),
4311
('chk_bytes', self.target_repo.chk_bytes),
4313
if versioned_file is None:
4315
missing_keys.update((prefix,) + key for key in
4316
versioned_file.get_missing_compression_parent_keys())
4317
except NotImplementedError:
4318
# cannot even attempt suspending, and missing would have failed
4319
# during stream insertion.
4320
missing_keys = set()
4323
# suspend the write group and tell the caller what we is
4324
# missing. We know we can suspend or else we would not have
4325
# entered this code path. (All repositories that can handle
4326
# missing keys can handle suspending a write group).
4327
write_group_tokens = self.target_repo.suspend_write_group()
4328
return write_group_tokens, missing_keys
4329
hint = self.target_repo.commit_write_group()
4330
if (to_serializer != src_serializer and
4331
self.target_repo._format.pack_compresses):
4332
self.target_repo.pack(hint=hint)
4335
def _extract_and_insert_inventory_deltas(self, substream, serializer):
4336
target_rich_root = self.target_repo._format.rich_root_data
4337
target_tree_refs = self.target_repo._format.supports_tree_reference
4338
for record in substream:
4339
# Insert the delta directly
4340
inventory_delta_bytes = record.get_bytes_as('fulltext')
4341
deserialiser = inventory_delta.InventoryDeltaDeserializer()
4343
parse_result = deserialiser.parse_text_bytes(
4344
inventory_delta_bytes)
4345
except inventory_delta.IncompatibleInventoryDelta, err:
4346
trace.mutter("Incompatible delta: %s", err.msg)
4347
raise errors.IncompatibleRevision(self.target_repo._format)
4348
basis_id, new_id, rich_root, tree_refs, inv_delta = parse_result
4349
revision_id = new_id
4350
parents = [key[0] for key in record.parents]
4351
self.target_repo.add_inventory_by_delta(
4352
basis_id, inv_delta, revision_id, parents)
4354
def _extract_and_insert_inventories(self, substream, serializer,
4356
"""Generate a new inventory versionedfile in target, converting data.
4358
The inventory is retrieved from the source, (deserializing it), and
4359
stored in the target (reserializing it in a different format).
4361
target_rich_root = self.target_repo._format.rich_root_data
4362
target_tree_refs = self.target_repo._format.supports_tree_reference
4363
for record in substream:
4364
# It's not a delta, so it must be a fulltext in the source
4365
# serializer's format.
4366
bytes = record.get_bytes_as('fulltext')
4367
revision_id = record.key[0]
4368
inv = serializer.read_inventory_from_string(bytes, revision_id)
4369
parents = [key[0] for key in record.parents]
4370
self.target_repo.add_inventory(revision_id, inv, parents)
4371
# No need to keep holding this full inv in memory when the rest of
4372
# the substream is likely to be all deltas.
4375
def _extract_and_insert_revisions(self, substream, serializer):
4376
for record in substream:
4377
bytes = record.get_bytes_as('fulltext')
4378
revision_id = record.key[0]
4379
rev = serializer.read_revision_from_string(bytes)
4380
if rev.revision_id != revision_id:
4381
raise AssertionError('wtf: %s != %s' % (rev, revision_id))
4382
self.target_repo.add_revision(revision_id, rev)
4385
if self.target_repo._format._fetch_reconcile:
4386
self.target_repo.reconcile()
4389
class StreamSource(object):
4390
"""A source of a stream for fetching between repositories."""
4392
def __init__(self, from_repository, to_format):
4393
"""Create a StreamSource streaming from from_repository."""
4394
self.from_repository = from_repository
4395
self.to_format = to_format
4397
def delta_on_metadata(self):
4398
"""Return True if delta's are permitted on metadata streams.
4400
That is on revisions and signatures.
4402
src_serializer = self.from_repository._format._serializer
4403
target_serializer = self.to_format._serializer
4404
return (self.to_format._fetch_uses_deltas and
4405
src_serializer == target_serializer)
4407
def _fetch_revision_texts(self, revs):
4408
# fetch signatures first and then the revision texts
4409
# may need to be a InterRevisionStore call here.
4410
from_sf = self.from_repository.signatures
4411
# A missing signature is just skipped.
4412
keys = [(rev_id,) for rev_id in revs]
4413
signatures = versionedfile.filter_absent(from_sf.get_record_stream(
4415
self.to_format._fetch_order,
4416
not self.to_format._fetch_uses_deltas))
4417
# If a revision has a delta, this is actually expanded inside the
4418
# insert_record_stream code now, which is an alternate fix for
4420
from_rf = self.from_repository.revisions
4421
revisions = from_rf.get_record_stream(
4423
self.to_format._fetch_order,
4424
not self.delta_on_metadata())
4425
return [('signatures', signatures), ('revisions', revisions)]
4427
def _generate_root_texts(self, revs):
4428
"""This will be called by get_stream between fetching weave texts and
4429
fetching the inventory weave.
4431
if self._rich_root_upgrade():
4433
return bzrlib.fetch.Inter1and2Helper(
4434
self.from_repository).generate_root_texts(revs)
4438
def get_stream(self, search):
4440
revs = search.get_keys()
4441
graph = self.from_repository.get_graph()
4442
revs = tsort.topo_sort(graph.get_parent_map(revs))
4443
data_to_fetch = self.from_repository.item_keys_introduced_by(revs)
4445
for knit_kind, file_id, revisions in data_to_fetch:
4446
if knit_kind != phase:
4448
# Make a new progress bar for this phase
4449
if knit_kind == "file":
4450
# Accumulate file texts
4451
text_keys.extend([(file_id, revision) for revision in
4453
elif knit_kind == "inventory":
4454
# Now copy the file texts.
4455
from_texts = self.from_repository.texts
4456
yield ('texts', from_texts.get_record_stream(
4457
text_keys, self.to_format._fetch_order,
4458
not self.to_format._fetch_uses_deltas))
4459
# Cause an error if a text occurs after we have done the
4462
# Before we process the inventory we generate the root
4463
# texts (if necessary) so that the inventories references
4465
for _ in self._generate_root_texts(revs):
4467
# we fetch only the referenced inventories because we do not
4468
# know for unselected inventories whether all their required
4469
# texts are present in the other repository - it could be
4471
for info in self._get_inventory_stream(revs):
4473
elif knit_kind == "signatures":
4474
# Nothing to do here; this will be taken care of when
4475
# _fetch_revision_texts happens.
4477
elif knit_kind == "revisions":
4478
for record in self._fetch_revision_texts(revs):
4481
raise AssertionError("Unknown knit kind %r" % knit_kind)
4483
def get_stream_for_missing_keys(self, missing_keys):
4484
# missing keys can only occur when we are byte copying and not
4485
# translating (because translation means we don't send
4486
# unreconstructable deltas ever).
4488
keys['texts'] = set()
4489
keys['revisions'] = set()
4490
keys['inventories'] = set()
4491
keys['chk_bytes'] = set()
4492
keys['signatures'] = set()
4493
for key in missing_keys:
4494
keys[key[0]].add(key[1:])
4495
if len(keys['revisions']):
4496
# If we allowed copying revisions at this point, we could end up
4497
# copying a revision without copying its required texts: a
4498
# violation of the requirements for repository integrity.
4499
raise AssertionError(
4500
'cannot copy revisions to fill in missing deltas %s' % (
4501
keys['revisions'],))
4502
for substream_kind, keys in keys.iteritems():
4503
vf = getattr(self.from_repository, substream_kind)
4504
if vf is None and keys:
4505
raise AssertionError(
4506
"cannot fill in keys for a versioned file we don't"
4507
" have: %s needs %s" % (substream_kind, keys))
4509
# No need to stream something we don't have
4511
if substream_kind == 'inventories':
4512
# Some missing keys are genuinely ghosts, filter those out.
4513
present = self.from_repository.inventories.get_parent_map(keys)
4514
revs = [key[0] for key in present]
4515
# Get the inventory stream more-or-less as we do for the
4516
# original stream; there's no reason to assume that records
4517
# direct from the source will be suitable for the sink. (Think
4518
# e.g. 2a -> 1.9-rich-root).
4519
for info in self._get_inventory_stream(revs, missing=True):
4523
# Ask for full texts always so that we don't need more round trips
4524
# after this stream.
4525
# Some of the missing keys are genuinely ghosts, so filter absent
4526
# records. The Sink is responsible for doing another check to
4527
# ensure that ghosts don't introduce missing data for future
4529
stream = versionedfile.filter_absent(vf.get_record_stream(keys,
4530
self.to_format._fetch_order, True))
4531
yield substream_kind, stream
4533
def inventory_fetch_order(self):
4534
if self._rich_root_upgrade():
4535
return 'topological'
4537
return self.to_format._fetch_order
4539
def _rich_root_upgrade(self):
4540
return (not self.from_repository._format.rich_root_data and
4541
self.to_format.rich_root_data)
4543
def _get_inventory_stream(self, revision_ids, missing=False):
4544
from_format = self.from_repository._format
4545
if (from_format.supports_chks and self.to_format.supports_chks and
4546
from_format.network_name() == self.to_format.network_name()):
4547
raise AssertionError(
4548
"this case should be handled by GroupCHKStreamSource")
4549
elif 'forceinvdeltas' in debug.debug_flags:
4550
return self._get_convertable_inventory_stream(revision_ids,
4551
delta_versus_null=missing)
4552
elif from_format.network_name() == self.to_format.network_name():
4554
return self._get_simple_inventory_stream(revision_ids,
4556
elif (not from_format.supports_chks and not self.to_format.supports_chks
4557
and from_format._serializer == self.to_format._serializer):
4558
# Essentially the same format.
4559
return self._get_simple_inventory_stream(revision_ids,
4562
# Any time we switch serializations, we want to use an
4563
# inventory-delta based approach.
4564
return self._get_convertable_inventory_stream(revision_ids,
4565
delta_versus_null=missing)
4567
def _get_simple_inventory_stream(self, revision_ids, missing=False):
4568
# NB: This currently reopens the inventory weave in source;
4569
# using a single stream interface instead would avoid this.
4570
from_weave = self.from_repository.inventories
4572
delta_closure = True
4574
delta_closure = not self.delta_on_metadata()
4575
yield ('inventories', from_weave.get_record_stream(
4576
[(rev_id,) for rev_id in revision_ids],
4577
self.inventory_fetch_order(), delta_closure))
4579
def _get_convertable_inventory_stream(self, revision_ids,
4580
delta_versus_null=False):
4581
# The source is using CHKs, but the target either doesn't or it has a
4582
# different serializer. The StreamSink code expects to be able to
4583
# convert on the target, so we need to put bytes-on-the-wire that can
4584
# be converted. That means inventory deltas (if the remote is <1.19,
4585
# RemoteStreamSink will fallback to VFS to insert the deltas).
4586
yield ('inventory-deltas',
4587
self._stream_invs_as_deltas(revision_ids,
4588
delta_versus_null=delta_versus_null))
4590
def _stream_invs_as_deltas(self, revision_ids, delta_versus_null=False):
4591
"""Return a stream of inventory-deltas for the given rev ids.
4593
:param revision_ids: The list of inventories to transmit
4594
:param delta_versus_null: Don't try to find a minimal delta for this
4595
entry, instead compute the delta versus the NULL_REVISION. This
4596
effectively streams a complete inventory. Used for stuff like
4597
filling in missing parents, etc.
4599
from_repo = self.from_repository
4600
revision_keys = [(rev_id,) for rev_id in revision_ids]
4601
parent_map = from_repo.inventories.get_parent_map(revision_keys)
4602
# XXX: possibly repos could implement a more efficient iter_inv_deltas
4604
inventories = self.from_repository.iter_inventories(
4605
revision_ids, 'topological')
4606
format = from_repo._format
4607
invs_sent_so_far = set([_mod_revision.NULL_REVISION])
4608
inventory_cache = lru_cache.LRUCache(50)
4609
null_inventory = from_repo.revision_tree(
4610
_mod_revision.NULL_REVISION).inventory
4611
# XXX: ideally the rich-root/tree-refs flags would be per-revision, not
4612
# per-repo (e.g. streaming a non-rich-root revision out of a rich-root
4613
# repo back into a non-rich-root repo ought to be allowed)
4614
serializer = inventory_delta.InventoryDeltaSerializer(
4615
versioned_root=format.rich_root_data,
4616
tree_references=format.supports_tree_reference)
4617
for inv in inventories:
4618
key = (inv.revision_id,)
4619
parent_keys = parent_map.get(key, ())
4621
if not delta_versus_null and parent_keys:
4622
# The caller did not ask for complete inventories and we have
4623
# some parents that we can delta against. Make a delta against
4624
# each parent so that we can find the smallest.
4625
parent_ids = [parent_key[0] for parent_key in parent_keys]
4626
for parent_id in parent_ids:
4627
if parent_id not in invs_sent_so_far:
4628
# We don't know that the remote side has this basis, so
4631
if parent_id == _mod_revision.NULL_REVISION:
4632
parent_inv = null_inventory
4634
parent_inv = inventory_cache.get(parent_id, None)
4635
if parent_inv is None:
4636
parent_inv = from_repo.get_inventory(parent_id)
4637
candidate_delta = inv._make_delta(parent_inv)
4638
if (delta is None or
4639
len(delta) > len(candidate_delta)):
4640
delta = candidate_delta
4641
basis_id = parent_id
4643
# Either none of the parents ended up being suitable, or we
4644
# were asked to delta against NULL
4645
basis_id = _mod_revision.NULL_REVISION
4646
delta = inv._make_delta(null_inventory)
4647
invs_sent_so_far.add(inv.revision_id)
4648
inventory_cache[inv.revision_id] = inv
4649
delta_serialized = ''.join(
4650
serializer.delta_to_lines(basis_id, key[-1], delta))
4651
yield versionedfile.FulltextContentFactory(
4652
key, parent_keys, None, delta_serialized)
4655
def _iter_for_revno(repo, partial_history_cache, stop_index=None,
4656
stop_revision=None):
4657
"""Extend the partial history to include a given index
4659
If a stop_index is supplied, stop when that index has been reached.
4660
If a stop_revision is supplied, stop when that revision is
4661
encountered. Otherwise, stop when the beginning of history is
4664
:param stop_index: The index which should be present. When it is
4665
present, history extension will stop.
4666
:param stop_revision: The revision id which should be present. When
4667
it is encountered, history extension will stop.
4669
start_revision = partial_history_cache[-1]
4670
iterator = repo.iter_reverse_revision_history(start_revision)
4672
#skip the last revision in the list
4675
if (stop_index is not None and
4676
len(partial_history_cache) > stop_index):
4678
if partial_history_cache[-1] == stop_revision:
4680
revision_id = iterator.next()
4681
partial_history_cache.append(revision_id)
4682
except StopIteration: