21
21
that has merged into it. As the first step of a merge, pull, or
22
22
branch operation we copy history from the source into the destination
25
The copying is done in a slightly complicated order. We don't want to
26
add a revision to the store until everything it refers to is also
27
stored, so that if a revision is present we can totally recreate it.
28
However, we can't know what files are included in a revision until we
29
read its inventory. So we query the inventory store of the source for
30
the ids we need, and then pull those ids and finally actually join
33
from bzrlib.revision import NULL_REVISION
34
from bzrlib.tsort import topo_sort
35
import bzrlib.errors as errors
36
from bzrlib.errors import (InstallFailed,
38
from bzrlib.progress import ProgressPhase
39
from bzrlib.revision import is_null, NULL_REVISION
40
from bzrlib.symbol_versioning import (deprecated_function,
35
43
from bzrlib.trace import mutter
37
from bzrlib.versionedfile import FulltextContentFactory
46
from bzrlib.lazy_import import lazy_import
48
# TODO: Avoid repeatedly opening weaves so many times.
50
# XXX: This doesn't handle ghost (not present in branch) revisions at
51
# all yet. I'm not sure they really should be supported.
53
# NOTE: This doesn't copy revisions which may be present but not
54
# merged into the last revision. I'm not sure we want to do that.
56
# - get a list of revisions that need to be pulled in
57
# - for each one, pull in that revision file
58
# and get the inventory, and store the inventory with right
60
# - and get the ancestry, and store that with right parents too
61
# - and keep a note of all file ids and version seen
62
# - then go through all files; for each one get the weave,
63
# and add in all file versions
40
66
class RepoFetcher(object):
41
67
"""Pull revisions and texts from one repository to another.
70
if set, try to limit to the data this revision references.
73
count_copied -- number of revisions copied
43
75
This should not be used directly, it's essential a object to encapsulate
44
76
the logic in InterRepository.fetch().
47
def __init__(self, to_repository, from_repository, last_revision=None,
48
pb=None, find_ghosts=True, fetch_spec=None):
49
"""Create a repo fetcher.
51
:param last_revision: If set, try to limit to the data this revision
53
:param find_ghosts: If True search the entire history for ghosts.
54
:param pb: ProgressBar object to use; deprecated and ignored.
55
This method will just create one on top of the stack.
58
symbol_versioning.warn(
59
symbol_versioning.deprecated_in((1, 14, 0))
60
% "pb parameter to RepoFetcher.__init__")
61
# and for simplicity it is in fact ignored
62
# repository.fetch has the responsibility for short-circuiting
63
# attempts to copy between a repository and itself.
78
def __init__(self, to_repository, from_repository, last_revision=None, pb=None):
80
self.failed_revisions = []
82
if to_repository.has_same_location(from_repository):
83
# repository.fetch should be taking care of this case.
84
raise errors.BzrError('RepoFetcher run '
85
'between two objects at the same location: '
86
'%r and %r' % (to_repository, from_repository))
64
87
self.to_repository = to_repository
65
88
self.from_repository = from_repository
66
self.sink = to_repository._get_sink()
67
89
# must not mutate self._last_revision as its potentially a shared instance
68
90
self._last_revision = last_revision
69
self._fetch_spec = fetch_spec
70
self.find_ghosts = find_ghosts
92
self.pb = bzrlib.ui.ui_factory.nested_progress_bar()
93
self.nested_pb = self.pb
71
97
self.from_repository.lock_read()
72
mutter("Using fetch logic to copy between %s(%s) and %s(%s)",
73
self.from_repository, self.from_repository._format,
74
self.to_repository, self.to_repository._format)
99
self.to_repository.lock_write()
101
self.to_repository.start_write_group()
105
self.to_repository.abort_write_group()
108
self.to_repository.commit_write_group()
110
if self.nested_pb is not None:
111
self.nested_pb.finished()
112
self.to_repository.unlock()
78
114
self.from_repository.unlock()
80
116
def __fetch(self):
81
117
"""Primary worker function.
83
This initialises all the needed variables, and then fetches the
119
This initialises all the needed variables, and then fetches the
84
120
requested revisions, finally clearing the progress bar.
86
# Roughly this is what we're aiming for fetch to become:
88
# missing = self.sink.insert_stream(self.source.get_stream(search))
90
# missing = self.sink.insert_stream(self.source.get_items(missing))
122
self.to_weaves = self.to_repository.weave_store
123
self.from_weaves = self.from_repository.weave_store
92
124
self.count_total = 0
93
125
self.file_ids_names = {}
94
pb = bzrlib.ui.ui_factory.nested_progress_bar()
95
pb.show_pct = pb.show_count = False
126
pp = ProgressPhase('Fetch phase', 4, self.pb)
97
pb.update("Finding revisions", 0, 2)
98
search = self._revids_to_fetch()
101
pb.update("Fetching revisions", 1, 2)
102
self._fetch_everything_for_search(search)
129
revs = self._revids_to_fetch()
130
self._fetch_everything_for_revisions(revs, pp)
106
def _fetch_everything_for_search(self, search):
134
def _fetch_everything_for_revisions(self, revs, pp):
107
135
"""Fetch all data for the given set of revisions."""
108
138
# The first phase is "file". We pass the progress bar for it directly
109
139
# into item_keys_introduced_by, which has more information about how
110
140
# that phase is progressing than we do. Progress updates for the other
113
143
# item_keys_introduced_by should have a richer API than it does at the
114
144
# moment, so that it can feed the progress information back to this
116
if (self.from_repository._format.rich_root_data and
117
not self.to_repository._format.rich_root_data):
118
raise errors.IncompatibleRepositories(
119
self.from_repository, self.to_repository,
120
"different rich-root support")
121
147
pb = bzrlib.ui.ui_factory.nested_progress_bar()
123
pb.update("Get stream source")
124
source = self.from_repository._get_source(
125
self.to_repository._format)
126
stream = source.get_stream(search)
127
from_format = self.from_repository._format
128
pb.update("Inserting stream")
129
resume_tokens, missing_keys = self.sink.insert_stream(
130
stream, from_format, [])
131
if self.to_repository._fallback_repositories:
133
self._parent_inventories(search.get_keys()))
135
pb.update("Missing keys")
136
stream = source.get_stream_for_missing_keys(missing_keys)
137
pb.update("Inserting missing keys")
138
resume_tokens, missing_keys = self.sink.insert_stream(
139
stream, from_format, resume_tokens)
141
raise AssertionError(
142
"second push failed to complete a fetch %r." % (
145
raise AssertionError(
146
"second push failed to commit the fetch %r." % (
148
pb.update("Finishing stream")
149
data_to_fetch = self.from_repository.item_keys_introduced_by(revs, pb)
150
for knit_kind, file_id, revisions in data_to_fetch:
151
if knit_kind != phase:
153
# Make a new progress bar for this phase
156
pb = bzrlib.ui.ui_factory.nested_progress_bar()
157
if knit_kind == "file":
158
self._fetch_weave_text(file_id, revisions)
159
elif knit_kind == "inventory":
161
# Once we've processed all the files, then we generate the root
162
# texts (if necessary), then we process the inventory. It's a
163
# bit distasteful to have knit_kind == "inventory" mean this,
164
# perhaps it should happen on the first non-"file" knit, in case
165
# it's not always inventory?
166
self._generate_root_texts(revs)
167
self._fetch_inventory_weave(revs, pb)
168
elif knit_kind == "signatures":
169
# Nothing to do here; this will be taken care of when
170
# _fetch_revision_texts happens.
172
elif knit_kind == "revisions":
173
self._fetch_revision_texts(revs, pb)
175
raise AssertionError("Unknown knit kind %r" % knit_kind)
179
self.count_copied += len(revs)
153
181
def _revids_to_fetch(self):
154
182
"""Determines the exact revisions needed from self.from_repository to
155
183
install self._last_revision in self.to_repository.
157
185
If no revisions need to be fetched, then this just returns None.
159
if self._fetch_spec is not None:
160
return self._fetch_spec
161
187
mutter('fetch up to rev {%s}', self._last_revision)
162
188
if self._last_revision is NULL_REVISION:
163
189
# explicit limit of no revisions needed
165
return self.to_repository.search_missing_revision_ids(
166
self.from_repository, self._last_revision,
167
find_ghosts=self.find_ghosts)
169
def _parent_inventories(self, revision_ids):
170
# Find all the parent revisions referenced by the stream, but
171
# not present in the stream, and make sure we send their
173
parent_maps = self.to_repository.get_parent_map(revision_ids)
175
map(parents.update, parent_maps.itervalues())
176
parents.discard(NULL_REVISION)
177
parents.difference_update(revision_ids)
178
missing_keys = set(('inventories', rev_id) for rev_id in parents)
191
if (self._last_revision is not None and
192
self.to_repository.has_revision(self._last_revision)):
196
return self.to_repository.missing_revision_ids(self.from_repository,
198
except errors.NoSuchRevision:
199
raise InstallFailed([self._last_revision])
201
def _fetch_weave_text(self, file_id, required_versions):
202
to_weave = self.to_weaves.get_weave_or_empty(file_id,
203
self.to_repository.get_transaction())
204
from_weave = self.from_weaves.get_weave(file_id,
205
self.from_repository.get_transaction())
206
# we fetch all the texts, because texts do
207
# not reference anything, and its cheap enough
208
to_weave.join(from_weave, version_ids=required_versions)
209
# we don't need *all* of this data anymore, but we dont know
210
# what we do. This cache clearing will result in a new read
211
# of the knit data when we do the checkout, but probably we
212
# want to emit the needed data on the fly rather than at the
214
# the from weave should know not to cache data being joined,
215
# but its ok to ask it to clear.
216
from_weave.clear_cache()
217
to_weave.clear_cache()
219
def _fetch_inventory_weave(self, revs, pb):
220
pb.update("fetch inventory", 0, 2)
221
to_weave = self.to_repository.get_inventory_weave()
222
child_pb = bzrlib.ui.ui_factory.nested_progress_bar()
224
# just merge, this is optimisable and its means we don't
225
# copy unreferenced data such as not-needed inventories.
226
pb.update("fetch inventory", 1, 3)
227
from_weave = self.from_repository.get_inventory_weave()
228
pb.update("fetch inventory", 2, 3)
229
# we fetch only the referenced inventories because we do not
230
# know for unselected inventories whether all their required
231
# texts are present in the other repository - it could be
233
to_weave.join(from_weave, pb=child_pb, msg='merge inventory',
235
from_weave.clear_cache()
239
def _generate_root_texts(self, revs):
240
"""This will be called by __fetch between fetching weave texts and
241
fetching the inventory weave.
243
Subclasses should override this if they need to generate root texts
244
after fetching weave texts.
249
class GenericRepoFetcher(RepoFetcher):
250
"""This is a generic repo to repo fetcher.
252
This makes minimal assumptions about repo layout and contents.
253
It triggers a reconciliation after fetching to ensure integrity.
256
def _fetch_revision_texts(self, revs, pb):
257
"""Fetch revision object texts"""
258
to_txn = self.to_transaction = self.to_repository.get_transaction()
261
to_store = self.to_repository._revision_store
263
pb.update('copying revisions', count, total)
265
sig_text = self.from_repository.get_signature_text(rev)
266
to_store.add_revision_signature_text(rev, sig_text, to_txn)
267
except errors.NoSuchRevision:
270
to_store.add_revision(self.from_repository.get_revision(rev),
273
# fixup inventory if needed:
274
# this is expensive because we have no inverse index to current ghosts.
275
# but on local disk its a few seconds and sftp push is already insane.
277
# FIXME: repository should inform if this is needed.
278
self.to_repository.reconcile()
281
class KnitRepoFetcher(RepoFetcher):
282
"""This is a knit format repository specific fetcher.
284
This differs from the GenericRepoFetcher by not doing a
285
reconciliation after copying, and using knit joining to
289
def _fetch_revision_texts(self, revs, pb):
290
# may need to be a InterRevisionStore call here.
291
from_transaction = self.from_repository.get_transaction()
292
to_transaction = self.to_repository.get_transaction()
293
to_sf = self.to_repository._revision_store.get_signature_file(
295
from_sf = self.from_repository._revision_store.get_signature_file(
297
to_sf.join(from_sf, version_ids=revs, ignore_missing=True)
298
to_rf = self.to_repository._revision_store.get_revision_file(
300
from_rf = self.from_repository._revision_store.get_revision_file(
302
to_rf.join(from_rf, version_ids=revs)
182
305
class Inter1and2Helper(object):
183
306
"""Helper for operations that convert data from model 1 and 2
185
308
This is for use by fetchers and converters.
188
def __init__(self, source):
311
def __init__(self, source, target):
191
314
:param source: The repository data comes from
315
:param target: The repository data goes to
193
317
self.source = source
195
320
def iter_rev_trees(self, revs):
196
321
"""Iterate through RevisionTrees efficiently.
212
335
revs = revs[100:]
214
def _find_root_ids(self, revs, parent_map, graph):
216
planned_versions = {}
217
for tree in self.iter_rev_trees(revs):
218
revision_id = tree.inventory.root.revision
219
root_id = tree.get_root_id()
220
planned_versions.setdefault(root_id, []).append(revision_id)
221
revision_root[revision_id] = root_id
222
# Find out which parents we don't already know root ids for
224
for revision_parents in parent_map.itervalues():
225
parents.update(revision_parents)
226
parents.difference_update(revision_root.keys() + [NULL_REVISION])
227
# Limit to revisions present in the versionedfile
228
parents = graph.get_parent_map(parents).keys()
229
for tree in self.iter_rev_trees(parents):
230
root_id = tree.get_root_id()
231
revision_root[tree.get_revision_id()] = root_id
232
return revision_root, planned_versions
234
337
def generate_root_texts(self, revs):
235
338
"""Generate VersionedFiles for all root ids.
237
340
:param revs: the revisions to include
239
graph = self.source.get_graph()
240
parent_map = graph.get_parent_map(revs)
241
rev_order = topo_sort(parent_map)
242
rev_id_to_root_id, root_id_to_rev_ids = self._find_root_ids(
243
revs, parent_map, graph)
244
root_id_order = [(rev_id_to_root_id[rev_id], rev_id) for rev_id in
246
# Guaranteed stable, this groups all the file id operations together
247
# retaining topological order within the revisions of a file id.
248
# File id splits and joins would invalidate this, but they don't exist
249
# yet, and are unlikely to in non-rich-root environments anyway.
250
root_id_order.sort(key=operator.itemgetter(0))
251
# Create a record stream containing the roots to create.
253
for key in root_id_order:
254
root_id, rev_id = key
255
rev_parents = parent_map[rev_id]
256
# We drop revision parents with different file-ids, because
257
# that represents a rename of the root to a different location
258
# - its not actually a parent for us. (We could look for that
259
# file id in the revision tree at considerably more expense,
260
# but for now this is sufficient (and reconcile will catch and
261
# correct this anyway).
262
# When a parent revision is a ghost, we guess that its root id
263
# was unchanged (rather than trimming it from the parent list).
264
parent_keys = tuple((root_id, parent) for parent in rev_parents
265
if parent != NULL_REVISION and
266
rev_id_to_root_id.get(parent, root_id) == root_id)
267
yield FulltextContentFactory(key, parent_keys, None, '')
268
return [('texts', yield_roots())]
342
inventory_weave = self.source.get_inventory_weave()
345
to_store = self.target.weave_store
346
for tree in self.iter_rev_trees(revs):
347
revision_id = tree.inventory.root.revision
348
root_id = tree.inventory.root.file_id
349
parents = inventory_weave.get_parents(revision_id)
350
if root_id not in versionedfile:
351
versionedfile[root_id] = to_store.get_weave_or_empty(root_id,
352
self.target.get_transaction())
353
_, _, parent_texts[root_id] = versionedfile[root_id].add_lines(
354
revision_id, parents, [], parent_texts)
356
def regenerate_inventory(self, revs):
357
"""Generate a new inventory versionedfile in target, convertin data.
359
The inventory is retrieved from the source, (deserializing it), and
360
stored in the target (reserializing it in a different format).
361
:param revs: The revisions to include
363
inventory_weave = self.source.get_inventory_weave()
364
for tree in self.iter_rev_trees(revs):
365
parents = inventory_weave.get_parents(tree.get_revision_id())
366
self.target.add_inventory(tree.get_revision_id(), tree.inventory,
370
class Model1toKnit2Fetcher(GenericRepoFetcher):
371
"""Fetch from a Model1 repository into a Knit2 repository
373
def __init__(self, to_repository, from_repository, last_revision=None,
375
self.helper = Inter1and2Helper(from_repository, to_repository)
376
GenericRepoFetcher.__init__(self, to_repository, from_repository,
379
def _generate_root_texts(self, revs):
380
self.helper.generate_root_texts(revs)
382
def _fetch_inventory_weave(self, revs, pb):
383
self.helper.regenerate_inventory(revs)
386
class Knit1to2Fetcher(KnitRepoFetcher):
387
"""Fetch from a Knit1 repository into a Knit2 repository"""
389
def __init__(self, to_repository, from_repository, last_revision=None,
391
self.helper = Inter1and2Helper(from_repository, to_repository)
392
KnitRepoFetcher.__init__(self, to_repository, from_repository,
395
def _generate_root_texts(self, revs):
396
self.helper.generate_root_texts(revs)
398
def _fetch_inventory_weave(self, revs, pb):
399
self.helper.regenerate_inventory(revs)