~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/repository.py

  • Committer: Ian Clatworthy
  • Date: 2010-02-19 03:02:07 UTC
  • mto: (4797.23.1 integration-2.1)
  • mto: This revision was merged to the branch mainline in revision 5055.
  • Revision ID: ian.clatworthy@canonical.com-20100219030207-zpbzx021zavx4sqt
What's New in 2.1 - a summary of changes since 2.0

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
 
1
# Copyright (C) 2005-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
24
24
    bzrdir,
25
25
    check,
26
26
    chk_map,
 
27
    config,
27
28
    debug,
28
29
    errors,
 
30
    fetch as _mod_fetch,
29
31
    fifo_cache,
30
32
    generate_ids,
31
33
    gpg,
32
34
    graph,
33
35
    inventory,
 
36
    inventory_delta,
34
37
    lazy_regex,
35
38
    lockable_files,
36
39
    lockdir,
38
41
    osutils,
39
42
    revision as _mod_revision,
40
43
    symbol_versioning,
 
44
    trace,
41
45
    tsort,
42
46
    ui,
43
47
    versionedfile,
48
52
from bzrlib.testament import Testament
49
53
""")
50
54
 
51
 
from bzrlib.decorators import needs_read_lock, needs_write_lock
 
55
from bzrlib.decorators import needs_read_lock, needs_write_lock, only_raises
52
56
from bzrlib.inter import InterObject
53
57
from bzrlib.inventory import (
54
58
    Inventory,
56
60
    ROOT_ID,
57
61
    entry_factory,
58
62
    )
 
63
from bzrlib.lock import _RelockDebugMixin
59
64
from bzrlib import registry
60
65
from bzrlib.trace import (
61
66
    log_exception_quietly, note, mutter, mutter_callsite, warning)
204
209
            # an inventory delta was accumulated without creating a new
205
210
            # inventory.
206
211
            basis_id = self.basis_delta_revision
207
 
            self.inv_sha1 = self.repository.add_inventory_by_delta(
 
212
            # We ignore the 'inventory' returned by add_inventory_by_delta
 
213
            # because self.new_inventory is used to hint to the rest of the
 
214
            # system what code path was taken
 
215
            self.inv_sha1, _ = self.repository.add_inventory_by_delta(
208
216
                basis_id, self._basis_delta, self._new_revision_id,
209
217
                self.parents)
210
218
        else:
464
472
            if content_summary[2] is None:
465
473
                raise ValueError("Files must not have executable = None")
466
474
            if not store:
467
 
                if (# if the file length changed we have to store:
468
 
                    parent_entry.text_size != content_summary[1] or
469
 
                    # if the exec bit has changed we have to store:
 
475
                # We can't trust a check of the file length because of content
 
476
                # filtering...
 
477
                if (# if the exec bit has changed we have to store:
470
478
                    parent_entry.executable != content_summary[2]):
471
479
                    store = True
472
480
                elif parent_entry.text_sha1 == content_summary[3]:
539
547
                ie.revision = parent_entry.revision
540
548
                return self._get_delta(ie, basis_inv, path), False, None
541
549
            ie.reference_revision = content_summary[3]
 
550
            if ie.reference_revision is None:
 
551
                raise AssertionError("invalid content_summary for nested tree: %r"
 
552
                    % (content_summary,))
542
553
            self._add_text_to_weave(ie.file_id, '', heads, None)
543
554
        else:
544
555
            raise NotImplementedError('unknown kind')
806
817
                seen_root = True
807
818
        self.new_inventory = None
808
819
        if len(inv_delta):
 
820
            # This should perhaps be guarded by a check that the basis we
 
821
            # commit against is the basis for the commit and if not do a delta
 
822
            # against the basis.
809
823
            self._any_changes = True
810
824
        if not seen_root:
811
825
            # housekeeping root entry changes do not affect no-change commits.
849
863
# Repositories
850
864
 
851
865
 
852
 
class Repository(object):
 
866
class Repository(_RelockDebugMixin):
853
867
    """Repository holding history for one or more branches.
854
868
 
855
869
    The repository holds and retrieves historical information including
924
938
        """
925
939
        if self._write_group is not self.get_transaction():
926
940
            # has an unlock or relock occured ?
 
941
            if suppress_errors:
 
942
                mutter(
 
943
                '(suppressed) mismatched lock context and write group. %r, %r',
 
944
                self._write_group, self.get_transaction())
 
945
                return
927
946
            raise errors.BzrError(
928
947
                'mismatched lock context and write group. %r, %r' %
929
948
                (self._write_group, self.get_transaction()))
1063
1082
        check_content=True):
1064
1083
        """Store lines in inv_vf and return the sha1 of the inventory."""
1065
1084
        parents = [(parent,) for parent in parents]
1066
 
        return self.inventories.add_lines((revision_id,), parents, lines,
 
1085
        result = self.inventories.add_lines((revision_id,), parents, lines,
1067
1086
            check_content=check_content)[0]
 
1087
        self.inventories._access.flush()
 
1088
        return result
1068
1089
 
1069
1090
    def add_revision(self, revision_id, rev, inv=None, config=None):
1070
1091
        """Add rev to the revision store as revision_id.
1146
1167
        # The old API returned a list, should this actually be a set?
1147
1168
        return parent_map.keys()
1148
1169
 
 
1170
    def _check_inventories(self, checker):
 
1171
        """Check the inventories found from the revision scan.
 
1172
        
 
1173
        This is responsible for verifying the sha1 of inventories and
 
1174
        creating a pending_keys set that covers data referenced by inventories.
 
1175
        """
 
1176
        bar = ui.ui_factory.nested_progress_bar()
 
1177
        try:
 
1178
            self._do_check_inventories(checker, bar)
 
1179
        finally:
 
1180
            bar.finished()
 
1181
 
 
1182
    def _do_check_inventories(self, checker, bar):
 
1183
        """Helper for _check_inventories."""
 
1184
        revno = 0
 
1185
        keys = {'chk_bytes':set(), 'inventories':set(), 'texts':set()}
 
1186
        kinds = ['chk_bytes', 'texts']
 
1187
        count = len(checker.pending_keys)
 
1188
        bar.update("inventories", 0, 2)
 
1189
        current_keys = checker.pending_keys
 
1190
        checker.pending_keys = {}
 
1191
        # Accumulate current checks.
 
1192
        for key in current_keys:
 
1193
            if key[0] != 'inventories' and key[0] not in kinds:
 
1194
                checker._report_items.append('unknown key type %r' % (key,))
 
1195
            keys[key[0]].add(key[1:])
 
1196
        if keys['inventories']:
 
1197
            # NB: output order *should* be roughly sorted - topo or
 
1198
            # inverse topo depending on repository - either way decent
 
1199
            # to just delta against. However, pre-CHK formats didn't
 
1200
            # try to optimise inventory layout on disk. As such the
 
1201
            # pre-CHK code path does not use inventory deltas.
 
1202
            last_object = None
 
1203
            for record in self.inventories.check(keys=keys['inventories']):
 
1204
                if record.storage_kind == 'absent':
 
1205
                    checker._report_items.append(
 
1206
                        'Missing inventory {%s}' % (record.key,))
 
1207
                else:
 
1208
                    last_object = self._check_record('inventories', record,
 
1209
                        checker, last_object,
 
1210
                        current_keys[('inventories',) + record.key])
 
1211
            del keys['inventories']
 
1212
        else:
 
1213
            return
 
1214
        bar.update("texts", 1)
 
1215
        while (checker.pending_keys or keys['chk_bytes']
 
1216
            or keys['texts']):
 
1217
            # Something to check.
 
1218
            current_keys = checker.pending_keys
 
1219
            checker.pending_keys = {}
 
1220
            # Accumulate current checks.
 
1221
            for key in current_keys:
 
1222
                if key[0] not in kinds:
 
1223
                    checker._report_items.append('unknown key type %r' % (key,))
 
1224
                keys[key[0]].add(key[1:])
 
1225
            # Check the outermost kind only - inventories || chk_bytes || texts
 
1226
            for kind in kinds:
 
1227
                if keys[kind]:
 
1228
                    last_object = None
 
1229
                    for record in getattr(self, kind).check(keys=keys[kind]):
 
1230
                        if record.storage_kind == 'absent':
 
1231
                            checker._report_items.append(
 
1232
                                'Missing %s {%s}' % (kind, record.key,))
 
1233
                        else:
 
1234
                            last_object = self._check_record(kind, record,
 
1235
                                checker, last_object, current_keys[(kind,) + record.key])
 
1236
                    keys[kind] = set()
 
1237
                    break
 
1238
 
 
1239
    def _check_record(self, kind, record, checker, last_object, item_data):
 
1240
        """Check a single text from this repository."""
 
1241
        if kind == 'inventories':
 
1242
            rev_id = record.key[0]
 
1243
            inv = self.deserialise_inventory(rev_id,
 
1244
                record.get_bytes_as('fulltext'))
 
1245
            if last_object is not None:
 
1246
                delta = inv._make_delta(last_object)
 
1247
                for old_path, path, file_id, ie in delta:
 
1248
                    if ie is None:
 
1249
                        continue
 
1250
                    ie.check(checker, rev_id, inv)
 
1251
            else:
 
1252
                for path, ie in inv.iter_entries():
 
1253
                    ie.check(checker, rev_id, inv)
 
1254
            if self._format.fast_deltas:
 
1255
                return inv
 
1256
        elif kind == 'chk_bytes':
 
1257
            # No code written to check chk_bytes for this repo format.
 
1258
            checker._report_items.append(
 
1259
                'unsupported key type chk_bytes for %s' % (record.key,))
 
1260
        elif kind == 'texts':
 
1261
            self._check_text(record, checker, item_data)
 
1262
        else:
 
1263
            checker._report_items.append(
 
1264
                'unknown key type %s for %s' % (kind, record.key))
 
1265
 
 
1266
    def _check_text(self, record, checker, item_data):
 
1267
        """Check a single text."""
 
1268
        # Check it is extractable.
 
1269
        # TODO: check length.
 
1270
        if record.storage_kind == 'chunked':
 
1271
            chunks = record.get_bytes_as(record.storage_kind)
 
1272
            sha1 = osutils.sha_strings(chunks)
 
1273
            length = sum(map(len, chunks))
 
1274
        else:
 
1275
            content = record.get_bytes_as('fulltext')
 
1276
            sha1 = osutils.sha_string(content)
 
1277
            length = len(content)
 
1278
        if item_data and sha1 != item_data[1]:
 
1279
            checker._report_items.append(
 
1280
                'sha1 mismatch: %s has sha1 %s expected %s referenced by %s' %
 
1281
                (record.key, sha1, item_data[1], item_data[2]))
 
1282
 
1149
1283
    @staticmethod
1150
1284
    def create(a_bzrdir):
1151
1285
        """Construct the current default format repository in a_bzrdir."""
1172
1306
        self._reconcile_does_inventory_gc = True
1173
1307
        self._reconcile_fixes_text_parents = False
1174
1308
        self._reconcile_backsup_inventory = True
1175
 
        # not right yet - should be more semantically clear ?
1176
 
        #
1177
 
        # TODO: make sure to construct the right store classes, etc, depending
1178
 
        # on whether escaping is required.
1179
 
        self._warn_if_deprecated()
1180
1309
        self._write_group = None
1181
1310
        # Additional places to query for data.
1182
1311
        self._fallback_repositories = []
1183
1312
        # An InventoryEntry cache, used during deserialization
1184
1313
        self._inventory_entry_cache = fifo_cache.FIFOCache(10*1024)
 
1314
        # Is it safe to return inventory entries directly from the entry cache,
 
1315
        # rather copying them?
 
1316
        self._safe_to_return_from_cache = False
1185
1317
 
1186
1318
    def __repr__(self):
1187
1319
        if self._fallback_repositories:
1254
1386
        locked = self.is_locked()
1255
1387
        result = self.control_files.lock_write(token=token)
1256
1388
        if not locked:
 
1389
            self._warn_if_deprecated()
 
1390
            self._note_lock('w')
1257
1391
            for repo in self._fallback_repositories:
1258
1392
                # Writes don't affect fallback repos
1259
1393
                repo.lock_read()
1264
1398
        locked = self.is_locked()
1265
1399
        self.control_files.lock_read()
1266
1400
        if not locked:
 
1401
            self._warn_if_deprecated()
 
1402
            self._note_lock('r')
1267
1403
            for repo in self._fallback_repositories:
1268
1404
                repo.lock_read()
1269
1405
            self._refresh_data()
1416
1552
        """Commit the contents accrued within the current write group.
1417
1553
 
1418
1554
        :seealso: start_write_group.
 
1555
        
 
1556
        :return: it may return an opaque hint that can be passed to 'pack'.
1419
1557
        """
1420
1558
        if self._write_group is not self.get_transaction():
1421
1559
            # has an unlock or relock occured ?
1475
1613
        # but at the moment we're only checking for texts referenced by
1476
1614
        # inventories at the graph's edge.
1477
1615
        key_deps = self.revisions._index._key_dependencies
1478
 
        key_deps.add_keys(present_inventories)
 
1616
        key_deps.satisfy_refs_for_keys(present_inventories)
1479
1617
        referrers = frozenset(r[0] for r in key_deps.get_referrers())
1480
1618
        file_ids = self.fileids_altered_by_revision_ids(referrers)
1481
1619
        missing_texts = set()
1582
1720
        :param revprops: Optional dictionary of revision properties.
1583
1721
        :param revision_id: Optional revision id.
1584
1722
        """
 
1723
        if self._fallback_repositories:
 
1724
            raise errors.BzrError("Cannot commit from a lightweight checkout "
 
1725
                "to a stacked branch. See "
 
1726
                "https://bugs.launchpad.net/bzr/+bug/375013 for details.")
1585
1727
        result = self._commit_builder_class(self, parents, config,
1586
1728
            timestamp, timezone, committer, revprops, revision_id)
1587
1729
        self.start_write_group()
1588
1730
        return result
1589
1731
 
 
1732
    @only_raises(errors.LockNotHeld, errors.LockBroken)
1590
1733
    def unlock(self):
1591
1734
        if (self.control_files._lock_count == 1 and
1592
1735
            self.control_files._lock_mode == 'w'):
1714
1857
 
1715
1858
    @needs_read_lock
1716
1859
    def get_revisions(self, revision_ids):
1717
 
        """Get many revisions at once."""
 
1860
        """Get many revisions at once.
 
1861
        
 
1862
        Repositories that need to check data on every revision read should 
 
1863
        subclass this method.
 
1864
        """
1718
1865
        return self._get_revisions(revision_ids)
1719
1866
 
1720
1867
    @needs_read_lock
1721
1868
    def _get_revisions(self, revision_ids):
1722
1869
        """Core work logic to get many revisions without sanity checks."""
1723
 
        for rev_id in revision_ids:
1724
 
            if not rev_id or not isinstance(rev_id, basestring):
1725
 
                raise errors.InvalidRevisionId(revision_id=rev_id, branch=self)
 
1870
        revs = {}
 
1871
        for revid, rev in self._iter_revisions(revision_ids):
 
1872
            if rev is None:
 
1873
                raise errors.NoSuchRevision(self, revid)
 
1874
            revs[revid] = rev
 
1875
        return [revs[revid] for revid in revision_ids]
 
1876
 
 
1877
    def _iter_revisions(self, revision_ids):
 
1878
        """Iterate over revision objects.
 
1879
 
 
1880
        :param revision_ids: An iterable of revisions to examine. None may be
 
1881
            passed to request all revisions known to the repository. Note that
 
1882
            not all repositories can find unreferenced revisions; for those
 
1883
            repositories only referenced ones will be returned.
 
1884
        :return: An iterator of (revid, revision) tuples. Absent revisions (
 
1885
            those asked for but not available) are returned as (revid, None).
 
1886
        """
 
1887
        if revision_ids is None:
 
1888
            revision_ids = self.all_revision_ids()
 
1889
        else:
 
1890
            for rev_id in revision_ids:
 
1891
                if not rev_id or not isinstance(rev_id, basestring):
 
1892
                    raise errors.InvalidRevisionId(revision_id=rev_id, branch=self)
1726
1893
        keys = [(key,) for key in revision_ids]
1727
1894
        stream = self.revisions.get_record_stream(keys, 'unordered', True)
1728
 
        revs = {}
1729
1895
        for record in stream:
 
1896
            revid = record.key[0]
1730
1897
            if record.storage_kind == 'absent':
1731
 
                raise errors.NoSuchRevision(self, record.key[0])
1732
 
            text = record.get_bytes_as('fulltext')
1733
 
            rev = self._serializer.read_revision_from_string(text)
1734
 
            revs[record.key[0]] = rev
1735
 
        return [revs[revid] for revid in revision_ids]
 
1898
                yield (revid, None)
 
1899
            else:
 
1900
                text = record.get_bytes_as('fulltext')
 
1901
                rev = self._serializer.read_revision_from_string(text)
 
1902
                yield (revid, rev)
1736
1903
 
1737
1904
    @needs_read_lock
1738
1905
    def get_revision_xml(self, revision_id):
2093
2260
                batch_size]
2094
2261
            if not to_query:
2095
2262
                break
2096
 
            for rev_tree in self.revision_trees(to_query):
2097
 
                revision_id = rev_tree.get_revision_id()
 
2263
            for revision_id in to_query:
2098
2264
                parent_ids = ancestors[revision_id]
2099
2265
                for text_key in revision_keys[revision_id]:
2100
2266
                    pb.update("Calculating text parents", processed_texts)
2173
2339
        num_file_ids = len(file_ids)
2174
2340
        for file_id, altered_versions in file_ids.iteritems():
2175
2341
            if pb is not None:
2176
 
                pb.update("fetch texts", count, num_file_ids)
 
2342
                pb.update("Fetch texts", count, num_file_ids)
2177
2343
            count += 1
2178
2344
            yield ("file", file_id, altered_versions)
2179
2345
 
2200
2366
        """Get Inventory object by revision id."""
2201
2367
        return self.iter_inventories([revision_id]).next()
2202
2368
 
2203
 
    def iter_inventories(self, revision_ids):
 
2369
    def iter_inventories(self, revision_ids, ordering=None):
2204
2370
        """Get many inventories by revision_ids.
2205
2371
 
2206
2372
        This will buffer some or all of the texts used in constructing the
2208
2374
        time.
2209
2375
 
2210
2376
        :param revision_ids: The expected revision ids of the inventories.
 
2377
        :param ordering: optional ordering, e.g. 'topological'.  If not
 
2378
            specified, the order of revision_ids will be preserved (by
 
2379
            buffering if necessary).
2211
2380
        :return: An iterator of inventories.
2212
2381
        """
2213
2382
        if ((None in revision_ids)
2214
2383
            or (_mod_revision.NULL_REVISION in revision_ids)):
2215
2384
            raise ValueError('cannot get null revision inventory')
2216
 
        return self._iter_inventories(revision_ids)
 
2385
        return self._iter_inventories(revision_ids, ordering)
2217
2386
 
2218
 
    def _iter_inventories(self, revision_ids):
 
2387
    def _iter_inventories(self, revision_ids, ordering):
2219
2388
        """single-document based inventory iteration."""
2220
 
        for text, revision_id in self._iter_inventory_xmls(revision_ids):
 
2389
        inv_xmls = self._iter_inventory_xmls(revision_ids, ordering)
 
2390
        for text, revision_id in inv_xmls:
2221
2391
            yield self.deserialise_inventory(revision_id, text)
2222
2392
 
2223
 
    def _iter_inventory_xmls(self, revision_ids):
 
2393
    def _iter_inventory_xmls(self, revision_ids, ordering):
 
2394
        if ordering is None:
 
2395
            order_as_requested = True
 
2396
            ordering = 'unordered'
 
2397
        else:
 
2398
            order_as_requested = False
2224
2399
        keys = [(revision_id,) for revision_id in revision_ids]
2225
 
        stream = self.inventories.get_record_stream(keys, 'unordered', True)
 
2400
        if not keys:
 
2401
            return
 
2402
        if order_as_requested:
 
2403
            key_iter = iter(keys)
 
2404
            next_key = key_iter.next()
 
2405
        stream = self.inventories.get_record_stream(keys, ordering, True)
2226
2406
        text_chunks = {}
2227
2407
        for record in stream:
2228
2408
            if record.storage_kind != 'absent':
2229
 
                text_chunks[record.key] = record.get_bytes_as('chunked')
 
2409
                chunks = record.get_bytes_as('chunked')
 
2410
                if order_as_requested:
 
2411
                    text_chunks[record.key] = chunks
 
2412
                else:
 
2413
                    yield ''.join(chunks), record.key[-1]
2230
2414
            else:
2231
2415
                raise errors.NoSuchRevision(self, record.key)
2232
 
        for key in keys:
2233
 
            chunks = text_chunks.pop(key)
2234
 
            yield ''.join(chunks), key[-1]
 
2416
            if order_as_requested:
 
2417
                # Yield as many results as we can while preserving order.
 
2418
                while next_key in text_chunks:
 
2419
                    chunks = text_chunks.pop(next_key)
 
2420
                    yield ''.join(chunks), next_key[-1]
 
2421
                    try:
 
2422
                        next_key = key_iter.next()
 
2423
                    except StopIteration:
 
2424
                        # We still want to fully consume the get_record_stream,
 
2425
                        # just in case it is not actually finished at this point
 
2426
                        next_key = None
 
2427
                        break
2235
2428
 
2236
2429
    def deserialise_inventory(self, revision_id, xml):
2237
2430
        """Transform the xml into an inventory object.
2240
2433
        :param xml: A serialised inventory.
2241
2434
        """
2242
2435
        result = self._serializer.read_inventory_from_string(xml, revision_id,
2243
 
                    entry_cache=self._inventory_entry_cache)
 
2436
                    entry_cache=self._inventory_entry_cache,
 
2437
                    return_from_cache=self._safe_to_return_from_cache)
2244
2438
        if result.revision_id != revision_id:
2245
2439
            raise AssertionError('revision id mismatch %s != %s' % (
2246
2440
                result.revision_id, revision_id))
2258
2452
    @needs_read_lock
2259
2453
    def get_inventory_xml(self, revision_id):
2260
2454
        """Get inventory XML as a file object."""
2261
 
        texts = self._iter_inventory_xmls([revision_id])
 
2455
        texts = self._iter_inventory_xmls([revision_id], 'unordered')
2262
2456
        try:
2263
2457
            text, revision_id = texts.next()
2264
2458
        except StopIteration:
2478
2672
        for ((revision_id,), parent_keys) in \
2479
2673
                self.revisions.get_parent_map(query_keys).iteritems():
2480
2674
            if parent_keys:
2481
 
                result[revision_id] = tuple(parent_revid
2482
 
                    for (parent_revid,) in parent_keys)
 
2675
                result[revision_id] = tuple([parent_revid
 
2676
                    for (parent_revid,) in parent_keys])
2483
2677
            else:
2484
2678
                result[revision_id] = (_mod_revision.NULL_REVISION,)
2485
2679
        return result
2496
2690
                [parents_provider, other_repository._make_parents_provider()])
2497
2691
        return graph.Graph(parents_provider)
2498
2692
 
2499
 
    def _get_versioned_file_checker(self, text_key_references=None):
 
2693
    def _get_versioned_file_checker(self, text_key_references=None,
 
2694
        ancestors=None):
2500
2695
        """Return an object suitable for checking versioned files.
2501
2696
        
2502
2697
        :param text_key_references: if non-None, an already built
2504
2699
            to whether they were referred to by the inventory of the
2505
2700
            revision_id that they contain. If None, this will be
2506
2701
            calculated.
 
2702
        :param ancestors: Optional result from
 
2703
            self.get_graph().get_parent_map(self.all_revision_ids()) if already
 
2704
            available.
2507
2705
        """
2508
2706
        return _VersionedFileChecker(self,
2509
 
            text_key_references=text_key_references)
 
2707
            text_key_references=text_key_references, ancestors=ancestors)
2510
2708
 
2511
2709
    def revision_ids_to_search_result(self, result_set):
2512
2710
        """Convert a set of revision ids to a graph SearchResult."""
2562
2760
        return record.get_bytes_as('fulltext')
2563
2761
 
2564
2762
    @needs_read_lock
2565
 
    def check(self, revision_ids=None):
 
2763
    def check(self, revision_ids=None, callback_refs=None, check_repo=True):
2566
2764
        """Check consistency of all history of given revision_ids.
2567
2765
 
2568
2766
        Different repository implementations should override _check().
2569
2767
 
2570
2768
        :param revision_ids: A non-empty list of revision_ids whose ancestry
2571
2769
             will be checked.  Typically the last revision_id of a branch.
 
2770
        :param callback_refs: A dict of check-refs to resolve and callback
 
2771
            the check/_check method on the items listed as wanting the ref.
 
2772
            see bzrlib.check.
 
2773
        :param check_repo: If False do not check the repository contents, just 
 
2774
            calculate the data callback_refs requires and call them back.
2572
2775
        """
2573
 
        return self._check(revision_ids)
 
2776
        return self._check(revision_ids, callback_refs=callback_refs,
 
2777
            check_repo=check_repo)
2574
2778
 
2575
 
    def _check(self, revision_ids):
2576
 
        result = check.Check(self)
2577
 
        result.check()
 
2779
    def _check(self, revision_ids, callback_refs, check_repo):
 
2780
        result = check.Check(self, check_repo=check_repo)
 
2781
        result.check(callback_refs)
2578
2782
        return result
2579
2783
 
2580
 
    def _warn_if_deprecated(self):
 
2784
    def _warn_if_deprecated(self, branch=None):
2581
2785
        global _deprecation_warning_done
2582
2786
        if _deprecation_warning_done:
2583
2787
            return
2584
 
        _deprecation_warning_done = True
2585
 
        warning("Format %s for %s is deprecated - please use 'bzr upgrade' to get better performance"
2586
 
                % (self._format, self.bzrdir.transport.base))
 
2788
        try:
 
2789
            if branch is None:
 
2790
                conf = config.GlobalConfig()
 
2791
            else:
 
2792
                conf = branch.get_config()
 
2793
            if conf.suppress_warning('format_deprecation'):
 
2794
                return
 
2795
            warning("Format %s for %s is deprecated -"
 
2796
                    " please use 'bzr upgrade' to get better performance"
 
2797
                    % (self._format, self.bzrdir.transport.base))
 
2798
        finally:
 
2799
            _deprecation_warning_done = True
2587
2800
 
2588
2801
    def supports_rich_root(self):
2589
2802
        return self._format.rich_root_data
2870
3083
    # help), and for fetching when data won't have come from the same
2871
3084
    # compressor.
2872
3085
    pack_compresses = False
 
3086
    # Does the repository inventory storage understand references to trees?
 
3087
    supports_tree_reference = None
2873
3088
 
2874
3089
    def __str__(self):
2875
3090
        return "<%s>" % self.__class__.__name__
2891
3106
        """
2892
3107
        try:
2893
3108
            transport = a_bzrdir.get_repository_transport(None)
2894
 
            format_string = transport.get("format").read()
 
3109
            format_string = transport.get_bytes("format")
2895
3110
            return format_registry.get(format_string)
2896
3111
        except errors.NoSuchFile:
2897
3112
            raise errors.NoRepositoryPresent(a_bzrdir)
2979
3194
        raise NotImplementedError(self.network_name)
2980
3195
 
2981
3196
    def check_conversion_target(self, target_format):
2982
 
        raise NotImplementedError(self.check_conversion_target)
 
3197
        if self.rich_root_data and not target_format.rich_root_data:
 
3198
            raise errors.BadConversionTarget(
 
3199
                'Does not support rich root data.', target_format,
 
3200
                from_format=self)
 
3201
        if (self.supports_tree_reference and 
 
3202
            not getattr(target_format, 'supports_tree_reference', False)):
 
3203
            raise errors.BadConversionTarget(
 
3204
                'Does not support nested trees', target_format,
 
3205
                from_format=self)
2983
3206
 
2984
3207
    def open(self, a_bzrdir, _found=False):
2985
3208
        """Return an instance of this format for the bzrdir a_bzrdir.
3202
3425
                   provided a default one will be created.
3203
3426
        :return: None.
3204
3427
        """
3205
 
        from bzrlib.fetch import RepoFetcher
3206
 
        f = RepoFetcher(to_repository=self.target,
 
3428
        f = _mod_fetch.RepoFetcher(to_repository=self.target,
3207
3429
                               from_repository=self.source,
3208
3430
                               last_revision=revision_id,
3209
3431
                               fetch_spec=fetch_spec,
3382
3604
                self.target.texts.insert_record_stream(
3383
3605
                    self.source.texts.get_record_stream(
3384
3606
                        self.source.texts.keys(), 'topological', False))
3385
 
                pb.update('copying inventory', 0, 1)
 
3607
                pb.update('Copying inventory', 0, 1)
3386
3608
                self.target.inventories.insert_record_stream(
3387
3609
                    self.source.inventories.get_record_stream(
3388
3610
                        self.source.inventories.keys(), 'topological', False))
3514
3736
        # This is redundant with format.check_conversion_target(), however that
3515
3737
        # raises an exception, and we just want to say "False" as in we won't
3516
3738
        # support converting between these formats.
 
3739
        if 'IDS_never' in debug.debug_flags:
 
3740
            return False
3517
3741
        if source.supports_rich_root() and not target.supports_rich_root():
3518
3742
            return False
3519
3743
        if (source._format.supports_tree_reference
3520
3744
            and not target._format.supports_tree_reference):
3521
3745
            return False
 
3746
        if target._fallback_repositories and target._format.supports_chks:
 
3747
            # IDS doesn't know how to copy CHKs for the parent inventories it
 
3748
            # adds to stacked repos.
 
3749
            return False
 
3750
        if 'IDS_always' in debug.debug_flags:
 
3751
            return True
 
3752
        # Only use this code path for local source and target.  IDS does far
 
3753
        # too much IO (both bandwidth and roundtrips) over a network.
 
3754
        if not source.bzrdir.transport.base.startswith('file:///'):
 
3755
            return False
 
3756
        if not target.bzrdir.transport.base.startswith('file:///'):
 
3757
            return False
3522
3758
        return True
3523
3759
 
3524
 
    def _get_delta_for_revision(self, tree, parent_ids, basis_id, cache):
 
3760
    def _get_trees(self, revision_ids, cache):
 
3761
        possible_trees = []
 
3762
        for rev_id in revision_ids:
 
3763
            if rev_id in cache:
 
3764
                possible_trees.append((rev_id, cache[rev_id]))
 
3765
            else:
 
3766
                # Not cached, but inventory might be present anyway.
 
3767
                try:
 
3768
                    tree = self.source.revision_tree(rev_id)
 
3769
                except errors.NoSuchRevision:
 
3770
                    # Nope, parent is ghost.
 
3771
                    pass
 
3772
                else:
 
3773
                    cache[rev_id] = tree
 
3774
                    possible_trees.append((rev_id, tree))
 
3775
        return possible_trees
 
3776
 
 
3777
    def _get_delta_for_revision(self, tree, parent_ids, possible_trees):
3525
3778
        """Get the best delta and base for this revision.
3526
3779
 
3527
3780
        :return: (basis_id, delta)
3528
3781
        """
3529
 
        possible_trees = [(parent_id, cache[parent_id])
3530
 
                          for parent_id in parent_ids
3531
 
                           if parent_id in cache]
3532
 
        if len(possible_trees) == 0:
3533
 
            # There either aren't any parents, or the parents aren't in the
3534
 
            # cache, so just use the last converted tree
3535
 
            possible_trees.append((basis_id, cache[basis_id]))
3536
3782
        deltas = []
 
3783
        # Generate deltas against each tree, to find the shortest.
 
3784
        texts_possibly_new_in_tree = set()
3537
3785
        for basis_id, basis_tree in possible_trees:
3538
3786
            delta = tree.inventory._make_delta(basis_tree.inventory)
 
3787
            for old_path, new_path, file_id, new_entry in delta:
 
3788
                if new_path is None:
 
3789
                    # This file_id isn't present in the new rev, so we don't
 
3790
                    # care about it.
 
3791
                    continue
 
3792
                if not new_path:
 
3793
                    # Rich roots are handled elsewhere...
 
3794
                    continue
 
3795
                kind = new_entry.kind
 
3796
                if kind != 'directory' and kind != 'file':
 
3797
                    # No text record associated with this inventory entry.
 
3798
                    continue
 
3799
                # This is a directory or file that has changed somehow.
 
3800
                texts_possibly_new_in_tree.add((file_id, new_entry.revision))
3539
3801
            deltas.append((len(delta), basis_id, delta))
3540
3802
        deltas.sort()
3541
3803
        return deltas[0][1:]
3542
3804
 
3543
 
    def _get_parent_keys(self, root_key, parent_map):
3544
 
        """Get the parent keys for a given root id."""
3545
 
        root_id, rev_id = root_key
3546
 
        # Include direct parents of the revision, but only if they used
3547
 
        # the same root_id and are heads.
3548
 
        parent_keys = []
3549
 
        for parent_id in parent_map[rev_id]:
3550
 
            if parent_id == _mod_revision.NULL_REVISION:
3551
 
                continue
3552
 
            if parent_id not in self._revision_id_to_root_id:
3553
 
                # We probably didn't read this revision, go spend the
3554
 
                # extra effort to actually check
3555
 
                try:
3556
 
                    tree = self.source.revision_tree(parent_id)
3557
 
                except errors.NoSuchRevision:
3558
 
                    # Ghost, fill out _revision_id_to_root_id in case we
3559
 
                    # encounter this again.
3560
 
                    # But set parent_root_id to None since we don't really know
3561
 
                    parent_root_id = None
3562
 
                else:
3563
 
                    parent_root_id = tree.get_root_id()
3564
 
                self._revision_id_to_root_id[parent_id] = None
3565
 
            else:
3566
 
                parent_root_id = self._revision_id_to_root_id[parent_id]
3567
 
            if root_id == parent_root_id:
3568
 
                # With stacking we _might_ want to refer to a non-local
3569
 
                # revision, but this code path only applies when we have the
3570
 
                # full content available, so ghosts really are ghosts, not just
3571
 
                # the edge of local data.
3572
 
                parent_keys.append((parent_id,))
3573
 
            else:
3574
 
                # root_id may be in the parent anyway.
3575
 
                try:
3576
 
                    tree = self.source.revision_tree(parent_id)
3577
 
                except errors.NoSuchRevision:
3578
 
                    # ghost, can't refer to it.
3579
 
                    pass
3580
 
                else:
3581
 
                    try:
3582
 
                        parent_keys.append((tree.inventory[root_id].revision,))
3583
 
                    except errors.NoSuchId:
3584
 
                        # not in the tree
3585
 
                        pass
3586
 
        g = graph.Graph(self.source.revisions)
3587
 
        heads = g.heads(parent_keys)
3588
 
        selected_keys = []
3589
 
        for key in parent_keys:
3590
 
            if key in heads and key not in selected_keys:
3591
 
                selected_keys.append(key)
3592
 
        return tuple([(root_id,)+ key for key in selected_keys])
3593
 
 
3594
 
    def _new_root_data_stream(self, root_keys_to_create, parent_map):
3595
 
        for root_key in root_keys_to_create:
3596
 
            parent_keys = self._get_parent_keys(root_key, parent_map)
3597
 
            yield versionedfile.FulltextContentFactory(root_key,
3598
 
                parent_keys, None, '')
3599
 
 
3600
 
    def _fetch_batch(self, revision_ids, basis_id, cache):
 
3805
    def _fetch_parent_invs_for_stacking(self, parent_map, cache):
 
3806
        """Find all parent revisions that are absent, but for which the
 
3807
        inventory is present, and copy those inventories.
 
3808
 
 
3809
        This is necessary to preserve correctness when the source is stacked
 
3810
        without fallbacks configured.  (Note that in cases like upgrade the
 
3811
        source may be not have _fallback_repositories even though it is
 
3812
        stacked.)
 
3813
        """
 
3814
        parent_revs = set()
 
3815
        for parents in parent_map.values():
 
3816
            parent_revs.update(parents)
 
3817
        present_parents = self.source.get_parent_map(parent_revs)
 
3818
        absent_parents = set(parent_revs).difference(present_parents)
 
3819
        parent_invs_keys_for_stacking = self.source.inventories.get_parent_map(
 
3820
            (rev_id,) for rev_id in absent_parents)
 
3821
        parent_inv_ids = [key[-1] for key in parent_invs_keys_for_stacking]
 
3822
        for parent_tree in self.source.revision_trees(parent_inv_ids):
 
3823
            current_revision_id = parent_tree.get_revision_id()
 
3824
            parents_parents_keys = parent_invs_keys_for_stacking[
 
3825
                (current_revision_id,)]
 
3826
            parents_parents = [key[-1] for key in parents_parents_keys]
 
3827
            basis_id = _mod_revision.NULL_REVISION
 
3828
            basis_tree = self.source.revision_tree(basis_id)
 
3829
            delta = parent_tree.inventory._make_delta(basis_tree.inventory)
 
3830
            self.target.add_inventory_by_delta(
 
3831
                basis_id, delta, current_revision_id, parents_parents)
 
3832
            cache[current_revision_id] = parent_tree
 
3833
 
 
3834
    def _fetch_batch(self, revision_ids, basis_id, cache, a_graph=None):
3601
3835
        """Fetch across a few revisions.
3602
3836
 
3603
3837
        :param revision_ids: The revisions to copy
3604
3838
        :param basis_id: The revision_id of a tree that must be in cache, used
3605
3839
            as a basis for delta when no other base is available
3606
3840
        :param cache: A cache of RevisionTrees that we can use.
 
3841
        :param a_graph: A Graph object to determine the heads() of the
 
3842
            rich-root data stream.
3607
3843
        :return: The revision_id of the last converted tree. The RevisionTree
3608
3844
            for it will be in cache
3609
3845
        """
3615
3851
        pending_deltas = []
3616
3852
        pending_revisions = []
3617
3853
        parent_map = self.source.get_parent_map(revision_ids)
 
3854
        self._fetch_parent_invs_for_stacking(parent_map, cache)
 
3855
        self.source._safe_to_return_from_cache = True
3618
3856
        for tree in self.source.revision_trees(revision_ids):
 
3857
            # Find a inventory delta for this revision.
 
3858
            # Find text entries that need to be copied, too.
3619
3859
            current_revision_id = tree.get_revision_id()
3620
3860
            parent_ids = parent_map.get(current_revision_id, ())
 
3861
            parent_trees = self._get_trees(parent_ids, cache)
 
3862
            possible_trees = list(parent_trees)
 
3863
            if len(possible_trees) == 0:
 
3864
                # There either aren't any parents, or the parents are ghosts,
 
3865
                # so just use the last converted tree.
 
3866
                possible_trees.append((basis_id, cache[basis_id]))
3621
3867
            basis_id, delta = self._get_delta_for_revision(tree, parent_ids,
3622
 
                                                           basis_id, cache)
 
3868
                                                           possible_trees)
 
3869
            revision = self.source.get_revision(current_revision_id)
 
3870
            pending_deltas.append((basis_id, delta,
 
3871
                current_revision_id, revision.parent_ids))
3623
3872
            if self._converting_to_rich_root:
3624
3873
                self._revision_id_to_root_id[current_revision_id] = \
3625
3874
                    tree.get_root_id()
3626
 
            # Find text entries that need to be copied
 
3875
            # Determine which texts are in present in this revision but not in
 
3876
            # any of the available parents.
 
3877
            texts_possibly_new_in_tree = set()
3627
3878
            for old_path, new_path, file_id, entry in delta:
3628
 
                if new_path is not None:
3629
 
                    if not new_path:
3630
 
                        # This is the root
3631
 
                        if not self.target.supports_rich_root():
3632
 
                            # The target doesn't support rich root, so we don't
3633
 
                            # copy
3634
 
                            continue
3635
 
                        if self._converting_to_rich_root:
3636
 
                            # This can't be copied normally, we have to insert
3637
 
                            # it specially
3638
 
                            root_keys_to_create.add((file_id, entry.revision))
3639
 
                            continue
3640
 
                    text_keys.add((file_id, entry.revision))
3641
 
            revision = self.source.get_revision(current_revision_id)
3642
 
            pending_deltas.append((basis_id, delta,
3643
 
                current_revision_id, revision.parent_ids))
 
3879
                if new_path is None:
 
3880
                    # This file_id isn't present in the new rev
 
3881
                    continue
 
3882
                if not new_path:
 
3883
                    # This is the root
 
3884
                    if not self.target.supports_rich_root():
 
3885
                        # The target doesn't support rich root, so we don't
 
3886
                        # copy
 
3887
                        continue
 
3888
                    if self._converting_to_rich_root:
 
3889
                        # This can't be copied normally, we have to insert
 
3890
                        # it specially
 
3891
                        root_keys_to_create.add((file_id, entry.revision))
 
3892
                        continue
 
3893
                kind = entry.kind
 
3894
                texts_possibly_new_in_tree.add((file_id, entry.revision))
 
3895
            for basis_id, basis_tree in possible_trees:
 
3896
                basis_inv = basis_tree.inventory
 
3897
                for file_key in list(texts_possibly_new_in_tree):
 
3898
                    file_id, file_revision = file_key
 
3899
                    try:
 
3900
                        entry = basis_inv[file_id]
 
3901
                    except errors.NoSuchId:
 
3902
                        continue
 
3903
                    if entry.revision == file_revision:
 
3904
                        texts_possibly_new_in_tree.remove(file_key)
 
3905
            text_keys.update(texts_possibly_new_in_tree)
3644
3906
            pending_revisions.append(revision)
3645
3907
            cache[current_revision_id] = tree
3646
3908
            basis_id = current_revision_id
 
3909
        self.source._safe_to_return_from_cache = False
3647
3910
        # Copy file texts
3648
3911
        from_texts = self.source.texts
3649
3912
        to_texts = self.target.texts
3650
3913
        if root_keys_to_create:
3651
 
            root_stream = self._new_root_data_stream(root_keys_to_create,
3652
 
                                                     parent_map)
 
3914
            root_stream = _mod_fetch._new_root_data_stream(
 
3915
                root_keys_to_create, self._revision_id_to_root_id, parent_map,
 
3916
                self.source, graph=a_graph)
3653
3917
            to_texts.insert_record_stream(root_stream)
3654
3918
        to_texts.insert_record_stream(from_texts.get_record_stream(
3655
3919
            text_keys, self.target._format._fetch_order,
3662
3926
            # for the new revisions that we are about to insert.  We do this
3663
3927
            # before adding the revisions so that no revision is added until
3664
3928
            # all the inventories it may depend on are added.
 
3929
            # Note that this is overzealous, as we may have fetched these in an
 
3930
            # earlier batch.
3665
3931
            parent_ids = set()
3666
3932
            revision_ids = set()
3667
3933
            for revision in pending_revisions:
3670
3936
            parent_ids.difference_update(revision_ids)
3671
3937
            parent_ids.discard(_mod_revision.NULL_REVISION)
3672
3938
            parent_map = self.source.get_parent_map(parent_ids)
3673
 
            for parent_tree in self.source.revision_trees(parent_ids):
3674
 
                basis_id, delta = self._get_delta_for_revision(tree, parent_ids, basis_id, cache)
 
3939
            # we iterate over parent_map and not parent_ids because we don't
 
3940
            # want to try copying any revision which is a ghost
 
3941
            for parent_tree in self.source.revision_trees(parent_map):
3675
3942
                current_revision_id = parent_tree.get_revision_id()
3676
3943
                parents_parents = parent_map[current_revision_id]
 
3944
                possible_trees = self._get_trees(parents_parents, cache)
 
3945
                if len(possible_trees) == 0:
 
3946
                    # There either aren't any parents, or the parents are
 
3947
                    # ghosts, so just use the last converted tree.
 
3948
                    possible_trees.append((basis_id, cache[basis_id]))
 
3949
                basis_id, delta = self._get_delta_for_revision(parent_tree,
 
3950
                    parents_parents, possible_trees)
3677
3951
                self.target.add_inventory_by_delta(
3678
3952
                    basis_id, delta, current_revision_id, parents_parents)
3679
3953
        # insert signatures and revisions
3693
3967
 
3694
3968
        :param revision_ids: The list of revisions to fetch. Must be in
3695
3969
            topological order.
3696
 
        :param pb: A ProgressBar
 
3970
        :param pb: A ProgressTask
3697
3971
        :return: None
3698
3972
        """
3699
3973
        basis_id, basis_tree = self._get_basis(revision_ids[0])
3702
3976
        cache[basis_id] = basis_tree
3703
3977
        del basis_tree # We don't want to hang on to it here
3704
3978
        hints = []
 
3979
        if self._converting_to_rich_root and len(revision_ids) > 100:
 
3980
            a_graph = _mod_fetch._get_rich_root_heads_graph(self.source,
 
3981
                                                            revision_ids)
 
3982
        else:
 
3983
            a_graph = None
 
3984
 
3705
3985
        for offset in range(0, len(revision_ids), batch_size):
3706
3986
            self.target.start_write_group()
3707
3987
            try:
3708
3988
                pb.update('Transferring revisions', offset,
3709
3989
                          len(revision_ids))
3710
3990
                batch = revision_ids[offset:offset+batch_size]
3711
 
                basis_id = self._fetch_batch(batch, basis_id, cache)
 
3991
                basis_id = self._fetch_batch(batch, basis_id, cache,
 
3992
                                             a_graph=a_graph)
3712
3993
            except:
 
3994
                self.source._safe_to_return_from_cache = False
3713
3995
                self.target.abort_write_group()
3714
3996
                raise
3715
3997
            else:
3727
4009
        """See InterRepository.fetch()."""
3728
4010
        if fetch_spec is not None:
3729
4011
            raise AssertionError("Not implemented yet...")
 
4012
        # See <https://launchpad.net/bugs/456077> asking for a warning here
 
4013
        #
 
4014
        # nb this is only active for local-local fetches; other things using
 
4015
        # streaming.
 
4016
        ui.ui_factory.warn_cross_format_fetch(self.source._format,
 
4017
            self.target._format)
3730
4018
        if (not self.source.supports_rich_root()
3731
4019
            and self.target.supports_rich_root()):
3732
4020
            self._converting_to_rich_root = True
3744
4032
        # Walk though all revisions; get inventory deltas, copy referenced
3745
4033
        # texts that delta references, insert the delta, revision and
3746
4034
        # signature.
3747
 
        first_rev = self.source.get_revision(revision_ids[0])
3748
4035
        if pb is None:
3749
4036
            my_pb = ui.ui_factory.nested_progress_bar()
3750
4037
            pb = my_pb
3827
4114
                                                  self.source_repo.is_shared())
3828
4115
        converted.lock_write()
3829
4116
        try:
3830
 
            self.step('Copying content into repository.')
 
4117
            self.step('Copying content')
3831
4118
            self.source_repo.copy_content_into(converted)
3832
4119
        finally:
3833
4120
            converted.unlock()
3834
 
        self.step('Deleting old repository content.')
 
4121
        self.step('Deleting old repository content')
3835
4122
        self.repo_dir.transport.delete_tree('repository.backup')
3836
 
        self.pb.note('repository converted')
 
4123
        ui.ui_factory.note('repository converted')
3837
4124
 
3838
4125
    def step(self, message):
3839
4126
        """Update the pb by a step."""
3873
4160
 
3874
4161
class _VersionedFileChecker(object):
3875
4162
 
3876
 
    def __init__(self, repository, text_key_references=None):
 
4163
    def __init__(self, repository, text_key_references=None, ancestors=None):
3877
4164
        self.repository = repository
3878
4165
        self.text_index = self.repository._generate_text_key_index(
3879
 
            text_key_references=text_key_references)
 
4166
            text_key_references=text_key_references, ancestors=ancestors)
3880
4167
 
3881
4168
    def calculate_file_version_parents(self, text_key):
3882
4169
        """Calculate the correct parents for a file version according to
3900
4187
            revision_id) tuples for versions that are present in this versioned
3901
4188
            file, but not used by the corresponding inventory.
3902
4189
        """
 
4190
        local_progress = None
 
4191
        if progress_bar is None:
 
4192
            local_progress = ui.ui_factory.nested_progress_bar()
 
4193
            progress_bar = local_progress
 
4194
        try:
 
4195
            return self._check_file_version_parents(texts, progress_bar)
 
4196
        finally:
 
4197
            if local_progress:
 
4198
                local_progress.finished()
 
4199
 
 
4200
    def _check_file_version_parents(self, texts, progress_bar):
 
4201
        """See check_file_version_parents."""
3903
4202
        wrong_parents = {}
3904
4203
        self.file_ids = set([file_id for file_id, _ in
3905
4204
            self.text_index.iterkeys()])
3906
4205
        # text keys is now grouped by file_id
3907
 
        n_weaves = len(self.file_ids)
3908
 
        files_in_revisions = {}
3909
 
        revisions_of_files = {}
3910
4206
        n_versions = len(self.text_index)
3911
4207
        progress_bar.update('loading text store', 0, n_versions)
3912
4208
        parent_map = self.repository.texts.get_parent_map(self.text_index)
3914
4210
        text_keys = self.repository.texts.keys()
3915
4211
        unused_keys = frozenset(text_keys) - set(self.text_index)
3916
4212
        for num, key in enumerate(self.text_index.iterkeys()):
3917
 
            if progress_bar is not None:
3918
 
                progress_bar.update('checking text graph', num, n_versions)
 
4213
            progress_bar.update('checking text graph', num, n_versions)
3919
4214
            correct_parents = self.calculate_file_version_parents(key)
3920
4215
            try:
3921
4216
                knit_parents = parent_map[key]
4006
4301
            else:
4007
4302
                new_pack.set_write_cache_size(1024*1024)
4008
4303
        for substream_type, substream in stream:
 
4304
            if 'stream' in debug.debug_flags:
 
4305
                mutter('inserting substream: %s', substream_type)
4009
4306
            if substream_type == 'texts':
4010
4307
                self.target_repo.texts.insert_record_stream(substream)
4011
4308
            elif substream_type == 'inventories':
4015
4312
                else:
4016
4313
                    self._extract_and_insert_inventories(
4017
4314
                        substream, src_serializer)
 
4315
            elif substream_type == 'inventory-deltas':
 
4316
                ui.ui_factory.warn_cross_format_fetch(src_format,
 
4317
                    self.target_repo._format)
 
4318
                self._extract_and_insert_inventory_deltas(
 
4319
                    substream, src_serializer)
4018
4320
            elif substream_type == 'chk_bytes':
4019
4321
                # XXX: This doesn't support conversions, as it assumes the
4020
4322
                #      conversion was done in the fetch code.
4051
4353
                ):
4052
4354
                if versioned_file is None:
4053
4355
                    continue
 
4356
                # TODO: key is often going to be a StaticTuple object
 
4357
                #       I don't believe we can define a method by which
 
4358
                #       (prefix,) + StaticTuple will work, though we could
 
4359
                #       define a StaticTuple.sq_concat that would allow you to
 
4360
                #       pass in either a tuple or a StaticTuple as the second
 
4361
                #       object, so instead we could have:
 
4362
                #       StaticTuple(prefix) + key here...
4054
4363
                missing_keys.update((prefix,) + key for key in
4055
4364
                    versioned_file.get_missing_compression_parent_keys())
4056
4365
        except NotImplementedError:
4071
4380
            self.target_repo.pack(hint=hint)
4072
4381
        return [], set()
4073
4382
 
4074
 
    def _extract_and_insert_inventories(self, substream, serializer):
 
4383
    def _extract_and_insert_inventory_deltas(self, substream, serializer):
 
4384
        target_rich_root = self.target_repo._format.rich_root_data
 
4385
        target_tree_refs = self.target_repo._format.supports_tree_reference
 
4386
        for record in substream:
 
4387
            # Insert the delta directly
 
4388
            inventory_delta_bytes = record.get_bytes_as('fulltext')
 
4389
            deserialiser = inventory_delta.InventoryDeltaDeserializer()
 
4390
            try:
 
4391
                parse_result = deserialiser.parse_text_bytes(
 
4392
                    inventory_delta_bytes)
 
4393
            except inventory_delta.IncompatibleInventoryDelta, err:
 
4394
                trace.mutter("Incompatible delta: %s", err.msg)
 
4395
                raise errors.IncompatibleRevision(self.target_repo._format)
 
4396
            basis_id, new_id, rich_root, tree_refs, inv_delta = parse_result
 
4397
            revision_id = new_id
 
4398
            parents = [key[0] for key in record.parents]
 
4399
            self.target_repo.add_inventory_by_delta(
 
4400
                basis_id, inv_delta, revision_id, parents)
 
4401
 
 
4402
    def _extract_and_insert_inventories(self, substream, serializer,
 
4403
            parse_delta=None):
4075
4404
        """Generate a new inventory versionedfile in target, converting data.
4076
4405
 
4077
4406
        The inventory is retrieved from the source, (deserializing it), and
4078
4407
        stored in the target (reserializing it in a different format).
4079
4408
        """
 
4409
        target_rich_root = self.target_repo._format.rich_root_data
 
4410
        target_tree_refs = self.target_repo._format.supports_tree_reference
4080
4411
        for record in substream:
 
4412
            # It's not a delta, so it must be a fulltext in the source
 
4413
            # serializer's format.
4081
4414
            bytes = record.get_bytes_as('fulltext')
4082
4415
            revision_id = record.key[0]
4083
4416
            inv = serializer.read_inventory_from_string(bytes, revision_id)
4084
4417
            parents = [key[0] for key in record.parents]
4085
4418
            self.target_repo.add_inventory(revision_id, inv, parents)
 
4419
            # No need to keep holding this full inv in memory when the rest of
 
4420
            # the substream is likely to be all deltas.
 
4421
            del inv
4086
4422
 
4087
4423
    def _extract_and_insert_revisions(self, substream, serializer):
4088
4424
        for record in substream:
4137
4473
        return [('signatures', signatures), ('revisions', revisions)]
4138
4474
 
4139
4475
    def _generate_root_texts(self, revs):
4140
 
        """This will be called by __fetch between fetching weave texts and
 
4476
        """This will be called by get_stream between fetching weave texts and
4141
4477
        fetching the inventory weave.
4142
 
 
4143
 
        Subclasses should override this if they need to generate root texts
4144
 
        after fetching weave texts.
4145
4478
        """
4146
4479
        if self._rich_root_upgrade():
4147
 
            import bzrlib.fetch
4148
 
            return bzrlib.fetch.Inter1and2Helper(
 
4480
            return _mod_fetch.Inter1and2Helper(
4149
4481
                self.from_repository).generate_root_texts(revs)
4150
4482
        else:
4151
4483
            return []
4154
4486
        phase = 'file'
4155
4487
        revs = search.get_keys()
4156
4488
        graph = self.from_repository.get_graph()
4157
 
        revs = list(graph.iter_topo_order(revs))
 
4489
        revs = tsort.topo_sort(graph.get_parent_map(revs))
4158
4490
        data_to_fetch = self.from_repository.item_keys_introduced_by(revs)
4159
4491
        text_keys = []
4160
4492
        for knit_kind, file_id, revisions in data_to_fetch:
4179
4511
                # will be valid.
4180
4512
                for _ in self._generate_root_texts(revs):
4181
4513
                    yield _
4182
 
                # NB: This currently reopens the inventory weave in source;
4183
 
                # using a single stream interface instead would avoid this.
4184
 
                from_weave = self.from_repository.inventories
4185
4514
                # we fetch only the referenced inventories because we do not
4186
4515
                # know for unselected inventories whether all their required
4187
4516
                # texts are present in the other repository - it could be
4226
4555
            if not keys:
4227
4556
                # No need to stream something we don't have
4228
4557
                continue
 
4558
            if substream_kind == 'inventories':
 
4559
                # Some missing keys are genuinely ghosts, filter those out.
 
4560
                present = self.from_repository.inventories.get_parent_map(keys)
 
4561
                revs = [key[0] for key in present]
 
4562
                # Get the inventory stream more-or-less as we do for the
 
4563
                # original stream; there's no reason to assume that records
 
4564
                # direct from the source will be suitable for the sink.  (Think
 
4565
                # e.g. 2a -> 1.9-rich-root).
 
4566
                for info in self._get_inventory_stream(revs, missing=True):
 
4567
                    yield info
 
4568
                continue
 
4569
 
4229
4570
            # Ask for full texts always so that we don't need more round trips
4230
4571
            # after this stream.
4231
4572
            # Some of the missing keys are genuinely ghosts, so filter absent
4246
4587
        return (not self.from_repository._format.rich_root_data and
4247
4588
            self.to_format.rich_root_data)
4248
4589
 
4249
 
    def _get_inventory_stream(self, revision_ids):
 
4590
    def _get_inventory_stream(self, revision_ids, missing=False):
4250
4591
        from_format = self.from_repository._format
4251
 
        if (from_format.supports_chks and self.to_format.supports_chks
4252
 
            and (from_format._serializer == self.to_format._serializer)):
4253
 
            # Both sides support chks, and they use the same serializer, so it
4254
 
            # is safe to transmit the chk pages and inventory pages across
4255
 
            # as-is.
4256
 
            return self._get_chk_inventory_stream(revision_ids)
4257
 
        elif (not from_format.supports_chks):
4258
 
            # Source repository doesn't support chks. So we can transmit the
4259
 
            # inventories 'as-is' and either they are just accepted on the
4260
 
            # target, or the Sink will properly convert it.
4261
 
            return self._get_simple_inventory_stream(revision_ids)
 
4592
        if (from_format.supports_chks and self.to_format.supports_chks and
 
4593
            from_format.network_name() == self.to_format.network_name()):
 
4594
            raise AssertionError(
 
4595
                "this case should be handled by GroupCHKStreamSource")
 
4596
        elif 'forceinvdeltas' in debug.debug_flags:
 
4597
            return self._get_convertable_inventory_stream(revision_ids,
 
4598
                    delta_versus_null=missing)
 
4599
        elif from_format.network_name() == self.to_format.network_name():
 
4600
            # Same format.
 
4601
            return self._get_simple_inventory_stream(revision_ids,
 
4602
                    missing=missing)
 
4603
        elif (not from_format.supports_chks and not self.to_format.supports_chks
 
4604
                and from_format._serializer == self.to_format._serializer):
 
4605
            # Essentially the same format.
 
4606
            return self._get_simple_inventory_stream(revision_ids,
 
4607
                    missing=missing)
4262
4608
        else:
4263
 
            # XXX: Hack to make not-chk->chk fetch: copy the inventories as
4264
 
            #      inventories. Note that this should probably be done somehow
4265
 
            #      as part of bzrlib.repository.StreamSink. Except JAM couldn't
4266
 
            #      figure out how a non-chk repository could possibly handle
4267
 
            #      deserializing an inventory stream from a chk repo, as it
4268
 
            #      doesn't have a way to understand individual pages.
4269
 
            return self._get_convertable_inventory_stream(revision_ids)
 
4609
            # Any time we switch serializations, we want to use an
 
4610
            # inventory-delta based approach.
 
4611
            return self._get_convertable_inventory_stream(revision_ids,
 
4612
                    delta_versus_null=missing)
4270
4613
 
4271
 
    def _get_simple_inventory_stream(self, revision_ids):
 
4614
    def _get_simple_inventory_stream(self, revision_ids, missing=False):
 
4615
        # NB: This currently reopens the inventory weave in source;
 
4616
        # using a single stream interface instead would avoid this.
4272
4617
        from_weave = self.from_repository.inventories
 
4618
        if missing:
 
4619
            delta_closure = True
 
4620
        else:
 
4621
            delta_closure = not self.delta_on_metadata()
4273
4622
        yield ('inventories', from_weave.get_record_stream(
4274
4623
            [(rev_id,) for rev_id in revision_ids],
4275
 
            self.inventory_fetch_order(),
4276
 
            not self.delta_on_metadata()))
4277
 
 
4278
 
    def _get_chk_inventory_stream(self, revision_ids):
4279
 
        """Fetch the inventory texts, along with the associated chk maps."""
4280
 
        # We want an inventory outside of the search set, so that we can filter
4281
 
        # out uninteresting chk pages. For now we use
4282
 
        # _find_revision_outside_set, but if we had a Search with cut_revs, we
4283
 
        # could use that instead.
4284
 
        start_rev_id = self.from_repository._find_revision_outside_set(
4285
 
                            revision_ids)
4286
 
        start_rev_key = (start_rev_id,)
4287
 
        inv_keys_to_fetch = [(rev_id,) for rev_id in revision_ids]
4288
 
        if start_rev_id != _mod_revision.NULL_REVISION:
4289
 
            inv_keys_to_fetch.append((start_rev_id,))
4290
 
        # Any repo that supports chk_bytes must also support out-of-order
4291
 
        # insertion. At least, that is how we expect it to work
4292
 
        # We use get_record_stream instead of iter_inventories because we want
4293
 
        # to be able to insert the stream as well. We could instead fetch
4294
 
        # allowing deltas, and then iter_inventories, but we don't know whether
4295
 
        # source or target is more 'local' anway.
4296
 
        inv_stream = self.from_repository.inventories.get_record_stream(
4297
 
            inv_keys_to_fetch, 'unordered',
4298
 
            True) # We need them as full-texts so we can find their references
4299
 
        uninteresting_chk_roots = set()
4300
 
        interesting_chk_roots = set()
4301
 
        def filter_inv_stream(inv_stream):
4302
 
            for idx, record in enumerate(inv_stream):
4303
 
                ### child_pb.update('fetch inv', idx, len(inv_keys_to_fetch))
4304
 
                bytes = record.get_bytes_as('fulltext')
4305
 
                chk_inv = inventory.CHKInventory.deserialise(
4306
 
                    self.from_repository.chk_bytes, bytes, record.key)
4307
 
                if record.key == start_rev_key:
4308
 
                    uninteresting_chk_roots.add(chk_inv.id_to_entry.key())
4309
 
                    p_id_map = chk_inv.parent_id_basename_to_file_id
4310
 
                    if p_id_map is not None:
4311
 
                        uninteresting_chk_roots.add(p_id_map.key())
4312
 
                else:
4313
 
                    yield record
4314
 
                    interesting_chk_roots.add(chk_inv.id_to_entry.key())
4315
 
                    p_id_map = chk_inv.parent_id_basename_to_file_id
4316
 
                    if p_id_map is not None:
4317
 
                        interesting_chk_roots.add(p_id_map.key())
4318
 
        ### pb.update('fetch inventory', 0, 2)
4319
 
        yield ('inventories', filter_inv_stream(inv_stream))
4320
 
        # Now that we have worked out all of the interesting root nodes, grab
4321
 
        # all of the interesting pages and insert them
4322
 
        ### pb.update('fetch inventory', 1, 2)
4323
 
        interesting = chk_map.iter_interesting_nodes(
4324
 
            self.from_repository.chk_bytes, interesting_chk_roots,
4325
 
            uninteresting_chk_roots)
4326
 
        def to_stream_adapter():
4327
 
            """Adapt the iter_interesting_nodes result to a single stream.
4328
 
 
4329
 
            iter_interesting_nodes returns records as it processes them, along
4330
 
            with keys. However, we only want to return the records themselves.
4331
 
            """
4332
 
            for record, items in interesting:
4333
 
                if record is not None:
4334
 
                    yield record
4335
 
        # XXX: We could instead call get_record_stream(records.keys())
4336
 
        #      ATM, this will always insert the records as fulltexts, and
4337
 
        #      requires that you can hang on to records once you have gone
4338
 
        #      on to the next one. Further, it causes the target to
4339
 
        #      recompress the data. Testing shows it to be faster than
4340
 
        #      requesting the records again, though.
4341
 
        yield ('chk_bytes', to_stream_adapter())
4342
 
        ### pb.update('fetch inventory', 2, 2)
4343
 
 
4344
 
    def _get_convertable_inventory_stream(self, revision_ids):
4345
 
        # XXX: One of source or target is using chks, and they don't have
4346
 
        #      compatible serializations. The StreamSink code expects to be
4347
 
        #      able to convert on the target, so we need to put
4348
 
        #      bytes-on-the-wire that can be converted
4349
 
        yield ('inventories', self._stream_invs_as_fulltexts(revision_ids))
4350
 
 
4351
 
    def _stream_invs_as_fulltexts(self, revision_ids):
 
4624
            self.inventory_fetch_order(), delta_closure))
 
4625
 
 
4626
    def _get_convertable_inventory_stream(self, revision_ids,
 
4627
                                          delta_versus_null=False):
 
4628
        # The two formats are sufficiently different that there is no fast
 
4629
        # path, so we need to send just inventorydeltas, which any
 
4630
        # sufficiently modern client can insert into any repository.
 
4631
        # The StreamSink code expects to be able to
 
4632
        # convert on the target, so we need to put bytes-on-the-wire that can
 
4633
        # be converted.  That means inventory deltas (if the remote is <1.19,
 
4634
        # RemoteStreamSink will fallback to VFS to insert the deltas).
 
4635
        yield ('inventory-deltas',
 
4636
           self._stream_invs_as_deltas(revision_ids,
 
4637
                                       delta_versus_null=delta_versus_null))
 
4638
 
 
4639
    def _stream_invs_as_deltas(self, revision_ids, delta_versus_null=False):
 
4640
        """Return a stream of inventory-deltas for the given rev ids.
 
4641
 
 
4642
        :param revision_ids: The list of inventories to transmit
 
4643
        :param delta_versus_null: Don't try to find a minimal delta for this
 
4644
            entry, instead compute the delta versus the NULL_REVISION. This
 
4645
            effectively streams a complete inventory. Used for stuff like
 
4646
            filling in missing parents, etc.
 
4647
        """
4352
4648
        from_repo = self.from_repository
4353
 
        from_serializer = from_repo._format._serializer
4354
4649
        revision_keys = [(rev_id,) for rev_id in revision_ids]
4355
4650
        parent_map = from_repo.inventories.get_parent_map(revision_keys)
4356
 
        for inv in self.from_repository.iter_inventories(revision_ids):
4357
 
            # XXX: This is a bit hackish, but it works. Basically,
4358
 
            #      CHKSerializer 'accidentally' supports
4359
 
            #      read/write_inventory_to_string, even though that is never
4360
 
            #      the format that is stored on disk. It *does* give us a
4361
 
            #      single string representation for an inventory, so live with
4362
 
            #      it for now.
4363
 
            #      This would be far better if we had a 'serialized inventory
4364
 
            #      delta' form. Then we could use 'inventory._make_delta', and
4365
 
            #      transmit that. This would both be faster to generate, and
4366
 
            #      result in fewer bytes-on-the-wire.
4367
 
            as_bytes = from_serializer.write_inventory_to_string(inv)
 
4651
        # XXX: possibly repos could implement a more efficient iter_inv_deltas
 
4652
        # method...
 
4653
        inventories = self.from_repository.iter_inventories(
 
4654
            revision_ids, 'topological')
 
4655
        format = from_repo._format
 
4656
        invs_sent_so_far = set([_mod_revision.NULL_REVISION])
 
4657
        inventory_cache = lru_cache.LRUCache(50)
 
4658
        null_inventory = from_repo.revision_tree(
 
4659
            _mod_revision.NULL_REVISION).inventory
 
4660
        # XXX: ideally the rich-root/tree-refs flags would be per-revision, not
 
4661
        # per-repo (e.g.  streaming a non-rich-root revision out of a rich-root
 
4662
        # repo back into a non-rich-root repo ought to be allowed)
 
4663
        serializer = inventory_delta.InventoryDeltaSerializer(
 
4664
            versioned_root=format.rich_root_data,
 
4665
            tree_references=format.supports_tree_reference)
 
4666
        for inv in inventories:
4368
4667
            key = (inv.revision_id,)
4369
4668
            parent_keys = parent_map.get(key, ())
 
4669
            delta = None
 
4670
            if not delta_versus_null and parent_keys:
 
4671
                # The caller did not ask for complete inventories and we have
 
4672
                # some parents that we can delta against.  Make a delta against
 
4673
                # each parent so that we can find the smallest.
 
4674
                parent_ids = [parent_key[0] for parent_key in parent_keys]
 
4675
                for parent_id in parent_ids:
 
4676
                    if parent_id not in invs_sent_so_far:
 
4677
                        # We don't know that the remote side has this basis, so
 
4678
                        # we can't use it.
 
4679
                        continue
 
4680
                    if parent_id == _mod_revision.NULL_REVISION:
 
4681
                        parent_inv = null_inventory
 
4682
                    else:
 
4683
                        parent_inv = inventory_cache.get(parent_id, None)
 
4684
                        if parent_inv is None:
 
4685
                            parent_inv = from_repo.get_inventory(parent_id)
 
4686
                    candidate_delta = inv._make_delta(parent_inv)
 
4687
                    if (delta is None or
 
4688
                        len(delta) > len(candidate_delta)):
 
4689
                        delta = candidate_delta
 
4690
                        basis_id = parent_id
 
4691
            if delta is None:
 
4692
                # Either none of the parents ended up being suitable, or we
 
4693
                # were asked to delta against NULL
 
4694
                basis_id = _mod_revision.NULL_REVISION
 
4695
                delta = inv._make_delta(null_inventory)
 
4696
            invs_sent_so_far.add(inv.revision_id)
 
4697
            inventory_cache[inv.revision_id] = inv
 
4698
            delta_serialized = ''.join(
 
4699
                serializer.delta_to_lines(basis_id, key[-1], delta))
4370
4700
            yield versionedfile.FulltextContentFactory(
4371
 
                key, parent_keys, None, as_bytes)
 
4701
                key, parent_keys, None, delta_serialized)
4372
4702
 
4373
4703
 
4374
4704
def _iter_for_revno(repo, partial_history_cache, stop_index=None,