~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/knit.py

  • Committer: Vincent Ladeuil
  • Date: 2011-02-10 12:37:27 UTC
  • mto: This revision was merged to the branch mainline in revision 5661.
  • Revision ID: v.ladeuil+lp@free.fr-20110210123727-8e0pu4wtlt6fj7nf
thread is already a python module, avoid confusion and use cethread instead.

Show diffs side-by-side

added added

removed removed

Lines of Context:
54
54
 
55
55
from cStringIO import StringIO
56
56
from itertools import izip
 
57
import gzip
57
58
import operator
58
59
import os
 
60
import sys
59
61
 
60
62
from bzrlib.lazy_import import lazy_import
61
63
lazy_import(globals(), """
62
 
import gzip
63
 
 
64
64
from bzrlib import (
 
65
    annotate,
65
66
    debug,
66
67
    diff,
67
68
    graph as _mod_graph,
68
69
    index as _mod_index,
 
70
    lru_cache,
69
71
    pack,
70
72
    patiencediff,
 
73
    progress,
71
74
    static_tuple,
72
75
    trace,
73
76
    tsort,
74
77
    tuned_gzip,
75
78
    ui,
76
79
    )
77
 
 
78
 
from bzrlib.repofmt import pack_repo
79
 
from bzrlib.i18n import gettext
80
80
""")
81
81
from bzrlib import (
82
 
    annotate,
83
82
    errors,
84
83
    osutils,
85
84
    )
86
85
from bzrlib.errors import (
 
86
    FileExists,
87
87
    NoSuchFile,
 
88
    KnitError,
88
89
    InvalidRevisionId,
89
90
    KnitCorrupt,
90
91
    KnitHeaderError,
91
92
    RevisionNotPresent,
 
93
    RevisionAlreadyPresent,
92
94
    SHA1KnitCorrupt,
93
95
    )
94
96
from bzrlib.osutils import (
95
97
    contains_whitespace,
 
98
    contains_linebreaks,
96
99
    sha_string,
97
100
    sha_strings,
98
101
    split_lines,
99
102
    )
100
103
from bzrlib.versionedfile import (
101
 
    _KeyRefs,
102
104
    AbsentContentFactory,
103
105
    adapter_registry,
104
106
    ConstantMapper,
105
107
    ContentFactory,
 
108
    ChunkedContentFactory,
106
109
    sort_groupcompress,
107
 
    VersionedFilesWithFallbacks,
 
110
    VersionedFile,
 
111
    VersionedFiles,
108
112
    )
109
113
 
110
114
 
409
413
class KnitContent(object):
410
414
    """Content of a knit version to which deltas can be applied.
411
415
 
412
 
    This is always stored in memory as a list of lines with \\n at the end,
 
416
    This is always stored in memory as a list of lines with \n at the end,
413
417
    plus a flag saying if the final ending is really there or not, because that
414
418
    corresponds to the on-disk knit representation.
415
419
    """
802
806
        writer.begin()
803
807
        index = _KnitGraphIndex(graph_index, lambda:True, parents=parents,
804
808
            deltas=delta, add_callback=graph_index.add_nodes)
805
 
        access = pack_repo._DirectPackAccess({})
 
809
        access = _DirectPackAccess({})
806
810
        access.set_writer(writer, graph_index, (transport, 'newpack'))
807
811
        result = KnitVersionedFiles(index, access,
808
812
            max_delta_chain=max_delta_chain)
846
850
                in all_build_index_memos.itervalues()])
847
851
 
848
852
 
849
 
class KnitVersionedFiles(VersionedFilesWithFallbacks):
 
853
class KnitVersionedFiles(VersionedFiles):
850
854
    """Storage for many versioned files using knit compression.
851
855
 
852
856
    Backend storage is managed by indices and data objects.
879
883
            self._factory = KnitAnnotateFactory()
880
884
        else:
881
885
            self._factory = KnitPlainFactory()
882
 
        self._immediate_fallback_vfs = []
 
886
        self._fallback_vfs = []
883
887
        self._reload_func = reload_func
884
888
 
885
889
    def __repr__(self):
888
892
            self._index,
889
893
            self._access)
890
894
 
891
 
    def without_fallbacks(self):
892
 
        """Return a clone of this object without any fallbacks configured."""
893
 
        return KnitVersionedFiles(self._index, self._access,
894
 
            self._max_delta_chain, self._factory.annotated,
895
 
            self._reload_func)
896
 
 
897
895
    def add_fallback_versioned_files(self, a_versioned_files):
898
896
        """Add a source of texts for texts not present in this knit.
899
897
 
900
898
        :param a_versioned_files: A VersionedFiles object.
901
899
        """
902
 
        self._immediate_fallback_vfs.append(a_versioned_files)
 
900
        self._fallback_vfs.append(a_versioned_files)
903
901
 
904
902
    def add_lines(self, key, parents, lines, parent_texts=None,
905
903
        left_matching_blocks=None, nostore_sha=None, random_id=False,
1072
1070
                    raise errors.KnitCorrupt(self,
1073
1071
                        "Missing basis parent %s for %s" % (
1074
1072
                        compression_parent, key))
1075
 
        for fallback_vfs in self._immediate_fallback_vfs:
 
1073
        for fallback_vfs in self._fallback_vfs:
1076
1074
            fallback_vfs.check()
1077
1075
 
1078
1076
    def _check_add(self, key, lines, random_id, check_content):
1156
1154
 
1157
1155
        A dict of key to (record_details, index_memo, next, parents) is
1158
1156
        returned.
1159
 
 
1160
 
        * method is the way referenced data should be applied.
1161
 
        * index_memo is the handle to pass to the data access to actually get
1162
 
          the data
1163
 
        * next is the build-parent of the version, or None for fulltexts.
1164
 
        * parents is the version_ids of the parents of this version
1165
 
 
1166
 
        :param allow_missing: If True do not raise an error on a missing
1167
 
            component, just ignore it.
 
1157
        method is the way referenced data should be applied.
 
1158
        index_memo is the handle to pass to the data access to actually get the
 
1159
            data
 
1160
        next is the build-parent of the version, or None for fulltexts.
 
1161
        parents is the version_ids of the parents of this version
 
1162
 
 
1163
        :param allow_missing: If True do not raise an error on a missing component,
 
1164
            just ignore it.
1168
1165
        """
1169
1166
        component_data = {}
1170
1167
        pending_components = keys
1196
1193
        generator = _VFContentMapGenerator(self, [key])
1197
1194
        return generator._get_content(key)
1198
1195
 
 
1196
    def get_known_graph_ancestry(self, keys):
 
1197
        """Get a KnownGraph instance with the ancestry of keys."""
 
1198
        parent_map, missing_keys = self._index.find_ancestry(keys)
 
1199
        for fallback in self._fallback_vfs:
 
1200
            if not missing_keys:
 
1201
                break
 
1202
            (f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
 
1203
                                                missing_keys)
 
1204
            parent_map.update(f_parent_map)
 
1205
            missing_keys = f_missing_keys
 
1206
        kg = _mod_graph.KnownGraph(parent_map)
 
1207
        return kg
 
1208
 
1199
1209
    def get_parent_map(self, keys):
1200
1210
        """Get a map of the graph parents of keys.
1201
1211
 
1216
1226
            and so on.
1217
1227
        """
1218
1228
        result = {}
1219
 
        sources = [self._index] + self._immediate_fallback_vfs
 
1229
        sources = [self._index] + self._fallback_vfs
1220
1230
        source_results = []
1221
1231
        missing = set(keys)
1222
1232
        for source in sources:
1232
1242
        """Produce a dictionary of knit records.
1233
1243
 
1234
1244
        :return: {key:(record, record_details, digest, next)}
1235
 
 
1236
 
            * record: data returned from read_records (a KnitContentobject)
1237
 
            * record_details: opaque information to pass to parse_record
1238
 
            * digest: SHA1 digest of the full text after all steps are done
1239
 
            * next: build-parent of the version, i.e. the leftmost ancestor.
 
1245
            record
 
1246
                data returned from read_records (a KnitContentobject)
 
1247
            record_details
 
1248
                opaque information to pass to parse_record
 
1249
            digest
 
1250
                SHA1 digest of the full text after all steps are done
 
1251
            next
 
1252
                build-parent of the version, i.e. the leftmost ancestor.
1240
1253
                Will be None if the record is not a delta.
1241
 
 
1242
1254
        :param keys: The keys to build a map for
1243
1255
        :param allow_missing: If some records are missing, rather than
1244
1256
            error, just return the data that could be generated.
1514
1526
                        yield KnitContentFactory(key, global_map[key],
1515
1527
                            record_details, None, raw_data, self._factory.annotated, None)
1516
1528
                else:
1517
 
                    vf = self._immediate_fallback_vfs[parent_maps.index(source) - 1]
 
1529
                    vf = self._fallback_vfs[parent_maps.index(source) - 1]
1518
1530
                    for record in vf.get_record_stream(keys, ordering,
1519
1531
                        include_delta_closure):
1520
1532
                        yield record
1530
1542
            # record entry 2 is the 'digest'.
1531
1543
            result[key] = details[2]
1532
1544
        missing.difference_update(set(result))
1533
 
        for source in self._immediate_fallback_vfs:
 
1545
        for source in self._fallback_vfs:
1534
1546
            if not missing:
1535
1547
                break
1536
1548
            new_result = source.get_sha1s(missing)
1607
1619
                raise RevisionNotPresent([record.key], self)
1608
1620
            elif ((record.storage_kind in knit_types)
1609
1621
                  and (compression_parent is None
1610
 
                       or not self._immediate_fallback_vfs
 
1622
                       or not self._fallback_vfs
1611
1623
                       or self._index.has_key(compression_parent)
1612
1624
                       or not self.has_key(compression_parent))):
1613
1625
                # we can insert the knit record literally if either it has no
1761
1773
                        key_records.append((key, details[0]))
1762
1774
                records_iter = enumerate(self._read_records_iter(key_records))
1763
1775
                for (key_idx, (key, data, sha_value)) in records_iter:
1764
 
                    pb.update(gettext('Walking content'), key_idx, total)
 
1776
                    pb.update('Walking content', key_idx, total)
1765
1777
                    compression_parent = build_details[key][1]
1766
1778
                    if compression_parent is None:
1767
1779
                        # fulltext
1785
1797
        # vfs, and hope to find them there.  Note that if the keys are found
1786
1798
        # but had no changes or no content, the fallback may not return
1787
1799
        # anything.
1788
 
        if keys and not self._immediate_fallback_vfs:
 
1800
        if keys and not self._fallback_vfs:
1789
1801
            # XXX: strictly the second parameter is meant to be the file id
1790
1802
            # but it's not easily accessible here.
1791
1803
            raise RevisionNotPresent(keys, repr(self))
1792
 
        for source in self._immediate_fallback_vfs:
 
1804
        for source in self._fallback_vfs:
1793
1805
            if not keys:
1794
1806
                break
1795
1807
            source_keys = set()
1797
1809
                source_keys.add(key)
1798
1810
                yield line, key
1799
1811
            keys.difference_update(source_keys)
1800
 
        pb.update(gettext('Walking content'), total, total)
 
1812
        pb.update('Walking content', total, total)
1801
1813
 
1802
1814
    def _make_line_delta(self, delta_seq, new_content):
1803
1815
        """Generate a line delta from delta_seq and new_content."""
1911
1923
        The result will be returned in whatever is the fastest to read.
1912
1924
        Not by the order requested. Also, multiple requests for the same
1913
1925
        record will only yield 1 response.
1914
 
 
1915
1926
        :param records: A list of (key, access_memo) entries
1916
1927
        :return: Yields (key, contents, digest) in the order
1917
1928
                 read, not the order requested
1975
1986
        :param key: The key of the record. Currently keys are always serialised
1976
1987
            using just the trailing component.
1977
1988
        :param dense_lines: The bytes of lines but in a denser form. For
1978
 
            instance, if lines is a list of 1000 bytestrings each ending in
1979
 
            \\n, dense_lines may be a list with one line in it, containing all
1980
 
            the 1000's lines and their \\n's. Using dense_lines if it is
1981
 
            already known is a win because the string join to create bytes in
1982
 
            this function spends less time resizing the final string.
 
1989
            instance, if lines is a list of 1000 bytestrings each ending in \n,
 
1990
            dense_lines may be a list with one line in it, containing all the
 
1991
            1000's lines and their \n's. Using dense_lines if it is already
 
1992
            known is a win because the string join to create bytes in this
 
1993
            function spends less time resizing the final string.
1983
1994
        :return: (len, a StringIO instance with the raw data ready to read.)
1984
1995
        """
1985
1996
        chunks = ["version %s %d %s\n" % (key[-1], len(lines), digest)]
2005
2016
        """See VersionedFiles.keys."""
2006
2017
        if 'evil' in debug.debug_flags:
2007
2018
            trace.mutter_callsite(2, "keys scales with size of history")
2008
 
        sources = [self._index] + self._immediate_fallback_vfs
 
2019
        sources = [self._index] + self._fallback_vfs
2009
2020
        result = set()
2010
2021
        for source in sources:
2011
2022
            result.update(source.keys())
2051
2062
 
2052
2063
        missing_keys = set(nonlocal_keys)
2053
2064
        # Read from remote versioned file instances and provide to our caller.
2054
 
        for source in self.vf._immediate_fallback_vfs:
 
2065
        for source in self.vf._fallback_vfs:
2055
2066
            if not missing_keys:
2056
2067
                break
2057
2068
            # Loop over fallback repositories asking them for texts - ignore
2776
2787
        return key[:-1], key[-1]
2777
2788
 
2778
2789
 
 
2790
class _KeyRefs(object):
 
2791
 
 
2792
    def __init__(self, track_new_keys=False):
 
2793
        # dict mapping 'key' to 'set of keys referring to that key'
 
2794
        self.refs = {}
 
2795
        if track_new_keys:
 
2796
            # set remembering all new keys
 
2797
            self.new_keys = set()
 
2798
        else:
 
2799
            self.new_keys = None
 
2800
 
 
2801
    def clear(self):
 
2802
        if self.refs:
 
2803
            self.refs.clear()
 
2804
        if self.new_keys:
 
2805
            self.new_keys.clear()
 
2806
 
 
2807
    def add_references(self, key, refs):
 
2808
        # Record the new references
 
2809
        for referenced in refs:
 
2810
            try:
 
2811
                needed_by = self.refs[referenced]
 
2812
            except KeyError:
 
2813
                needed_by = self.refs[referenced] = set()
 
2814
            needed_by.add(key)
 
2815
        # Discard references satisfied by the new key
 
2816
        self.add_key(key)
 
2817
 
 
2818
    def get_new_keys(self):
 
2819
        return self.new_keys
 
2820
    
 
2821
    def get_unsatisfied_refs(self):
 
2822
        return self.refs.iterkeys()
 
2823
 
 
2824
    def _satisfy_refs_for_key(self, key):
 
2825
        try:
 
2826
            del self.refs[key]
 
2827
        except KeyError:
 
2828
            # No keys depended on this key.  That's ok.
 
2829
            pass
 
2830
 
 
2831
    def add_key(self, key):
 
2832
        # satisfy refs for key, and remember that we've seen this key.
 
2833
        self._satisfy_refs_for_key(key)
 
2834
        if self.new_keys is not None:
 
2835
            self.new_keys.add(key)
 
2836
 
 
2837
    def satisfy_refs_for_keys(self, keys):
 
2838
        for key in keys:
 
2839
            self._satisfy_refs_for_key(key)
 
2840
 
 
2841
    def get_referrers(self):
 
2842
        result = set()
 
2843
        for referrers in self.refs.itervalues():
 
2844
            result.update(referrers)
 
2845
        return result
 
2846
 
 
2847
 
2779
2848
class _KnitGraphIndex(object):
2780
2849
    """A KnitVersionedFiles index layered on GraphIndex."""
2781
2850
 
3210
3279
                yield data
3211
3280
 
3212
3281
 
 
3282
class _DirectPackAccess(object):
 
3283
    """Access to data in one or more packs with less translation."""
 
3284
 
 
3285
    def __init__(self, index_to_packs, reload_func=None, flush_func=None):
 
3286
        """Create a _DirectPackAccess object.
 
3287
 
 
3288
        :param index_to_packs: A dict mapping index objects to the transport
 
3289
            and file names for obtaining data.
 
3290
        :param reload_func: A function to call if we determine that the pack
 
3291
            files have moved and we need to reload our caches. See
 
3292
            bzrlib.repo_fmt.pack_repo.AggregateIndex for more details.
 
3293
        """
 
3294
        self._container_writer = None
 
3295
        self._write_index = None
 
3296
        self._indices = index_to_packs
 
3297
        self._reload_func = reload_func
 
3298
        self._flush_func = flush_func
 
3299
 
 
3300
    def add_raw_records(self, key_sizes, raw_data):
 
3301
        """Add raw knit bytes to a storage area.
 
3302
 
 
3303
        The data is spooled to the container writer in one bytes-record per
 
3304
        raw data item.
 
3305
 
 
3306
        :param sizes: An iterable of tuples containing the key and size of each
 
3307
            raw data segment.
 
3308
        :param raw_data: A bytestring containing the data.
 
3309
        :return: A list of memos to retrieve the record later. Each memo is an
 
3310
            opaque index memo. For _DirectPackAccess the memo is (index, pos,
 
3311
            length), where the index field is the write_index object supplied
 
3312
            to the PackAccess object.
 
3313
        """
 
3314
        if type(raw_data) is not str:
 
3315
            raise AssertionError(
 
3316
                'data must be plain bytes was %s' % type(raw_data))
 
3317
        result = []
 
3318
        offset = 0
 
3319
        for key, size in key_sizes:
 
3320
            p_offset, p_length = self._container_writer.add_bytes_record(
 
3321
                raw_data[offset:offset+size], [])
 
3322
            offset += size
 
3323
            result.append((self._write_index, p_offset, p_length))
 
3324
        return result
 
3325
 
 
3326
    def flush(self):
 
3327
        """Flush pending writes on this access object.
 
3328
 
 
3329
        This will flush any buffered writes to a NewPack.
 
3330
        """
 
3331
        if self._flush_func is not None:
 
3332
            self._flush_func()
 
3333
            
 
3334
    def get_raw_records(self, memos_for_retrieval):
 
3335
        """Get the raw bytes for a records.
 
3336
 
 
3337
        :param memos_for_retrieval: An iterable containing the (index, pos,
 
3338
            length) memo for retrieving the bytes. The Pack access method
 
3339
            looks up the pack to use for a given record in its index_to_pack
 
3340
            map.
 
3341
        :return: An iterator over the bytes of the records.
 
3342
        """
 
3343
        # first pass, group into same-index requests
 
3344
        request_lists = []
 
3345
        current_index = None
 
3346
        for (index, offset, length) in memos_for_retrieval:
 
3347
            if current_index == index:
 
3348
                current_list.append((offset, length))
 
3349
            else:
 
3350
                if current_index is not None:
 
3351
                    request_lists.append((current_index, current_list))
 
3352
                current_index = index
 
3353
                current_list = [(offset, length)]
 
3354
        # handle the last entry
 
3355
        if current_index is not None:
 
3356
            request_lists.append((current_index, current_list))
 
3357
        for index, offsets in request_lists:
 
3358
            try:
 
3359
                transport, path = self._indices[index]
 
3360
            except KeyError:
 
3361
                # A KeyError here indicates that someone has triggered an index
 
3362
                # reload, and this index has gone missing, we need to start
 
3363
                # over.
 
3364
                if self._reload_func is None:
 
3365
                    # If we don't have a _reload_func there is nothing that can
 
3366
                    # be done
 
3367
                    raise
 
3368
                raise errors.RetryWithNewPacks(index,
 
3369
                                               reload_occurred=True,
 
3370
                                               exc_info=sys.exc_info())
 
3371
            try:
 
3372
                reader = pack.make_readv_reader(transport, path, offsets)
 
3373
                for names, read_func in reader.iter_records():
 
3374
                    yield read_func(None)
 
3375
            except errors.NoSuchFile:
 
3376
                # A NoSuchFile error indicates that a pack file has gone
 
3377
                # missing on disk, we need to trigger a reload, and start over.
 
3378
                if self._reload_func is None:
 
3379
                    raise
 
3380
                raise errors.RetryWithNewPacks(transport.abspath(path),
 
3381
                                               reload_occurred=False,
 
3382
                                               exc_info=sys.exc_info())
 
3383
 
 
3384
    def set_writer(self, writer, index, transport_packname):
 
3385
        """Set a writer to use for adding data."""
 
3386
        if index is not None:
 
3387
            self._indices[index] = transport_packname
 
3388
        self._container_writer = writer
 
3389
        self._write_index = index
 
3390
 
 
3391
    def reload_or_raise(self, retry_exc):
 
3392
        """Try calling the reload function, or re-raise the original exception.
 
3393
 
 
3394
        This should be called after _DirectPackAccess raises a
 
3395
        RetryWithNewPacks exception. This function will handle the common logic
 
3396
        of determining when the error is fatal versus being temporary.
 
3397
        It will also make sure that the original exception is raised, rather
 
3398
        than the RetryWithNewPacks exception.
 
3399
 
 
3400
        If this function returns, then the calling function should retry
 
3401
        whatever operation was being performed. Otherwise an exception will
 
3402
        be raised.
 
3403
 
 
3404
        :param retry_exc: A RetryWithNewPacks exception.
 
3405
        """
 
3406
        is_error = False
 
3407
        if self._reload_func is None:
 
3408
            is_error = True
 
3409
        elif not self._reload_func():
 
3410
            # The reload claimed that nothing changed
 
3411
            if not retry_exc.reload_occurred:
 
3412
                # If there wasn't an earlier reload, then we really were
 
3413
                # expecting to find changes. We didn't find them, so this is a
 
3414
                # hard error
 
3415
                is_error = True
 
3416
        if is_error:
 
3417
            exc_class, exc_value, exc_traceback = retry_exc.exc_info
 
3418
            raise exc_class, exc_value, exc_traceback
 
3419
 
 
3420
 
3213
3421
def annotate_knit(knit, revision_id):
3214
3422
    """Annotate a knit with no cached annotations.
3215
3423
 
3313
3521
        return records, ann_keys
3314
3522
 
3315
3523
    def _get_needed_texts(self, key, pb=None):
3316
 
        # if True or len(self._vf._immediate_fallback_vfs) > 0:
3317
 
        if len(self._vf._immediate_fallback_vfs) > 0:
 
3524
        # if True or len(self._vf._fallback_vfs) > 0:
 
3525
        if len(self._vf._fallback_vfs) > 0:
3318
3526
            # If we have fallbacks, go to the generic path
3319
3527
            for v in annotate.Annotator._get_needed_texts(self, key, pb=pb):
3320
3528
                yield v
3325
3533
                for idx, (sub_key, text, num_lines) in enumerate(
3326
3534
                                                self._extract_texts(records)):
3327
3535
                    if pb is not None:
3328
 
                        pb.update(gettext('annotating'), idx, len(records))
 
3536
                        pb.update('annotating', idx, len(records))
3329
3537
                    yield sub_key, text, num_lines
3330
3538
                for sub_key in ann_keys:
3331
3539
                    text = self._text_cache[sub_key]