~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/fetch.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2007-12-20 16:16:34 UTC
  • mfrom: (3123.5.18 hardlinks)
  • Revision ID: pqm@pqm.ubuntu.com-20071220161634-2kcjb650o21ydko4
Accelerate build_tree using similar workingtrees (abentley)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2008 Canonical Ltd
 
1
# Copyright (C) 2005, 2006 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
27
27
stored, so that if a revision is present we can totally recreate it.
28
28
However, we can't know what files are included in a revision until we
29
29
read its inventory.  So we query the inventory store of the source for
30
 
the ids we need, and then pull those ids and then return to the inventories.
 
30
the ids we need, and then pull those ids and finally actually join
 
31
the inventories.
31
32
"""
32
33
 
33
 
import operator
34
 
 
35
34
import bzrlib
36
35
import bzrlib.errors as errors
37
 
from bzrlib.errors import InstallFailed
 
36
from bzrlib.errors import (InstallFailed,
 
37
                           )
38
38
from bzrlib.progress import ProgressPhase
39
39
from bzrlib.revision import is_null, NULL_REVISION
40
40
from bzrlib.symbol_versioning import (deprecated_function,
41
41
        deprecated_method,
42
42
        )
43
 
from bzrlib.tsort import topo_sort
44
43
from bzrlib.trace import mutter
45
44
import bzrlib.ui
46
 
from bzrlib.versionedfile import filter_absent, FulltextContentFactory
 
45
 
 
46
from bzrlib.lazy_import import lazy_import
47
47
 
48
48
# TODO: Avoid repeatedly opening weaves so many times.
49
49
 
75
75
    This should not be used directly, it's essential a object to encapsulate
76
76
    the logic in InterRepository.fetch().
77
77
    """
78
 
 
79
 
    def __init__(self, to_repository, from_repository, last_revision=None, pb=None,
80
 
        find_ghosts=True):
81
 
        """Create a repo fetcher.
82
 
 
83
 
        :param find_ghosts: If True search the entire history for ghosts.
84
 
        """
 
78
    def __init__(self, to_repository, from_repository, last_revision=None, pb=None):
85
79
        # result variables.
86
80
        self.failed_revisions = []
87
81
        self.count_copied = 0
94
88
        self.from_repository = from_repository
95
89
        # must not mutate self._last_revision as its potentially a shared instance
96
90
        self._last_revision = last_revision
97
 
        self.find_ghosts = find_ghosts
98
91
        if pb is None:
99
92
            self.pb = bzrlib.ui.ui_factory.nested_progress_bar()
100
93
            self.nested_pb = self.pb
114
107
                else:
115
108
                    self.to_repository.commit_write_group()
116
109
            finally:
117
 
                try:
118
 
                    if self.nested_pb is not None:
119
 
                        self.nested_pb.finished()
120
 
                finally:
121
 
                    self.to_repository.unlock()
 
110
                if self.nested_pb is not None:
 
111
                    self.nested_pb.finished()
 
112
                self.to_repository.unlock()
122
113
        finally:
123
114
            self.from_repository.unlock()
124
115
 
128
119
        This initialises all the needed variables, and then fetches the 
129
120
        requested revisions, finally clearing the progress bar.
130
121
        """
 
122
        self.to_weaves = self.to_repository.weave_store
 
123
        self.from_weaves = self.from_repository.weave_store
131
124
        self.count_total = 0
132
125
        self.file_ids_names = {}
133
126
        pp = ProgressPhase('Transferring', 4, self.pb)
134
127
        try:
135
128
            pp.next_phase()
136
 
            search = self._revids_to_fetch()
137
 
            if search is None:
 
129
            revs = self._revids_to_fetch()
 
130
            if revs is None:
138
131
                return
139
 
            if getattr(self, '_fetch_everything_for_search', None) is not None:
140
 
                self._fetch_everything_for_search(search, pp)
141
 
            else:
142
 
                # backward compatibility
143
 
                self._fetch_everything_for_revisions(search.get_keys, pp)
 
132
            self._fetch_everything_for_revisions(revs, pp)
144
133
        finally:
145
134
            self.pb.clear()
146
135
 
147
 
    def _fetch_everything_for_search(self, search, pp):
 
136
    def _fetch_everything_for_revisions(self, revs, pp):
148
137
        """Fetch all data for the given set of revisions."""
149
138
        # The first phase is "file".  We pass the progress bar for it directly
150
139
        # into item_keys_introduced_by, which has more information about how
157
146
        phase = 'file'
158
147
        pb = bzrlib.ui.ui_factory.nested_progress_bar()
159
148
        try:
160
 
            revs = search.get_keys()
161
 
            graph = self.from_repository.get_graph()
162
 
            revs = list(graph.iter_topo_order(revs))
163
 
            data_to_fetch = self.from_repository.item_keys_introduced_by(revs,
164
 
                                                                         pb)
165
 
            text_keys = []
 
149
            data_to_fetch = self.from_repository.item_keys_introduced_by(revs, pb)
166
150
            for knit_kind, file_id, revisions in data_to_fetch:
167
151
                if knit_kind != phase:
168
152
                    phase = knit_kind
171
155
                    pp.next_phase()
172
156
                    pb = bzrlib.ui.ui_factory.nested_progress_bar()
173
157
                if knit_kind == "file":
174
 
                    # Accumulate file texts
175
 
                    text_keys.extend([(file_id, revision) for revision in
176
 
                        revisions])
 
158
                    self._fetch_weave_text(file_id, revisions)
177
159
                elif knit_kind == "inventory":
178
 
                    # Now copy the file texts.
179
 
                    to_texts = self.to_repository.texts
180
 
                    from_texts = self.from_repository.texts
181
 
                    to_texts.insert_record_stream(from_texts.get_record_stream(
182
 
                        text_keys, self.to_repository._fetch_order,
183
 
                        not self.to_repository._fetch_uses_deltas))
184
 
                    # Cause an error if a text occurs after we have done the
185
 
                    # copy.
186
 
                    text_keys = None
187
 
                    # Before we process the inventory we generate the root
188
 
                    # texts (if necessary) so that the inventories references
189
 
                    # will be valid.
 
160
                    # XXX:
 
161
                    # Once we've processed all the files, then we generate the root
 
162
                    # texts (if necessary), then we process the inventory.  It's a
 
163
                    # bit distasteful to have knit_kind == "inventory" mean this,
 
164
                    # perhaps it should happen on the first non-"file" knit, in case
 
165
                    # it's not always inventory?
190
166
                    self._generate_root_texts(revs)
191
 
                    # NB: This currently reopens the inventory weave in source;
192
 
                    # using a single stream interface instead would avoid this.
193
167
                    self._fetch_inventory_weave(revs, pb)
194
168
                elif knit_kind == "signatures":
195
169
                    # Nothing to do here; this will be taken care of when
199
173
                    self._fetch_revision_texts(revs, pb)
200
174
                else:
201
175
                    raise AssertionError("Unknown knit kind %r" % knit_kind)
202
 
            if self.to_repository._fetch_reconcile:
203
 
                self.to_repository.reconcile()
204
176
        finally:
205
177
            if pb is not None:
206
178
                pb.finished()
219
191
        if (self._last_revision is not None and
220
192
            self.to_repository.has_revision(self._last_revision)):
221
193
            return None
 
194
            
222
195
        try:
223
 
            return self.to_repository.search_missing_revision_ids(
224
 
                self.from_repository, self._last_revision,
225
 
                find_ghosts=self.find_ghosts)
226
 
        except errors.NoSuchRevision, e:
 
196
            # XXX: this gets the full graph on both sides, and will make sure
 
197
            # that ghosts are filled whether or not you care about them.
 
198
            return self.to_repository.missing_revision_ids(self.from_repository,
 
199
                                                           self._last_revision)
 
200
        except errors.NoSuchRevision:
227
201
            raise InstallFailed([self._last_revision])
228
202
 
 
203
    def _fetch_weave_text(self, file_id, required_versions):
 
204
        to_weave = self.to_weaves.get_weave_or_empty(file_id,
 
205
            self.to_repository.get_transaction())
 
206
        from_weave = self.from_weaves.get_weave(file_id,
 
207
            self.from_repository.get_transaction())
 
208
        # we fetch all the texts, because texts do
 
209
        # not reference anything, and its cheap enough
 
210
        to_weave.join(from_weave, version_ids=required_versions)
 
211
        # we don't need *all* of this data anymore, but we dont know
 
212
        # what we do. This cache clearing will result in a new read 
 
213
        # of the knit data when we do the checkout, but probably we
 
214
        # want to emit the needed data on the fly rather than at the
 
215
        # end anyhow.
 
216
        # the from weave should know not to cache data being joined,
 
217
        # but its ok to ask it to clear.
 
218
        from_weave.clear_cache()
 
219
        to_weave.clear_cache()
 
220
 
229
221
    def _fetch_inventory_weave(self, revs, pb):
230
222
        pb.update("fetch inventory", 0, 2)
231
 
        to_weave = self.to_repository.inventories
 
223
        to_weave = self.to_repository.get_inventory_weave()
232
224
        child_pb = bzrlib.ui.ui_factory.nested_progress_bar()
233
225
        try:
234
226
            # just merge, this is optimisable and its means we don't
235
227
            # copy unreferenced data such as not-needed inventories.
236
228
            pb.update("fetch inventory", 1, 3)
237
 
            from_weave = self.from_repository.inventories
 
229
            from_weave = self.from_repository.get_inventory_weave()
238
230
            pb.update("fetch inventory", 2, 3)
239
231
            # we fetch only the referenced inventories because we do not
240
232
            # know for unselected inventories whether all their required
241
233
            # texts are present in the other repository - it could be
242
234
            # corrupt.
243
 
            to_weave.insert_record_stream(from_weave.get_record_stream(
244
 
                [(rev_id,) for rev_id in revs],
245
 
                self.to_repository._fetch_order,
246
 
                not self.to_repository._fetch_uses_deltas))
 
235
            to_weave.join(from_weave, pb=child_pb, msg='merge inventory',
 
236
                          version_ids=revs)
 
237
            from_weave.clear_cache()
247
238
        finally:
248
239
            child_pb.finished()
249
240
 
250
 
    def _fetch_revision_texts(self, revs, pb):
251
 
        # may need to be a InterRevisionStore call here.
252
 
        to_sf = self.to_repository.signatures
253
 
        from_sf = self.from_repository.signatures
254
 
        # A missing signature is just skipped.
255
 
        to_sf.insert_record_stream(filter_absent(from_sf.get_record_stream(
256
 
            [(rev_id,) for rev_id in revs],
257
 
            self.to_repository._fetch_order,
258
 
            not self.to_repository._fetch_uses_deltas)))
259
 
        self._fetch_just_revision_texts(revs)
260
 
 
261
 
    def _fetch_just_revision_texts(self, version_ids):
262
 
        to_rf = self.to_repository.revisions
263
 
        from_rf = self.from_repository.revisions
264
 
        to_rf.insert_record_stream(from_rf.get_record_stream(
265
 
            [(rev_id,) for rev_id in version_ids],
266
 
            self.to_repository._fetch_order,
267
 
            not self.to_repository._fetch_uses_deltas))
268
 
 
269
241
    def _generate_root_texts(self, revs):
270
242
        """This will be called by __fetch between fetching weave texts and
271
243
        fetching the inventory weave.
276
248
        pass
277
249
 
278
250
 
 
251
class GenericRepoFetcher(RepoFetcher):
 
252
    """This is a generic repo to repo fetcher.
 
253
 
 
254
    This makes minimal assumptions about repo layout and contents.
 
255
    It triggers a reconciliation after fetching to ensure integrity.
 
256
    """
 
257
 
 
258
    def _fetch_revision_texts(self, revs, pb):
 
259
        """Fetch revision object texts"""
 
260
        to_txn = self.to_transaction = self.to_repository.get_transaction()
 
261
        count = 0
 
262
        total = len(revs)
 
263
        to_store = self.to_repository._revision_store
 
264
        for rev in revs:
 
265
            pb.update('copying revisions', count, total)
 
266
            try:
 
267
                sig_text = self.from_repository.get_signature_text(rev)
 
268
                to_store.add_revision_signature_text(rev, sig_text, to_txn)
 
269
            except errors.NoSuchRevision:
 
270
                # not signed.
 
271
                pass
 
272
            to_store.add_revision(self.from_repository.get_revision(rev),
 
273
                                  to_txn)
 
274
            count += 1
 
275
        # fixup inventory if needed: 
 
276
        # this is expensive because we have no inverse index to current ghosts.
 
277
        # but on local disk its a few seconds and sftp push is already insane.
 
278
        # so we just-do-it.
 
279
        # FIXME: repository should inform if this is needed.
 
280
        self.to_repository.reconcile()
 
281
    
 
282
 
 
283
class KnitRepoFetcher(RepoFetcher):
 
284
    """This is a knit format repository specific fetcher.
 
285
 
 
286
    This differs from the GenericRepoFetcher by not doing a 
 
287
    reconciliation after copying, and using knit joining to
 
288
    copy revision texts.
 
289
    """
 
290
 
 
291
    def _fetch_revision_texts(self, revs, pb):
 
292
        # may need to be a InterRevisionStore call here.
 
293
        from_transaction = self.from_repository.get_transaction()
 
294
        to_transaction = self.to_repository.get_transaction()
 
295
        to_sf = self.to_repository._revision_store.get_signature_file(
 
296
            to_transaction)
 
297
        from_sf = self.from_repository._revision_store.get_signature_file(
 
298
            from_transaction)
 
299
        to_sf.join(from_sf, version_ids=revs, ignore_missing=True)
 
300
        to_rf = self.to_repository._revision_store.get_revision_file(
 
301
            to_transaction)
 
302
        from_rf = self.from_repository._revision_store.get_revision_file(
 
303
            from_transaction)
 
304
        to_rf.join(from_rf, version_ids=revs)
 
305
 
 
306
 
279
307
class Inter1and2Helper(object):
280
308
    """Helper for operations that convert data from model 1 and 2
281
309
    
301
329
 
302
330
        :param revs: A list of revision ids
303
331
        """
304
 
        # In case that revs is not a list.
305
 
        revs = list(revs)
306
332
        while revs:
307
333
            for tree in self.source.revision_trees(revs[:100]):
308
334
                if tree.inventory.revision_id is None:
310
336
                yield tree
311
337
            revs = revs[100:]
312
338
 
313
 
    def _find_root_ids(self, revs, parent_map, graph):
314
 
        revision_root = {}
315
 
        planned_versions = {}
316
 
        for tree in self.iter_rev_trees(revs):
317
 
            revision_id = tree.inventory.root.revision
318
 
            root_id = tree.get_root_id()
319
 
            planned_versions.setdefault(root_id, []).append(revision_id)
320
 
            revision_root[revision_id] = root_id
321
 
        # Find out which parents we don't already know root ids for
322
 
        parents = set()
323
 
        for revision_parents in parent_map.itervalues():
324
 
            parents.update(revision_parents)
325
 
        parents.difference_update(revision_root.keys() + [NULL_REVISION])
326
 
        # Limit to revisions present in the versionedfile
327
 
        parents = graph.get_parent_map(parents).keys()
328
 
        for tree in self.iter_rev_trees(parents):
329
 
            root_id = tree.get_root_id()
330
 
            revision_root[tree.get_revision_id()] = root_id
331
 
        return revision_root, planned_versions
332
 
 
333
339
    def generate_root_texts(self, revs):
334
340
        """Generate VersionedFiles for all root ids.
335
 
 
 
341
        
336
342
        :param revs: the revisions to include
337
343
        """
338
 
        to_texts = self.target.texts
339
 
        graph = self.source.get_graph()
340
 
        parent_map = graph.get_parent_map(revs)
341
 
        rev_order = topo_sort(parent_map)
342
 
        rev_id_to_root_id, root_id_to_rev_ids = self._find_root_ids(
343
 
            revs, parent_map, graph)
344
 
        root_id_order = [(rev_id_to_root_id[rev_id], rev_id) for rev_id in
345
 
            rev_order]
346
 
        # Guaranteed stable, this groups all the file id operations together
347
 
        # retaining topological order within the revisions of a file id.
348
 
        # File id splits and joins would invalidate this, but they don't exist
349
 
        # yet, and are unlikely to in non-rich-root environments anyway.
350
 
        root_id_order.sort(key=operator.itemgetter(0))
351
 
        # Create a record stream containing the roots to create.
352
 
        def yield_roots():
353
 
            for key in root_id_order:
354
 
                root_id, rev_id = key
355
 
                rev_parents = parent_map[rev_id]
356
 
                # We drop revision parents with different file-ids, because
357
 
                # that represents a rename of the root to a different location
358
 
                # - its not actually a parent for us. (We could look for that
359
 
                # file id in the revision tree at considerably more expense,
360
 
                # but for now this is sufficient (and reconcile will catch and
361
 
                # correct this anyway).
362
 
                # When a parent revision is a ghost, we guess that its root id
363
 
                # was unchanged (rather than trimming it from the parent list).
364
 
                parent_keys = tuple((root_id, parent) for parent in rev_parents
365
 
                    if parent != NULL_REVISION and
366
 
                        rev_id_to_root_id.get(parent, root_id) == root_id)
367
 
                yield FulltextContentFactory(key, parent_keys, None, '')
368
 
        to_texts.insert_record_stream(yield_roots())
 
344
        inventory_weave = self.source.get_inventory_weave()
 
345
        parent_texts = {}
 
346
        versionedfile = {}
 
347
        to_store = self.target.weave_store
 
348
        for tree in self.iter_rev_trees(revs):
 
349
            revision_id = tree.inventory.root.revision
 
350
            root_id = tree.get_root_id()
 
351
            parents = inventory_weave.get_parents(revision_id)
 
352
            if root_id not in versionedfile:
 
353
                versionedfile[root_id] = to_store.get_weave_or_empty(root_id, 
 
354
                    self.target.get_transaction())
 
355
            _, _, parent_texts[root_id] = versionedfile[root_id].add_lines(
 
356
                revision_id, parents, [], parent_texts)
369
357
 
370
358
    def regenerate_inventory(self, revs):
371
359
        """Generate a new inventory versionedfile in target, convertin data.
374
362
        stored in the target (reserializing it in a different format).
375
363
        :param revs: The revisions to include
376
364
        """
 
365
        inventory_weave = self.source.get_inventory_weave()
377
366
        for tree in self.iter_rev_trees(revs):
378
 
            parents = tree.get_parent_ids()
 
367
            parents = inventory_weave.get_parents(tree.get_revision_id())
379
368
            self.target.add_inventory(tree.get_revision_id(), tree.inventory,
380
369
                                      parents)
381
370
 
382
 
    def fetch_revisions(self, revision_ids):
383
 
        for revision in self.source.get_revisions(revision_ids):
384
 
            self.target.add_revision(revision.revision_id, revision)
385
 
 
386
 
 
387
 
class Model1toKnit2Fetcher(RepoFetcher):
 
371
 
 
372
class Model1toKnit2Fetcher(GenericRepoFetcher):
388
373
    """Fetch from a Model1 repository into a Knit2 repository
389
374
    """
390
 
    def __init__(self, to_repository, from_repository, last_revision=None,
391
 
                 pb=None, find_ghosts=True):
 
375
    def __init__(self, to_repository, from_repository, last_revision=None, 
 
376
                 pb=None):
392
377
        self.helper = Inter1and2Helper(from_repository, to_repository)
393
 
        RepoFetcher.__init__(self, to_repository, from_repository,
394
 
            last_revision, pb, find_ghosts)
 
378
        GenericRepoFetcher.__init__(self, to_repository, from_repository,
 
379
                                    last_revision, pb)
395
380
 
396
381
    def _generate_root_texts(self, revs):
397
382
        self.helper.generate_root_texts(revs)
398
383
 
399
384
    def _fetch_inventory_weave(self, revs, pb):
400
385
        self.helper.regenerate_inventory(revs)
401
 
 
402
 
    def _fetch_revision_texts(self, revs, pb):
403
 
        """Fetch revision object texts"""
404
 
        count = 0
405
 
        total = len(revs)
406
 
        for rev in revs:
407
 
            pb.update('copying revisions', count, total)
408
 
            try:
409
 
                sig_text = self.from_repository.get_signature_text(rev)
410
 
                self.to_repository.add_signature_text(rev, sig_text)
411
 
            except errors.NoSuchRevision:
412
 
                # not signed.
413
 
                pass
414
 
            self._copy_revision(rev)
415
 
            count += 1
416
 
 
417
 
    def _copy_revision(self, rev):
418
 
        self.helper.fetch_revisions([rev])
419
 
 
420
 
 
421
 
class Knit1to2Fetcher(RepoFetcher):
 
386
 
 
387
 
 
388
class Knit1to2Fetcher(KnitRepoFetcher):
422
389
    """Fetch from a Knit1 repository into a Knit2 repository"""
423
390
 
424
 
    def __init__(self, to_repository, from_repository, last_revision=None,
425
 
                 pb=None, find_ghosts=True):
 
391
    def __init__(self, to_repository, from_repository, last_revision=None, 
 
392
                 pb=None):
426
393
        self.helper = Inter1and2Helper(from_repository, to_repository)
427
 
        RepoFetcher.__init__(self, to_repository, from_repository,
428
 
            last_revision, pb, find_ghosts)
 
394
        KnitRepoFetcher.__init__(self, to_repository, from_repository,
 
395
                                 last_revision, pb)
429
396
 
430
397
    def _generate_root_texts(self, revs):
431
398
        self.helper.generate_root_texts(revs)
433
400
    def _fetch_inventory_weave(self, revs, pb):
434
401
        self.helper.regenerate_inventory(revs)
435
402
 
436
 
    def _fetch_just_revision_texts(self, version_ids):
437
 
        self.helper.fetch_revisions(version_ids)
 
403
 
 
404
class RemoteToOtherFetcher(GenericRepoFetcher):
 
405
 
 
406
    def _fetch_everything_for_revisions(self, revs, pp):
 
407
        data_stream = self.from_repository.get_data_stream(revs)
 
408
        self.to_repository.insert_data_stream(data_stream)
 
409
 
 
410