62
58
# and add in all file versions
65
@deprecated_function(zero_eight)
66
62
def greedy_fetch(to_branch, from_branch, revision=None, pb=None):
67
"""Legacy API, please see branch.fetch(from_branch, last_revision, pb)."""
68
63
f = Fetcher(to_branch, from_branch, revision, pb)
69
64
return f.count_copied, f.failed_revisions
74
class RepoFetcher(object):
75
"""Pull revisions and texts from one repository to another.
78
if set, try to limit to the data this revision references.
68
class Fetcher(object):
69
"""Pull revisions and texts from one branch to another.
71
This doesn't update the destination's history; that can be done
72
separately if desired.
75
If set, pull only up to this revision_id.
79
last_revision -- if last_revision
80
is given it will be that, otherwise the last revision of
81
83
count_copied -- number of revisions copied
83
This should not be used directory, its essential a object to encapsulate
84
the logic in InterRepository.fetch().
85
count_texts -- number of file texts copied
86
def __init__(self, to_repository, from_repository, last_revision=None, pb=None):
87
def __init__(self, to_branch, from_branch, last_revision=None, pb=None):
88
self.to_branch = to_branch
89
self.to_weaves = to_branch.weave_store
90
self.from_branch = from_branch
91
self.from_weaves = from_branch.weave_store
88
92
self.failed_revisions = []
89
93
self.count_copied = 0
90
if to_repository.control_files._transport.base == from_repository.control_files._transport.base:
91
# check that last_revision is in 'from' and then return a no-operation.
92
if last_revision not in (None, NULL_REVISION):
93
from_repository.get_revision(last_revision)
95
self.to_repository = to_repository
96
self.from_repository = from_repository
97
# must not mutate self._last_revision as its potentially a shared instance
98
self._last_revision = last_revision
100
self.pb = bzrlib.ui.ui_factory.nested_progress_bar()
101
self.nested_pb = self.pb
97
self.pb = bzrlib.ui.ui_factory.progress_bar()
104
self.nested_pb = None
105
self.from_repository.lock_read()
107
self.to_repository.lock_write()
111
if self.nested_pb is not None:
112
self.nested_pb.finished()
113
self.to_repository.unlock()
115
self.from_repository.unlock()
118
"""Primary worker function.
120
This initialises all the needed variables, and then fetches the
121
requested revisions, finally clearing the progress bar.
123
self.to_weaves = self.to_repository.weave_store
124
self.to_control = self.to_repository.control_weaves
125
self.from_weaves = self.from_repository.weave_store
126
self.from_control = self.from_repository.control_weaves
128
self.file_ids_names = {}
129
pp = ProgressPhase('Fetch phase', 4, self.pb)
132
revs = self._revids_to_fetch()
136
self._fetch_weave_texts(revs)
138
self._fetch_inventory_weave(revs)
140
self._fetch_revision_texts(revs)
141
self.count_copied += len(revs)
145
def _revids_to_fetch(self):
146
mutter('fetch up to rev {%s}', self._last_revision)
147
if self._last_revision is NULL_REVISION:
148
# explicit limit of no revisions needed
150
if (self._last_revision != None and
151
self.to_repository.has_revision(self._last_revision)):
100
self.last_revision = self._find_last_revision(last_revision)
101
mutter('fetch up to rev {%s}', self.last_revision)
102
revs_to_fetch = self._compare_ancestries()
103
self._copy_revisions(revs_to_fetch)
104
self.new_ancestry = revs_to_fetch
108
def _find_last_revision(self, last_revision):
109
"""Find the limiting source revision.
111
Every ancestor of that revision will be merged across.
113
Returns the revision_id, or returns None if there's no history
114
in the source branch."""
115
self.pb.update('get source history')
116
from_history = self.from_branch.revision_history()
117
self.pb.update('get destination history')
119
if last_revision not in from_history:
120
raise NoSuchRevision(self.from_branch, last_revision)
124
return from_history[-1]
126
return None # no history in the source branch
155
return self.to_repository.missing_revision_ids(self.from_repository,
157
except errors.NoSuchRevision:
158
raise InstallFailed([self._last_revision])
160
def _fetch_weave_texts(self, revs):
161
texts_pb = bzrlib.ui.ui_factory.nested_progress_bar()
163
# fileids_altered_by_revision_ids requires reading the inventory
164
# weave, we will need to read the inventory weave again when
165
# all this is done, so enable caching for that specific weave
166
inv_w = self.from_repository.get_inventory_weave()
168
file_ids = self.from_repository.fileids_altered_by_revision_ids(revs)
170
num_file_ids = len(file_ids)
171
for file_id, required_versions in file_ids.items():
172
texts_pb.update("fetch texts", count, num_file_ids)
174
to_weave = self.to_weaves.get_weave_or_empty(file_id,
175
self.to_repository.get_transaction())
176
from_weave = self.from_weaves.get_weave(file_id,
177
self.from_repository.get_transaction())
178
# we fetch all the texts, because texts do
179
# not reference anything, and its cheap enough
180
to_weave.join(from_weave, version_ids=required_versions)
181
# we don't need *all* of this data anymore, but we dont know
182
# what we do. This cache clearing will result in a new read
183
# of the knit data when we do the checkout, but probably we
184
# want to emit the needed data on the fly rather than at the
186
# the from weave should know not to cache data being joined,
187
# but its ok to ask it to clear.
188
from_weave.clear_cache()
189
to_weave.clear_cache()
193
def _fetch_inventory_weave(self, revs):
194
pb = bzrlib.ui.ui_factory.nested_progress_bar()
196
pb.update("fetch inventory", 0, 2)
197
to_weave = self.to_control.get_weave('inventory',
198
self.to_repository.get_transaction())
200
child_pb = bzrlib.ui.ui_factory.nested_progress_bar()
202
# just merge, this is optimisable and its means we don't
203
# copy unreferenced data such as not-needed inventories.
204
pb.update("fetch inventory", 1, 3)
205
from_weave = self.from_repository.get_inventory_weave()
206
pb.update("fetch inventory", 2, 3)
207
# we fetch only the referenced inventories because we do not
208
# know for unselected inventories whether all their required
209
# texts are present in the other repository - it could be
211
to_weave.join(from_weave, pb=child_pb, msg='merge inventory',
213
from_weave.clear_cache()
220
class GenericRepoFetcher(RepoFetcher):
221
"""This is a generic repo to repo fetcher.
223
This makes minimal assumptions about repo layout and contents.
224
It triggers a reconciliation after fetching to ensure integrity.
227
def _fetch_revision_texts(self, revs):
228
"""Fetch revision object texts"""
229
rev_pb = bzrlib.ui.ui_factory.nested_progress_bar()
231
to_txn = self.to_transaction = self.to_repository.get_transaction()
234
to_store = self.to_repository._revision_store
236
pb = bzrlib.ui.ui_factory.nested_progress_bar()
238
pb.update('copying revisions', count, total)
240
sig_text = self.from_repository.get_signature_text(rev)
241
to_store.add_revision_signature_text(rev, sig_text, to_txn)
242
except errors.NoSuchRevision:
245
to_store.add_revision(self.from_repository.get_revision(rev),
250
# fixup inventory if needed:
251
# this is expensive because we have no inverse index to current ghosts.
252
# but on local disk its a few seconds and sftp push is already insane.
254
# FIXME: repository should inform if this is needed.
255
self.to_repository.reconcile()
260
class KnitRepoFetcher(RepoFetcher):
261
"""This is a knit format repository specific fetcher.
263
This differs from the GenericRepoFetcher by not doing a
264
reconciliation after copying, and using knit joining to
268
def _fetch_revision_texts(self, revs):
269
# may need to be a InterRevisionStore call here.
270
from_transaction = self.from_repository.get_transaction()
271
to_transaction = self.to_repository.get_transaction()
272
to_sf = self.to_repository._revision_store.get_signature_file(
274
from_sf = self.from_repository._revision_store.get_signature_file(
276
to_sf.join(from_sf, version_ids=revs, ignore_missing=True)
277
to_rf = self.to_repository._revision_store.get_revision_file(
279
from_rf = self.from_repository._revision_store.get_revision_file(
281
to_rf.join(from_rf, version_ids=revs)
284
class Fetcher(object):
285
"""Backwards compatibility glue for branch.fetch()."""
287
@deprecated_method(zero_eight)
288
def __init__(self, to_branch, from_branch, last_revision=None, pb=None):
289
"""Please see branch.fetch()."""
290
to_branch.fetch(from_branch, last_revision, pb)
129
def _compare_ancestries(self):
130
"""Get a list of revisions that must be copied.
132
That is, every revision that's in the ancestry of the source
133
branch and not in the destination branch."""
134
self.pb.update('get source ancestry')
135
self.from_ancestry = self.from_branch.get_ancestry(self.last_revision)
137
dest_last_rev = self.to_branch.last_revision()
138
self.pb.update('get destination ancestry')
140
dest_ancestry = self.to_branch.get_ancestry(dest_last_rev)
143
ss = set(dest_ancestry)
145
for rev_id in self.from_ancestry:
147
to_fetch.append(rev_id)
148
mutter('need to get revision {%s}', rev_id)
149
mutter('need to get %d revisions in total', len(to_fetch))
150
self.count_total = len(to_fetch)
155
def _copy_revisions(self, revs_to_fetch):
157
for rev_id in revs_to_fetch:
159
if self.to_branch.has_revision(rev_id):
161
self.pb.update('fetch revision', i, self.count_total)
162
self._copy_one_revision(rev_id)
163
self.count_copied += 1
166
def _copy_one_revision(self, rev_id):
167
"""Copy revision and everything referenced by it."""
168
mutter('copying revision {%s}', rev_id)
169
rev_xml = self.from_branch.get_revision_xml(rev_id)
170
inv_xml = self.from_branch.get_inventory_xml(rev_id)
171
rev = serializer_v5.read_revision_from_string(rev_xml)
172
inv = serializer_v5.read_inventory_from_string(inv_xml)
173
assert rev.revision_id == rev_id
174
assert rev.inventory_sha1 == sha_string(inv_xml)
175
mutter(' commiter %s, %d parents',
178
self._copy_new_texts(rev_id, inv)
179
parent_ids = [x.revision_id for x in rev.parents]
180
self._copy_inventory(rev_id, inv_xml, parent_ids)
181
self._copy_ancestry(rev_id, parent_ids)
182
self.to_branch.revision_store.add(StringIO(rev_xml), rev_id)
185
def _copy_inventory(self, rev_id, inv_xml, parent_ids):
186
self.to_weaves.add_text(INVENTORY_FILEID, rev_id,
187
split_lines(inv_xml), parent_ids)
190
def _copy_ancestry(self, rev_id, parent_ids):
191
ancestry_lines = self.from_weaves.get_lines(ANCESTRY_FILEID, rev_id)
192
self.to_weaves.add_text(ANCESTRY_FILEID, rev_id, ancestry_lines,
196
def _copy_new_texts(self, rev_id, inv):
197
"""Copy any new texts occuring in this revision."""
198
# TODO: Rather than writing out weaves every time, hold them
199
# in memory until everything's done? But this way is nicer
200
# if it's interrupted.
201
for path, ie in inv.iter_entries():
202
if ie.kind != 'file':
204
if ie.text_version != rev_id:
206
mutter('%s {%s} is changed in this revision',
208
self._copy_one_text(rev_id, ie.file_id)
211
def _copy_one_text(self, rev_id, file_id):
212
"""Copy one file text."""
213
mutter('copy text version {%s} of file {%s}',
215
from_weave = self.from_weaves.get_weave(file_id)
216
from_idx = from_weave.lookup(rev_id)
217
from_parents = map(from_weave.idx_to_name, from_weave.parents(from_idx))
218
text_lines = from_weave.get(from_idx)
219
to_weave = self.to_weaves.get_weave_or_empty(file_id)
220
to_parents = map(to_weave.lookup, from_parents)
221
# it's ok to add even if the text is already there
222
to_weave.add(rev_id, to_parents, text_lines)
223
self.to_weaves.put_weave(file_id, to_weave)
224
self.count_texts += 1