~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/vf_repository.py

  • Committer: Jelmer Vernooij
  • Date: 2011-05-02 15:29:58 UTC
  • mto: This revision was merged to the branch mainline in revision 5844.
  • Revision ID: jelmer@samba.org-20110502152958-hx8vedal093rh64t
Split versionedfile-specific stuff out into VersionedFileRepository.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005-2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Repository formats built around versioned files."""
 
18
 
 
19
 
 
20
from bzrlib.lazy_import import lazy_import
 
21
lazy_import(globals(), """
 
22
from bzrlib import (
 
23
    debug,
 
24
    fetch as _mod_fetch,
 
25
    fifo_cache,
 
26
    gpg,
 
27
    graph,
 
28
    inventory_delta,
 
29
    lru_cache,
 
30
    osutils,
 
31
    revision as _mod_revision,
 
32
    serializer as _mod_serializer,
 
33
    static_tuple,
 
34
    tsort,
 
35
    ui,
 
36
    versionedfile,
 
37
    )
 
38
 
 
39
from bzrlib.recordcounter import RecordCounter
 
40
from bzrlib.revisiontree import InventoryRevisionTree
 
41
from bzrlib.testament import Testament
 
42
""")
 
43
 
 
44
from bzrlib import (
 
45
    errors,
 
46
    )
 
47
from bzrlib.decorators import (
 
48
    needs_read_lock,
 
49
    needs_write_lock,
 
50
    only_raises,
 
51
    )
 
52
from bzrlib.inventory import (
 
53
    Inventory,
 
54
    )
 
55
 
 
56
from bzrlib.repository import (
 
57
    InterRepository,
 
58
    MetaDirRepository,
 
59
    Repository,
 
60
    )
 
61
 
 
62
from bzrlib.trace import (
 
63
    mutter,
 
64
    )
 
65
 
 
66
 
 
67
class VersionedFileRepository(Repository):
 
68
    """Repository holding history for one or more branches.
 
69
 
 
70
    The repository holds and retrieves historical information including
 
71
    revisions and file history.  It's normally accessed only by the Branch,
 
72
    which views a particular line of development through that history.
 
73
 
 
74
    The Repository builds on top of some byte storage facilies (the revisions,
 
75
    signatures, inventories, texts and chk_bytes attributes) and a Transport,
 
76
    which respectively provide byte storage and a means to access the (possibly
 
77
    remote) disk.
 
78
 
 
79
    The byte storage facilities are addressed via tuples, which we refer to
 
80
    as 'keys' throughout the code base. Revision_keys, inventory_keys and
 
81
    signature_keys are all 1-tuples: (revision_id,). text_keys are two-tuples:
 
82
    (file_id, revision_id). chk_bytes uses CHK keys - a 1-tuple with a single
 
83
    byte string made up of a hash identifier and a hash value.
 
84
    We use this interface because it allows low friction with the underlying
 
85
    code that implements disk indices, network encoding and other parts of
 
86
    bzrlib.
 
87
 
 
88
    :ivar revisions: A bzrlib.versionedfile.VersionedFiles instance containing
 
89
        the serialised revisions for the repository. This can be used to obtain
 
90
        revision graph information or to access raw serialised revisions.
 
91
        The result of trying to insert data into the repository via this store
 
92
        is undefined: it should be considered read-only except for implementors
 
93
        of repositories.
 
94
    :ivar signatures: A bzrlib.versionedfile.VersionedFiles instance containing
 
95
        the serialised signatures for the repository. This can be used to
 
96
        obtain access to raw serialised signatures.  The result of trying to
 
97
        insert data into the repository via this store is undefined: it should
 
98
        be considered read-only except for implementors of repositories.
 
99
    :ivar inventories: A bzrlib.versionedfile.VersionedFiles instance containing
 
100
        the serialised inventories for the repository. This can be used to
 
101
        obtain unserialised inventories.  The result of trying to insert data
 
102
        into the repository via this store is undefined: it should be
 
103
        considered read-only except for implementors of repositories.
 
104
    :ivar texts: A bzrlib.versionedfile.VersionedFiles instance containing the
 
105
        texts of files and directories for the repository. This can be used to
 
106
        obtain file texts or file graphs. Note that Repository.iter_file_bytes
 
107
        is usually a better interface for accessing file texts.
 
108
        The result of trying to insert data into the repository via this store
 
109
        is undefined: it should be considered read-only except for implementors
 
110
        of repositories.
 
111
    :ivar chk_bytes: A bzrlib.versionedfile.VersionedFiles instance containing
 
112
        any data the repository chooses to store or have indexed by its hash.
 
113
        The result of trying to insert data into the repository via this store
 
114
        is undefined: it should be considered read-only except for implementors
 
115
        of repositories.
 
116
    :ivar _transport: Transport for file access to repository, typically
 
117
        pointing to .bzr/repository.
 
118
    """
 
119
 
 
120
    def add_fallback_repository(self, repository):
 
121
        """Add a repository to use for looking up data not held locally.
 
122
 
 
123
        :param repository: A repository.
 
124
        """
 
125
        if not self._format.supports_external_lookups:
 
126
            raise errors.UnstackableRepositoryFormat(self._format, self.base)
 
127
        if self.is_locked():
 
128
            # This repository will call fallback.unlock() when we transition to
 
129
            # the unlocked state, so we make sure to increment the lock count
 
130
            repository.lock_read()
 
131
        self._check_fallback_repository(repository)
 
132
        self._fallback_repositories.append(repository)
 
133
        self.texts.add_fallback_versioned_files(repository.texts)
 
134
        self.inventories.add_fallback_versioned_files(repository.inventories)
 
135
        self.revisions.add_fallback_versioned_files(repository.revisions)
 
136
        self.signatures.add_fallback_versioned_files(repository.signatures)
 
137
        if self.chk_bytes is not None:
 
138
            self.chk_bytes.add_fallback_versioned_files(repository.chk_bytes)
 
139
 
 
140
    @only_raises(errors.LockNotHeld, errors.LockBroken)
 
141
    def unlock(self):
 
142
        super(VersionedFileRepository, self).unlock()
 
143
        if self.control_files._lock_count == 0:
 
144
            self._inventory_entry_cache.clear()
 
145
 
 
146
    def add_inventory(self, revision_id, inv, parents):
 
147
        """Add the inventory inv to the repository as revision_id.
 
148
 
 
149
        :param parents: The revision ids of the parents that revision_id
 
150
                        is known to have and are in the repository already.
 
151
 
 
152
        :returns: The validator(which is a sha1 digest, though what is sha'd is
 
153
            repository format specific) of the serialized inventory.
 
154
        """
 
155
        if not self.is_in_write_group():
 
156
            raise AssertionError("%r not in write group" % (self,))
 
157
        _mod_revision.check_not_reserved_id(revision_id)
 
158
        if not (inv.revision_id is None or inv.revision_id == revision_id):
 
159
            raise AssertionError(
 
160
                "Mismatch between inventory revision"
 
161
                " id and insertion revid (%r, %r)"
 
162
                % (inv.revision_id, revision_id))
 
163
        if inv.root is None:
 
164
            raise errors.RootMissing()
 
165
        return self._add_inventory_checked(revision_id, inv, parents)
 
166
 
 
167
    def _add_inventory_checked(self, revision_id, inv, parents):
 
168
        """Add inv to the repository after checking the inputs.
 
169
 
 
170
        This function can be overridden to allow different inventory styles.
 
171
 
 
172
        :seealso: add_inventory, for the contract.
 
173
        """
 
174
        inv_lines = self._serializer.write_inventory_to_lines(inv)
 
175
        return self._inventory_add_lines(revision_id, parents,
 
176
            inv_lines, check_content=False)
 
177
 
 
178
    def add_inventory_by_delta(self, basis_revision_id, delta, new_revision_id,
 
179
                               parents, basis_inv=None, propagate_caches=False):
 
180
        """Add a new inventory expressed as a delta against another revision.
 
181
 
 
182
        See the inventory developers documentation for the theory behind
 
183
        inventory deltas.
 
184
 
 
185
        :param basis_revision_id: The inventory id the delta was created
 
186
            against. (This does not have to be a direct parent.)
 
187
        :param delta: The inventory delta (see Inventory.apply_delta for
 
188
            details).
 
189
        :param new_revision_id: The revision id that the inventory is being
 
190
            added for.
 
191
        :param parents: The revision ids of the parents that revision_id is
 
192
            known to have and are in the repository already. These are supplied
 
193
            for repositories that depend on the inventory graph for revision
 
194
            graph access, as well as for those that pun ancestry with delta
 
195
            compression.
 
196
        :param basis_inv: The basis inventory if it is already known,
 
197
            otherwise None.
 
198
        :param propagate_caches: If True, the caches for this inventory are
 
199
          copied to and updated for the result if possible.
 
200
 
 
201
        :returns: (validator, new_inv)
 
202
            The validator(which is a sha1 digest, though what is sha'd is
 
203
            repository format specific) of the serialized inventory, and the
 
204
            resulting inventory.
 
205
        """
 
206
        if not self.is_in_write_group():
 
207
            raise AssertionError("%r not in write group" % (self,))
 
208
        _mod_revision.check_not_reserved_id(new_revision_id)
 
209
        basis_tree = self.revision_tree(basis_revision_id)
 
210
        basis_tree.lock_read()
 
211
        try:
 
212
            # Note that this mutates the inventory of basis_tree, which not all
 
213
            # inventory implementations may support: A better idiom would be to
 
214
            # return a new inventory, but as there is no revision tree cache in
 
215
            # repository this is safe for now - RBC 20081013
 
216
            if basis_inv is None:
 
217
                basis_inv = basis_tree.inventory
 
218
            basis_inv.apply_delta(delta)
 
219
            basis_inv.revision_id = new_revision_id
 
220
            return (self.add_inventory(new_revision_id, basis_inv, parents),
 
221
                    basis_inv)
 
222
        finally:
 
223
            basis_tree.unlock()
 
224
 
 
225
    def _inventory_add_lines(self, revision_id, parents, lines,
 
226
        check_content=True):
 
227
        """Store lines in inv_vf and return the sha1 of the inventory."""
 
228
        parents = [(parent,) for parent in parents]
 
229
        result = self.inventories.add_lines((revision_id,), parents, lines,
 
230
            check_content=check_content)[0]
 
231
        self.inventories._access.flush()
 
232
        return result
 
233
 
 
234
    def add_revision(self, revision_id, rev, inv=None, config=None):
 
235
        """Add rev to the revision store as revision_id.
 
236
 
 
237
        :param revision_id: the revision id to use.
 
238
        :param rev: The revision object.
 
239
        :param inv: The inventory for the revision. if None, it will be looked
 
240
                    up in the inventory storer
 
241
        :param config: If None no digital signature will be created.
 
242
                       If supplied its signature_needed method will be used
 
243
                       to determine if a signature should be made.
 
244
        """
 
245
        # TODO: jam 20070210 Shouldn't we check rev.revision_id and
 
246
        #       rev.parent_ids?
 
247
        _mod_revision.check_not_reserved_id(revision_id)
 
248
        if config is not None and config.signature_needed():
 
249
            if inv is None:
 
250
                inv = self.get_inventory(revision_id)
 
251
            tree = InventoryRevisionTree(self, inv, revision_id)
 
252
            testament = Testament(rev, tree)
 
253
            plaintext = testament.as_short_text()
 
254
            self.store_revision_signature(
 
255
                gpg.GPGStrategy(config), plaintext, revision_id)
 
256
        # check inventory present
 
257
        if not self.inventories.get_parent_map([(revision_id,)]):
 
258
            if inv is None:
 
259
                raise errors.WeaveRevisionNotPresent(revision_id,
 
260
                                                     self.inventories)
 
261
            else:
 
262
                # yes, this is not suitable for adding with ghosts.
 
263
                rev.inventory_sha1 = self.add_inventory(revision_id, inv,
 
264
                                                        rev.parent_ids)
 
265
        else:
 
266
            key = (revision_id,)
 
267
            rev.inventory_sha1 = self.inventories.get_sha1s([key])[key]
 
268
        self._add_revision(rev)
 
269
 
 
270
    def _add_revision(self, revision):
 
271
        text = self._serializer.write_revision_to_string(revision)
 
272
        key = (revision.revision_id,)
 
273
        parents = tuple((parent,) for parent in revision.parent_ids)
 
274
        self.revisions.add_lines(key, parents, osutils.split_lines(text))
 
275
 
 
276
    def _check_inventories(self, checker):
 
277
        """Check the inventories found from the revision scan.
 
278
        
 
279
        This is responsible for verifying the sha1 of inventories and
 
280
        creating a pending_keys set that covers data referenced by inventories.
 
281
        """
 
282
        bar = ui.ui_factory.nested_progress_bar()
 
283
        try:
 
284
            self._do_check_inventories(checker, bar)
 
285
        finally:
 
286
            bar.finished()
 
287
 
 
288
    def _do_check_inventories(self, checker, bar):
 
289
        """Helper for _check_inventories."""
 
290
        revno = 0
 
291
        keys = {'chk_bytes':set(), 'inventories':set(), 'texts':set()}
 
292
        kinds = ['chk_bytes', 'texts']
 
293
        count = len(checker.pending_keys)
 
294
        bar.update("inventories", 0, 2)
 
295
        current_keys = checker.pending_keys
 
296
        checker.pending_keys = {}
 
297
        # Accumulate current checks.
 
298
        for key in current_keys:
 
299
            if key[0] != 'inventories' and key[0] not in kinds:
 
300
                checker._report_items.append('unknown key type %r' % (key,))
 
301
            keys[key[0]].add(key[1:])
 
302
        if keys['inventories']:
 
303
            # NB: output order *should* be roughly sorted - topo or
 
304
            # inverse topo depending on repository - either way decent
 
305
            # to just delta against. However, pre-CHK formats didn't
 
306
            # try to optimise inventory layout on disk. As such the
 
307
            # pre-CHK code path does not use inventory deltas.
 
308
            last_object = None
 
309
            for record in self.inventories.check(keys=keys['inventories']):
 
310
                if record.storage_kind == 'absent':
 
311
                    checker._report_items.append(
 
312
                        'Missing inventory {%s}' % (record.key,))
 
313
                else:
 
314
                    last_object = self._check_record('inventories', record,
 
315
                        checker, last_object,
 
316
                        current_keys[('inventories',) + record.key])
 
317
            del keys['inventories']
 
318
        else:
 
319
            return
 
320
        bar.update("texts", 1)
 
321
        while (checker.pending_keys or keys['chk_bytes']
 
322
            or keys['texts']):
 
323
            # Something to check.
 
324
            current_keys = checker.pending_keys
 
325
            checker.pending_keys = {}
 
326
            # Accumulate current checks.
 
327
            for key in current_keys:
 
328
                if key[0] not in kinds:
 
329
                    checker._report_items.append('unknown key type %r' % (key,))
 
330
                keys[key[0]].add(key[1:])
 
331
            # Check the outermost kind only - inventories || chk_bytes || texts
 
332
            for kind in kinds:
 
333
                if keys[kind]:
 
334
                    last_object = None
 
335
                    for record in getattr(self, kind).check(keys=keys[kind]):
 
336
                        if record.storage_kind == 'absent':
 
337
                            checker._report_items.append(
 
338
                                'Missing %s {%s}' % (kind, record.key,))
 
339
                        else:
 
340
                            last_object = self._check_record(kind, record,
 
341
                                checker, last_object, current_keys[(kind,) + record.key])
 
342
                    keys[kind] = set()
 
343
                    break
 
344
 
 
345
    def _check_record(self, kind, record, checker, last_object, item_data):
 
346
        """Check a single text from this repository."""
 
347
        if kind == 'inventories':
 
348
            rev_id = record.key[0]
 
349
            inv = self._deserialise_inventory(rev_id,
 
350
                record.get_bytes_as('fulltext'))
 
351
            if last_object is not None:
 
352
                delta = inv._make_delta(last_object)
 
353
                for old_path, path, file_id, ie in delta:
 
354
                    if ie is None:
 
355
                        continue
 
356
                    ie.check(checker, rev_id, inv)
 
357
            else:
 
358
                for path, ie in inv.iter_entries():
 
359
                    ie.check(checker, rev_id, inv)
 
360
            if self._format.fast_deltas:
 
361
                return inv
 
362
        elif kind == 'chk_bytes':
 
363
            # No code written to check chk_bytes for this repo format.
 
364
            checker._report_items.append(
 
365
                'unsupported key type chk_bytes for %s' % (record.key,))
 
366
        elif kind == 'texts':
 
367
            self._check_text(record, checker, item_data)
 
368
        else:
 
369
            checker._report_items.append(
 
370
                'unknown key type %s for %s' % (kind, record.key))
 
371
 
 
372
    def _check_text(self, record, checker, item_data):
 
373
        """Check a single text."""
 
374
        # Check it is extractable.
 
375
        # TODO: check length.
 
376
        if record.storage_kind == 'chunked':
 
377
            chunks = record.get_bytes_as(record.storage_kind)
 
378
            sha1 = osutils.sha_strings(chunks)
 
379
            length = sum(map(len, chunks))
 
380
        else:
 
381
            content = record.get_bytes_as('fulltext')
 
382
            sha1 = osutils.sha_string(content)
 
383
            length = len(content)
 
384
        if item_data and sha1 != item_data[1]:
 
385
            checker._report_items.append(
 
386
                'sha1 mismatch: %s has sha1 %s expected %s referenced by %s' %
 
387
                (record.key, sha1, item_data[1], item_data[2]))
 
388
 
 
389
    def __init__(self, _format, a_bzrdir, control_files):
 
390
        """Instantiate a VersionedFileRepository.
 
391
 
 
392
        :param _format: The format of the repository on disk.
 
393
        :param a_bzrdir: The BzrDir of the repository.
 
394
        :param control_files: Control files to use for locking, etc.
 
395
        """
 
396
        # In the future we will have a single api for all stores for
 
397
        # getting file texts, inventories and revisions, then
 
398
        # this construct will accept instances of those things.
 
399
        super(VersionedFileRepository, self).__init__(_format, a_bzrdir,
 
400
            control_files)
 
401
        # for tests
 
402
        self._reconcile_does_inventory_gc = True
 
403
        self._reconcile_fixes_text_parents = False
 
404
        self._reconcile_backsup_inventory = True
 
405
        # An InventoryEntry cache, used during deserialization
 
406
        self._inventory_entry_cache = fifo_cache.FIFOCache(10*1024)
 
407
        # Is it safe to return inventory entries directly from the entry cache,
 
408
        # rather copying them?
 
409
        self._safe_to_return_from_cache = False
 
410
 
 
411
    def get_missing_parent_inventories(self, check_for_missing_texts=True):
 
412
        """Return the keys of missing inventory parents for revisions added in
 
413
        this write group.
 
414
 
 
415
        A revision is not complete if the inventory delta for that revision
 
416
        cannot be calculated.  Therefore if the parent inventories of a
 
417
        revision are not present, the revision is incomplete, and e.g. cannot
 
418
        be streamed by a smart server.  This method finds missing inventory
 
419
        parents for revisions added in this write group.
 
420
        """
 
421
        if not self._format.supports_external_lookups:
 
422
            # This is only an issue for stacked repositories
 
423
            return set()
 
424
        if not self.is_in_write_group():
 
425
            raise AssertionError('not in a write group')
 
426
 
 
427
        # XXX: We assume that every added revision already has its
 
428
        # corresponding inventory, so we only check for parent inventories that
 
429
        # might be missing, rather than all inventories.
 
430
        parents = set(self.revisions._index.get_missing_parents())
 
431
        parents.discard(_mod_revision.NULL_REVISION)
 
432
        unstacked_inventories = self.inventories._index
 
433
        present_inventories = unstacked_inventories.get_parent_map(
 
434
            key[-1:] for key in parents)
 
435
        parents.difference_update(present_inventories)
 
436
        if len(parents) == 0:
 
437
            # No missing parent inventories.
 
438
            return set()
 
439
        if not check_for_missing_texts:
 
440
            return set(('inventories', rev_id) for (rev_id,) in parents)
 
441
        # Ok, now we have a list of missing inventories.  But these only matter
 
442
        # if the inventories that reference them are missing some texts they
 
443
        # appear to introduce.
 
444
        # XXX: Texts referenced by all added inventories need to be present,
 
445
        # but at the moment we're only checking for texts referenced by
 
446
        # inventories at the graph's edge.
 
447
        key_deps = self.revisions._index._key_dependencies
 
448
        key_deps.satisfy_refs_for_keys(present_inventories)
 
449
        referrers = frozenset(r[0] for r in key_deps.get_referrers())
 
450
        file_ids = self.fileids_altered_by_revision_ids(referrers)
 
451
        missing_texts = set()
 
452
        for file_id, version_ids in file_ids.iteritems():
 
453
            missing_texts.update(
 
454
                (file_id, version_id) for version_id in version_ids)
 
455
        present_texts = self.texts.get_parent_map(missing_texts)
 
456
        missing_texts.difference_update(present_texts)
 
457
        if not missing_texts:
 
458
            # No texts are missing, so all revisions and their deltas are
 
459
            # reconstructable.
 
460
            return set()
 
461
        # Alternatively the text versions could be returned as the missing
 
462
        # keys, but this is likely to be less data.
 
463
        missing_keys = set(('inventories', rev_id) for (rev_id,) in parents)
 
464
        return missing_keys
 
465
 
 
466
    @needs_read_lock
 
467
    def has_revisions(self, revision_ids):
 
468
        """Probe to find out the presence of multiple revisions.
 
469
 
 
470
        :param revision_ids: An iterable of revision_ids.
 
471
        :return: A set of the revision_ids that were present.
 
472
        """
 
473
        parent_map = self.revisions.get_parent_map(
 
474
            [(rev_id,) for rev_id in revision_ids])
 
475
        result = set()
 
476
        if _mod_revision.NULL_REVISION in revision_ids:
 
477
            result.add(_mod_revision.NULL_REVISION)
 
478
        result.update([key[0] for key in parent_map])
 
479
        return result
 
480
 
 
481
    @needs_read_lock
 
482
    def get_revision_reconcile(self, revision_id):
 
483
        """'reconcile' helper routine that allows access to a revision always.
 
484
 
 
485
        This variant of get_revision does not cross check the weave graph
 
486
        against the revision one as get_revision does: but it should only
 
487
        be used by reconcile, or reconcile-alike commands that are correcting
 
488
        or testing the revision graph.
 
489
        """
 
490
        return self._get_revisions([revision_id])[0]
 
491
 
 
492
    @needs_read_lock
 
493
    def get_revisions(self, revision_ids):
 
494
        """Get many revisions at once.
 
495
        
 
496
        Repositories that need to check data on every revision read should 
 
497
        subclass this method.
 
498
        """
 
499
        return self._get_revisions(revision_ids)
 
500
 
 
501
    @needs_read_lock
 
502
    def _get_revisions(self, revision_ids):
 
503
        """Core work logic to get many revisions without sanity checks."""
 
504
        revs = {}
 
505
        for revid, rev in self._iter_revisions(revision_ids):
 
506
            if rev is None:
 
507
                raise errors.NoSuchRevision(self, revid)
 
508
            revs[revid] = rev
 
509
        return [revs[revid] for revid in revision_ids]
 
510
 
 
511
    def _iter_revisions(self, revision_ids):
 
512
        """Iterate over revision objects.
 
513
 
 
514
        :param revision_ids: An iterable of revisions to examine. None may be
 
515
            passed to request all revisions known to the repository. Note that
 
516
            not all repositories can find unreferenced revisions; for those
 
517
            repositories only referenced ones will be returned.
 
518
        :return: An iterator of (revid, revision) tuples. Absent revisions (
 
519
            those asked for but not available) are returned as (revid, None).
 
520
        """
 
521
        if revision_ids is None:
 
522
            revision_ids = self.all_revision_ids()
 
523
        else:
 
524
            for rev_id in revision_ids:
 
525
                if not rev_id or not isinstance(rev_id, basestring):
 
526
                    raise errors.InvalidRevisionId(revision_id=rev_id, branch=self)
 
527
        keys = [(key,) for key in revision_ids]
 
528
        stream = self.revisions.get_record_stream(keys, 'unordered', True)
 
529
        for record in stream:
 
530
            revid = record.key[0]
 
531
            if record.storage_kind == 'absent':
 
532
                yield (revid, None)
 
533
            else:
 
534
                text = record.get_bytes_as('fulltext')
 
535
                rev = self._serializer.read_revision_from_string(text)
 
536
                yield (revid, rev)
 
537
 
 
538
    @needs_write_lock
 
539
    def add_signature_text(self, revision_id, signature):
 
540
        """Store a signature text for a revision.
 
541
 
 
542
        :param revision_id: Revision id of the revision
 
543
        :param signature: Signature text.
 
544
        """
 
545
        self.signatures.add_lines((revision_id,), (),
 
546
            osutils.split_lines(signature))
 
547
 
 
548
    def find_text_key_references(self):
 
549
        """Find the text key references within the repository.
 
550
 
 
551
        :return: A dictionary mapping text keys ((fileid, revision_id) tuples)
 
552
            to whether they were referred to by the inventory of the
 
553
            revision_id that they contain. The inventory texts from all present
 
554
            revision ids are assessed to generate this report.
 
555
        """
 
556
        revision_keys = self.revisions.keys()
 
557
        w = self.inventories
 
558
        pb = ui.ui_factory.nested_progress_bar()
 
559
        try:
 
560
            return self._serializer._find_text_key_references(
 
561
                w.iter_lines_added_or_present_in_keys(revision_keys, pb=pb))
 
562
        finally:
 
563
            pb.finished()
 
564
 
 
565
    def _inventory_xml_lines_for_keys(self, keys):
 
566
        """Get a line iterator of the sort needed for findind references.
 
567
 
 
568
        Not relevant for non-xml inventory repositories.
 
569
 
 
570
        Ghosts in revision_keys are ignored.
 
571
 
 
572
        :param revision_keys: The revision keys for the inventories to inspect.
 
573
        :return: An iterator over (inventory line, revid) for the fulltexts of
 
574
            all of the xml inventories specified by revision_keys.
 
575
        """
 
576
        stream = self.inventories.get_record_stream(keys, 'unordered', True)
 
577
        for record in stream:
 
578
            if record.storage_kind != 'absent':
 
579
                chunks = record.get_bytes_as('chunked')
 
580
                revid = record.key[-1]
 
581
                lines = osutils.chunks_to_lines(chunks)
 
582
                for line in lines:
 
583
                    yield line, revid
 
584
 
 
585
    def _find_file_ids_from_xml_inventory_lines(self, line_iterator,
 
586
        revision_keys):
 
587
        """Helper routine for fileids_altered_by_revision_ids.
 
588
 
 
589
        This performs the translation of xml lines to revision ids.
 
590
 
 
591
        :param line_iterator: An iterator of lines, origin_version_id
 
592
        :param revision_keys: The revision ids to filter for. This should be a
 
593
            set or other type which supports efficient __contains__ lookups, as
 
594
            the revision key from each parsed line will be looked up in the
 
595
            revision_keys filter.
 
596
        :return: a dictionary mapping altered file-ids to an iterable of
 
597
        revision_ids. Each altered file-ids has the exact revision_ids that
 
598
        altered it listed explicitly.
 
599
        """
 
600
        seen = set(self._serializer._find_text_key_references(
 
601
                line_iterator).iterkeys())
 
602
        parent_keys = self._find_parent_keys_of_revisions(revision_keys)
 
603
        parent_seen = set(self._serializer._find_text_key_references(
 
604
            self._inventory_xml_lines_for_keys(parent_keys)))
 
605
        new_keys = seen - parent_seen
 
606
        result = {}
 
607
        setdefault = result.setdefault
 
608
        for key in new_keys:
 
609
            setdefault(key[0], set()).add(key[-1])
 
610
        return result
 
611
 
 
612
    def _find_parent_keys_of_revisions(self, revision_keys):
 
613
        """Similar to _find_parent_ids_of_revisions, but used with keys.
 
614
 
 
615
        :param revision_keys: An iterable of revision_keys.
 
616
        :return: The parents of all revision_keys that are not already in
 
617
            revision_keys
 
618
        """
 
619
        parent_map = self.revisions.get_parent_map(revision_keys)
 
620
        parent_keys = set()
 
621
        map(parent_keys.update, parent_map.itervalues())
 
622
        parent_keys.difference_update(revision_keys)
 
623
        parent_keys.discard(_mod_revision.NULL_REVISION)
 
624
        return parent_keys
 
625
 
 
626
    def fileids_altered_by_revision_ids(self, revision_ids, _inv_weave=None):
 
627
        """Find the file ids and versions affected by revisions.
 
628
 
 
629
        :param revisions: an iterable containing revision ids.
 
630
        :param _inv_weave: The inventory weave from this repository or None.
 
631
            If None, the inventory weave will be opened automatically.
 
632
        :return: a dictionary mapping altered file-ids to an iterable of
 
633
        revision_ids. Each altered file-ids has the exact revision_ids that
 
634
        altered it listed explicitly.
 
635
        """
 
636
        selected_keys = set((revid,) for revid in revision_ids)
 
637
        w = _inv_weave or self.inventories
 
638
        return self._find_file_ids_from_xml_inventory_lines(
 
639
            w.iter_lines_added_or_present_in_keys(
 
640
                selected_keys, pb=None),
 
641
            selected_keys)
 
642
 
 
643
    def iter_files_bytes(self, desired_files):
 
644
        """Iterate through file versions.
 
645
 
 
646
        Files will not necessarily be returned in the order they occur in
 
647
        desired_files.  No specific order is guaranteed.
 
648
 
 
649
        Yields pairs of identifier, bytes_iterator.  identifier is an opaque
 
650
        value supplied by the caller as part of desired_files.  It should
 
651
        uniquely identify the file version in the caller's context.  (Examples:
 
652
        an index number or a TreeTransform trans_id.)
 
653
 
 
654
        bytes_iterator is an iterable of bytestrings for the file.  The
 
655
        kind of iterable and length of the bytestrings are unspecified, but for
 
656
        this implementation, it is a list of bytes produced by
 
657
        VersionedFile.get_record_stream().
 
658
 
 
659
        :param desired_files: a list of (file_id, revision_id, identifier)
 
660
            triples
 
661
        """
 
662
        text_keys = {}
 
663
        for file_id, revision_id, callable_data in desired_files:
 
664
            text_keys[(file_id, revision_id)] = callable_data
 
665
        for record in self.texts.get_record_stream(text_keys, 'unordered', True):
 
666
            if record.storage_kind == 'absent':
 
667
                raise errors.RevisionNotPresent(record.key, self)
 
668
            yield text_keys[record.key], record.get_bytes_as('chunked')
 
669
 
 
670
    def _generate_text_key_index(self, text_key_references=None,
 
671
        ancestors=None):
 
672
        """Generate a new text key index for the repository.
 
673
 
 
674
        This is an expensive function that will take considerable time to run.
 
675
 
 
676
        :return: A dict mapping text keys ((file_id, revision_id) tuples) to a
 
677
            list of parents, also text keys. When a given key has no parents,
 
678
            the parents list will be [NULL_REVISION].
 
679
        """
 
680
        # All revisions, to find inventory parents.
 
681
        if ancestors is None:
 
682
            graph = self.get_graph()
 
683
            ancestors = graph.get_parent_map(self.all_revision_ids())
 
684
        if text_key_references is None:
 
685
            text_key_references = self.find_text_key_references()
 
686
        pb = ui.ui_factory.nested_progress_bar()
 
687
        try:
 
688
            return self._do_generate_text_key_index(ancestors,
 
689
                text_key_references, pb)
 
690
        finally:
 
691
            pb.finished()
 
692
 
 
693
    def _do_generate_text_key_index(self, ancestors, text_key_references, pb):
 
694
        """Helper for _generate_text_key_index to avoid deep nesting."""
 
695
        revision_order = tsort.topo_sort(ancestors)
 
696
        invalid_keys = set()
 
697
        revision_keys = {}
 
698
        for revision_id in revision_order:
 
699
            revision_keys[revision_id] = set()
 
700
        text_count = len(text_key_references)
 
701
        # a cache of the text keys to allow reuse; costs a dict of all the
 
702
        # keys, but saves a 2-tuple for every child of a given key.
 
703
        text_key_cache = {}
 
704
        for text_key, valid in text_key_references.iteritems():
 
705
            if not valid:
 
706
                invalid_keys.add(text_key)
 
707
            else:
 
708
                revision_keys[text_key[1]].add(text_key)
 
709
            text_key_cache[text_key] = text_key
 
710
        del text_key_references
 
711
        text_index = {}
 
712
        text_graph = graph.Graph(graph.DictParentsProvider(text_index))
 
713
        NULL_REVISION = _mod_revision.NULL_REVISION
 
714
        # Set a cache with a size of 10 - this suffices for bzr.dev but may be
 
715
        # too small for large or very branchy trees. However, for 55K path
 
716
        # trees, it would be easy to use too much memory trivially. Ideally we
 
717
        # could gauge this by looking at available real memory etc, but this is
 
718
        # always a tricky proposition.
 
719
        inventory_cache = lru_cache.LRUCache(10)
 
720
        batch_size = 10 # should be ~150MB on a 55K path tree
 
721
        batch_count = len(revision_order) / batch_size + 1
 
722
        processed_texts = 0
 
723
        pb.update("Calculating text parents", processed_texts, text_count)
 
724
        for offset in xrange(batch_count):
 
725
            to_query = revision_order[offset * batch_size:(offset + 1) *
 
726
                batch_size]
 
727
            if not to_query:
 
728
                break
 
729
            for revision_id in to_query:
 
730
                parent_ids = ancestors[revision_id]
 
731
                for text_key in revision_keys[revision_id]:
 
732
                    pb.update("Calculating text parents", processed_texts)
 
733
                    processed_texts += 1
 
734
                    candidate_parents = []
 
735
                    for parent_id in parent_ids:
 
736
                        parent_text_key = (text_key[0], parent_id)
 
737
                        try:
 
738
                            check_parent = parent_text_key not in \
 
739
                                revision_keys[parent_id]
 
740
                        except KeyError:
 
741
                            # the parent parent_id is a ghost:
 
742
                            check_parent = False
 
743
                            # truncate the derived graph against this ghost.
 
744
                            parent_text_key = None
 
745
                        if check_parent:
 
746
                            # look at the parent commit details inventories to
 
747
                            # determine possible candidates in the per file graph.
 
748
                            # TODO: cache here.
 
749
                            try:
 
750
                                inv = inventory_cache[parent_id]
 
751
                            except KeyError:
 
752
                                inv = self.revision_tree(parent_id).inventory
 
753
                                inventory_cache[parent_id] = inv
 
754
                            try:
 
755
                                parent_entry = inv[text_key[0]]
 
756
                            except (KeyError, errors.NoSuchId):
 
757
                                parent_entry = None
 
758
                            if parent_entry is not None:
 
759
                                parent_text_key = (
 
760
                                    text_key[0], parent_entry.revision)
 
761
                            else:
 
762
                                parent_text_key = None
 
763
                        if parent_text_key is not None:
 
764
                            candidate_parents.append(
 
765
                                text_key_cache[parent_text_key])
 
766
                    parent_heads = text_graph.heads(candidate_parents)
 
767
                    new_parents = list(parent_heads)
 
768
                    new_parents.sort(key=lambda x:candidate_parents.index(x))
 
769
                    if new_parents == []:
 
770
                        new_parents = [NULL_REVISION]
 
771
                    text_index[text_key] = new_parents
 
772
 
 
773
        for text_key in invalid_keys:
 
774
            text_index[text_key] = [NULL_REVISION]
 
775
        return text_index
 
776
 
 
777
    def item_keys_introduced_by(self, revision_ids, _files_pb=None):
 
778
        """Get an iterable listing the keys of all the data introduced by a set
 
779
        of revision IDs.
 
780
 
 
781
        The keys will be ordered so that the corresponding items can be safely
 
782
        fetched and inserted in that order.
 
783
 
 
784
        :returns: An iterable producing tuples of (knit-kind, file-id,
 
785
            versions).  knit-kind is one of 'file', 'inventory', 'signatures',
 
786
            'revisions'.  file-id is None unless knit-kind is 'file'.
 
787
        """
 
788
        for result in self._find_file_keys_to_fetch(revision_ids, _files_pb):
 
789
            yield result
 
790
        del _files_pb
 
791
        for result in self._find_non_file_keys_to_fetch(revision_ids):
 
792
            yield result
 
793
 
 
794
    def _find_file_keys_to_fetch(self, revision_ids, pb):
 
795
        # XXX: it's a bit weird to control the inventory weave caching in this
 
796
        # generator.  Ideally the caching would be done in fetch.py I think.  Or
 
797
        # maybe this generator should explicitly have the contract that it
 
798
        # should not be iterated until the previously yielded item has been
 
799
        # processed?
 
800
        inv_w = self.inventories
 
801
 
 
802
        # file ids that changed
 
803
        file_ids = self.fileids_altered_by_revision_ids(revision_ids, inv_w)
 
804
        count = 0
 
805
        num_file_ids = len(file_ids)
 
806
        for file_id, altered_versions in file_ids.iteritems():
 
807
            if pb is not None:
 
808
                pb.update("Fetch texts", count, num_file_ids)
 
809
            count += 1
 
810
            yield ("file", file_id, altered_versions)
 
811
 
 
812
    def _find_non_file_keys_to_fetch(self, revision_ids):
 
813
        # inventory
 
814
        yield ("inventory", None, revision_ids)
 
815
 
 
816
        # signatures
 
817
        # XXX: Note ATM no callers actually pay attention to this return
 
818
        #      instead they just use the list of revision ids and ignore
 
819
        #      missing sigs. Consider removing this work entirely
 
820
        revisions_with_signatures = set(self.signatures.get_parent_map(
 
821
            [(r,) for r in revision_ids]))
 
822
        revisions_with_signatures = set(
 
823
            [r for (r,) in revisions_with_signatures])
 
824
        revisions_with_signatures.intersection_update(revision_ids)
 
825
        yield ("signatures", None, revisions_with_signatures)
 
826
 
 
827
        # revisions
 
828
        yield ("revisions", None, revision_ids)
 
829
 
 
830
    @needs_read_lock
 
831
    def get_inventory(self, revision_id):
 
832
        """Get Inventory object by revision id."""
 
833
        return self.iter_inventories([revision_id]).next()
 
834
 
 
835
    def iter_inventories(self, revision_ids, ordering=None):
 
836
        """Get many inventories by revision_ids.
 
837
 
 
838
        This will buffer some or all of the texts used in constructing the
 
839
        inventories in memory, but will only parse a single inventory at a
 
840
        time.
 
841
 
 
842
        :param revision_ids: The expected revision ids of the inventories.
 
843
        :param ordering: optional ordering, e.g. 'topological'.  If not
 
844
            specified, the order of revision_ids will be preserved (by
 
845
            buffering if necessary).
 
846
        :return: An iterator of inventories.
 
847
        """
 
848
        if ((None in revision_ids)
 
849
            or (_mod_revision.NULL_REVISION in revision_ids)):
 
850
            raise ValueError('cannot get null revision inventory')
 
851
        return self._iter_inventories(revision_ids, ordering)
 
852
 
 
853
    def _iter_inventories(self, revision_ids, ordering):
 
854
        """single-document based inventory iteration."""
 
855
        inv_xmls = self._iter_inventory_xmls(revision_ids, ordering)
 
856
        for text, revision_id in inv_xmls:
 
857
            yield self._deserialise_inventory(revision_id, text)
 
858
 
 
859
    def _iter_inventory_xmls(self, revision_ids, ordering):
 
860
        if ordering is None:
 
861
            order_as_requested = True
 
862
            ordering = 'unordered'
 
863
        else:
 
864
            order_as_requested = False
 
865
        keys = [(revision_id,) for revision_id in revision_ids]
 
866
        if not keys:
 
867
            return
 
868
        if order_as_requested:
 
869
            key_iter = iter(keys)
 
870
            next_key = key_iter.next()
 
871
        stream = self.inventories.get_record_stream(keys, ordering, True)
 
872
        text_chunks = {}
 
873
        for record in stream:
 
874
            if record.storage_kind != 'absent':
 
875
                chunks = record.get_bytes_as('chunked')
 
876
                if order_as_requested:
 
877
                    text_chunks[record.key] = chunks
 
878
                else:
 
879
                    yield ''.join(chunks), record.key[-1]
 
880
            else:
 
881
                raise errors.NoSuchRevision(self, record.key)
 
882
            if order_as_requested:
 
883
                # Yield as many results as we can while preserving order.
 
884
                while next_key in text_chunks:
 
885
                    chunks = text_chunks.pop(next_key)
 
886
                    yield ''.join(chunks), next_key[-1]
 
887
                    try:
 
888
                        next_key = key_iter.next()
 
889
                    except StopIteration:
 
890
                        # We still want to fully consume the get_record_stream,
 
891
                        # just in case it is not actually finished at this point
 
892
                        next_key = None
 
893
                        break
 
894
 
 
895
    def _deserialise_inventory(self, revision_id, xml):
 
896
        """Transform the xml into an inventory object.
 
897
 
 
898
        :param revision_id: The expected revision id of the inventory.
 
899
        :param xml: A serialised inventory.
 
900
        """
 
901
        result = self._serializer.read_inventory_from_string(xml, revision_id,
 
902
                    entry_cache=self._inventory_entry_cache,
 
903
                    return_from_cache=self._safe_to_return_from_cache)
 
904
        if result.revision_id != revision_id:
 
905
            raise AssertionError('revision id mismatch %s != %s' % (
 
906
                result.revision_id, revision_id))
 
907
        return result
 
908
 
 
909
    def get_serializer_format(self):
 
910
        return self._serializer.format_num
 
911
 
 
912
    @needs_read_lock
 
913
    def _get_inventory_xml(self, revision_id):
 
914
        """Get serialized inventory as a string."""
 
915
        texts = self._iter_inventory_xmls([revision_id], 'unordered')
 
916
        try:
 
917
            text, revision_id = texts.next()
 
918
        except StopIteration:
 
919
            raise errors.HistoryMissing(self, 'inventory', revision_id)
 
920
        return text
 
921
 
 
922
    @needs_read_lock
 
923
    def revision_tree(self, revision_id):
 
924
        """Return Tree for a revision on this branch.
 
925
 
 
926
        `revision_id` may be NULL_REVISION for the empty tree revision.
 
927
        """
 
928
        revision_id = _mod_revision.ensure_null(revision_id)
 
929
        # TODO: refactor this to use an existing revision object
 
930
        # so we don't need to read it in twice.
 
931
        if revision_id == _mod_revision.NULL_REVISION:
 
932
            return InventoryRevisionTree(self,
 
933
                Inventory(root_id=None), _mod_revision.NULL_REVISION)
 
934
        else:
 
935
            inv = self.get_inventory(revision_id)
 
936
            return InventoryRevisionTree(self, inv, revision_id)
 
937
 
 
938
    def revision_trees(self, revision_ids):
 
939
        """Return Trees for revisions in this repository.
 
940
 
 
941
        :param revision_ids: a sequence of revision-ids;
 
942
          a revision-id may not be None or 'null:'
 
943
        """
 
944
        inventories = self.iter_inventories(revision_ids)
 
945
        for inv in inventories:
 
946
            yield InventoryRevisionTree(self, inv, inv.revision_id)
 
947
 
 
948
    def _filtered_revision_trees(self, revision_ids, file_ids):
 
949
        """Return Tree for a revision on this branch with only some files.
 
950
 
 
951
        :param revision_ids: a sequence of revision-ids;
 
952
          a revision-id may not be None or 'null:'
 
953
        :param file_ids: if not None, the result is filtered
 
954
          so that only those file-ids, their parents and their
 
955
          children are included.
 
956
        """
 
957
        inventories = self.iter_inventories(revision_ids)
 
958
        for inv in inventories:
 
959
            # Should we introduce a FilteredRevisionTree class rather
 
960
            # than pre-filter the inventory here?
 
961
            filtered_inv = inv.filter(file_ids)
 
962
            yield InventoryRevisionTree(self, filtered_inv, filtered_inv.revision_id)
 
963
 
 
964
    def get_parent_map(self, revision_ids):
 
965
        """See graph.StackedParentsProvider.get_parent_map"""
 
966
        # revisions index works in keys; this just works in revisions
 
967
        # therefore wrap and unwrap
 
968
        query_keys = []
 
969
        result = {}
 
970
        for revision_id in revision_ids:
 
971
            if revision_id == _mod_revision.NULL_REVISION:
 
972
                result[revision_id] = ()
 
973
            elif revision_id is None:
 
974
                raise ValueError('get_parent_map(None) is not valid')
 
975
            else:
 
976
                query_keys.append((revision_id ,))
 
977
        for ((revision_id,), parent_keys) in \
 
978
                self.revisions.get_parent_map(query_keys).iteritems():
 
979
            if parent_keys:
 
980
                result[revision_id] = tuple([parent_revid
 
981
                    for (parent_revid,) in parent_keys])
 
982
            else:
 
983
                result[revision_id] = (_mod_revision.NULL_REVISION,)
 
984
        return result
 
985
 
 
986
    @needs_read_lock
 
987
    def get_known_graph_ancestry(self, revision_ids):
 
988
        """Return the known graph for a set of revision ids and their ancestors.
 
989
        """
 
990
        st = static_tuple.StaticTuple
 
991
        revision_keys = [st(r_id).intern() for r_id in revision_ids]
 
992
        known_graph = self.revisions.get_known_graph_ancestry(revision_keys)
 
993
        return graph.GraphThunkIdsToKeys(known_graph)
 
994
 
 
995
    def _get_versioned_file_checker(self, text_key_references=None,
 
996
        ancestors=None):
 
997
        """Return an object suitable for checking versioned files.
 
998
        
 
999
        :param text_key_references: if non-None, an already built
 
1000
            dictionary mapping text keys ((fileid, revision_id) tuples)
 
1001
            to whether they were referred to by the inventory of the
 
1002
            revision_id that they contain. If None, this will be
 
1003
            calculated.
 
1004
        :param ancestors: Optional result from
 
1005
            self.get_graph().get_parent_map(self.all_revision_ids()) if already
 
1006
            available.
 
1007
        """
 
1008
        return _VersionedFileChecker(self,
 
1009
            text_key_references=text_key_references, ancestors=ancestors)
 
1010
 
 
1011
    @needs_read_lock
 
1012
    def has_signature_for_revision_id(self, revision_id):
 
1013
        """Query for a revision signature for revision_id in the repository."""
 
1014
        if not self.has_revision(revision_id):
 
1015
            raise errors.NoSuchRevision(self, revision_id)
 
1016
        sig_present = (1 == len(
 
1017
            self.signatures.get_parent_map([(revision_id,)])))
 
1018
        return sig_present
 
1019
 
 
1020
    @needs_read_lock
 
1021
    def get_signature_text(self, revision_id):
 
1022
        """Return the text for a signature."""
 
1023
        stream = self.signatures.get_record_stream([(revision_id,)],
 
1024
            'unordered', True)
 
1025
        record = stream.next()
 
1026
        if record.storage_kind == 'absent':
 
1027
            raise errors.NoSuchRevision(self, revision_id)
 
1028
        return record.get_bytes_as('fulltext')
 
1029
 
 
1030
    def _find_inconsistent_revision_parents(self, revisions_iterator=None):
 
1031
        """Find revisions with different parent lists in the revision object
 
1032
        and in the index graph.
 
1033
 
 
1034
        :param revisions_iterator: None, or an iterator of (revid,
 
1035
            Revision-or-None). This iterator controls the revisions checked.
 
1036
        :returns: an iterator yielding tuples of (revison-id, parents-in-index,
 
1037
            parents-in-revision).
 
1038
        """
 
1039
        if not self.is_locked():
 
1040
            raise AssertionError()
 
1041
        vf = self.revisions
 
1042
        if revisions_iterator is None:
 
1043
            revisions_iterator = self._iter_revisions(None)
 
1044
        for revid, revision in revisions_iterator:
 
1045
            if revision is None:
 
1046
                pass
 
1047
            parent_map = vf.get_parent_map([(revid,)])
 
1048
            parents_according_to_index = tuple(parent[-1] for parent in
 
1049
                parent_map[(revid,)])
 
1050
            parents_according_to_revision = tuple(revision.parent_ids)
 
1051
            if parents_according_to_index != parents_according_to_revision:
 
1052
                yield (revid, parents_according_to_index,
 
1053
                    parents_according_to_revision)
 
1054
 
 
1055
    def _check_for_inconsistent_revision_parents(self):
 
1056
        inconsistencies = list(self._find_inconsistent_revision_parents())
 
1057
        if inconsistencies:
 
1058
            raise errors.BzrCheckError(
 
1059
                "Revision knit has inconsistent parents.")
 
1060
 
 
1061
    def _get_sink(self):
 
1062
        """Return a sink for streaming into this repository."""
 
1063
        return StreamSink(self)
 
1064
 
 
1065
    def _get_source(self, to_format):
 
1066
        """Return a source for streaming from this repository."""
 
1067
        return StreamSource(self, to_format)
 
1068
 
 
1069
 
 
1070
class MetaDirVersionedFileRepository(MetaDirRepository,
 
1071
                                     VersionedFileRepository):
 
1072
    """Repositories in a meta-dir, that work via versioned file objects."""
 
1073
 
 
1074
    def __init__(self, _format, a_bzrdir, control_files):
 
1075
        super(MetaDirVersionedFileRepository, self).__init__(_format, a_bzrdir,
 
1076
            control_files)
 
1077
 
 
1078
 
 
1079
class StreamSink(object):
 
1080
    """An object that can insert a stream into a repository.
 
1081
 
 
1082
    This interface handles the complexity of reserialising inventories and
 
1083
    revisions from different formats, and allows unidirectional insertion into
 
1084
    stacked repositories without looking for the missing basis parents
 
1085
    beforehand.
 
1086
    """
 
1087
 
 
1088
    def __init__(self, target_repo):
 
1089
        self.target_repo = target_repo
 
1090
 
 
1091
    def insert_stream(self, stream, src_format, resume_tokens):
 
1092
        """Insert a stream's content into the target repository.
 
1093
 
 
1094
        :param src_format: a bzr repository format.
 
1095
 
 
1096
        :return: a list of resume tokens and an  iterable of keys additional
 
1097
            items required before the insertion can be completed.
 
1098
        """
 
1099
        self.target_repo.lock_write()
 
1100
        try:
 
1101
            if resume_tokens:
 
1102
                self.target_repo.resume_write_group(resume_tokens)
 
1103
                is_resume = True
 
1104
            else:
 
1105
                self.target_repo.start_write_group()
 
1106
                is_resume = False
 
1107
            try:
 
1108
                # locked_insert_stream performs a commit|suspend.
 
1109
                missing_keys = self.insert_stream_without_locking(stream,
 
1110
                                    src_format, is_resume)
 
1111
                if missing_keys:
 
1112
                    # suspend the write group and tell the caller what we is
 
1113
                    # missing. We know we can suspend or else we would not have
 
1114
                    # entered this code path. (All repositories that can handle
 
1115
                    # missing keys can handle suspending a write group).
 
1116
                    write_group_tokens = self.target_repo.suspend_write_group()
 
1117
                    return write_group_tokens, missing_keys
 
1118
                hint = self.target_repo.commit_write_group()
 
1119
                to_serializer = self.target_repo._format._serializer
 
1120
                src_serializer = src_format._serializer
 
1121
                if (to_serializer != src_serializer and
 
1122
                    self.target_repo._format.pack_compresses):
 
1123
                    self.target_repo.pack(hint=hint)
 
1124
                return [], set()
 
1125
            except:
 
1126
                self.target_repo.abort_write_group(suppress_errors=True)
 
1127
                raise
 
1128
        finally:
 
1129
            self.target_repo.unlock()
 
1130
 
 
1131
    def insert_stream_without_locking(self, stream, src_format,
 
1132
                                      is_resume=False):
 
1133
        """Insert a stream's content into the target repository.
 
1134
 
 
1135
        This assumes that you already have a locked repository and an active
 
1136
        write group.
 
1137
 
 
1138
        :param src_format: a bzr repository format.
 
1139
        :param is_resume: Passed down to get_missing_parent_inventories to
 
1140
            indicate if we should be checking for missing texts at the same
 
1141
            time.
 
1142
 
 
1143
        :return: A set of keys that are missing.
 
1144
        """
 
1145
        if not self.target_repo.is_write_locked():
 
1146
            raise errors.ObjectNotLocked(self)
 
1147
        if not self.target_repo.is_in_write_group():
 
1148
            raise errors.BzrError('you must already be in a write group')
 
1149
        to_serializer = self.target_repo._format._serializer
 
1150
        src_serializer = src_format._serializer
 
1151
        new_pack = None
 
1152
        if to_serializer == src_serializer:
 
1153
            # If serializers match and the target is a pack repository, set the
 
1154
            # write cache size on the new pack.  This avoids poor performance
 
1155
            # on transports where append is unbuffered (such as
 
1156
            # RemoteTransport).  This is safe to do because nothing should read
 
1157
            # back from the target repository while a stream with matching
 
1158
            # serialization is being inserted.
 
1159
            # The exception is that a delta record from the source that should
 
1160
            # be a fulltext may need to be expanded by the target (see
 
1161
            # test_fetch_revisions_with_deltas_into_pack); but we take care to
 
1162
            # explicitly flush any buffered writes first in that rare case.
 
1163
            try:
 
1164
                new_pack = self.target_repo._pack_collection._new_pack
 
1165
            except AttributeError:
 
1166
                # Not a pack repository
 
1167
                pass
 
1168
            else:
 
1169
                new_pack.set_write_cache_size(1024*1024)
 
1170
        for substream_type, substream in stream:
 
1171
            if 'stream' in debug.debug_flags:
 
1172
                mutter('inserting substream: %s', substream_type)
 
1173
            if substream_type == 'texts':
 
1174
                self.target_repo.texts.insert_record_stream(substream)
 
1175
            elif substream_type == 'inventories':
 
1176
                if src_serializer == to_serializer:
 
1177
                    self.target_repo.inventories.insert_record_stream(
 
1178
                        substream)
 
1179
                else:
 
1180
                    self._extract_and_insert_inventories(
 
1181
                        substream, src_serializer)
 
1182
            elif substream_type == 'inventory-deltas':
 
1183
                self._extract_and_insert_inventory_deltas(
 
1184
                    substream, src_serializer)
 
1185
            elif substream_type == 'chk_bytes':
 
1186
                # XXX: This doesn't support conversions, as it assumes the
 
1187
                #      conversion was done in the fetch code.
 
1188
                self.target_repo.chk_bytes.insert_record_stream(substream)
 
1189
            elif substream_type == 'revisions':
 
1190
                # This may fallback to extract-and-insert more often than
 
1191
                # required if the serializers are different only in terms of
 
1192
                # the inventory.
 
1193
                if src_serializer == to_serializer:
 
1194
                    self.target_repo.revisions.insert_record_stream(substream)
 
1195
                else:
 
1196
                    self._extract_and_insert_revisions(substream,
 
1197
                        src_serializer)
 
1198
            elif substream_type == 'signatures':
 
1199
                self.target_repo.signatures.insert_record_stream(substream)
 
1200
            else:
 
1201
                raise AssertionError('kaboom! %s' % (substream_type,))
 
1202
        # Done inserting data, and the missing_keys calculations will try to
 
1203
        # read back from the inserted data, so flush the writes to the new pack
 
1204
        # (if this is pack format).
 
1205
        if new_pack is not None:
 
1206
            new_pack._write_data('', flush=True)
 
1207
        # Find all the new revisions (including ones from resume_tokens)
 
1208
        missing_keys = self.target_repo.get_missing_parent_inventories(
 
1209
            check_for_missing_texts=is_resume)
 
1210
        try:
 
1211
            for prefix, versioned_file in (
 
1212
                ('texts', self.target_repo.texts),
 
1213
                ('inventories', self.target_repo.inventories),
 
1214
                ('revisions', self.target_repo.revisions),
 
1215
                ('signatures', self.target_repo.signatures),
 
1216
                ('chk_bytes', self.target_repo.chk_bytes),
 
1217
                ):
 
1218
                if versioned_file is None:
 
1219
                    continue
 
1220
                # TODO: key is often going to be a StaticTuple object
 
1221
                #       I don't believe we can define a method by which
 
1222
                #       (prefix,) + StaticTuple will work, though we could
 
1223
                #       define a StaticTuple.sq_concat that would allow you to
 
1224
                #       pass in either a tuple or a StaticTuple as the second
 
1225
                #       object, so instead we could have:
 
1226
                #       StaticTuple(prefix) + key here...
 
1227
                missing_keys.update((prefix,) + key for key in
 
1228
                    versioned_file.get_missing_compression_parent_keys())
 
1229
        except NotImplementedError:
 
1230
            # cannot even attempt suspending, and missing would have failed
 
1231
            # during stream insertion.
 
1232
            missing_keys = set()
 
1233
        return missing_keys
 
1234
 
 
1235
    def _extract_and_insert_inventory_deltas(self, substream, serializer):
 
1236
        target_rich_root = self.target_repo._format.rich_root_data
 
1237
        target_tree_refs = self.target_repo._format.supports_tree_reference
 
1238
        for record in substream:
 
1239
            # Insert the delta directly
 
1240
            inventory_delta_bytes = record.get_bytes_as('fulltext')
 
1241
            deserialiser = inventory_delta.InventoryDeltaDeserializer()
 
1242
            try:
 
1243
                parse_result = deserialiser.parse_text_bytes(
 
1244
                    inventory_delta_bytes)
 
1245
            except inventory_delta.IncompatibleInventoryDelta, err:
 
1246
                mutter("Incompatible delta: %s", err.msg)
 
1247
                raise errors.IncompatibleRevision(self.target_repo._format)
 
1248
            basis_id, new_id, rich_root, tree_refs, inv_delta = parse_result
 
1249
            revision_id = new_id
 
1250
            parents = [key[0] for key in record.parents]
 
1251
            self.target_repo.add_inventory_by_delta(
 
1252
                basis_id, inv_delta, revision_id, parents)
 
1253
 
 
1254
    def _extract_and_insert_inventories(self, substream, serializer,
 
1255
            parse_delta=None):
 
1256
        """Generate a new inventory versionedfile in target, converting data.
 
1257
 
 
1258
        The inventory is retrieved from the source, (deserializing it), and
 
1259
        stored in the target (reserializing it in a different format).
 
1260
        """
 
1261
        target_rich_root = self.target_repo._format.rich_root_data
 
1262
        target_tree_refs = self.target_repo._format.supports_tree_reference
 
1263
        for record in substream:
 
1264
            # It's not a delta, so it must be a fulltext in the source
 
1265
            # serializer's format.
 
1266
            bytes = record.get_bytes_as('fulltext')
 
1267
            revision_id = record.key[0]
 
1268
            inv = serializer.read_inventory_from_string(bytes, revision_id)
 
1269
            parents = [key[0] for key in record.parents]
 
1270
            self.target_repo.add_inventory(revision_id, inv, parents)
 
1271
            # No need to keep holding this full inv in memory when the rest of
 
1272
            # the substream is likely to be all deltas.
 
1273
            del inv
 
1274
 
 
1275
    def _extract_and_insert_revisions(self, substream, serializer):
 
1276
        for record in substream:
 
1277
            bytes = record.get_bytes_as('fulltext')
 
1278
            revision_id = record.key[0]
 
1279
            rev = serializer.read_revision_from_string(bytes)
 
1280
            if rev.revision_id != revision_id:
 
1281
                raise AssertionError('wtf: %s != %s' % (rev, revision_id))
 
1282
            self.target_repo.add_revision(revision_id, rev)
 
1283
 
 
1284
    def finished(self):
 
1285
        if self.target_repo._format._fetch_reconcile:
 
1286
            self.target_repo.reconcile()
 
1287
 
 
1288
 
 
1289
class StreamSource(object):
 
1290
    """A source of a stream for fetching between repositories."""
 
1291
 
 
1292
    def __init__(self, from_repository, to_format):
 
1293
        """Create a StreamSource streaming from from_repository."""
 
1294
        self.from_repository = from_repository
 
1295
        self.to_format = to_format
 
1296
        self._record_counter = RecordCounter()
 
1297
 
 
1298
    def delta_on_metadata(self):
 
1299
        """Return True if delta's are permitted on metadata streams.
 
1300
 
 
1301
        That is on revisions and signatures.
 
1302
        """
 
1303
        src_serializer = self.from_repository._format._serializer
 
1304
        target_serializer = self.to_format._serializer
 
1305
        return (self.to_format._fetch_uses_deltas and
 
1306
            src_serializer == target_serializer)
 
1307
 
 
1308
    def _fetch_revision_texts(self, revs):
 
1309
        # fetch signatures first and then the revision texts
 
1310
        # may need to be a InterRevisionStore call here.
 
1311
        from_sf = self.from_repository.signatures
 
1312
        # A missing signature is just skipped.
 
1313
        keys = [(rev_id,) for rev_id in revs]
 
1314
        signatures = versionedfile.filter_absent(from_sf.get_record_stream(
 
1315
            keys,
 
1316
            self.to_format._fetch_order,
 
1317
            not self.to_format._fetch_uses_deltas))
 
1318
        # If a revision has a delta, this is actually expanded inside the
 
1319
        # insert_record_stream code now, which is an alternate fix for
 
1320
        # bug #261339
 
1321
        from_rf = self.from_repository.revisions
 
1322
        revisions = from_rf.get_record_stream(
 
1323
            keys,
 
1324
            self.to_format._fetch_order,
 
1325
            not self.delta_on_metadata())
 
1326
        return [('signatures', signatures), ('revisions', revisions)]
 
1327
 
 
1328
    def _generate_root_texts(self, revs):
 
1329
        """This will be called by get_stream between fetching weave texts and
 
1330
        fetching the inventory weave.
 
1331
        """
 
1332
        if self._rich_root_upgrade():
 
1333
            return _mod_fetch.Inter1and2Helper(
 
1334
                self.from_repository).generate_root_texts(revs)
 
1335
        else:
 
1336
            return []
 
1337
 
 
1338
    def get_stream(self, search):
 
1339
        phase = 'file'
 
1340
        revs = search.get_keys()
 
1341
        graph = self.from_repository.get_graph()
 
1342
        revs = tsort.topo_sort(graph.get_parent_map(revs))
 
1343
        data_to_fetch = self.from_repository.item_keys_introduced_by(revs)
 
1344
        text_keys = []
 
1345
        for knit_kind, file_id, revisions in data_to_fetch:
 
1346
            if knit_kind != phase:
 
1347
                phase = knit_kind
 
1348
                # Make a new progress bar for this phase
 
1349
            if knit_kind == "file":
 
1350
                # Accumulate file texts
 
1351
                text_keys.extend([(file_id, revision) for revision in
 
1352
                    revisions])
 
1353
            elif knit_kind == "inventory":
 
1354
                # Now copy the file texts.
 
1355
                from_texts = self.from_repository.texts
 
1356
                yield ('texts', from_texts.get_record_stream(
 
1357
                    text_keys, self.to_format._fetch_order,
 
1358
                    not self.to_format._fetch_uses_deltas))
 
1359
                # Cause an error if a text occurs after we have done the
 
1360
                # copy.
 
1361
                text_keys = None
 
1362
                # Before we process the inventory we generate the root
 
1363
                # texts (if necessary) so that the inventories references
 
1364
                # will be valid.
 
1365
                for _ in self._generate_root_texts(revs):
 
1366
                    yield _
 
1367
                # we fetch only the referenced inventories because we do not
 
1368
                # know for unselected inventories whether all their required
 
1369
                # texts are present in the other repository - it could be
 
1370
                # corrupt.
 
1371
                for info in self._get_inventory_stream(revs):
 
1372
                    yield info
 
1373
            elif knit_kind == "signatures":
 
1374
                # Nothing to do here; this will be taken care of when
 
1375
                # _fetch_revision_texts happens.
 
1376
                pass
 
1377
            elif knit_kind == "revisions":
 
1378
                for record in self._fetch_revision_texts(revs):
 
1379
                    yield record
 
1380
            else:
 
1381
                raise AssertionError("Unknown knit kind %r" % knit_kind)
 
1382
 
 
1383
    def get_stream_for_missing_keys(self, missing_keys):
 
1384
        # missing keys can only occur when we are byte copying and not
 
1385
        # translating (because translation means we don't send
 
1386
        # unreconstructable deltas ever).
 
1387
        keys = {}
 
1388
        keys['texts'] = set()
 
1389
        keys['revisions'] = set()
 
1390
        keys['inventories'] = set()
 
1391
        keys['chk_bytes'] = set()
 
1392
        keys['signatures'] = set()
 
1393
        for key in missing_keys:
 
1394
            keys[key[0]].add(key[1:])
 
1395
        if len(keys['revisions']):
 
1396
            # If we allowed copying revisions at this point, we could end up
 
1397
            # copying a revision without copying its required texts: a
 
1398
            # violation of the requirements for repository integrity.
 
1399
            raise AssertionError(
 
1400
                'cannot copy revisions to fill in missing deltas %s' % (
 
1401
                    keys['revisions'],))
 
1402
        for substream_kind, keys in keys.iteritems():
 
1403
            vf = getattr(self.from_repository, substream_kind)
 
1404
            if vf is None and keys:
 
1405
                    raise AssertionError(
 
1406
                        "cannot fill in keys for a versioned file we don't"
 
1407
                        " have: %s needs %s" % (substream_kind, keys))
 
1408
            if not keys:
 
1409
                # No need to stream something we don't have
 
1410
                continue
 
1411
            if substream_kind == 'inventories':
 
1412
                # Some missing keys are genuinely ghosts, filter those out.
 
1413
                present = self.from_repository.inventories.get_parent_map(keys)
 
1414
                revs = [key[0] for key in present]
 
1415
                # Get the inventory stream more-or-less as we do for the
 
1416
                # original stream; there's no reason to assume that records
 
1417
                # direct from the source will be suitable for the sink.  (Think
 
1418
                # e.g. 2a -> 1.9-rich-root).
 
1419
                for info in self._get_inventory_stream(revs, missing=True):
 
1420
                    yield info
 
1421
                continue
 
1422
 
 
1423
            # Ask for full texts always so that we don't need more round trips
 
1424
            # after this stream.
 
1425
            # Some of the missing keys are genuinely ghosts, so filter absent
 
1426
            # records. The Sink is responsible for doing another check to
 
1427
            # ensure that ghosts don't introduce missing data for future
 
1428
            # fetches.
 
1429
            stream = versionedfile.filter_absent(vf.get_record_stream(keys,
 
1430
                self.to_format._fetch_order, True))
 
1431
            yield substream_kind, stream
 
1432
 
 
1433
    def inventory_fetch_order(self):
 
1434
        if self._rich_root_upgrade():
 
1435
            return 'topological'
 
1436
        else:
 
1437
            return self.to_format._fetch_order
 
1438
 
 
1439
    def _rich_root_upgrade(self):
 
1440
        return (not self.from_repository._format.rich_root_data and
 
1441
            self.to_format.rich_root_data)
 
1442
 
 
1443
    def _get_inventory_stream(self, revision_ids, missing=False):
 
1444
        from_format = self.from_repository._format
 
1445
        if (from_format.supports_chks and self.to_format.supports_chks and
 
1446
            from_format.network_name() == self.to_format.network_name()):
 
1447
            raise AssertionError(
 
1448
                "this case should be handled by GroupCHKStreamSource")
 
1449
        elif 'forceinvdeltas' in debug.debug_flags:
 
1450
            return self._get_convertable_inventory_stream(revision_ids,
 
1451
                    delta_versus_null=missing)
 
1452
        elif from_format.network_name() == self.to_format.network_name():
 
1453
            # Same format.
 
1454
            return self._get_simple_inventory_stream(revision_ids,
 
1455
                    missing=missing)
 
1456
        elif (not from_format.supports_chks and not self.to_format.supports_chks
 
1457
                and from_format._serializer == self.to_format._serializer):
 
1458
            # Essentially the same format.
 
1459
            return self._get_simple_inventory_stream(revision_ids,
 
1460
                    missing=missing)
 
1461
        else:
 
1462
            # Any time we switch serializations, we want to use an
 
1463
            # inventory-delta based approach.
 
1464
            return self._get_convertable_inventory_stream(revision_ids,
 
1465
                    delta_versus_null=missing)
 
1466
 
 
1467
    def _get_simple_inventory_stream(self, revision_ids, missing=False):
 
1468
        # NB: This currently reopens the inventory weave in source;
 
1469
        # using a single stream interface instead would avoid this.
 
1470
        from_weave = self.from_repository.inventories
 
1471
        if missing:
 
1472
            delta_closure = True
 
1473
        else:
 
1474
            delta_closure = not self.delta_on_metadata()
 
1475
        yield ('inventories', from_weave.get_record_stream(
 
1476
            [(rev_id,) for rev_id in revision_ids],
 
1477
            self.inventory_fetch_order(), delta_closure))
 
1478
 
 
1479
    def _get_convertable_inventory_stream(self, revision_ids,
 
1480
                                          delta_versus_null=False):
 
1481
        # The two formats are sufficiently different that there is no fast
 
1482
        # path, so we need to send just inventorydeltas, which any
 
1483
        # sufficiently modern client can insert into any repository.
 
1484
        # The StreamSink code expects to be able to
 
1485
        # convert on the target, so we need to put bytes-on-the-wire that can
 
1486
        # be converted.  That means inventory deltas (if the remote is <1.19,
 
1487
        # RemoteStreamSink will fallback to VFS to insert the deltas).
 
1488
        yield ('inventory-deltas',
 
1489
           self._stream_invs_as_deltas(revision_ids,
 
1490
                                       delta_versus_null=delta_versus_null))
 
1491
 
 
1492
    def _stream_invs_as_deltas(self, revision_ids, delta_versus_null=False):
 
1493
        """Return a stream of inventory-deltas for the given rev ids.
 
1494
 
 
1495
        :param revision_ids: The list of inventories to transmit
 
1496
        :param delta_versus_null: Don't try to find a minimal delta for this
 
1497
            entry, instead compute the delta versus the NULL_REVISION. This
 
1498
            effectively streams a complete inventory. Used for stuff like
 
1499
            filling in missing parents, etc.
 
1500
        """
 
1501
        from_repo = self.from_repository
 
1502
        revision_keys = [(rev_id,) for rev_id in revision_ids]
 
1503
        parent_map = from_repo.inventories.get_parent_map(revision_keys)
 
1504
        # XXX: possibly repos could implement a more efficient iter_inv_deltas
 
1505
        # method...
 
1506
        inventories = self.from_repository.iter_inventories(
 
1507
            revision_ids, 'topological')
 
1508
        format = from_repo._format
 
1509
        invs_sent_so_far = set([_mod_revision.NULL_REVISION])
 
1510
        inventory_cache = lru_cache.LRUCache(50)
 
1511
        null_inventory = from_repo.revision_tree(
 
1512
            _mod_revision.NULL_REVISION).inventory
 
1513
        # XXX: ideally the rich-root/tree-refs flags would be per-revision, not
 
1514
        # per-repo (e.g.  streaming a non-rich-root revision out of a rich-root
 
1515
        # repo back into a non-rich-root repo ought to be allowed)
 
1516
        serializer = inventory_delta.InventoryDeltaSerializer(
 
1517
            versioned_root=format.rich_root_data,
 
1518
            tree_references=format.supports_tree_reference)
 
1519
        for inv in inventories:
 
1520
            key = (inv.revision_id,)
 
1521
            parent_keys = parent_map.get(key, ())
 
1522
            delta = None
 
1523
            if not delta_versus_null and parent_keys:
 
1524
                # The caller did not ask for complete inventories and we have
 
1525
                # some parents that we can delta against.  Make a delta against
 
1526
                # each parent so that we can find the smallest.
 
1527
                parent_ids = [parent_key[0] for parent_key in parent_keys]
 
1528
                for parent_id in parent_ids:
 
1529
                    if parent_id not in invs_sent_so_far:
 
1530
                        # We don't know that the remote side has this basis, so
 
1531
                        # we can't use it.
 
1532
                        continue
 
1533
                    if parent_id == _mod_revision.NULL_REVISION:
 
1534
                        parent_inv = null_inventory
 
1535
                    else:
 
1536
                        parent_inv = inventory_cache.get(parent_id, None)
 
1537
                        if parent_inv is None:
 
1538
                            parent_inv = from_repo.get_inventory(parent_id)
 
1539
                    candidate_delta = inv._make_delta(parent_inv)
 
1540
                    if (delta is None or
 
1541
                        len(delta) > len(candidate_delta)):
 
1542
                        delta = candidate_delta
 
1543
                        basis_id = parent_id
 
1544
            if delta is None:
 
1545
                # Either none of the parents ended up being suitable, or we
 
1546
                # were asked to delta against NULL
 
1547
                basis_id = _mod_revision.NULL_REVISION
 
1548
                delta = inv._make_delta(null_inventory)
 
1549
            invs_sent_so_far.add(inv.revision_id)
 
1550
            inventory_cache[inv.revision_id] = inv
 
1551
            delta_serialized = ''.join(
 
1552
                serializer.delta_to_lines(basis_id, key[-1], delta))
 
1553
            yield versionedfile.FulltextContentFactory(
 
1554
                key, parent_keys, None, delta_serialized)
 
1555
 
 
1556
 
 
1557
class _VersionedFileChecker(object):
 
1558
 
 
1559
    def __init__(self, repository, text_key_references=None, ancestors=None):
 
1560
        self.repository = repository
 
1561
        self.text_index = self.repository._generate_text_key_index(
 
1562
            text_key_references=text_key_references, ancestors=ancestors)
 
1563
 
 
1564
    def calculate_file_version_parents(self, text_key):
 
1565
        """Calculate the correct parents for a file version according to
 
1566
        the inventories.
 
1567
        """
 
1568
        parent_keys = self.text_index[text_key]
 
1569
        if parent_keys == [_mod_revision.NULL_REVISION]:
 
1570
            return ()
 
1571
        return tuple(parent_keys)
 
1572
 
 
1573
    def check_file_version_parents(self, texts, progress_bar=None):
 
1574
        """Check the parents stored in a versioned file are correct.
 
1575
 
 
1576
        It also detects file versions that are not referenced by their
 
1577
        corresponding revision's inventory.
 
1578
 
 
1579
        :returns: A tuple of (wrong_parents, dangling_file_versions).
 
1580
            wrong_parents is a dict mapping {revision_id: (stored_parents,
 
1581
            correct_parents)} for each revision_id where the stored parents
 
1582
            are not correct.  dangling_file_versions is a set of (file_id,
 
1583
            revision_id) tuples for versions that are present in this versioned
 
1584
            file, but not used by the corresponding inventory.
 
1585
        """
 
1586
        local_progress = None
 
1587
        if progress_bar is None:
 
1588
            local_progress = ui.ui_factory.nested_progress_bar()
 
1589
            progress_bar = local_progress
 
1590
        try:
 
1591
            return self._check_file_version_parents(texts, progress_bar)
 
1592
        finally:
 
1593
            if local_progress:
 
1594
                local_progress.finished()
 
1595
 
 
1596
    def _check_file_version_parents(self, texts, progress_bar):
 
1597
        """See check_file_version_parents."""
 
1598
        wrong_parents = {}
 
1599
        self.file_ids = set([file_id for file_id, _ in
 
1600
            self.text_index.iterkeys()])
 
1601
        # text keys is now grouped by file_id
 
1602
        n_versions = len(self.text_index)
 
1603
        progress_bar.update('loading text store', 0, n_versions)
 
1604
        parent_map = self.repository.texts.get_parent_map(self.text_index)
 
1605
        # On unlistable transports this could well be empty/error...
 
1606
        text_keys = self.repository.texts.keys()
 
1607
        unused_keys = frozenset(text_keys) - set(self.text_index)
 
1608
        for num, key in enumerate(self.text_index.iterkeys()):
 
1609
            progress_bar.update('checking text graph', num, n_versions)
 
1610
            correct_parents = self.calculate_file_version_parents(key)
 
1611
            try:
 
1612
                knit_parents = parent_map[key]
 
1613
            except errors.RevisionNotPresent:
 
1614
                # Missing text!
 
1615
                knit_parents = None
 
1616
            if correct_parents != knit_parents:
 
1617
                wrong_parents[key] = (knit_parents, correct_parents)
 
1618
        return wrong_parents, unused_keys
 
1619
 
 
1620
 
 
1621
class InterDifferingSerializer(InterRepository):
 
1622
 
 
1623
    @classmethod
 
1624
    def _get_repo_format_to_test(self):
 
1625
        return None
 
1626
 
 
1627
    @staticmethod
 
1628
    def is_compatible(source, target):
 
1629
        # This is redundant with format.check_conversion_target(), however that
 
1630
        # raises an exception, and we just want to say "False" as in we won't
 
1631
        # support converting between these formats.
 
1632
        if 'IDS_never' in debug.debug_flags:
 
1633
            return False
 
1634
        if source.supports_rich_root() and not target.supports_rich_root():
 
1635
            return False
 
1636
        if (source._format.supports_tree_reference
 
1637
            and not target._format.supports_tree_reference):
 
1638
            return False
 
1639
        if target._fallback_repositories and target._format.supports_chks:
 
1640
            # IDS doesn't know how to copy CHKs for the parent inventories it
 
1641
            # adds to stacked repos.
 
1642
            return False
 
1643
        if 'IDS_always' in debug.debug_flags:
 
1644
            return True
 
1645
        # Only use this code path for local source and target.  IDS does far
 
1646
        # too much IO (both bandwidth and roundtrips) over a network.
 
1647
        if not source.bzrdir.transport.base.startswith('file:///'):
 
1648
            return False
 
1649
        if not target.bzrdir.transport.base.startswith('file:///'):
 
1650
            return False
 
1651
        if not source._format.supports_full_versioned_files:
 
1652
            return False
 
1653
        if not target._format.supports_full_versioned_files:
 
1654
            return False
 
1655
        return True
 
1656
 
 
1657
    def _get_trees(self, revision_ids, cache):
 
1658
        possible_trees = []
 
1659
        for rev_id in revision_ids:
 
1660
            if rev_id in cache:
 
1661
                possible_trees.append((rev_id, cache[rev_id]))
 
1662
            else:
 
1663
                # Not cached, but inventory might be present anyway.
 
1664
                try:
 
1665
                    tree = self.source.revision_tree(rev_id)
 
1666
                except errors.NoSuchRevision:
 
1667
                    # Nope, parent is ghost.
 
1668
                    pass
 
1669
                else:
 
1670
                    cache[rev_id] = tree
 
1671
                    possible_trees.append((rev_id, tree))
 
1672
        return possible_trees
 
1673
 
 
1674
    def _get_delta_for_revision(self, tree, parent_ids, possible_trees):
 
1675
        """Get the best delta and base for this revision.
 
1676
 
 
1677
        :return: (basis_id, delta)
 
1678
        """
 
1679
        deltas = []
 
1680
        # Generate deltas against each tree, to find the shortest.
 
1681
        texts_possibly_new_in_tree = set()
 
1682
        for basis_id, basis_tree in possible_trees:
 
1683
            delta = tree.inventory._make_delta(basis_tree.inventory)
 
1684
            for old_path, new_path, file_id, new_entry in delta:
 
1685
                if new_path is None:
 
1686
                    # This file_id isn't present in the new rev, so we don't
 
1687
                    # care about it.
 
1688
                    continue
 
1689
                if not new_path:
 
1690
                    # Rich roots are handled elsewhere...
 
1691
                    continue
 
1692
                kind = new_entry.kind
 
1693
                if kind != 'directory' and kind != 'file':
 
1694
                    # No text record associated with this inventory entry.
 
1695
                    continue
 
1696
                # This is a directory or file that has changed somehow.
 
1697
                texts_possibly_new_in_tree.add((file_id, new_entry.revision))
 
1698
            deltas.append((len(delta), basis_id, delta))
 
1699
        deltas.sort()
 
1700
        return deltas[0][1:]
 
1701
 
 
1702
    def _fetch_parent_invs_for_stacking(self, parent_map, cache):
 
1703
        """Find all parent revisions that are absent, but for which the
 
1704
        inventory is present, and copy those inventories.
 
1705
 
 
1706
        This is necessary to preserve correctness when the source is stacked
 
1707
        without fallbacks configured.  (Note that in cases like upgrade the
 
1708
        source may be not have _fallback_repositories even though it is
 
1709
        stacked.)
 
1710
        """
 
1711
        parent_revs = set()
 
1712
        for parents in parent_map.values():
 
1713
            parent_revs.update(parents)
 
1714
        present_parents = self.source.get_parent_map(parent_revs)
 
1715
        absent_parents = set(parent_revs).difference(present_parents)
 
1716
        parent_invs_keys_for_stacking = self.source.inventories.get_parent_map(
 
1717
            (rev_id,) for rev_id in absent_parents)
 
1718
        parent_inv_ids = [key[-1] for key in parent_invs_keys_for_stacking]
 
1719
        for parent_tree in self.source.revision_trees(parent_inv_ids):
 
1720
            current_revision_id = parent_tree.get_revision_id()
 
1721
            parents_parents_keys = parent_invs_keys_for_stacking[
 
1722
                (current_revision_id,)]
 
1723
            parents_parents = [key[-1] for key in parents_parents_keys]
 
1724
            basis_id = _mod_revision.NULL_REVISION
 
1725
            basis_tree = self.source.revision_tree(basis_id)
 
1726
            delta = parent_tree.inventory._make_delta(basis_tree.inventory)
 
1727
            self.target.add_inventory_by_delta(
 
1728
                basis_id, delta, current_revision_id, parents_parents)
 
1729
            cache[current_revision_id] = parent_tree
 
1730
 
 
1731
    def _fetch_batch(self, revision_ids, basis_id, cache):
 
1732
        """Fetch across a few revisions.
 
1733
 
 
1734
        :param revision_ids: The revisions to copy
 
1735
        :param basis_id: The revision_id of a tree that must be in cache, used
 
1736
            as a basis for delta when no other base is available
 
1737
        :param cache: A cache of RevisionTrees that we can use.
 
1738
        :return: The revision_id of the last converted tree. The RevisionTree
 
1739
            for it will be in cache
 
1740
        """
 
1741
        # Walk though all revisions; get inventory deltas, copy referenced
 
1742
        # texts that delta references, insert the delta, revision and
 
1743
        # signature.
 
1744
        root_keys_to_create = set()
 
1745
        text_keys = set()
 
1746
        pending_deltas = []
 
1747
        pending_revisions = []
 
1748
        parent_map = self.source.get_parent_map(revision_ids)
 
1749
        self._fetch_parent_invs_for_stacking(parent_map, cache)
 
1750
        self.source._safe_to_return_from_cache = True
 
1751
        for tree in self.source.revision_trees(revision_ids):
 
1752
            # Find a inventory delta for this revision.
 
1753
            # Find text entries that need to be copied, too.
 
1754
            current_revision_id = tree.get_revision_id()
 
1755
            parent_ids = parent_map.get(current_revision_id, ())
 
1756
            parent_trees = self._get_trees(parent_ids, cache)
 
1757
            possible_trees = list(parent_trees)
 
1758
            if len(possible_trees) == 0:
 
1759
                # There either aren't any parents, or the parents are ghosts,
 
1760
                # so just use the last converted tree.
 
1761
                possible_trees.append((basis_id, cache[basis_id]))
 
1762
            basis_id, delta = self._get_delta_for_revision(tree, parent_ids,
 
1763
                                                           possible_trees)
 
1764
            revision = self.source.get_revision(current_revision_id)
 
1765
            pending_deltas.append((basis_id, delta,
 
1766
                current_revision_id, revision.parent_ids))
 
1767
            if self._converting_to_rich_root:
 
1768
                self._revision_id_to_root_id[current_revision_id] = \
 
1769
                    tree.get_root_id()
 
1770
            # Determine which texts are in present in this revision but not in
 
1771
            # any of the available parents.
 
1772
            texts_possibly_new_in_tree = set()
 
1773
            for old_path, new_path, file_id, entry in delta:
 
1774
                if new_path is None:
 
1775
                    # This file_id isn't present in the new rev
 
1776
                    continue
 
1777
                if not new_path:
 
1778
                    # This is the root
 
1779
                    if not self.target.supports_rich_root():
 
1780
                        # The target doesn't support rich root, so we don't
 
1781
                        # copy
 
1782
                        continue
 
1783
                    if self._converting_to_rich_root:
 
1784
                        # This can't be copied normally, we have to insert
 
1785
                        # it specially
 
1786
                        root_keys_to_create.add((file_id, entry.revision))
 
1787
                        continue
 
1788
                kind = entry.kind
 
1789
                texts_possibly_new_in_tree.add((file_id, entry.revision))
 
1790
            for basis_id, basis_tree in possible_trees:
 
1791
                basis_inv = basis_tree.inventory
 
1792
                for file_key in list(texts_possibly_new_in_tree):
 
1793
                    file_id, file_revision = file_key
 
1794
                    try:
 
1795
                        entry = basis_inv[file_id]
 
1796
                    except errors.NoSuchId:
 
1797
                        continue
 
1798
                    if entry.revision == file_revision:
 
1799
                        texts_possibly_new_in_tree.remove(file_key)
 
1800
            text_keys.update(texts_possibly_new_in_tree)
 
1801
            pending_revisions.append(revision)
 
1802
            cache[current_revision_id] = tree
 
1803
            basis_id = current_revision_id
 
1804
        self.source._safe_to_return_from_cache = False
 
1805
        # Copy file texts
 
1806
        from_texts = self.source.texts
 
1807
        to_texts = self.target.texts
 
1808
        if root_keys_to_create:
 
1809
            root_stream = _mod_fetch._new_root_data_stream(
 
1810
                root_keys_to_create, self._revision_id_to_root_id, parent_map,
 
1811
                self.source)
 
1812
            to_texts.insert_record_stream(root_stream)
 
1813
        to_texts.insert_record_stream(from_texts.get_record_stream(
 
1814
            text_keys, self.target._format._fetch_order,
 
1815
            not self.target._format._fetch_uses_deltas))
 
1816
        # insert inventory deltas
 
1817
        for delta in pending_deltas:
 
1818
            self.target.add_inventory_by_delta(*delta)
 
1819
        if self.target._fallback_repositories:
 
1820
            # Make sure this stacked repository has all the parent inventories
 
1821
            # for the new revisions that we are about to insert.  We do this
 
1822
            # before adding the revisions so that no revision is added until
 
1823
            # all the inventories it may depend on are added.
 
1824
            # Note that this is overzealous, as we may have fetched these in an
 
1825
            # earlier batch.
 
1826
            parent_ids = set()
 
1827
            revision_ids = set()
 
1828
            for revision in pending_revisions:
 
1829
                revision_ids.add(revision.revision_id)
 
1830
                parent_ids.update(revision.parent_ids)
 
1831
            parent_ids.difference_update(revision_ids)
 
1832
            parent_ids.discard(_mod_revision.NULL_REVISION)
 
1833
            parent_map = self.source.get_parent_map(parent_ids)
 
1834
            # we iterate over parent_map and not parent_ids because we don't
 
1835
            # want to try copying any revision which is a ghost
 
1836
            for parent_tree in self.source.revision_trees(parent_map):
 
1837
                current_revision_id = parent_tree.get_revision_id()
 
1838
                parents_parents = parent_map[current_revision_id]
 
1839
                possible_trees = self._get_trees(parents_parents, cache)
 
1840
                if len(possible_trees) == 0:
 
1841
                    # There either aren't any parents, or the parents are
 
1842
                    # ghosts, so just use the last converted tree.
 
1843
                    possible_trees.append((basis_id, cache[basis_id]))
 
1844
                basis_id, delta = self._get_delta_for_revision(parent_tree,
 
1845
                    parents_parents, possible_trees)
 
1846
                self.target.add_inventory_by_delta(
 
1847
                    basis_id, delta, current_revision_id, parents_parents)
 
1848
        # insert signatures and revisions
 
1849
        for revision in pending_revisions:
 
1850
            try:
 
1851
                signature = self.source.get_signature_text(
 
1852
                    revision.revision_id)
 
1853
                self.target.add_signature_text(revision.revision_id,
 
1854
                    signature)
 
1855
            except errors.NoSuchRevision:
 
1856
                pass
 
1857
            self.target.add_revision(revision.revision_id, revision)
 
1858
        return basis_id
 
1859
 
 
1860
    def _fetch_all_revisions(self, revision_ids, pb):
 
1861
        """Fetch everything for the list of revisions.
 
1862
 
 
1863
        :param revision_ids: The list of revisions to fetch. Must be in
 
1864
            topological order.
 
1865
        :param pb: A ProgressTask
 
1866
        :return: None
 
1867
        """
 
1868
        basis_id, basis_tree = self._get_basis(revision_ids[0])
 
1869
        batch_size = 100
 
1870
        cache = lru_cache.LRUCache(100)
 
1871
        cache[basis_id] = basis_tree
 
1872
        del basis_tree # We don't want to hang on to it here
 
1873
        hints = []
 
1874
        a_graph = None
 
1875
 
 
1876
        for offset in range(0, len(revision_ids), batch_size):
 
1877
            self.target.start_write_group()
 
1878
            try:
 
1879
                pb.update('Transferring revisions', offset,
 
1880
                          len(revision_ids))
 
1881
                batch = revision_ids[offset:offset+batch_size]
 
1882
                basis_id = self._fetch_batch(batch, basis_id, cache)
 
1883
            except:
 
1884
                self.source._safe_to_return_from_cache = False
 
1885
                self.target.abort_write_group()
 
1886
                raise
 
1887
            else:
 
1888
                hint = self.target.commit_write_group()
 
1889
                if hint:
 
1890
                    hints.extend(hint)
 
1891
        if hints and self.target._format.pack_compresses:
 
1892
            self.target.pack(hint=hints)
 
1893
        pb.update('Transferring revisions', len(revision_ids),
 
1894
                  len(revision_ids))
 
1895
 
 
1896
    @needs_write_lock
 
1897
    def fetch(self, revision_id=None, find_ghosts=False,
 
1898
            fetch_spec=None):
 
1899
        """See InterRepository.fetch()."""
 
1900
        if fetch_spec is not None:
 
1901
            revision_ids = fetch_spec.get_keys()
 
1902
        else:
 
1903
            revision_ids = None
 
1904
        ui.ui_factory.warn_experimental_format_fetch(self)
 
1905
        if (not self.source.supports_rich_root()
 
1906
            and self.target.supports_rich_root()):
 
1907
            self._converting_to_rich_root = True
 
1908
            self._revision_id_to_root_id = {}
 
1909
        else:
 
1910
            self._converting_to_rich_root = False
 
1911
        # See <https://launchpad.net/bugs/456077> asking for a warning here
 
1912
        if self.source._format.network_name() != self.target._format.network_name():
 
1913
            ui.ui_factory.show_user_warning('cross_format_fetch',
 
1914
                from_format=self.source._format,
 
1915
                to_format=self.target._format)
 
1916
        if revision_ids is None:
 
1917
            if revision_id:
 
1918
                search_revision_ids = [revision_id]
 
1919
            else:
 
1920
                search_revision_ids = None
 
1921
            revision_ids = self.target.search_missing_revision_ids(self.source,
 
1922
                revision_ids=search_revision_ids,
 
1923
                find_ghosts=find_ghosts).get_keys()
 
1924
        if not revision_ids:
 
1925
            return 0, 0
 
1926
        revision_ids = tsort.topo_sort(
 
1927
            self.source.get_graph().get_parent_map(revision_ids))
 
1928
        if not revision_ids:
 
1929
            return 0, 0
 
1930
        # Walk though all revisions; get inventory deltas, copy referenced
 
1931
        # texts that delta references, insert the delta, revision and
 
1932
        # signature.
 
1933
        pb = ui.ui_factory.nested_progress_bar()
 
1934
        try:
 
1935
            self._fetch_all_revisions(revision_ids, pb)
 
1936
        finally:
 
1937
            pb.finished()
 
1938
        return len(revision_ids), 0
 
1939
 
 
1940
    def _get_basis(self, first_revision_id):
 
1941
        """Get a revision and tree which exists in the target.
 
1942
 
 
1943
        This assumes that first_revision_id is selected for transmission
 
1944
        because all other ancestors are already present. If we can't find an
 
1945
        ancestor we fall back to NULL_REVISION since we know that is safe.
 
1946
 
 
1947
        :return: (basis_id, basis_tree)
 
1948
        """
 
1949
        first_rev = self.source.get_revision(first_revision_id)
 
1950
        try:
 
1951
            basis_id = first_rev.parent_ids[0]
 
1952
            # only valid as a basis if the target has it
 
1953
            self.target.get_revision(basis_id)
 
1954
            # Try to get a basis tree - if it's a ghost it will hit the
 
1955
            # NoSuchRevision case.
 
1956
            basis_tree = self.source.revision_tree(basis_id)
 
1957
        except (IndexError, errors.NoSuchRevision):
 
1958
            basis_id = _mod_revision.NULL_REVISION
 
1959
            basis_tree = self.source.revision_tree(basis_id)
 
1960
        return basis_id, basis_tree
 
1961
 
 
1962
 
 
1963
InterRepository.register_optimiser(InterDifferingSerializer)
 
1964
 
 
1965
 
 
1966
def install_revisions(repository, iterable, num_revisions=None, pb=None):
 
1967
    """Install all revision data into a repository.
 
1968
 
 
1969
    Accepts an iterable of revision, tree, signature tuples.  The signature
 
1970
    may be None.
 
1971
    """
 
1972
    repository.start_write_group()
 
1973
    try:
 
1974
        inventory_cache = lru_cache.LRUCache(10)
 
1975
        for n, (revision, revision_tree, signature) in enumerate(iterable):
 
1976
            _install_revision(repository, revision, revision_tree, signature,
 
1977
                inventory_cache)
 
1978
            if pb is not None:
 
1979
                pb.update('Transferring revisions', n + 1, num_revisions)
 
1980
    except:
 
1981
        repository.abort_write_group()
 
1982
        raise
 
1983
    else:
 
1984
        repository.commit_write_group()
 
1985
 
 
1986
 
 
1987
def _install_revision(repository, rev, revision_tree, signature,
 
1988
    inventory_cache):
 
1989
    """Install all revision data into a repository."""
 
1990
    present_parents = []
 
1991
    parent_trees = {}
 
1992
    for p_id in rev.parent_ids:
 
1993
        if repository.has_revision(p_id):
 
1994
            present_parents.append(p_id)
 
1995
            parent_trees[p_id] = repository.revision_tree(p_id)
 
1996
        else:
 
1997
            parent_trees[p_id] = repository.revision_tree(
 
1998
                                     _mod_revision.NULL_REVISION)
 
1999
 
 
2000
    inv = revision_tree.inventory
 
2001
    entries = inv.iter_entries()
 
2002
    # backwards compatibility hack: skip the root id.
 
2003
    if not repository.supports_rich_root():
 
2004
        path, root = entries.next()
 
2005
        if root.revision != rev.revision_id:
 
2006
            raise errors.IncompatibleRevision(repr(repository))
 
2007
    text_keys = {}
 
2008
    for path, ie in entries:
 
2009
        text_keys[(ie.file_id, ie.revision)] = ie
 
2010
    text_parent_map = repository.texts.get_parent_map(text_keys)
 
2011
    missing_texts = set(text_keys) - set(text_parent_map)
 
2012
    # Add the texts that are not already present
 
2013
    for text_key in missing_texts:
 
2014
        ie = text_keys[text_key]
 
2015
        text_parents = []
 
2016
        # FIXME: TODO: The following loop overlaps/duplicates that done by
 
2017
        # commit to determine parents. There is a latent/real bug here where
 
2018
        # the parents inserted are not those commit would do - in particular
 
2019
        # they are not filtered by heads(). RBC, AB
 
2020
        for revision, tree in parent_trees.iteritems():
 
2021
            if ie.file_id not in tree:
 
2022
                continue
 
2023
            parent_id = tree.get_file_revision(ie.file_id)
 
2024
            if parent_id in text_parents:
 
2025
                continue
 
2026
            text_parents.append((ie.file_id, parent_id))
 
2027
        lines = revision_tree.get_file(ie.file_id).readlines()
 
2028
        repository.texts.add_lines(text_key, text_parents, lines)
 
2029
    try:
 
2030
        # install the inventory
 
2031
        if repository._format._commit_inv_deltas and len(rev.parent_ids):
 
2032
            # Cache this inventory
 
2033
            inventory_cache[rev.revision_id] = inv
 
2034
            try:
 
2035
                basis_inv = inventory_cache[rev.parent_ids[0]]
 
2036
            except KeyError:
 
2037
                repository.add_inventory(rev.revision_id, inv, present_parents)
 
2038
            else:
 
2039
                delta = inv._make_delta(basis_inv)
 
2040
                repository.add_inventory_by_delta(rev.parent_ids[0], delta,
 
2041
                    rev.revision_id, present_parents)
 
2042
        else:
 
2043
            repository.add_inventory(rev.revision_id, inv, present_parents)
 
2044
    except errors.RevisionAlreadyPresent:
 
2045
        pass
 
2046
    if signature is not None:
 
2047
        repository.add_signature_text(rev.revision_id, signature)
 
2048
    repository.add_revision(rev.revision_id, rev, inv)
 
2049
 
 
2050
 
 
2051
def install_revision(repository, rev, revision_tree):
 
2052
    """Install all revision data into a repository."""
 
2053
    install_revisions(repository, [(rev, revision_tree, None)])