60
58
# and add in all file versions
63
@deprecated_function(zero_eight)
64
62
def greedy_fetch(to_branch, from_branch, revision=None, pb=None):
65
"""Legacy API, please see branch.fetch(from_branch, last_revision, pb)."""
66
63
f = Fetcher(to_branch, from_branch, revision, pb)
67
64
return f.count_copied, f.failed_revisions
72
class RepoFetcher(object):
73
"""Pull revisions and texts from one repository to another.
76
if set, try to limit to the data this revision references.
68
class Fetcher(object):
69
"""Pull revisions and texts from one branch to another.
71
This doesn't update the destination's history; that can be done
72
separately if desired.
75
If set, pull only up to this revision_id.
79
last_revision -- if last_revision
80
is given it will be that, otherwise the last revision of
79
83
count_copied -- number of revisions copied
81
This should not be used directory, its essential a object to encapsulate
82
the logic in InterRepository.fetch().
85
count_texts -- number of file texts copied
84
def __init__(self, to_repository, from_repository, last_revision=None, pb=None):
87
def __init__(self, to_branch, from_branch, last_revision=None, pb=None):
88
self.to_branch = to_branch
89
self.to_weaves = to_branch.weave_store
90
self.from_branch = from_branch
91
self.from_weaves = from_branch.weave_store
86
92
self.failed_revisions = []
87
93
self.count_copied = 0
88
if to_repository.control_files._transport.base == from_repository.control_files._transport.base:
89
# check that last_revision is in 'from' and then return a no-operation.
90
if last_revision not in (None, NULL_REVISION):
91
from_repository.get_revision(last_revision)
93
self.to_repository = to_repository
94
self.from_repository = from_repository
95
# must not mutate self._last_revision as its potentially a shared instance
96
self._last_revision = last_revision
98
self.pb = bzrlib.ui.ui_factory.nested_progress_bar()
99
self.nested_pb = self.pb
97
self.pb = bzrlib.ui.ui_factory.progress_bar()
102
self.nested_pb = None
103
self.from_repository.lock_read()
105
self.to_repository.lock_write()
109
if self.nested_pb is not None:
110
self.nested_pb.finished()
111
self.to_repository.unlock()
113
self.from_repository.unlock()
116
"""Primary worker function.
118
This initialises all the needed variables, and then fetches the
119
requested revisions, finally clearing the progress bar.
121
self.to_weaves = self.to_repository.weave_store
122
self.to_control = self.to_repository.control_weaves
123
self.from_weaves = self.from_repository.weave_store
124
self.from_control = self.from_repository.control_weaves
126
self.file_ids_names = {}
127
pp = ProgressPhase('fetch phase', 4, self.pb)
129
revs = self._revids_to_fetch()
133
self._fetch_weave_texts(revs)
135
self._fetch_inventory_weave(revs)
137
self._fetch_revision_texts(revs)
138
self.count_copied += len(revs)
142
def _revids_to_fetch(self):
143
mutter('fetch up to rev {%s}', self._last_revision)
144
if self._last_revision is NULL_REVISION:
145
# explicit limit of no revisions needed
147
if (self._last_revision != None and
148
self.to_repository.has_revision(self._last_revision)):
100
self.last_revision = self._find_last_revision(last_revision)
101
mutter('fetch up to rev {%s}', self.last_revision)
102
revs_to_fetch = self._compare_ancestries()
103
self._copy_revisions(revs_to_fetch)
104
self.new_ancestry = revs_to_fetch
108
def _find_last_revision(self, last_revision):
109
"""Find the limiting source revision.
111
Every ancestor of that revision will be merged across.
113
Returns the revision_id, or returns None if there's no history
114
in the source branch."""
115
self.pb.update('get source history')
116
from_history = self.from_branch.revision_history()
117
self.pb.update('get destination history')
119
if last_revision not in from_history:
120
raise NoSuchRevision(self.from_branch, last_revision)
124
return from_history[-1]
126
return None # no history in the source branch
152
return self.to_repository.missing_revision_ids(self.from_repository,
154
except errors.NoSuchRevision:
155
raise InstallFailed([self._last_revision])
157
def _fetch_weave_texts(self, revs):
158
texts_pb = bzrlib.ui.ui_factory.nested_progress_bar()
160
file_ids = self.from_repository.fileids_altered_by_revision_ids(revs)
162
num_file_ids = len(file_ids)
163
for file_id, required_versions in file_ids.items():
164
texts_pb.update("fetch texts", count, num_file_ids)
166
to_weave = self.to_weaves.get_weave_or_empty(file_id,
167
self.to_repository.get_transaction())
168
from_weave = self.from_weaves.get_weave(file_id,
169
self.from_repository.get_transaction())
170
# we fetch all the texts, because texts do
171
# not reference anything, and its cheap enough
172
to_weave.join(from_weave, version_ids=required_versions)
173
# we don't need *all* of this data anymore, but we dont know
174
# what we do. This cache clearing will result in a new read
175
# of the knit data when we do the checkout, but probably we
176
# want to emit the needed data on the fly rather than at the
178
# the from weave should know not to cache data being joined,
179
# but its ok to ask it to clear.
180
from_weave.clear_cache()
181
to_weave.clear_cache()
185
def _fetch_inventory_weave(self, revs):
186
pb = bzrlib.ui.ui_factory.nested_progress_bar()
188
pb.update("fetch inventory", 0, 2)
189
to_weave = self.to_control.get_weave('inventory',
190
self.to_repository.get_transaction())
192
child_pb = bzrlib.ui.ui_factory.nested_progress_bar()
194
# just merge, this is optimisable and its means we don't
195
# copy unreferenced data such as not-needed inventories.
196
pb.update("fetch inventory", 1, 3)
197
from_weave = self.from_repository.get_inventory_weave()
198
pb.update("fetch inventory", 2, 3)
199
# we fetch only the referenced inventories because we do not
200
# know for unselected inventories whether all their required
201
# texts are present in the other repository - it could be
203
to_weave.join(from_weave, pb=child_pb, msg='merge inventory',
211
class GenericRepoFetcher(RepoFetcher):
212
"""This is a generic repo to repo fetcher.
214
This makes minimal assumptions about repo layout and contents.
215
It triggers a reconciliation after fetching to ensure integrity.
218
def _fetch_revision_texts(self, revs):
219
"""Fetch revision object texts"""
220
rev_pb = bzrlib.ui.ui_factory.nested_progress_bar()
222
to_txn = self.to_transaction = self.to_repository.get_transaction()
225
to_store = self.to_repository._revision_store
227
pb = bzrlib.ui.ui_factory.nested_progress_bar()
229
pb.update('copying revisions', count, total)
231
sig_text = self.from_repository.get_signature_text(rev)
232
to_store.add_revision_signature_text(rev, sig_text, to_txn)
233
except errors.NoSuchRevision:
236
to_store.add_revision(self.from_repository.get_revision(rev),
241
# fixup inventory if needed:
242
# this is expensive because we have no inverse index to current ghosts.
243
# but on local disk its a few seconds and sftp push is already insane.
245
# FIXME: repository should inform if this is needed.
246
self.to_repository.reconcile()
251
class KnitRepoFetcher(RepoFetcher):
252
"""This is a knit format repository specific fetcher.
254
This differs from the GenericRepoFetcher by not doing a
255
reconciliation after copying, and using knit joining to
259
def _fetch_revision_texts(self, revs):
260
# may need to be a InterRevisionStore call here.
261
from_transaction = self.from_repository.get_transaction()
262
to_transaction = self.to_repository.get_transaction()
263
to_sf = self.to_repository._revision_store.get_signature_file(
265
from_sf = self.from_repository._revision_store.get_signature_file(
267
to_sf.join(from_sf, version_ids=revs, ignore_missing=True)
268
to_rf = self.to_repository._revision_store.get_revision_file(
270
from_rf = self.from_repository._revision_store.get_revision_file(
272
to_rf.join(from_rf, version_ids=revs)
275
class Fetcher(object):
276
"""Backwards compatibility glue for branch.fetch()."""
278
@deprecated_method(zero_eight)
279
def __init__(self, to_branch, from_branch, last_revision=None, pb=None):
280
"""Please see branch.fetch()."""
281
to_branch.fetch(from_branch, last_revision, pb)
129
def _compare_ancestries(self):
130
"""Get a list of revisions that must be copied.
132
That is, every revision that's in the ancestry of the source
133
branch and not in the destination branch."""
134
self.pb.update('get source ancestry')
135
self.from_ancestry = self.from_branch.get_ancestry(self.last_revision)
137
dest_last_rev = self.to_branch.last_revision()
138
self.pb.update('get destination ancestry')
140
dest_ancestry = self.to_branch.get_ancestry(dest_last_rev)
143
ss = set(dest_ancestry)
145
for rev_id in self.from_ancestry:
147
to_fetch.append(rev_id)
148
mutter('need to get revision {%s}', rev_id)
149
mutter('need to get %d revisions in total', len(to_fetch))
150
self.count_total = len(to_fetch)
155
def _copy_revisions(self, revs_to_fetch):
157
for rev_id in revs_to_fetch:
159
if self.to_branch.has_revision(rev_id):
161
self.pb.update('fetch revision', i, self.count_total)
162
self._copy_one_revision(rev_id)
163
self.count_copied += 1
166
def _copy_one_revision(self, rev_id):
167
"""Copy revision and everything referenced by it."""
168
mutter('copying revision {%s}', rev_id)
169
rev_xml = self.from_branch.get_revision_xml(rev_id)
170
inv_xml = self.from_branch.get_inventory_xml(rev_id)
171
rev = serializer_v5.read_revision_from_string(rev_xml)
172
inv = serializer_v5.read_inventory_from_string(inv_xml)
173
assert rev.revision_id == rev_id
174
assert rev.inventory_sha1 == sha_string(inv_xml)
175
mutter(' commiter %s, %d parents',
178
self._copy_new_texts(rev_id, inv)
179
parent_ids = [x.revision_id for x in rev.parents]
180
self._copy_inventory(rev_id, inv_xml, parent_ids)
181
self._copy_ancestry(rev_id, parent_ids)
182
self.to_branch.revision_store.add(StringIO(rev_xml), rev_id)
185
def _copy_inventory(self, rev_id, inv_xml, parent_ids):
186
self.to_weaves.add_text(INVENTORY_FILEID, rev_id,
187
split_lines(inv_xml), parent_ids)
190
def _copy_ancestry(self, rev_id, parent_ids):
191
ancestry_lines = self.from_weaves.get_lines(ANCESTRY_FILEID, rev_id)
192
self.to_weaves.add_text(ANCESTRY_FILEID, rev_id, ancestry_lines,
196
def _copy_new_texts(self, rev_id, inv):
197
"""Copy any new texts occuring in this revision."""
198
# TODO: Rather than writing out weaves every time, hold them
199
# in memory until everything's done? But this way is nicer
200
# if it's interrupted.
201
for path, ie in inv.iter_entries():
202
if ie.kind != 'file':
204
if ie.text_version != rev_id:
206
mutter('%s {%s} is changed in this revision',
208
self._copy_one_text(rev_id, ie.file_id)
211
def _copy_one_text(self, rev_id, file_id):
212
"""Copy one file text."""
213
mutter('copy text version {%s} of file {%s}',
215
from_weave = self.from_weaves.get_weave(file_id)
216
from_idx = from_weave.lookup(rev_id)
217
from_parents = map(from_weave.idx_to_name, from_weave.parents(from_idx))
218
text_lines = from_weave.get(from_idx)
219
to_weave = self.to_weaves.get_weave_or_empty(file_id)
220
to_parents = map(to_weave.lookup, from_parents)
221
# it's ok to add even if the text is already there
222
to_weave.add(rev_id, to_parents, text_lines)
223
self.to_weaves.put_weave(file_id, to_weave)
224
self.count_texts += 1