157
157
# item_keys_introduced_by should have a richer API than it does at the
158
158
# moment, so that it can feed the progress information back to this
160
self.pb = bzrlib.ui.ui_factory.nested_progress_bar()
162
from_format = self.from_repository._format
163
stream = self.get_stream(search, pp)
164
missing_keys = self.sink.insert_stream(stream, from_format)
166
stream = self.get_stream_for_missing_keys(missing_keys)
167
missing_keys = self.sink.insert_stream(stream, from_format)
169
raise AssertionError(
170
"second push failed to complete a fetch %r." % (
174
if self.pb is not None:
177
def get_stream(self, search, pp):
161
pb = bzrlib.ui.ui_factory.nested_progress_bar()
163
revs = search.get_keys()
164
graph = self.from_repository.get_graph()
165
revs = list(graph.iter_topo_order(revs))
166
data_to_fetch = self.from_repository.item_keys_introduced_by(revs,
169
for knit_kind, file_id, revisions in data_to_fetch:
170
if knit_kind != phase:
172
# Make a new progress bar for this phase
175
pb = bzrlib.ui.ui_factory.nested_progress_bar()
176
if knit_kind == "file":
177
# Accumulate file texts
178
text_keys.extend([(file_id, revision) for revision in
180
elif knit_kind == "inventory":
181
# Now copy the file texts.
182
to_texts = self.to_repository.texts
183
from_texts = self.from_repository.texts
184
to_texts.insert_record_stream(from_texts.get_record_stream(
185
text_keys, self.to_repository._fetch_order,
186
not self.to_repository._fetch_uses_deltas))
187
# Cause an error if a text occurs after we have done the
190
# Before we process the inventory we generate the root
191
# texts (if necessary) so that the inventories references
193
self._generate_root_texts(revs)
194
# NB: This currently reopens the inventory weave in source;
195
# using a single stream interface instead would avoid this.
196
self._fetch_inventory_weave(revs, pb)
197
elif knit_kind == "signatures":
198
# Nothing to do here; this will be taken care of when
199
# _fetch_revision_texts happens.
201
elif knit_kind == "revisions":
202
self._fetch_revision_texts(revs, pb)
204
raise AssertionError("Unknown knit kind %r" % knit_kind)
205
if self.to_repository._fetch_reconcile:
206
self.to_repository.reconcile()
179
revs = search.get_keys()
180
graph = self.from_repository.get_graph()
181
revs = list(graph.iter_topo_order(revs))
182
data_to_fetch = self.from_repository.item_keys_introduced_by(
185
for knit_kind, file_id, revisions in data_to_fetch:
186
if knit_kind != phase:
188
# Make a new progress bar for this phase
191
self.pb = bzrlib.ui.ui_factory.nested_progress_bar()
192
if knit_kind == "file":
193
# Accumulate file texts
194
text_keys.extend([(file_id, revision) for revision in
196
elif knit_kind == "inventory":
197
# Now copy the file texts.
198
to_texts = self.to_repository.texts
199
from_texts = self.from_repository.texts
200
yield ('texts', from_texts.get_record_stream(
201
text_keys, self.to_repository._fetch_order,
202
not self.to_repository._fetch_uses_deltas))
203
# Cause an error if a text occurs after we have done the
206
# Before we process the inventory we generate the root
207
# texts (if necessary) so that the inventories references
209
for _ in self._generate_root_texts(revs):
211
# NB: This currently reopens the inventory weave in source;
212
# using a single stream interface instead would avoid this.
213
self.pb.update("fetch inventory", 0, 1)
214
from_weave = self.from_repository.inventories
215
# we fetch only the referenced inventories because we do not
216
# know for unselected inventories whether all their required
217
# texts are present in the other repository - it could be
219
yield ('inventories', from_weave.get_record_stream(
220
[(rev_id,) for rev_id in revs],
221
self.inventory_fetch_order(),
222
not self.delta_on_metadata()))
223
elif knit_kind == "signatures":
224
# Nothing to do here; this will be taken care of when
225
# _fetch_revision_texts happens.
227
elif knit_kind == "revisions":
228
for _ in self._fetch_revision_texts(revs, self.pb):
231
raise AssertionError("Unknown knit kind %r" % knit_kind)
210
232
self.count_copied += len(revs)
234
def get_stream_for_missing_keys(self, missing_keys):
235
# missing keys can only occur when we are byte copying and not
236
# translating (because translation means we don't send
237
# unreconstructable deltas ever).
239
keys['texts'] = set()
240
keys['revisions'] = set()
241
keys['inventories'] = set()
242
keys['signatures'] = set()
243
for key in missing_keys:
244
keys[key[0]].add(key[1:])
245
if len(keys['revisions']):
246
# If we allowed copying revisions at this point, we could end up
247
# copying a revision without copying its required texts: a
248
# violation of the requirements for repository integrity.
249
raise AssertionError(
250
'cannot copy revisions to fill in missing deltas %s' % (
252
for substream_kind, keys in keys.iteritems():
253
vf = getattr(self.from_repository, substream_kind)
254
# Ask for full texts always so that we don't need more round trips
256
stream = vf.get_record_stream(keys,
257
self.to_repository._fetch_order, True)
258
yield substream_kind, stream
212
260
def _revids_to_fetch(self):
213
261
"""Determines the exact revisions needed from self.from_repository to
214
262
install self._last_revision in self.to_repository.
229
277
except errors.NoSuchRevision, e:
230
278
raise InstallFailed([self._last_revision])
232
def _fetch_inventory_weave(self, revs, pb):
233
pb.update("fetch inventory", 0, 2)
234
to_weave = self.to_repository.inventories
235
# just merge, this is optimisable and its means we don't
236
# copy unreferenced data such as not-needed inventories.
237
pb.update("fetch inventory", 1, 3)
238
from_weave = self.from_repository.inventories
239
pb.update("fetch inventory", 2, 3)
240
# we fetch only the referenced inventories because we do not
241
# know for unselected inventories whether all their required
242
# texts are present in the other repository - it could be
244
to_weave.insert_record_stream(from_weave.get_record_stream(
245
[(rev_id,) for rev_id in revs],
246
self.to_repository._fetch_order,
247
not self.to_repository._fetch_uses_deltas))
249
280
def _fetch_revision_texts(self, revs, pb):
250
281
# fetch signatures first and then the revision texts
251
282
# may need to be a InterRevisionStore call here.
252
to_sf = self.to_repository.signatures
253
283
from_sf = self.from_repository.signatures
254
284
# A missing signature is just skipped.
255
to_sf.insert_record_stream(filter_absent(from_sf.get_record_stream(
256
[(rev_id,) for rev_id in revs],
285
keys = [(rev_id,) for rev_id in revs]
286
signatures = filter_absent(from_sf.get_record_stream(
257
288
self.to_repository._fetch_order,
258
not self.to_repository._fetch_uses_deltas)))
259
self._fetch_just_revision_texts(revs)
261
def _fetch_just_revision_texts(self, version_ids):
262
to_rf = self.to_repository.revisions
263
from_rf = self.from_repository.revisions
289
not self.to_repository._fetch_uses_deltas))
264
290
# If a revision has a delta, this is actually expanded inside the
265
291
# insert_record_stream code now, which is an alternate fix for
267
to_rf.insert_record_stream(from_rf.get_record_stream(
268
[(rev_id,) for rev_id in version_ids],
293
from_rf = self.from_repository.revisions
294
revisions = from_rf.get_record_stream(
269
296
self.to_repository._fetch_order,
270
not self.to_repository._fetch_uses_deltas))
297
not self.delta_on_metadata())
298
return [('signatures', signatures), ('revisions', revisions)]
272
300
def _generate_root_texts(self, revs):
273
301
"""This will be called by __fetch between fetching weave texts and
276
304
Subclasses should override this if they need to generate root texts
277
305
after fetching weave texts.
309
def inventory_fetch_order(self):
310
return self.to_repository._fetch_order
312
def delta_on_metadata(self):
313
src_serializer = self.from_repository._format._serializer
314
target_serializer = self.to_repository._format._serializer
315
return (self.to_repository._fetch_uses_deltas and
316
src_serializer == target_serializer)
282
319
class Inter1and2Helper(object):
283
320
"""Helper for operations that convert data from model 1 and 2
285
322
This is for use by fetchers and converters.
288
def __init__(self, source, target):
325
def __init__(self, source):
291
328
:param source: The repository data comes from
292
:param target: The repository data goes to
294
330
self.source = source
297
332
def iter_rev_trees(self, revs):
298
333
"""Iterate through RevisionTrees efficiently.
368
402
if parent != NULL_REVISION and
369
403
rev_id_to_root_id.get(parent, root_id) == root_id)
370
404
yield FulltextContentFactory(key, parent_keys, None, '')
371
to_texts.insert_record_stream(yield_roots())
373
def regenerate_inventory(self, revs):
374
"""Generate a new inventory versionedfile in target, convertin data.
376
The inventory is retrieved from the source, (deserializing it), and
377
stored in the target (reserializing it in a different format).
378
:param revs: The revisions to include
380
for tree in self.iter_rev_trees(revs):
381
parents = tree.get_parent_ids()
382
self.target.add_inventory(tree.get_revision_id(), tree.inventory,
385
def fetch_revisions(self, revision_ids):
386
# TODO: should this batch them up rather than requesting 10,000
388
for revision in self.source.get_revisions(revision_ids):
389
self.target.add_revision(revision.revision_id, revision)
405
return [('texts', yield_roots())]
392
408
class Model1toKnit2Fetcher(RepoFetcher):
395
411
def __init__(self, to_repository, from_repository, last_revision=None,
396
412
pb=None, find_ghosts=True):
397
self.helper = Inter1and2Helper(from_repository, to_repository)
398
RepoFetcher.__init__(self, to_repository, from_repository,
399
last_revision, pb, find_ghosts)
401
def _generate_root_texts(self, revs):
402
self.helper.generate_root_texts(revs)
404
def _fetch_inventory_weave(self, revs, pb):
405
self.helper.regenerate_inventory(revs)
407
def _fetch_revision_texts(self, revs, pb):
408
"""Fetch revision object texts"""
412
pb.update('copying revisions', count, total)
414
sig_text = self.from_repository.get_signature_text(rev)
415
self.to_repository.add_signature_text(rev, sig_text)
416
except errors.NoSuchRevision:
419
self._copy_revision(rev)
422
def _copy_revision(self, rev):
423
self.helper.fetch_revisions([rev])
426
class Knit1to2Fetcher(RepoFetcher):
427
"""Fetch from a Knit1 repository into a Knit2 repository"""
429
def __init__(self, to_repository, from_repository, last_revision=None,
430
pb=None, find_ghosts=True):
431
self.helper = Inter1and2Helper(from_repository, to_repository)
432
RepoFetcher.__init__(self, to_repository, from_repository,
433
last_revision, pb, find_ghosts)
435
def _generate_root_texts(self, revs):
436
self.helper.generate_root_texts(revs)
438
def _fetch_inventory_weave(self, revs, pb):
439
self.helper.regenerate_inventory(revs)
441
def _fetch_just_revision_texts(self, version_ids):
442
self.helper.fetch_revisions(version_ids)
413
self.helper = Inter1and2Helper(from_repository)
414
RepoFetcher.__init__(self, to_repository, from_repository,
415
last_revision, pb, find_ghosts)
417
def _generate_root_texts(self, revs):
418
return self.helper.generate_root_texts(revs)
420
def inventory_fetch_order(self):
423
Knit1to2Fetcher = Model1toKnit2Fetcher