~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/repository.py

create thread for bbc

Show diffs side-by-side

added added

removed removed

Lines of Context:
23
23
from bzrlib import (
24
24
    bzrdir,
25
25
    check,
 
26
    chk_map,
26
27
    debug,
27
28
    errors,
28
29
    fifo_cache,
29
30
    generate_ids,
30
31
    gpg,
31
32
    graph,
 
33
    inventory,
32
34
    lazy_regex,
33
35
    lockable_files,
34
36
    lockdir,
197
199
 
198
200
    def finish_inventory(self):
199
201
        """Tell the builder that the inventory is finished.
200
 
        
 
202
 
201
203
        :return: The inventory id in the repository, which can be used with
202
204
            repository.get_inventory.
203
205
        """
204
206
        if self.new_inventory is None:
 
207
            # XXX: Using these asserts causes test failures. However, at least
 
208
            #      "self._recording_deletes" seems like a useful check to do,
 
209
            #      as it ensure the delta is completely valid. Most likely this
 
210
            #      just exposes that the test suite isn't using CommitBuilder
 
211
            #      100% correctly.
 
212
            # if (not self.repository._format._commit_inv_deltas
 
213
            #     or not self._recording_deletes):
 
214
            #     raise AssertionError('new_inventory is None, but we did not'
 
215
            #         ' set the flag that the repository format supports'
 
216
            #         ' partial inventory generation.')
205
217
            # an inventory delta was accumulated without creating a new
206
218
            # inventory.
207
219
            basis_id = self.basis_delta_revision
816
828
    which views a particular line of development through that history.
817
829
 
818
830
    The Repository builds on top of some byte storage facilies (the revisions,
819
 
    signatures, inventories and texts attributes) and a Transport, which
820
 
    respectively provide byte storage and a means to access the (possibly
 
831
    signatures, inventories, texts and chk_bytes attributes) and a Transport,
 
832
    which respectively provide byte storage and a means to access the (possibly
821
833
    remote) disk.
822
834
 
823
835
    The byte storage facilities are addressed via tuples, which we refer to
824
836
    as 'keys' throughout the code base. Revision_keys, inventory_keys and
825
837
    signature_keys are all 1-tuples: (revision_id,). text_keys are two-tuples:
826
 
    (file_id, revision_id). We use this interface because it allows low
827
 
    friction with the underlying code that implements disk indices, network
828
 
    encoding and other parts of bzrlib.
 
838
    (file_id, revision_id). chk_bytes uses CHK keys - a 1-tuple with a single
 
839
    byte string made up of a hash identifier and a hash value.
 
840
    We use this interface because it allows low friction with the underlying
 
841
    code that implements disk indices, network encoding and other parts of
 
842
    bzrlib.
829
843
 
830
844
    :ivar revisions: A bzrlib.versionedfile.VersionedFiles instance containing
831
845
        the serialised revisions for the repository. This can be used to obtain
850
864
        The result of trying to insert data into the repository via this store
851
865
        is undefined: it should be considered read-only except for implementors
852
866
        of repositories.
 
867
    :ivar chk_bytes: A bzrlib.versionedfile.VersioedFiles instance containing
 
868
        any data the repository chooses to store or have indexed by its hash.
 
869
        The result of trying to insert data into the repository via this store
 
870
        is undefined: it should be considered read-only except for implementors
 
871
        of repositories.
853
872
    :ivar _transport: Transport for file access to repository, typically
854
873
        pointing to .bzr/repository.
855
874
    """
876
895
        """
877
896
        if self._write_group is not self.get_transaction():
878
897
            # has an unlock or relock occured ?
879
 
            raise errors.BzrError('mismatched lock context and write group.')
 
898
            raise errors.BzrError(
 
899
                'mismatched lock context and write group. %r, %r' %
 
900
                (self._write_group, self.get_transaction()))
880
901
        try:
881
902
            self._abort_write_group()
882
903
        except Exception, exc:
914
935
        self.inventories.add_fallback_versioned_files(repository.inventories)
915
936
        self.revisions.add_fallback_versioned_files(repository.revisions)
916
937
        self.signatures.add_fallback_versioned_files(repository.signatures)
 
938
        if self.chk_bytes is not None:
 
939
            self.chk_bytes.add_fallback_versioned_files(repository.chk_bytes)
 
940
        self._fetch_order = 'topological'
917
941
 
918
942
    def _check_fallback_repository(self, repository):
919
943
        """Check that this repository can fallback to repository safely.
943
967
                % (inv.revision_id, revision_id))
944
968
        if inv.root is None:
945
969
            raise AssertionError()
 
970
        return self._add_inventory_checked(revision_id, inv, parents)
 
971
 
 
972
    def _add_inventory_checked(self, revision_id, inv, parents):
 
973
        """Add inv to the repository after checking the inputs.
 
974
 
 
975
        This function can be overridden to allow different inventory styles.
 
976
 
 
977
        :seealso: add_inventory, for the contract.
 
978
        """
946
979
        inv_lines = self._serialise_inventory_to_lines(inv)
947
980
        return self._inventory_add_lines(revision_id, parents,
948
981
            inv_lines, check_content=False)
949
982
 
950
983
    def add_inventory_by_delta(self, basis_revision_id, delta, new_revision_id,
951
 
                               parents):
 
984
                               parents, basis_inv=None, propagate_caches=False):
952
985
        """Add a new inventory expressed as a delta against another revision.
953
986
 
954
987
        :param basis_revision_id: The inventory id the delta was created
962
995
            for repositories that depend on the inventory graph for revision
963
996
            graph access, as well as for those that pun ancestry with delta
964
997
            compression.
 
998
        :param basis_inv: The basis inventory if it is already known,
 
999
            otherwise None.
 
1000
        :param propagate_caches: If True, the caches for this inventory are
 
1001
          copied to and updated for the result if possible.
965
1002
 
966
1003
        :returns: (validator, new_inv)
967
1004
            The validator(which is a sha1 digest, though what is sha'd is
978
1015
            # inventory implementations may support: A better idiom would be to
979
1016
            # return a new inventory, but as there is no revision tree cache in
980
1017
            # repository this is safe for now - RBC 20081013
981
 
            basis_inv = basis_tree.inventory
 
1018
            if basis_inv is None:
 
1019
                basis_inv = basis_tree.inventory
982
1020
            basis_inv.apply_delta(delta)
983
1021
            basis_inv.revision_id = new_revision_id
984
1022
            return (self.add_inventory(new_revision_id, basis_inv, parents),
1941
1979
                            except KeyError:
1942
1980
                                inv = self.revision_tree(parent_id).inventory
1943
1981
                                inventory_cache[parent_id] = inv
1944
 
                            parent_entry = inv._byid.get(text_key[0], None)
 
1982
                            try:
 
1983
                                parent_entry = inv[text_key[0]]
 
1984
                            except (KeyError, errors.NoSuchId):
 
1985
                                parent_entry = None
1945
1986
                            if parent_entry is not None:
1946
1987
                                parent_text_key = (
1947
1988
                                    text_key[0], parent_entry.revision)
1972
2013
            versions).  knit-kind is one of 'file', 'inventory', 'signatures',
1973
2014
            'revisions'.  file-id is None unless knit-kind is 'file'.
1974
2015
        """
 
2016
        for result in self._find_file_keys_to_fetch(revision_ids, _files_pb):
 
2017
            yield result
 
2018
        del _files_pb
 
2019
        for result in self._find_non_file_keys_to_fetch(revision_ids):
 
2020
            yield result
 
2021
 
 
2022
    def _find_file_keys_to_fetch(self, revision_ids, pb):
1975
2023
        # XXX: it's a bit weird to control the inventory weave caching in this
1976
2024
        # generator.  Ideally the caching would be done in fetch.py I think.  Or
1977
2025
        # maybe this generator should explicitly have the contract that it
1984
2032
        count = 0
1985
2033
        num_file_ids = len(file_ids)
1986
2034
        for file_id, altered_versions in file_ids.iteritems():
1987
 
            if _files_pb is not None:
1988
 
                _files_pb.update("fetch texts", count, num_file_ids)
 
2035
            if pb is not None:
 
2036
                pb.update("fetch texts", count, num_file_ids)
1989
2037
            count += 1
1990
2038
            yield ("file", file_id, altered_versions)
1991
 
        # We're done with the files_pb.  Note that it finished by the caller,
1992
 
        # just as it was created by the caller.
1993
 
        del _files_pb
1994
2039
 
 
2040
    def _find_non_file_keys_to_fetch(self, revision_ids):
1995
2041
        # inventory
1996
2042
        yield ("inventory", None, revision_ids)
1997
2043
 
2436
2482
    """
2437
2483
    repository.start_write_group()
2438
2484
    try:
 
2485
        inventory_cache = lru_cache.LRUCache(10)
2439
2486
        for n, (revision, revision_tree, signature) in enumerate(iterable):
2440
 
            _install_revision(repository, revision, revision_tree, signature)
 
2487
            _install_revision(repository, revision, revision_tree, signature,
 
2488
                inventory_cache)
2441
2489
            if pb is not None:
2442
2490
                pb.update('Transferring revisions', n + 1, num_revisions)
2443
2491
    except:
2447
2495
        repository.commit_write_group()
2448
2496
 
2449
2497
 
2450
 
def _install_revision(repository, rev, revision_tree, signature):
 
2498
def _install_revision(repository, rev, revision_tree, signature,
 
2499
    inventory_cache):
2451
2500
    """Install all revision data into a repository."""
2452
2501
    present_parents = []
2453
2502
    parent_trees = {}
2490
2539
        repository.texts.add_lines(text_key, text_parents, lines)
2491
2540
    try:
2492
2541
        # install the inventory
2493
 
        repository.add_inventory(rev.revision_id, inv, present_parents)
 
2542
        if repository._format._commit_inv_deltas and len(rev.parent_ids):
 
2543
            # Cache this inventory
 
2544
            inventory_cache[rev.revision_id] = inv
 
2545
            try:
 
2546
                basis_inv = inventory_cache[rev.parent_ids[0]]
 
2547
            except KeyError:
 
2548
                repository.add_inventory(rev.revision_id, inv, present_parents)
 
2549
            else:
 
2550
                delta = inv._make_delta(basis_inv)
 
2551
                repository.add_inventory_by_delta(rev.parent_ids[0], delta,
 
2552
                    rev.revision_id, present_parents)
 
2553
        else:
 
2554
            repository.add_inventory(rev.revision_id, inv, present_parents)
2494
2555
    except errors.RevisionAlreadyPresent:
2495
2556
        pass
2496
2557
    if signature is not None:
2603
2664
    # Can this repository be given external locations to lookup additional
2604
2665
    # data. Set to True or False in derived classes.
2605
2666
    supports_external_lookups = None
 
2667
    # Does this format support CHK bytestring lookups. Set to True or False in
 
2668
    # derived classes.
 
2669
    supports_chks = None
 
2670
    # Should commit add an inventory, or an inventory delta to the repository.
 
2671
    _commit_inv_deltas = True
2606
2672
    # What order should fetch operations request streams in?
2607
2673
    # The default is unordered as that is the cheapest for an origin to
2608
2674
    # provide.
2888
2954
    'bzrlib.repofmt.pack_repo',
2889
2955
    'RepositoryFormatPackDevelopment2Subtree',
2890
2956
    )
 
2957
# 1.9->1.110 go below here
 
2958
format_registry.register_lazy(
 
2959
    # merge-bbc-dev4-to-bzr.dev
 
2960
    "Bazaar development format 5 (needs bzr.dev from before 1.13)\n",
 
2961
    'bzrlib.repofmt.pack_repo',
 
2962
    'RepositoryFormatPackDevelopment5',
 
2963
    )
 
2964
format_registry.register_lazy(
 
2965
    # merge-bbc-dev4-to-bzr.dev
 
2966
    ("Bazaar development format 5 with subtree support"
 
2967
     " (needs bzr.dev from before 1.13)\n"),
 
2968
    'bzrlib.repofmt.pack_repo',
 
2969
    'RepositoryFormatPackDevelopment5Subtree',
 
2970
    )
 
2971
format_registry.register_lazy(
 
2972
    # merge-bbc-dev4-to-bzr.dev
 
2973
    ('Bazaar development format 5 hash 16'
 
2974
     ' (needs bzr.dev from before 1.13)\n'),
 
2975
    'bzrlib.repofmt.pack_repo',
 
2976
    'RepositoryFormatPackDevelopment5Hash16',
 
2977
    )
 
2978
format_registry.register_lazy(
 
2979
    # merge-bbc-dev4-to-bzr.dev
 
2980
    ('Bazaar development format 5 hash 255'
 
2981
     ' (needs bzr.dev from before 1.13)\n'),
 
2982
    'bzrlib.repofmt.pack_repo',
 
2983
    'RepositoryFormatPackDevelopment5Hash255',
 
2984
    )
 
2985
# XXX: This format is scheduled for termination
 
2986
# format_registry.register_lazy(
 
2987
#     'Bazaar development format - btree+gc (needs bzr.dev from 1.13)\n',
 
2988
#     'bzrlib.repofmt.groupcompress_repo',
 
2989
#     'RepositoryFormatPackGCPlain',
 
2990
#     )
 
2991
format_registry.register_lazy(
 
2992
    'Bazaar development format - hash16chk+gc rich-root (needs bzr.dev from 1.13)\n',
 
2993
    'bzrlib.repofmt.groupcompress_repo',
 
2994
    'RepositoryFormatPackGCCHK16',
 
2995
    )
 
2996
format_registry.register_lazy(
 
2997
    'Bazaar development format - hash255chk+gc rich-root (needs bzr.dev from 1.13)\n',
 
2998
    'bzrlib.repofmt.groupcompress_repo',
 
2999
    'RepositoryFormatPackGCCHK255',
 
3000
    )
 
3001
format_registry.register_lazy(
 
3002
    'Bazaar development format - hash255chk+gc rich-root bigpage (needs bzr.dev from 1.13)\n',
 
3003
    'bzrlib.repofmt.groupcompress_repo',
 
3004
    'RepositoryFormatPackGCCHK255Big',
 
3005
    )
2891
3006
 
2892
3007
 
2893
3008
class InterRepository(InterObject):
3257
3372
        We don't test for the stores being of specific types because that
3258
3373
        could lead to confusing results, and there is no need to be
3259
3374
        overly general.
 
3375
 
 
3376
        Do not support CHK based repositories at this point.
3260
3377
        """
3261
3378
        from bzrlib.repofmt.pack_repo import RepositoryFormatPack
 
3379
        # XXX: This format is scheduled for termination
 
3380
        # from bzrlib.repofmt.groupcompress_repo import (
 
3381
        #     RepositoryFormatPackGCPlain,
 
3382
        #     )
3262
3383
        try:
3263
3384
            are_packs = (isinstance(source._format, RepositoryFormatPack) and
3264
3385
                isinstance(target._format, RepositoryFormatPack))
3265
3386
        except AttributeError:
3266
3387
            return False
3267
 
        return are_packs and InterRepository._same_model(source, target)
 
3388
        if not are_packs:
 
3389
            return False
 
3390
        # if (isinstance(source._format, RepositoryFormatPackGCPlain)
 
3391
        #     or isinstance(target._format, RepositoryFormatPackGCPlain)):
 
3392
        #     return False
 
3393
        return (InterRepository._same_model(source, target) and
 
3394
            not source._format.supports_chks)
3268
3395
 
3269
3396
    @needs_write_lock
3270
3397
    def fetch(self, revision_id=None, pb=None, find_ghosts=False,
3385
3512
    @staticmethod
3386
3513
    def is_compatible(source, target):
3387
3514
        """Be compatible with Knit2 source and Knit3 target"""
3388
 
        if source.supports_rich_root() != target.supports_rich_root():
3389
 
            return False
 
3515
        # XXX: What do we need to do to support fetching them?
 
3516
        # if source.supports_rich_root() != target.supports_rich_root():
 
3517
        #     return False
3390
3518
        # Ideally, we'd support fetching if the source had no tree references
3391
3519
        # even if it supported them...
3392
 
        if (getattr(source, '_format.supports_tree_reference', False) and
3393
 
            not getattr(target, '_format.supports_tree_reference', False)):
3394
 
            return False
 
3520
        # XXX: What do we need to do to support fetching them?
 
3521
        # if (getattr(source._format, 'supports_tree_reference', False) and
 
3522
        #     not getattr(target._format, 'supports_tree_reference', False)):
 
3523
        #    return False
3395
3524
        return True
3396
3525
 
3397
3526
    def _get_delta_for_revision(self, tree, parent_ids, basis_id, cache):
3426
3555
        # Walk though all revisions; get inventory deltas, copy referenced
3427
3556
        # texts that delta references, insert the delta, revision and
3428
3557
        # signature.
 
3558
        root_keys_to_create = set()
3429
3559
        text_keys = set()
3430
3560
        pending_deltas = []
3431
3561
        pending_revisions = []
3432
3562
        parent_map = self.source.get_parent_map(revision_ids)
 
3563
        # NB: This fails with dubious inventory data (when inv A has rev OLD
 
3564
        # for file F, and in B, after A, has rev A for file F) when A and B are
 
3565
        # in different groups.
3433
3566
        for tree in self.source.revision_trees(revision_ids):
3434
3567
            current_revision_id = tree.get_revision_id()
3435
3568
            parent_ids = parent_map.get(current_revision_id, ())
3436
3569
            basis_id, delta = self._get_delta_for_revision(tree, parent_ids,
3437
3570
                                                           basis_id, cache)
 
3571
            if self._converting_to_rich_root:
 
3572
                self._revision_id_to_root_id[current_revision_id] = \
 
3573
                    tree.get_root_id()
3438
3574
            # Find text entries that need to be copied
3439
3575
            for old_path, new_path, file_id, entry in delta:
3440
3576
                if new_path is not None:
3441
 
                    if not (new_path or self.target.supports_rich_root()):
3442
 
                        # We don't copy the text for the root node unless the
3443
 
                        # target supports_rich_root.
3444
 
                        continue
 
3577
                    if not new_path:
 
3578
                        # This is the root
 
3579
                        if not self.target.supports_rich_root():
 
3580
                            # The target doesn't support rich root, so we don't
 
3581
                            # copy
 
3582
                            continue
 
3583
                        if self._converting_to_rich_root:
 
3584
                            # This can't be copied normally, we have to insert
 
3585
                            # it specially
 
3586
                            root_keys_to_create.add((file_id, entry.revision))
 
3587
                            continue
3445
3588
                    text_keys.add((file_id, entry.revision))
3446
3589
            revision = self.source.get_revision(current_revision_id)
3447
3590
            pending_deltas.append((basis_id, delta,
3452
3595
        # Copy file texts
3453
3596
        from_texts = self.source.texts
3454
3597
        to_texts = self.target.texts
 
3598
        if root_keys_to_create:
 
3599
            NULL_REVISION = _mod_revision.NULL_REVISION
 
3600
            def _get_parent_keys(root_key):
 
3601
                root_id, rev_id = root_key
 
3602
                # Include direct parents of the revision, but only if they used
 
3603
                # the same root_id.
 
3604
                parent_keys = tuple([(root_id, parent_id)
 
3605
                    for parent_id in parent_map[rev_id]
 
3606
                    if parent_id != NULL_REVISION and
 
3607
                        self._revision_id_to_root_id.get(parent_id, root_id) == root_id])
 
3608
                return parent_keys
 
3609
            def new_root_data_stream():
 
3610
                for root_key in root_keys_to_create:
 
3611
                    parent_keys = _get_parent_keys(root_key)
 
3612
                    yield versionedfile.FulltextContentFactory(root_key,
 
3613
                        parent_keys, None, '')
 
3614
            to_texts.insert_record_stream(new_root_data_stream())
3455
3615
        to_texts.insert_record_stream(from_texts.get_record_stream(
3456
3616
            text_keys, self.target._format._fetch_order,
3457
3617
            not self.target._format._fetch_uses_deltas))
3504
3664
        """See InterRepository.fetch()."""
3505
3665
        if fetch_spec is not None:
3506
3666
            raise AssertionError("Not implemented yet...")
 
3667
        if (not self.source.supports_rich_root()
 
3668
            and self.target.supports_rich_root()):
 
3669
            self._converting_to_rich_root = True
 
3670
            self._revision_id_to_root_id = {}
 
3671
        else:
 
3672
            self._converting_to_rich_root = False
3507
3673
        revision_ids = self.target.search_missing_revision_ids(self.source,
3508
3674
            revision_id, find_ghosts=find_ghosts).get_keys()
3509
3675
        if not revision_ids:
3510
3676
            return 0, 0
3511
3677
        revision_ids = tsort.topo_sort(
3512
3678
            self.source.get_graph().get_parent_map(revision_ids))
 
3679
        if not revision_ids:
 
3680
            return 0, 0
 
3681
        # Walk though all revisions; get inventory deltas, copy referenced
 
3682
        # texts that delta references, insert the delta, revision and
 
3683
        # signature.
 
3684
        first_rev = self.source.get_revision(revision_ids[0])
3513
3685
        if pb is None:
3514
3686
            my_pb = ui.ui_factory.nested_progress_bar()
3515
3687
            pb = my_pb
3760
3932
                else:
3761
3933
                    self._extract_and_insert_inventories(
3762
3934
                        substream, src_serializer)
 
3935
            elif substream_type == 'chk_bytes':
 
3936
                # XXX: This doesn't support conversions, as it assumes the
 
3937
                #      conversion was done in the fetch code.
 
3938
                self.target_repo.chk_bytes.insert_record_stream(substream)
3763
3939
            elif substream_type == 'revisions':
3764
3940
                # This may fallback to extract-and-insert more often than
3765
3941
                # required if the serializers are different only in terms of
3914
4090
                # know for unselected inventories whether all their required
3915
4091
                # texts are present in the other repository - it could be
3916
4092
                # corrupt.
3917
 
                yield ('inventories', from_weave.get_record_stream(
3918
 
                    [(rev_id,) for rev_id in revs],
3919
 
                    self.inventory_fetch_order(),
3920
 
                    not self.delta_on_metadata()))
 
4093
                for info in self._get_inventory_stream(revs):
 
4094
                    yield info
3921
4095
            elif knit_kind == "signatures":
3922
4096
                # Nothing to do here; this will be taken care of when
3923
4097
                # _fetch_revision_texts happens.
3964
4138
        return (not self.from_repository._format.rich_root_data and
3965
4139
            self.to_format.rich_root_data)
3966
4140
 
 
4141
    def _get_inventory_stream(self, revision_ids):
 
4142
        from_format = self.from_repository._format
 
4143
        if (from_format.supports_chks and self.to_format.supports_chks
 
4144
            and (from_format._serializer == self.to_format._serializer)):
 
4145
            # Both sides support chks, and they use the same serializer, so it
 
4146
            # is safe to transmit the chk pages and inventory pages across
 
4147
            # as-is.
 
4148
            return self._get_chk_inventory_stream(revision_ids)
 
4149
        elif (not from_format.supports_chks):
 
4150
            # Source repository doesn't support chks. So we can transmit the
 
4151
            # inventories 'as-is' and either they are just accepted on the
 
4152
            # target, or the Sink will properly convert it.
 
4153
            return self._get_simple_inventory_stream(revision_ids)
 
4154
        else:
 
4155
            # XXX: Hack to make not-chk->chk fetch: copy the inventories as
 
4156
            #      inventories. Note that this should probably be done somehow
 
4157
            #      as part of bzrlib.repository.StreamSink. Except JAM couldn't
 
4158
            #      figure out how a non-chk repository could possibly handle
 
4159
            #      deserializing an inventory stream from a chk repo, as it
 
4160
            #      doesn't have a way to understand individual pages.
 
4161
            return self._get_convertable_inventory_stream(revision_ids)
 
4162
 
 
4163
    def _get_simple_inventory_stream(self, revision_ids):
 
4164
        from_weave = self.from_repository.inventories
 
4165
        yield ('inventories', from_weave.get_record_stream(
 
4166
            [(rev_id,) for rev_id in revision_ids],
 
4167
            self.inventory_fetch_order(),
 
4168
            not self.delta_on_metadata()))
 
4169
 
 
4170
    def _get_chk_inventory_stream(self, revision_ids):
 
4171
        """Fetch the inventory texts, along with the associated chk maps."""
 
4172
        # We want an inventory outside of the search set, so that we can filter
 
4173
        # out uninteresting chk pages. For now we use
 
4174
        # _find_revision_outside_set, but if we had a Search with cut_revs, we
 
4175
        # could use that instead.
 
4176
        start_rev_id = self.from_repository._find_revision_outside_set(
 
4177
                            revision_ids)
 
4178
        start_rev_key = (start_rev_id,)
 
4179
        inv_keys_to_fetch = [(rev_id,) for rev_id in revision_ids]
 
4180
        if start_rev_id != _mod_revision.NULL_REVISION:
 
4181
            inv_keys_to_fetch.append((start_rev_id,))
 
4182
        # Any repo that supports chk_bytes must also support out-of-order
 
4183
        # insertion. At least, that is how we expect it to work
 
4184
        # We use get_record_stream instead of iter_inventories because we want
 
4185
        # to be able to insert the stream as well. We could instead fetch
 
4186
        # allowing deltas, and then iter_inventories, but we don't know whether
 
4187
        # source or target is more 'local' anway.
 
4188
        inv_stream = self.from_repository.inventories.get_record_stream(
 
4189
            inv_keys_to_fetch, 'unordered',
 
4190
            True) # We need them as full-texts so we can find their references
 
4191
        uninteresting_chk_roots = set()
 
4192
        interesting_chk_roots = set()
 
4193
        def filter_inv_stream(inv_stream):
 
4194
            for idx, record in enumerate(inv_stream):
 
4195
                ### child_pb.update('fetch inv', idx, len(inv_keys_to_fetch))
 
4196
                bytes = record.get_bytes_as('fulltext')
 
4197
                chk_inv = inventory.CHKInventory.deserialise(
 
4198
                    self.from_repository.chk_bytes, bytes, record.key)
 
4199
                if record.key == start_rev_key:
 
4200
                    uninteresting_chk_roots.add(chk_inv.id_to_entry.key())
 
4201
                    p_id_map = chk_inv.parent_id_basename_to_file_id
 
4202
                    if p_id_map is not None:
 
4203
                        uninteresting_chk_roots.add(p_id_map.key())
 
4204
                else:
 
4205
                    yield record
 
4206
                    interesting_chk_roots.add(chk_inv.id_to_entry.key())
 
4207
                    p_id_map = chk_inv.parent_id_basename_to_file_id
 
4208
                    if p_id_map is not None:
 
4209
                        interesting_chk_roots.add(p_id_map.key())
 
4210
        ### pb.update('fetch inventory', 0, 2)
 
4211
        yield ('inventories', filter_inv_stream(inv_stream))
 
4212
        # Now that we have worked out all of the interesting root nodes, grab
 
4213
        # all of the interesting pages and insert them
 
4214
        ### pb.update('fetch inventory', 1, 2)
 
4215
        interesting = chk_map.iter_interesting_nodes(
 
4216
            self.from_repository.chk_bytes, interesting_chk_roots,
 
4217
            uninteresting_chk_roots)
 
4218
        def to_stream_adapter():
 
4219
            """Adapt the iter_interesting_nodes result to a single stream.
 
4220
 
 
4221
            iter_interesting_nodes returns records as it processes them, which
 
4222
            can be in batches. But we only want a single stream to be inserted.
 
4223
            """
 
4224
            for record, items in interesting:
 
4225
                for value in record.itervalues():
 
4226
                    yield value
 
4227
        # XXX: We could instead call get_record_stream(records.keys())
 
4228
        #      ATM, this will always insert the records as fulltexts, and
 
4229
        #      requires that you can hang on to records once you have gone
 
4230
        #      on to the next one. Further, it causes the target to
 
4231
        #      recompress the data. Testing shows it to be faster than
 
4232
        #      requesting the records again, though.
 
4233
        yield ('chk_bytes', to_stream_adapter())
 
4234
        ### pb.update('fetch inventory', 2, 2)
 
4235
 
 
4236
    def _get_convertable_inventory_stream(self, revision_ids):
 
4237
        # XXX: One of source or target is using chks, and they don't have
 
4238
        #      compatible serializations. The StreamSink code expects to be
 
4239
        #      able to convert on the target, so we need to put
 
4240
        #      bytes-on-the-wire that can be converted
 
4241
        yield ('inventories', self._stream_invs_as_fulltexts(revision_ids))
 
4242
 
 
4243
    def _stream_invs_as_fulltexts(self, revision_ids):
 
4244
        from_repo = self.from_repository
 
4245
        from_serializer = from_repo._format._serializer
 
4246
        revision_keys = [(rev_id,) for rev_id in revision_ids]
 
4247
        parent_map = from_repo.inventories.get_parent_map(revision_keys)
 
4248
        for inv in self.from_repository.iter_inventories(revision_ids):
 
4249
            # XXX: This is a bit hackish, but it works. Basically,
 
4250
            #      CHKSerializer 'accidentally' supports
 
4251
            #      read/write_inventory_to_string, even though that is never
 
4252
            #      the format that is stored on disk. It *does* give us a
 
4253
            #      single string representation for an inventory, so live with
 
4254
            #      it for now.
 
4255
            #      This would be far better if we had a 'serialized inventory
 
4256
            #      delta' form. Then we could use 'inventory._make_delta', and
 
4257
            #      transmit that. This would both be faster to generate, and
 
4258
            #      result in fewer bytes-on-the-wire.
 
4259
            as_bytes = from_serializer.write_inventory_to_string(inv)
 
4260
            key = (inv.revision_id,)
 
4261
            parent_keys = parent_map.get(key, ())
 
4262
            yield versionedfile.FulltextContentFactory(
 
4263
                key, parent_keys, None, as_bytes)
 
4264