~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/repository.py

  • Committer: Vincent Ladeuil
  • Date: 2010-02-11 09:27:55 UTC
  • mfrom: (5017.3.46 test-servers)
  • mto: This revision was merged to the branch mainline in revision 5030.
  • Revision ID: v.ladeuil+lp@free.fr-20100211092755-3vvu4vbwiwjjte3s
Move tests servers from bzrlib.transport to bzrlib.tests.test_server

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
 
1
# Copyright (C) 2005-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
24
24
    bzrdir,
25
25
    check,
26
26
    chk_map,
 
27
    config,
27
28
    debug,
28
29
    errors,
 
30
    fetch as _mod_fetch,
29
31
    fifo_cache,
30
32
    generate_ids,
31
33
    gpg,
32
34
    graph,
33
35
    inventory,
 
36
    inventory_delta,
34
37
    lazy_regex,
35
38
    lockable_files,
36
39
    lockdir,
38
41
    osutils,
39
42
    revision as _mod_revision,
40
43
    symbol_versioning,
 
44
    trace,
41
45
    tsort,
42
46
    ui,
43
47
    versionedfile,
48
52
from bzrlib.testament import Testament
49
53
""")
50
54
 
51
 
from bzrlib.decorators import needs_read_lock, needs_write_lock
 
55
from bzrlib.decorators import needs_read_lock, needs_write_lock, only_raises
52
56
from bzrlib.inter import InterObject
53
57
from bzrlib.inventory import (
54
58
    Inventory,
56
60
    ROOT_ID,
57
61
    entry_factory,
58
62
    )
 
63
from bzrlib.lock import _RelockDebugMixin
59
64
from bzrlib import registry
60
65
from bzrlib.trace import (
61
66
    log_exception_quietly, note, mutter, mutter_callsite, warning)
204
209
            # an inventory delta was accumulated without creating a new
205
210
            # inventory.
206
211
            basis_id = self.basis_delta_revision
207
 
            self.inv_sha1 = self.repository.add_inventory_by_delta(
 
212
            # We ignore the 'inventory' returned by add_inventory_by_delta
 
213
            # because self.new_inventory is used to hint to the rest of the
 
214
            # system what code path was taken
 
215
            self.inv_sha1, _ = self.repository.add_inventory_by_delta(
208
216
                basis_id, self._basis_delta, self._new_revision_id,
209
217
                self.parents)
210
218
        else:
464
472
            if content_summary[2] is None:
465
473
                raise ValueError("Files must not have executable = None")
466
474
            if not store:
467
 
                if (# if the file length changed we have to store:
468
 
                    parent_entry.text_size != content_summary[1] or
469
 
                    # if the exec bit has changed we have to store:
 
475
                # We can't trust a check of the file length because of content
 
476
                # filtering...
 
477
                if (# if the exec bit has changed we have to store:
470
478
                    parent_entry.executable != content_summary[2]):
471
479
                    store = True
472
480
                elif parent_entry.text_sha1 == content_summary[3]:
539
547
                ie.revision = parent_entry.revision
540
548
                return self._get_delta(ie, basis_inv, path), False, None
541
549
            ie.reference_revision = content_summary[3]
 
550
            if ie.reference_revision is None:
 
551
                raise AssertionError("invalid content_summary for nested tree: %r"
 
552
                    % (content_summary,))
542
553
            self._add_text_to_weave(ie.file_id, '', heads, None)
543
554
        else:
544
555
            raise NotImplementedError('unknown kind')
806
817
                seen_root = True
807
818
        self.new_inventory = None
808
819
        if len(inv_delta):
 
820
            # This should perhaps be guarded by a check that the basis we
 
821
            # commit against is the basis for the commit and if not do a delta
 
822
            # against the basis.
809
823
            self._any_changes = True
810
824
        if not seen_root:
811
825
            # housekeeping root entry changes do not affect no-change commits.
849
863
# Repositories
850
864
 
851
865
 
852
 
class Repository(object):
 
866
class Repository(_RelockDebugMixin):
853
867
    """Repository holding history for one or more branches.
854
868
 
855
869
    The repository holds and retrieves historical information including
924
938
        """
925
939
        if self._write_group is not self.get_transaction():
926
940
            # has an unlock or relock occured ?
 
941
            if suppress_errors:
 
942
                mutter(
 
943
                '(suppressed) mismatched lock context and write group. %r, %r',
 
944
                self._write_group, self.get_transaction())
 
945
                return
927
946
            raise errors.BzrError(
928
947
                'mismatched lock context and write group. %r, %r' %
929
948
                (self._write_group, self.get_transaction()))
1063
1082
        check_content=True):
1064
1083
        """Store lines in inv_vf and return the sha1 of the inventory."""
1065
1084
        parents = [(parent,) for parent in parents]
1066
 
        return self.inventories.add_lines((revision_id,), parents, lines,
 
1085
        result = self.inventories.add_lines((revision_id,), parents, lines,
1067
1086
            check_content=check_content)[0]
 
1087
        self.inventories._access.flush()
 
1088
        return result
1068
1089
 
1069
1090
    def add_revision(self, revision_id, rev, inv=None, config=None):
1070
1091
        """Add rev to the revision store as revision_id.
1208
1229
                    for record in getattr(self, kind).check(keys=keys[kind]):
1209
1230
                        if record.storage_kind == 'absent':
1210
1231
                            checker._report_items.append(
1211
 
                                'Missing inventory {%s}' % (record.key,))
 
1232
                                'Missing %s {%s}' % (kind, record.key,))
1212
1233
                        else:
1213
1234
                            last_object = self._check_record(kind, record,
1214
1235
                                checker, last_object, current_keys[(kind,) + record.key])
1219
1240
        """Check a single text from this repository."""
1220
1241
        if kind == 'inventories':
1221
1242
            rev_id = record.key[0]
1222
 
            inv = self.deserialise_inventory(rev_id,
 
1243
            inv = self._deserialise_inventory(rev_id,
1223
1244
                record.get_bytes_as('fulltext'))
1224
1245
            if last_object is not None:
1225
1246
                delta = inv._make_delta(last_object)
1285
1306
        self._reconcile_does_inventory_gc = True
1286
1307
        self._reconcile_fixes_text_parents = False
1287
1308
        self._reconcile_backsup_inventory = True
1288
 
        # not right yet - should be more semantically clear ?
1289
 
        #
1290
 
        # TODO: make sure to construct the right store classes, etc, depending
1291
 
        # on whether escaping is required.
1292
 
        self._warn_if_deprecated()
1293
1309
        self._write_group = None
1294
1310
        # Additional places to query for data.
1295
1311
        self._fallback_repositories = []
1296
1312
        # An InventoryEntry cache, used during deserialization
1297
1313
        self._inventory_entry_cache = fifo_cache.FIFOCache(10*1024)
 
1314
        # Is it safe to return inventory entries directly from the entry cache,
 
1315
        # rather copying them?
 
1316
        self._safe_to_return_from_cache = False
1298
1317
 
1299
1318
    def __repr__(self):
1300
1319
        if self._fallback_repositories:
1367
1386
        locked = self.is_locked()
1368
1387
        result = self.control_files.lock_write(token=token)
1369
1388
        if not locked:
 
1389
            self._warn_if_deprecated()
 
1390
            self._note_lock('w')
1370
1391
            for repo in self._fallback_repositories:
1371
1392
                # Writes don't affect fallback repos
1372
1393
                repo.lock_read()
1377
1398
        locked = self.is_locked()
1378
1399
        self.control_files.lock_read()
1379
1400
        if not locked:
 
1401
            self._warn_if_deprecated()
 
1402
            self._note_lock('r')
1380
1403
            for repo in self._fallback_repositories:
1381
1404
                repo.lock_read()
1382
1405
            self._refresh_data()
1529
1552
        """Commit the contents accrued within the current write group.
1530
1553
 
1531
1554
        :seealso: start_write_group.
 
1555
        
 
1556
        :return: it may return an opaque hint that can be passed to 'pack'.
1532
1557
        """
1533
1558
        if self._write_group is not self.get_transaction():
1534
1559
            # has an unlock or relock occured ?
1588
1613
        # but at the moment we're only checking for texts referenced by
1589
1614
        # inventories at the graph's edge.
1590
1615
        key_deps = self.revisions._index._key_dependencies
1591
 
        key_deps.add_keys(present_inventories)
 
1616
        key_deps.satisfy_refs_for_keys(present_inventories)
1592
1617
        referrers = frozenset(r[0] for r in key_deps.get_referrers())
1593
1618
        file_ids = self.fileids_altered_by_revision_ids(referrers)
1594
1619
        missing_texts = set()
1695
1720
        :param revprops: Optional dictionary of revision properties.
1696
1721
        :param revision_id: Optional revision id.
1697
1722
        """
 
1723
        if self._fallback_repositories:
 
1724
            raise errors.BzrError("Cannot commit from a lightweight checkout "
 
1725
                "to a stacked branch. See "
 
1726
                "https://bugs.launchpad.net/bzr/+bug/375013 for details.")
1698
1727
        result = self._commit_builder_class(self, parents, config,
1699
1728
            timestamp, timezone, committer, revprops, revision_id)
1700
1729
        self.start_write_group()
1701
1730
        return result
1702
1731
 
 
1732
    @only_raises(errors.LockNotHeld, errors.LockBroken)
1703
1733
    def unlock(self):
1704
1734
        if (self.control_files._lock_count == 1 and
1705
1735
            self.control_files._lock_mode == 'w'):
2135
2165
        """
2136
2166
        selected_keys = set((revid,) for revid in revision_ids)
2137
2167
        w = _inv_weave or self.inventories
2138
 
        pb = ui.ui_factory.nested_progress_bar()
2139
 
        try:
2140
 
            return self._find_file_ids_from_xml_inventory_lines(
2141
 
                w.iter_lines_added_or_present_in_keys(
2142
 
                    selected_keys, pb=pb),
2143
 
                selected_keys)
2144
 
        finally:
2145
 
            pb.finished()
 
2168
        return self._find_file_ids_from_xml_inventory_lines(
 
2169
            w.iter_lines_added_or_present_in_keys(
 
2170
                selected_keys, pb=None),
 
2171
            selected_keys)
2146
2172
 
2147
2173
    def iter_files_bytes(self, desired_files):
2148
2174
        """Iterate through file versions.
2309
2335
        num_file_ids = len(file_ids)
2310
2336
        for file_id, altered_versions in file_ids.iteritems():
2311
2337
            if pb is not None:
2312
 
                pb.update("fetch texts", count, num_file_ids)
 
2338
                pb.update("Fetch texts", count, num_file_ids)
2313
2339
            count += 1
2314
2340
            yield ("file", file_id, altered_versions)
2315
2341
 
2336
2362
        """Get Inventory object by revision id."""
2337
2363
        return self.iter_inventories([revision_id]).next()
2338
2364
 
2339
 
    def iter_inventories(self, revision_ids):
 
2365
    def iter_inventories(self, revision_ids, ordering=None):
2340
2366
        """Get many inventories by revision_ids.
2341
2367
 
2342
2368
        This will buffer some or all of the texts used in constructing the
2344
2370
        time.
2345
2371
 
2346
2372
        :param revision_ids: The expected revision ids of the inventories.
 
2373
        :param ordering: optional ordering, e.g. 'topological'.  If not
 
2374
            specified, the order of revision_ids will be preserved (by
 
2375
            buffering if necessary).
2347
2376
        :return: An iterator of inventories.
2348
2377
        """
2349
2378
        if ((None in revision_ids)
2350
2379
            or (_mod_revision.NULL_REVISION in revision_ids)):
2351
2380
            raise ValueError('cannot get null revision inventory')
2352
 
        return self._iter_inventories(revision_ids)
 
2381
        return self._iter_inventories(revision_ids, ordering)
2353
2382
 
2354
 
    def _iter_inventories(self, revision_ids):
 
2383
    def _iter_inventories(self, revision_ids, ordering):
2355
2384
        """single-document based inventory iteration."""
2356
 
        for text, revision_id in self._iter_inventory_xmls(revision_ids):
2357
 
            yield self.deserialise_inventory(revision_id, text)
 
2385
        inv_xmls = self._iter_inventory_xmls(revision_ids, ordering)
 
2386
        for text, revision_id in inv_xmls:
 
2387
            yield self._deserialise_inventory(revision_id, text)
2358
2388
 
2359
 
    def _iter_inventory_xmls(self, revision_ids):
 
2389
    def _iter_inventory_xmls(self, revision_ids, ordering):
 
2390
        if ordering is None:
 
2391
            order_as_requested = True
 
2392
            ordering = 'unordered'
 
2393
        else:
 
2394
            order_as_requested = False
2360
2395
        keys = [(revision_id,) for revision_id in revision_ids]
2361
 
        stream = self.inventories.get_record_stream(keys, 'unordered', True)
 
2396
        if not keys:
 
2397
            return
 
2398
        if order_as_requested:
 
2399
            key_iter = iter(keys)
 
2400
            next_key = key_iter.next()
 
2401
        stream = self.inventories.get_record_stream(keys, ordering, True)
2362
2402
        text_chunks = {}
2363
2403
        for record in stream:
2364
2404
            if record.storage_kind != 'absent':
2365
 
                text_chunks[record.key] = record.get_bytes_as('chunked')
 
2405
                chunks = record.get_bytes_as('chunked')
 
2406
                if order_as_requested:
 
2407
                    text_chunks[record.key] = chunks
 
2408
                else:
 
2409
                    yield ''.join(chunks), record.key[-1]
2366
2410
            else:
2367
2411
                raise errors.NoSuchRevision(self, record.key)
2368
 
        for key in keys:
2369
 
            chunks = text_chunks.pop(key)
2370
 
            yield ''.join(chunks), key[-1]
 
2412
            if order_as_requested:
 
2413
                # Yield as many results as we can while preserving order.
 
2414
                while next_key in text_chunks:
 
2415
                    chunks = text_chunks.pop(next_key)
 
2416
                    yield ''.join(chunks), next_key[-1]
 
2417
                    try:
 
2418
                        next_key = key_iter.next()
 
2419
                    except StopIteration:
 
2420
                        # We still want to fully consume the get_record_stream,
 
2421
                        # just in case it is not actually finished at this point
 
2422
                        next_key = None
 
2423
                        break
2371
2424
 
2372
 
    def deserialise_inventory(self, revision_id, xml):
 
2425
    def _deserialise_inventory(self, revision_id, xml):
2373
2426
        """Transform the xml into an inventory object.
2374
2427
 
2375
2428
        :param revision_id: The expected revision id of the inventory.
2376
2429
        :param xml: A serialised inventory.
2377
2430
        """
2378
2431
        result = self._serializer.read_inventory_from_string(xml, revision_id,
2379
 
                    entry_cache=self._inventory_entry_cache)
 
2432
                    entry_cache=self._inventory_entry_cache,
 
2433
                    return_from_cache=self._safe_to_return_from_cache)
2380
2434
        if result.revision_id != revision_id:
2381
2435
            raise AssertionError('revision id mismatch %s != %s' % (
2382
2436
                result.revision_id, revision_id))
2383
2437
        return result
2384
2438
 
2385
 
    def serialise_inventory(self, inv):
 
2439
    def _serialise_inventory(self, inv):
2386
2440
        return self._serializer.write_inventory_to_string(inv)
2387
2441
 
2388
2442
    def _serialise_inventory_to_lines(self, inv):
2392
2446
        return self._serializer.format_num
2393
2447
 
2394
2448
    @needs_read_lock
2395
 
    def get_inventory_xml(self, revision_id):
2396
 
        """Get inventory XML as a file object."""
2397
 
        texts = self._iter_inventory_xmls([revision_id])
 
2449
    def _get_inventory_xml(self, revision_id):
 
2450
        """Get serialized inventory as a string."""
 
2451
        texts = self._iter_inventory_xmls([revision_id], 'unordered')
2398
2452
        try:
2399
2453
            text, revision_id = texts.next()
2400
2454
        except StopIteration:
2401
2455
            raise errors.HistoryMissing(self, 'inventory', revision_id)
2402
2456
        return text
2403
2457
 
2404
 
    @needs_read_lock
2405
 
    def get_inventory_sha1(self, revision_id):
2406
 
        """Return the sha1 hash of the inventory entry
2407
 
        """
2408
 
        return self.get_revision(revision_id).inventory_sha1
2409
 
 
2410
2458
    def get_rev_id_for_revno(self, revno, known_pair):
2411
2459
        """Return the revision id of a revno, given a later (revno, revid)
2412
2460
        pair in the same history.
2614
2662
        for ((revision_id,), parent_keys) in \
2615
2663
                self.revisions.get_parent_map(query_keys).iteritems():
2616
2664
            if parent_keys:
2617
 
                result[revision_id] = tuple(parent_revid
2618
 
                    for (parent_revid,) in parent_keys)
 
2665
                result[revision_id] = tuple([parent_revid
 
2666
                    for (parent_revid,) in parent_keys])
2619
2667
            else:
2620
2668
                result[revision_id] = (_mod_revision.NULL_REVISION,)
2621
2669
        return result
2723
2771
        result.check(callback_refs)
2724
2772
        return result
2725
2773
 
2726
 
    def _warn_if_deprecated(self):
 
2774
    def _warn_if_deprecated(self, branch=None):
2727
2775
        global _deprecation_warning_done
2728
2776
        if _deprecation_warning_done:
2729
2777
            return
2730
 
        _deprecation_warning_done = True
2731
 
        warning("Format %s for %s is deprecated - please use 'bzr upgrade' to get better performance"
2732
 
                % (self._format, self.bzrdir.transport.base))
 
2778
        try:
 
2779
            if branch is None:
 
2780
                conf = config.GlobalConfig()
 
2781
            else:
 
2782
                conf = branch.get_config()
 
2783
            if conf.suppress_warning('format_deprecation'):
 
2784
                return
 
2785
            warning("Format %s for %s is deprecated -"
 
2786
                    " please use 'bzr upgrade' to get better performance"
 
2787
                    % (self._format, self.bzrdir.transport.base))
 
2788
        finally:
 
2789
            _deprecation_warning_done = True
2733
2790
 
2734
2791
    def supports_rich_root(self):
2735
2792
        return self._format.rich_root_data
3016
3073
    # help), and for fetching when data won't have come from the same
3017
3074
    # compressor.
3018
3075
    pack_compresses = False
 
3076
    # Does the repository inventory storage understand references to trees?
 
3077
    supports_tree_reference = None
 
3078
    # Is the format experimental ?
 
3079
    experimental = False
3019
3080
 
3020
3081
    def __str__(self):
3021
3082
        return "<%s>" % self.__class__.__name__
3037
3098
        """
3038
3099
        try:
3039
3100
            transport = a_bzrdir.get_repository_transport(None)
3040
 
            format_string = transport.get("format").read()
 
3101
            format_string = transport.get_bytes("format")
3041
3102
            return format_registry.get(format_string)
3042
3103
        except errors.NoSuchFile:
3043
3104
            raise errors.NoRepositoryPresent(a_bzrdir)
3125
3186
        raise NotImplementedError(self.network_name)
3126
3187
 
3127
3188
    def check_conversion_target(self, target_format):
3128
 
        raise NotImplementedError(self.check_conversion_target)
 
3189
        if self.rich_root_data and not target_format.rich_root_data:
 
3190
            raise errors.BadConversionTarget(
 
3191
                'Does not support rich root data.', target_format,
 
3192
                from_format=self)
 
3193
        if (self.supports_tree_reference and 
 
3194
            not getattr(target_format, 'supports_tree_reference', False)):
 
3195
            raise errors.BadConversionTarget(
 
3196
                'Does not support nested trees', target_format,
 
3197
                from_format=self)
3129
3198
 
3130
3199
    def open(self, a_bzrdir, _found=False):
3131
3200
        """Return an instance of this format for the bzrdir a_bzrdir.
3344
3413
 
3345
3414
        :param revision_id: if None all content is copied, if NULL_REVISION no
3346
3415
                            content is copied.
3347
 
        :param pb: optional progress bar to use for progress reports. If not
3348
 
                   provided a default one will be created.
 
3416
        :param pb: ignored.
3349
3417
        :return: None.
3350
3418
        """
3351
 
        from bzrlib.fetch import RepoFetcher
3352
 
        f = RepoFetcher(to_repository=self.target,
 
3419
        ui.ui_factory.warn_experimental_format_fetch(self)
 
3420
        f = _mod_fetch.RepoFetcher(to_repository=self.target,
3353
3421
                               from_repository=self.source,
3354
3422
                               last_revision=revision_id,
3355
3423
                               fetch_spec=fetch_spec,
3356
 
                               pb=pb, find_ghosts=find_ghosts)
 
3424
                               find_ghosts=find_ghosts)
3357
3425
 
3358
3426
    def _walk_to_common_revisions(self, revision_ids):
3359
3427
        """Walk out from revision_ids in source to revisions target has.
3528
3596
                self.target.texts.insert_record_stream(
3529
3597
                    self.source.texts.get_record_stream(
3530
3598
                        self.source.texts.keys(), 'topological', False))
3531
 
                pb.update('copying inventory', 0, 1)
 
3599
                pb.update('Copying inventory', 0, 1)
3532
3600
                self.target.inventories.insert_record_stream(
3533
3601
                    self.source.inventories.get_record_stream(
3534
3602
                        self.source.inventories.keys(), 'topological', False))
3660
3728
        # This is redundant with format.check_conversion_target(), however that
3661
3729
        # raises an exception, and we just want to say "False" as in we won't
3662
3730
        # support converting between these formats.
 
3731
        if 'IDS_never' in debug.debug_flags:
 
3732
            return False
3663
3733
        if source.supports_rich_root() and not target.supports_rich_root():
3664
3734
            return False
3665
3735
        if (source._format.supports_tree_reference
3666
3736
            and not target._format.supports_tree_reference):
3667
3737
            return False
 
3738
        if target._fallback_repositories and target._format.supports_chks:
 
3739
            # IDS doesn't know how to copy CHKs for the parent inventories it
 
3740
            # adds to stacked repos.
 
3741
            return False
 
3742
        if 'IDS_always' in debug.debug_flags:
 
3743
            return True
 
3744
        # Only use this code path for local source and target.  IDS does far
 
3745
        # too much IO (both bandwidth and roundtrips) over a network.
 
3746
        if not source.bzrdir.transport.base.startswith('file:///'):
 
3747
            return False
 
3748
        if not target.bzrdir.transport.base.startswith('file:///'):
 
3749
            return False
3668
3750
        return True
3669
3751
 
3670
 
    def _get_delta_for_revision(self, tree, parent_ids, basis_id, cache):
 
3752
    def _get_trees(self, revision_ids, cache):
 
3753
        possible_trees = []
 
3754
        for rev_id in revision_ids:
 
3755
            if rev_id in cache:
 
3756
                possible_trees.append((rev_id, cache[rev_id]))
 
3757
            else:
 
3758
                # Not cached, but inventory might be present anyway.
 
3759
                try:
 
3760
                    tree = self.source.revision_tree(rev_id)
 
3761
                except errors.NoSuchRevision:
 
3762
                    # Nope, parent is ghost.
 
3763
                    pass
 
3764
                else:
 
3765
                    cache[rev_id] = tree
 
3766
                    possible_trees.append((rev_id, tree))
 
3767
        return possible_trees
 
3768
 
 
3769
    def _get_delta_for_revision(self, tree, parent_ids, possible_trees):
3671
3770
        """Get the best delta and base for this revision.
3672
3771
 
3673
3772
        :return: (basis_id, delta)
3674
3773
        """
3675
 
        possible_trees = [(parent_id, cache[parent_id])
3676
 
                          for parent_id in parent_ids
3677
 
                           if parent_id in cache]
3678
 
        if len(possible_trees) == 0:
3679
 
            # There either aren't any parents, or the parents aren't in the
3680
 
            # cache, so just use the last converted tree
3681
 
            possible_trees.append((basis_id, cache[basis_id]))
3682
3774
        deltas = []
 
3775
        # Generate deltas against each tree, to find the shortest.
 
3776
        texts_possibly_new_in_tree = set()
3683
3777
        for basis_id, basis_tree in possible_trees:
3684
3778
            delta = tree.inventory._make_delta(basis_tree.inventory)
 
3779
            for old_path, new_path, file_id, new_entry in delta:
 
3780
                if new_path is None:
 
3781
                    # This file_id isn't present in the new rev, so we don't
 
3782
                    # care about it.
 
3783
                    continue
 
3784
                if not new_path:
 
3785
                    # Rich roots are handled elsewhere...
 
3786
                    continue
 
3787
                kind = new_entry.kind
 
3788
                if kind != 'directory' and kind != 'file':
 
3789
                    # No text record associated with this inventory entry.
 
3790
                    continue
 
3791
                # This is a directory or file that has changed somehow.
 
3792
                texts_possibly_new_in_tree.add((file_id, new_entry.revision))
3685
3793
            deltas.append((len(delta), basis_id, delta))
3686
3794
        deltas.sort()
3687
3795
        return deltas[0][1:]
3688
3796
 
3689
 
    def _get_parent_keys(self, root_key, parent_map):
3690
 
        """Get the parent keys for a given root id."""
3691
 
        root_id, rev_id = root_key
3692
 
        # Include direct parents of the revision, but only if they used
3693
 
        # the same root_id and are heads.
3694
 
        parent_keys = []
3695
 
        for parent_id in parent_map[rev_id]:
3696
 
            if parent_id == _mod_revision.NULL_REVISION:
3697
 
                continue
3698
 
            if parent_id not in self._revision_id_to_root_id:
3699
 
                # We probably didn't read this revision, go spend the
3700
 
                # extra effort to actually check
3701
 
                try:
3702
 
                    tree = self.source.revision_tree(parent_id)
3703
 
                except errors.NoSuchRevision:
3704
 
                    # Ghost, fill out _revision_id_to_root_id in case we
3705
 
                    # encounter this again.
3706
 
                    # But set parent_root_id to None since we don't really know
3707
 
                    parent_root_id = None
3708
 
                else:
3709
 
                    parent_root_id = tree.get_root_id()
3710
 
                self._revision_id_to_root_id[parent_id] = None
3711
 
            else:
3712
 
                parent_root_id = self._revision_id_to_root_id[parent_id]
3713
 
            if root_id == parent_root_id:
3714
 
                # With stacking we _might_ want to refer to a non-local
3715
 
                # revision, but this code path only applies when we have the
3716
 
                # full content available, so ghosts really are ghosts, not just
3717
 
                # the edge of local data.
3718
 
                parent_keys.append((parent_id,))
3719
 
            else:
3720
 
                # root_id may be in the parent anyway.
3721
 
                try:
3722
 
                    tree = self.source.revision_tree(parent_id)
3723
 
                except errors.NoSuchRevision:
3724
 
                    # ghost, can't refer to it.
3725
 
                    pass
3726
 
                else:
3727
 
                    try:
3728
 
                        parent_keys.append((tree.inventory[root_id].revision,))
3729
 
                    except errors.NoSuchId:
3730
 
                        # not in the tree
3731
 
                        pass
3732
 
        g = graph.Graph(self.source.revisions)
3733
 
        heads = g.heads(parent_keys)
3734
 
        selected_keys = []
3735
 
        for key in parent_keys:
3736
 
            if key in heads and key not in selected_keys:
3737
 
                selected_keys.append(key)
3738
 
        return tuple([(root_id,)+ key for key in selected_keys])
3739
 
 
3740
 
    def _new_root_data_stream(self, root_keys_to_create, parent_map):
3741
 
        for root_key in root_keys_to_create:
3742
 
            parent_keys = self._get_parent_keys(root_key, parent_map)
3743
 
            yield versionedfile.FulltextContentFactory(root_key,
3744
 
                parent_keys, None, '')
3745
 
 
3746
 
    def _fetch_batch(self, revision_ids, basis_id, cache):
 
3797
    def _fetch_parent_invs_for_stacking(self, parent_map, cache):
 
3798
        """Find all parent revisions that are absent, but for which the
 
3799
        inventory is present, and copy those inventories.
 
3800
 
 
3801
        This is necessary to preserve correctness when the source is stacked
 
3802
        without fallbacks configured.  (Note that in cases like upgrade the
 
3803
        source may be not have _fallback_repositories even though it is
 
3804
        stacked.)
 
3805
        """
 
3806
        parent_revs = set()
 
3807
        for parents in parent_map.values():
 
3808
            parent_revs.update(parents)
 
3809
        present_parents = self.source.get_parent_map(parent_revs)
 
3810
        absent_parents = set(parent_revs).difference(present_parents)
 
3811
        parent_invs_keys_for_stacking = self.source.inventories.get_parent_map(
 
3812
            (rev_id,) for rev_id in absent_parents)
 
3813
        parent_inv_ids = [key[-1] for key in parent_invs_keys_for_stacking]
 
3814
        for parent_tree in self.source.revision_trees(parent_inv_ids):
 
3815
            current_revision_id = parent_tree.get_revision_id()
 
3816
            parents_parents_keys = parent_invs_keys_for_stacking[
 
3817
                (current_revision_id,)]
 
3818
            parents_parents = [key[-1] for key in parents_parents_keys]
 
3819
            basis_id = _mod_revision.NULL_REVISION
 
3820
            basis_tree = self.source.revision_tree(basis_id)
 
3821
            delta = parent_tree.inventory._make_delta(basis_tree.inventory)
 
3822
            self.target.add_inventory_by_delta(
 
3823
                basis_id, delta, current_revision_id, parents_parents)
 
3824
            cache[current_revision_id] = parent_tree
 
3825
 
 
3826
    def _fetch_batch(self, revision_ids, basis_id, cache, a_graph=None):
3747
3827
        """Fetch across a few revisions.
3748
3828
 
3749
3829
        :param revision_ids: The revisions to copy
3750
3830
        :param basis_id: The revision_id of a tree that must be in cache, used
3751
3831
            as a basis for delta when no other base is available
3752
3832
        :param cache: A cache of RevisionTrees that we can use.
 
3833
        :param a_graph: A Graph object to determine the heads() of the
 
3834
            rich-root data stream.
3753
3835
        :return: The revision_id of the last converted tree. The RevisionTree
3754
3836
            for it will be in cache
3755
3837
        """
3761
3843
        pending_deltas = []
3762
3844
        pending_revisions = []
3763
3845
        parent_map = self.source.get_parent_map(revision_ids)
 
3846
        self._fetch_parent_invs_for_stacking(parent_map, cache)
 
3847
        self.source._safe_to_return_from_cache = True
3764
3848
        for tree in self.source.revision_trees(revision_ids):
 
3849
            # Find a inventory delta for this revision.
 
3850
            # Find text entries that need to be copied, too.
3765
3851
            current_revision_id = tree.get_revision_id()
3766
3852
            parent_ids = parent_map.get(current_revision_id, ())
 
3853
            parent_trees = self._get_trees(parent_ids, cache)
 
3854
            possible_trees = list(parent_trees)
 
3855
            if len(possible_trees) == 0:
 
3856
                # There either aren't any parents, or the parents are ghosts,
 
3857
                # so just use the last converted tree.
 
3858
                possible_trees.append((basis_id, cache[basis_id]))
3767
3859
            basis_id, delta = self._get_delta_for_revision(tree, parent_ids,
3768
 
                                                           basis_id, cache)
 
3860
                                                           possible_trees)
 
3861
            revision = self.source.get_revision(current_revision_id)
 
3862
            pending_deltas.append((basis_id, delta,
 
3863
                current_revision_id, revision.parent_ids))
3769
3864
            if self._converting_to_rich_root:
3770
3865
                self._revision_id_to_root_id[current_revision_id] = \
3771
3866
                    tree.get_root_id()
3772
 
            # Find text entries that need to be copied
 
3867
            # Determine which texts are in present in this revision but not in
 
3868
            # any of the available parents.
 
3869
            texts_possibly_new_in_tree = set()
3773
3870
            for old_path, new_path, file_id, entry in delta:
3774
 
                if new_path is not None:
3775
 
                    if not new_path:
3776
 
                        # This is the root
3777
 
                        if not self.target.supports_rich_root():
3778
 
                            # The target doesn't support rich root, so we don't
3779
 
                            # copy
3780
 
                            continue
3781
 
                        if self._converting_to_rich_root:
3782
 
                            # This can't be copied normally, we have to insert
3783
 
                            # it specially
3784
 
                            root_keys_to_create.add((file_id, entry.revision))
3785
 
                            continue
3786
 
                    text_keys.add((file_id, entry.revision))
3787
 
            revision = self.source.get_revision(current_revision_id)
3788
 
            pending_deltas.append((basis_id, delta,
3789
 
                current_revision_id, revision.parent_ids))
 
3871
                if new_path is None:
 
3872
                    # This file_id isn't present in the new rev
 
3873
                    continue
 
3874
                if not new_path:
 
3875
                    # This is the root
 
3876
                    if not self.target.supports_rich_root():
 
3877
                        # The target doesn't support rich root, so we don't
 
3878
                        # copy
 
3879
                        continue
 
3880
                    if self._converting_to_rich_root:
 
3881
                        # This can't be copied normally, we have to insert
 
3882
                        # it specially
 
3883
                        root_keys_to_create.add((file_id, entry.revision))
 
3884
                        continue
 
3885
                kind = entry.kind
 
3886
                texts_possibly_new_in_tree.add((file_id, entry.revision))
 
3887
            for basis_id, basis_tree in possible_trees:
 
3888
                basis_inv = basis_tree.inventory
 
3889
                for file_key in list(texts_possibly_new_in_tree):
 
3890
                    file_id, file_revision = file_key
 
3891
                    try:
 
3892
                        entry = basis_inv[file_id]
 
3893
                    except errors.NoSuchId:
 
3894
                        continue
 
3895
                    if entry.revision == file_revision:
 
3896
                        texts_possibly_new_in_tree.remove(file_key)
 
3897
            text_keys.update(texts_possibly_new_in_tree)
3790
3898
            pending_revisions.append(revision)
3791
3899
            cache[current_revision_id] = tree
3792
3900
            basis_id = current_revision_id
 
3901
        self.source._safe_to_return_from_cache = False
3793
3902
        # Copy file texts
3794
3903
        from_texts = self.source.texts
3795
3904
        to_texts = self.target.texts
3796
3905
        if root_keys_to_create:
3797
 
            root_stream = self._new_root_data_stream(root_keys_to_create,
3798
 
                                                     parent_map)
 
3906
            root_stream = _mod_fetch._new_root_data_stream(
 
3907
                root_keys_to_create, self._revision_id_to_root_id, parent_map,
 
3908
                self.source, graph=a_graph)
3799
3909
            to_texts.insert_record_stream(root_stream)
3800
3910
        to_texts.insert_record_stream(from_texts.get_record_stream(
3801
3911
            text_keys, self.target._format._fetch_order,
3808
3918
            # for the new revisions that we are about to insert.  We do this
3809
3919
            # before adding the revisions so that no revision is added until
3810
3920
            # all the inventories it may depend on are added.
 
3921
            # Note that this is overzealous, as we may have fetched these in an
 
3922
            # earlier batch.
3811
3923
            parent_ids = set()
3812
3924
            revision_ids = set()
3813
3925
            for revision in pending_revisions:
3816
3928
            parent_ids.difference_update(revision_ids)
3817
3929
            parent_ids.discard(_mod_revision.NULL_REVISION)
3818
3930
            parent_map = self.source.get_parent_map(parent_ids)
3819
 
            for parent_tree in self.source.revision_trees(parent_ids):
3820
 
                basis_id, delta = self._get_delta_for_revision(tree, parent_ids, basis_id, cache)
 
3931
            # we iterate over parent_map and not parent_ids because we don't
 
3932
            # want to try copying any revision which is a ghost
 
3933
            for parent_tree in self.source.revision_trees(parent_map):
3821
3934
                current_revision_id = parent_tree.get_revision_id()
3822
3935
                parents_parents = parent_map[current_revision_id]
 
3936
                possible_trees = self._get_trees(parents_parents, cache)
 
3937
                if len(possible_trees) == 0:
 
3938
                    # There either aren't any parents, or the parents are
 
3939
                    # ghosts, so just use the last converted tree.
 
3940
                    possible_trees.append((basis_id, cache[basis_id]))
 
3941
                basis_id, delta = self._get_delta_for_revision(parent_tree,
 
3942
                    parents_parents, possible_trees)
3823
3943
                self.target.add_inventory_by_delta(
3824
3944
                    basis_id, delta, current_revision_id, parents_parents)
3825
3945
        # insert signatures and revisions
3848
3968
        cache[basis_id] = basis_tree
3849
3969
        del basis_tree # We don't want to hang on to it here
3850
3970
        hints = []
 
3971
        if self._converting_to_rich_root and len(revision_ids) > 100:
 
3972
            a_graph = _mod_fetch._get_rich_root_heads_graph(self.source,
 
3973
                                                            revision_ids)
 
3974
        else:
 
3975
            a_graph = None
 
3976
 
3851
3977
        for offset in range(0, len(revision_ids), batch_size):
3852
3978
            self.target.start_write_group()
3853
3979
            try:
3854
3980
                pb.update('Transferring revisions', offset,
3855
3981
                          len(revision_ids))
3856
3982
                batch = revision_ids[offset:offset+batch_size]
3857
 
                basis_id = self._fetch_batch(batch, basis_id, cache)
 
3983
                basis_id = self._fetch_batch(batch, basis_id, cache,
 
3984
                                             a_graph=a_graph)
3858
3985
            except:
 
3986
                self.source._safe_to_return_from_cache = False
3859
3987
                self.target.abort_write_group()
3860
3988
                raise
3861
3989
            else:
3873
4001
        """See InterRepository.fetch()."""
3874
4002
        if fetch_spec is not None:
3875
4003
            raise AssertionError("Not implemented yet...")
 
4004
        # See <https://launchpad.net/bugs/456077> asking for a warning here
 
4005
        #
 
4006
        # nb this is only active for local-local fetches; other things using
 
4007
        # streaming.
 
4008
        ui.ui_factory.warn_cross_format_fetch(self.source._format,
 
4009
            self.target._format)
 
4010
        ui.ui_factory.warn_experimental_format_fetch(self)
3876
4011
        if (not self.source.supports_rich_root()
3877
4012
            and self.target.supports_rich_root()):
3878
4013
            self._converting_to_rich_root = True
3890
4025
        # Walk though all revisions; get inventory deltas, copy referenced
3891
4026
        # texts that delta references, insert the delta, revision and
3892
4027
        # signature.
3893
 
        first_rev = self.source.get_revision(revision_ids[0])
3894
4028
        if pb is None:
3895
4029
            my_pb = ui.ui_factory.nested_progress_bar()
3896
4030
            pb = my_pb
3954
4088
        :param to_convert: The disk object to convert.
3955
4089
        :param pb: a progress bar to use for progress information.
3956
4090
        """
3957
 
        self.pb = pb
 
4091
        pb = ui.ui_factory.nested_progress_bar()
3958
4092
        self.count = 0
3959
4093
        self.total = 4
3960
4094
        # this is only useful with metadir layouts - separated repo content.
3961
4095
        # trigger an assertion if not such
3962
4096
        repo._format.get_format_string()
3963
4097
        self.repo_dir = repo.bzrdir
3964
 
        self.step('Moving repository to repository.backup')
 
4098
        pb.update('Moving repository to repository.backup')
3965
4099
        self.repo_dir.transport.move('repository', 'repository.backup')
3966
4100
        backup_transport =  self.repo_dir.transport.clone('repository.backup')
3967
4101
        repo._format.check_conversion_target(self.target_format)
3968
4102
        self.source_repo = repo._format.open(self.repo_dir,
3969
4103
            _found=True,
3970
4104
            _override_transport=backup_transport)
3971
 
        self.step('Creating new repository')
 
4105
        pb.update('Creating new repository')
3972
4106
        converted = self.target_format.initialize(self.repo_dir,
3973
4107
                                                  self.source_repo.is_shared())
3974
4108
        converted.lock_write()
3975
4109
        try:
3976
 
            self.step('Copying content into repository.')
 
4110
            pb.update('Copying content')
3977
4111
            self.source_repo.copy_content_into(converted)
3978
4112
        finally:
3979
4113
            converted.unlock()
3980
 
        self.step('Deleting old repository content.')
 
4114
        pb.update('Deleting old repository content')
3981
4115
        self.repo_dir.transport.delete_tree('repository.backup')
3982
 
        self.pb.note('repository converted')
3983
 
 
3984
 
    def step(self, message):
3985
 
        """Update the pb by a step."""
3986
 
        self.count +=1
3987
 
        self.pb.update(message, self.count, self.total)
 
4116
        ui.ui_factory.note('repository converted')
 
4117
        pb.finished()
3988
4118
 
3989
4119
 
3990
4120
_unescape_map = {
4062
4192
        self.file_ids = set([file_id for file_id, _ in
4063
4193
            self.text_index.iterkeys()])
4064
4194
        # text keys is now grouped by file_id
4065
 
        n_weaves = len(self.file_ids)
4066
 
        files_in_revisions = {}
4067
 
        revisions_of_files = {}
4068
4195
        n_versions = len(self.text_index)
4069
4196
        progress_bar.update('loading text store', 0, n_versions)
4070
4197
        parent_map = self.repository.texts.get_parent_map(self.text_index)
4163
4290
            else:
4164
4291
                new_pack.set_write_cache_size(1024*1024)
4165
4292
        for substream_type, substream in stream:
 
4293
            if 'stream' in debug.debug_flags:
 
4294
                mutter('inserting substream: %s', substream_type)
4166
4295
            if substream_type == 'texts':
4167
4296
                self.target_repo.texts.insert_record_stream(substream)
4168
4297
            elif substream_type == 'inventories':
4172
4301
                else:
4173
4302
                    self._extract_and_insert_inventories(
4174
4303
                        substream, src_serializer)
 
4304
            elif substream_type == 'inventory-deltas':
 
4305
                ui.ui_factory.warn_cross_format_fetch(src_format,
 
4306
                    self.target_repo._format)
 
4307
                self._extract_and_insert_inventory_deltas(
 
4308
                    substream, src_serializer)
4175
4309
            elif substream_type == 'chk_bytes':
4176
4310
                # XXX: This doesn't support conversions, as it assumes the
4177
4311
                #      conversion was done in the fetch code.
4208
4342
                ):
4209
4343
                if versioned_file is None:
4210
4344
                    continue
 
4345
                # TODO: key is often going to be a StaticTuple object
 
4346
                #       I don't believe we can define a method by which
 
4347
                #       (prefix,) + StaticTuple will work, though we could
 
4348
                #       define a StaticTuple.sq_concat that would allow you to
 
4349
                #       pass in either a tuple or a StaticTuple as the second
 
4350
                #       object, so instead we could have:
 
4351
                #       StaticTuple(prefix) + key here...
4211
4352
                missing_keys.update((prefix,) + key for key in
4212
4353
                    versioned_file.get_missing_compression_parent_keys())
4213
4354
        except NotImplementedError:
4228
4369
            self.target_repo.pack(hint=hint)
4229
4370
        return [], set()
4230
4371
 
4231
 
    def _extract_and_insert_inventories(self, substream, serializer):
 
4372
    def _extract_and_insert_inventory_deltas(self, substream, serializer):
 
4373
        target_rich_root = self.target_repo._format.rich_root_data
 
4374
        target_tree_refs = self.target_repo._format.supports_tree_reference
 
4375
        for record in substream:
 
4376
            # Insert the delta directly
 
4377
            inventory_delta_bytes = record.get_bytes_as('fulltext')
 
4378
            deserialiser = inventory_delta.InventoryDeltaDeserializer()
 
4379
            try:
 
4380
                parse_result = deserialiser.parse_text_bytes(
 
4381
                    inventory_delta_bytes)
 
4382
            except inventory_delta.IncompatibleInventoryDelta, err:
 
4383
                trace.mutter("Incompatible delta: %s", err.msg)
 
4384
                raise errors.IncompatibleRevision(self.target_repo._format)
 
4385
            basis_id, new_id, rich_root, tree_refs, inv_delta = parse_result
 
4386
            revision_id = new_id
 
4387
            parents = [key[0] for key in record.parents]
 
4388
            self.target_repo.add_inventory_by_delta(
 
4389
                basis_id, inv_delta, revision_id, parents)
 
4390
 
 
4391
    def _extract_and_insert_inventories(self, substream, serializer,
 
4392
            parse_delta=None):
4232
4393
        """Generate a new inventory versionedfile in target, converting data.
4233
4394
 
4234
4395
        The inventory is retrieved from the source, (deserializing it), and
4235
4396
        stored in the target (reserializing it in a different format).
4236
4397
        """
 
4398
        target_rich_root = self.target_repo._format.rich_root_data
 
4399
        target_tree_refs = self.target_repo._format.supports_tree_reference
4237
4400
        for record in substream:
 
4401
            # It's not a delta, so it must be a fulltext in the source
 
4402
            # serializer's format.
4238
4403
            bytes = record.get_bytes_as('fulltext')
4239
4404
            revision_id = record.key[0]
4240
4405
            inv = serializer.read_inventory_from_string(bytes, revision_id)
4241
4406
            parents = [key[0] for key in record.parents]
4242
4407
            self.target_repo.add_inventory(revision_id, inv, parents)
 
4408
            # No need to keep holding this full inv in memory when the rest of
 
4409
            # the substream is likely to be all deltas.
 
4410
            del inv
4243
4411
 
4244
4412
    def _extract_and_insert_revisions(self, substream, serializer):
4245
4413
        for record in substream:
4294
4462
        return [('signatures', signatures), ('revisions', revisions)]
4295
4463
 
4296
4464
    def _generate_root_texts(self, revs):
4297
 
        """This will be called by __fetch between fetching weave texts and
 
4465
        """This will be called by get_stream between fetching weave texts and
4298
4466
        fetching the inventory weave.
4299
 
 
4300
 
        Subclasses should override this if they need to generate root texts
4301
 
        after fetching weave texts.
4302
4467
        """
4303
4468
        if self._rich_root_upgrade():
4304
 
            import bzrlib.fetch
4305
 
            return bzrlib.fetch.Inter1and2Helper(
 
4469
            return _mod_fetch.Inter1and2Helper(
4306
4470
                self.from_repository).generate_root_texts(revs)
4307
4471
        else:
4308
4472
            return []
4311
4475
        phase = 'file'
4312
4476
        revs = search.get_keys()
4313
4477
        graph = self.from_repository.get_graph()
4314
 
        revs = list(graph.iter_topo_order(revs))
 
4478
        revs = tsort.topo_sort(graph.get_parent_map(revs))
4315
4479
        data_to_fetch = self.from_repository.item_keys_introduced_by(revs)
4316
4480
        text_keys = []
4317
4481
        for knit_kind, file_id, revisions in data_to_fetch:
4336
4500
                # will be valid.
4337
4501
                for _ in self._generate_root_texts(revs):
4338
4502
                    yield _
4339
 
                # NB: This currently reopens the inventory weave in source;
4340
 
                # using a single stream interface instead would avoid this.
4341
 
                from_weave = self.from_repository.inventories
4342
4503
                # we fetch only the referenced inventories because we do not
4343
4504
                # know for unselected inventories whether all their required
4344
4505
                # texts are present in the other repository - it could be
4383
4544
            if not keys:
4384
4545
                # No need to stream something we don't have
4385
4546
                continue
 
4547
            if substream_kind == 'inventories':
 
4548
                # Some missing keys are genuinely ghosts, filter those out.
 
4549
                present = self.from_repository.inventories.get_parent_map(keys)
 
4550
                revs = [key[0] for key in present]
 
4551
                # Get the inventory stream more-or-less as we do for the
 
4552
                # original stream; there's no reason to assume that records
 
4553
                # direct from the source will be suitable for the sink.  (Think
 
4554
                # e.g. 2a -> 1.9-rich-root).
 
4555
                for info in self._get_inventory_stream(revs, missing=True):
 
4556
                    yield info
 
4557
                continue
 
4558
 
4386
4559
            # Ask for full texts always so that we don't need more round trips
4387
4560
            # after this stream.
4388
4561
            # Some of the missing keys are genuinely ghosts, so filter absent
4403
4576
        return (not self.from_repository._format.rich_root_data and
4404
4577
            self.to_format.rich_root_data)
4405
4578
 
4406
 
    def _get_inventory_stream(self, revision_ids):
 
4579
    def _get_inventory_stream(self, revision_ids, missing=False):
4407
4580
        from_format = self.from_repository._format
4408
 
        if (from_format.supports_chks and self.to_format.supports_chks
4409
 
            and (from_format._serializer == self.to_format._serializer)):
4410
 
            # Both sides support chks, and they use the same serializer, so it
4411
 
            # is safe to transmit the chk pages and inventory pages across
4412
 
            # as-is.
4413
 
            return self._get_chk_inventory_stream(revision_ids)
4414
 
        elif (not from_format.supports_chks):
4415
 
            # Source repository doesn't support chks. So we can transmit the
4416
 
            # inventories 'as-is' and either they are just accepted on the
4417
 
            # target, or the Sink will properly convert it.
4418
 
            return self._get_simple_inventory_stream(revision_ids)
 
4581
        if (from_format.supports_chks and self.to_format.supports_chks and
 
4582
            from_format.network_name() == self.to_format.network_name()):
 
4583
            raise AssertionError(
 
4584
                "this case should be handled by GroupCHKStreamSource")
 
4585
        elif 'forceinvdeltas' in debug.debug_flags:
 
4586
            return self._get_convertable_inventory_stream(revision_ids,
 
4587
                    delta_versus_null=missing)
 
4588
        elif from_format.network_name() == self.to_format.network_name():
 
4589
            # Same format.
 
4590
            return self._get_simple_inventory_stream(revision_ids,
 
4591
                    missing=missing)
 
4592
        elif (not from_format.supports_chks and not self.to_format.supports_chks
 
4593
                and from_format._serializer == self.to_format._serializer):
 
4594
            # Essentially the same format.
 
4595
            return self._get_simple_inventory_stream(revision_ids,
 
4596
                    missing=missing)
4419
4597
        else:
4420
 
            # XXX: Hack to make not-chk->chk fetch: copy the inventories as
4421
 
            #      inventories. Note that this should probably be done somehow
4422
 
            #      as part of bzrlib.repository.StreamSink. Except JAM couldn't
4423
 
            #      figure out how a non-chk repository could possibly handle
4424
 
            #      deserializing an inventory stream from a chk repo, as it
4425
 
            #      doesn't have a way to understand individual pages.
4426
 
            return self._get_convertable_inventory_stream(revision_ids)
 
4598
            # Any time we switch serializations, we want to use an
 
4599
            # inventory-delta based approach.
 
4600
            return self._get_convertable_inventory_stream(revision_ids,
 
4601
                    delta_versus_null=missing)
4427
4602
 
4428
 
    def _get_simple_inventory_stream(self, revision_ids):
 
4603
    def _get_simple_inventory_stream(self, revision_ids, missing=False):
 
4604
        # NB: This currently reopens the inventory weave in source;
 
4605
        # using a single stream interface instead would avoid this.
4429
4606
        from_weave = self.from_repository.inventories
 
4607
        if missing:
 
4608
            delta_closure = True
 
4609
        else:
 
4610
            delta_closure = not self.delta_on_metadata()
4430
4611
        yield ('inventories', from_weave.get_record_stream(
4431
4612
            [(rev_id,) for rev_id in revision_ids],
4432
 
            self.inventory_fetch_order(),
4433
 
            not self.delta_on_metadata()))
4434
 
 
4435
 
    def _get_chk_inventory_stream(self, revision_ids):
4436
 
        """Fetch the inventory texts, along with the associated chk maps."""
4437
 
        # We want an inventory outside of the search set, so that we can filter
4438
 
        # out uninteresting chk pages. For now we use
4439
 
        # _find_revision_outside_set, but if we had a Search with cut_revs, we
4440
 
        # could use that instead.
4441
 
        start_rev_id = self.from_repository._find_revision_outside_set(
4442
 
                            revision_ids)
4443
 
        start_rev_key = (start_rev_id,)
4444
 
        inv_keys_to_fetch = [(rev_id,) for rev_id in revision_ids]
4445
 
        if start_rev_id != _mod_revision.NULL_REVISION:
4446
 
            inv_keys_to_fetch.append((start_rev_id,))
4447
 
        # Any repo that supports chk_bytes must also support out-of-order
4448
 
        # insertion. At least, that is how we expect it to work
4449
 
        # We use get_record_stream instead of iter_inventories because we want
4450
 
        # to be able to insert the stream as well. We could instead fetch
4451
 
        # allowing deltas, and then iter_inventories, but we don't know whether
4452
 
        # source or target is more 'local' anway.
4453
 
        inv_stream = self.from_repository.inventories.get_record_stream(
4454
 
            inv_keys_to_fetch, 'unordered',
4455
 
            True) # We need them as full-texts so we can find their references
4456
 
        uninteresting_chk_roots = set()
4457
 
        interesting_chk_roots = set()
4458
 
        def filter_inv_stream(inv_stream):
4459
 
            for idx, record in enumerate(inv_stream):
4460
 
                ### child_pb.update('fetch inv', idx, len(inv_keys_to_fetch))
4461
 
                bytes = record.get_bytes_as('fulltext')
4462
 
                chk_inv = inventory.CHKInventory.deserialise(
4463
 
                    self.from_repository.chk_bytes, bytes, record.key)
4464
 
                if record.key == start_rev_key:
4465
 
                    uninteresting_chk_roots.add(chk_inv.id_to_entry.key())
4466
 
                    p_id_map = chk_inv.parent_id_basename_to_file_id
4467
 
                    if p_id_map is not None:
4468
 
                        uninteresting_chk_roots.add(p_id_map.key())
4469
 
                else:
4470
 
                    yield record
4471
 
                    interesting_chk_roots.add(chk_inv.id_to_entry.key())
4472
 
                    p_id_map = chk_inv.parent_id_basename_to_file_id
4473
 
                    if p_id_map is not None:
4474
 
                        interesting_chk_roots.add(p_id_map.key())
4475
 
        ### pb.update('fetch inventory', 0, 2)
4476
 
        yield ('inventories', filter_inv_stream(inv_stream))
4477
 
        # Now that we have worked out all of the interesting root nodes, grab
4478
 
        # all of the interesting pages and insert them
4479
 
        ### pb.update('fetch inventory', 1, 2)
4480
 
        interesting = chk_map.iter_interesting_nodes(
4481
 
            self.from_repository.chk_bytes, interesting_chk_roots,
4482
 
            uninteresting_chk_roots)
4483
 
        def to_stream_adapter():
4484
 
            """Adapt the iter_interesting_nodes result to a single stream.
4485
 
 
4486
 
            iter_interesting_nodes returns records as it processes them, along
4487
 
            with keys. However, we only want to return the records themselves.
4488
 
            """
4489
 
            for record, items in interesting:
4490
 
                if record is not None:
4491
 
                    yield record
4492
 
        # XXX: We could instead call get_record_stream(records.keys())
4493
 
        #      ATM, this will always insert the records as fulltexts, and
4494
 
        #      requires that you can hang on to records once you have gone
4495
 
        #      on to the next one. Further, it causes the target to
4496
 
        #      recompress the data. Testing shows it to be faster than
4497
 
        #      requesting the records again, though.
4498
 
        yield ('chk_bytes', to_stream_adapter())
4499
 
        ### pb.update('fetch inventory', 2, 2)
4500
 
 
4501
 
    def _get_convertable_inventory_stream(self, revision_ids):
4502
 
        # XXX: One of source or target is using chks, and they don't have
4503
 
        #      compatible serializations. The StreamSink code expects to be
4504
 
        #      able to convert on the target, so we need to put
4505
 
        #      bytes-on-the-wire that can be converted
4506
 
        yield ('inventories', self._stream_invs_as_fulltexts(revision_ids))
4507
 
 
4508
 
    def _stream_invs_as_fulltexts(self, revision_ids):
 
4613
            self.inventory_fetch_order(), delta_closure))
 
4614
 
 
4615
    def _get_convertable_inventory_stream(self, revision_ids,
 
4616
                                          delta_versus_null=False):
 
4617
        # The two formats are sufficiently different that there is no fast
 
4618
        # path, so we need to send just inventorydeltas, which any
 
4619
        # sufficiently modern client can insert into any repository.
 
4620
        # The StreamSink code expects to be able to
 
4621
        # convert on the target, so we need to put bytes-on-the-wire that can
 
4622
        # be converted.  That means inventory deltas (if the remote is <1.19,
 
4623
        # RemoteStreamSink will fallback to VFS to insert the deltas).
 
4624
        yield ('inventory-deltas',
 
4625
           self._stream_invs_as_deltas(revision_ids,
 
4626
                                       delta_versus_null=delta_versus_null))
 
4627
 
 
4628
    def _stream_invs_as_deltas(self, revision_ids, delta_versus_null=False):
 
4629
        """Return a stream of inventory-deltas for the given rev ids.
 
4630
 
 
4631
        :param revision_ids: The list of inventories to transmit
 
4632
        :param delta_versus_null: Don't try to find a minimal delta for this
 
4633
            entry, instead compute the delta versus the NULL_REVISION. This
 
4634
            effectively streams a complete inventory. Used for stuff like
 
4635
            filling in missing parents, etc.
 
4636
        """
4509
4637
        from_repo = self.from_repository
4510
 
        from_serializer = from_repo._format._serializer
4511
4638
        revision_keys = [(rev_id,) for rev_id in revision_ids]
4512
4639
        parent_map = from_repo.inventories.get_parent_map(revision_keys)
4513
 
        for inv in self.from_repository.iter_inventories(revision_ids):
4514
 
            # XXX: This is a bit hackish, but it works. Basically,
4515
 
            #      CHKSerializer 'accidentally' supports
4516
 
            #      read/write_inventory_to_string, even though that is never
4517
 
            #      the format that is stored on disk. It *does* give us a
4518
 
            #      single string representation for an inventory, so live with
4519
 
            #      it for now.
4520
 
            #      This would be far better if we had a 'serialized inventory
4521
 
            #      delta' form. Then we could use 'inventory._make_delta', and
4522
 
            #      transmit that. This would both be faster to generate, and
4523
 
            #      result in fewer bytes-on-the-wire.
4524
 
            as_bytes = from_serializer.write_inventory_to_string(inv)
 
4640
        # XXX: possibly repos could implement a more efficient iter_inv_deltas
 
4641
        # method...
 
4642
        inventories = self.from_repository.iter_inventories(
 
4643
            revision_ids, 'topological')
 
4644
        format = from_repo._format
 
4645
        invs_sent_so_far = set([_mod_revision.NULL_REVISION])
 
4646
        inventory_cache = lru_cache.LRUCache(50)
 
4647
        null_inventory = from_repo.revision_tree(
 
4648
            _mod_revision.NULL_REVISION).inventory
 
4649
        # XXX: ideally the rich-root/tree-refs flags would be per-revision, not
 
4650
        # per-repo (e.g.  streaming a non-rich-root revision out of a rich-root
 
4651
        # repo back into a non-rich-root repo ought to be allowed)
 
4652
        serializer = inventory_delta.InventoryDeltaSerializer(
 
4653
            versioned_root=format.rich_root_data,
 
4654
            tree_references=format.supports_tree_reference)
 
4655
        for inv in inventories:
4525
4656
            key = (inv.revision_id,)
4526
4657
            parent_keys = parent_map.get(key, ())
 
4658
            delta = None
 
4659
            if not delta_versus_null and parent_keys:
 
4660
                # The caller did not ask for complete inventories and we have
 
4661
                # some parents that we can delta against.  Make a delta against
 
4662
                # each parent so that we can find the smallest.
 
4663
                parent_ids = [parent_key[0] for parent_key in parent_keys]
 
4664
                for parent_id in parent_ids:
 
4665
                    if parent_id not in invs_sent_so_far:
 
4666
                        # We don't know that the remote side has this basis, so
 
4667
                        # we can't use it.
 
4668
                        continue
 
4669
                    if parent_id == _mod_revision.NULL_REVISION:
 
4670
                        parent_inv = null_inventory
 
4671
                    else:
 
4672
                        parent_inv = inventory_cache.get(parent_id, None)
 
4673
                        if parent_inv is None:
 
4674
                            parent_inv = from_repo.get_inventory(parent_id)
 
4675
                    candidate_delta = inv._make_delta(parent_inv)
 
4676
                    if (delta is None or
 
4677
                        len(delta) > len(candidate_delta)):
 
4678
                        delta = candidate_delta
 
4679
                        basis_id = parent_id
 
4680
            if delta is None:
 
4681
                # Either none of the parents ended up being suitable, or we
 
4682
                # were asked to delta against NULL
 
4683
                basis_id = _mod_revision.NULL_REVISION
 
4684
                delta = inv._make_delta(null_inventory)
 
4685
            invs_sent_so_far.add(inv.revision_id)
 
4686
            inventory_cache[inv.revision_id] = inv
 
4687
            delta_serialized = ''.join(
 
4688
                serializer.delta_to_lines(basis_id, key[-1], delta))
4527
4689
            yield versionedfile.FulltextContentFactory(
4528
 
                key, parent_keys, None, as_bytes)
 
4690
                key, parent_keys, None, delta_serialized)
4529
4691
 
4530
4692
 
4531
4693
def _iter_for_revno(repo, partial_history_cache, stop_index=None,