~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/groupcompress.py

  • Committer: John Arbash Meinel
  • Date: 2011-05-11 11:35:28 UTC
  • mto: This revision was merged to the branch mainline in revision 5851.
  • Revision ID: john@arbash-meinel.com-20110511113528-qepibuwxicjrbb2h
Break compatibility with python <2.6.

This includes auditing the code for places where we were doing
explicit 'sys.version' checks and removing them as appropriate.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008, 2009, 2010 Canonical Ltd
 
1
# Copyright (C) 2008-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
23
23
except ImportError:
24
24
    pylzma = None
25
25
 
 
26
from bzrlib.lazy_import import lazy_import
 
27
lazy_import(globals(), """
26
28
from bzrlib import (
27
29
    annotate,
28
30
    debug,
29
31
    errors,
30
32
    graph as _mod_graph,
31
 
    knit,
32
33
    osutils,
33
34
    pack,
34
35
    static_tuple,
35
36
    trace,
 
37
    tsort,
36
38
    )
 
39
 
 
40
from bzrlib.repofmt import pack_repo
 
41
""")
 
42
 
37
43
from bzrlib.btree_index import BTreeBuilder
38
44
from bzrlib.lru_cache import LRUSizeCache
39
 
from bzrlib.tsort import topo_sort
40
45
from bzrlib.versionedfile import (
 
46
    _KeyRefs,
41
47
    adapter_registry,
42
48
    AbsentContentFactory,
43
49
    ChunkedContentFactory,
44
50
    FulltextContentFactory,
45
 
    VersionedFiles,
 
51
    VersionedFilesWithFallbacks,
46
52
    )
47
53
 
48
54
# Minimum number of uncompressed bytes to try fetch at once when retrieving
77
83
 
78
84
    present_keys = []
79
85
    for prefix in sorted(per_prefix_map):
80
 
        present_keys.extend(reversed(topo_sort(per_prefix_map[prefix])))
 
86
        present_keys.extend(reversed(tsort.topo_sort(per_prefix_map[prefix])))
81
87
    return present_keys
82
88
 
83
89
 
101
107
    def __init__(self):
102
108
        # map by key? or just order in file?
103
109
        self._compressor_name = None
104
 
        self._z_content = None
 
110
        self._z_content_chunks = None
105
111
        self._z_content_decompressor = None
106
112
        self._z_content_length = None
107
113
        self._content_length = None
135
141
                self._content = ''.join(self._content_chunks)
136
142
                self._content_chunks = None
137
143
        if self._content is None:
138
 
            if self._z_content is None:
 
144
            # We join self._z_content_chunks here, because if we are
 
145
            # decompressing, then it is *very* likely that we have a single
 
146
            # chunk
 
147
            if self._z_content_chunks is None:
139
148
                raise AssertionError('No content to decompress')
140
 
            if self._z_content == '':
 
149
            z_content = ''.join(self._z_content_chunks)
 
150
            if z_content == '':
141
151
                self._content = ''
142
152
            elif self._compressor_name == 'lzma':
143
153
                # We don't do partial lzma decomp yet
144
 
                self._content = pylzma.decompress(self._z_content)
 
154
                self._content = pylzma.decompress(z_content)
145
155
            elif self._compressor_name == 'zlib':
146
156
                # Start a zlib decompressor
147
157
                if num_bytes * 4 > self._content_length * 3:
148
158
                    # If we are requesting more that 3/4ths of the content,
149
159
                    # just extract the whole thing in a single pass
150
160
                    num_bytes = self._content_length
151
 
                    self._content = zlib.decompress(self._z_content)
 
161
                    self._content = zlib.decompress(z_content)
152
162
                else:
153
163
                    self._z_content_decompressor = zlib.decompressobj()
154
164
                    # Seed the decompressor with the uncompressed bytes, so
155
165
                    # that the rest of the code is simplified
156
166
                    self._content = self._z_content_decompressor.decompress(
157
 
                        self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
 
167
                        z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
158
168
                    if not self._z_content_decompressor.unconsumed_tail:
159
169
                        self._z_content_decompressor = None
160
170
            else:
207
217
            # XXX: Define some GCCorrupt error ?
208
218
            raise AssertionError('Invalid bytes: (%d) != %d + %d' %
209
219
                                 (len(bytes), pos, self._z_content_length))
210
 
        self._z_content = bytes[pos:]
 
220
        self._z_content_chunks = (bytes[pos:],)
 
221
 
 
222
    @property
 
223
    def _z_content(self):
 
224
        """Return z_content_chunks as a simple string.
 
225
 
 
226
        Meant only to be used by the test suite.
 
227
        """
 
228
        if self._z_content_chunks is not None:
 
229
            return ''.join(self._z_content_chunks)
 
230
        return None
211
231
 
212
232
    @classmethod
213
233
    def from_bytes(cls, bytes):
269
289
        self._content_length = length
270
290
        self._content_chunks = content_chunks
271
291
        self._content = None
272
 
        self._z_content = None
 
292
        self._z_content_chunks = None
273
293
 
274
294
    def set_content(self, content):
275
295
        """Set the content of this block."""
276
296
        self._content_length = len(content)
277
297
        self._content = content
278
 
        self._z_content = None
 
298
        self._z_content_chunks = None
279
299
 
280
300
    def _create_z_content_using_lzma(self):
281
301
        if self._content_chunks is not None:
283
303
            self._content_chunks = None
284
304
        if self._content is None:
285
305
            raise AssertionError('Nothing to compress')
286
 
        self._z_content = pylzma.compress(self._content)
287
 
        self._z_content_length = len(self._z_content)
 
306
        z_content = pylzma.compress(self._content)
 
307
        self._z_content_chunks = (z_content,)
 
308
        self._z_content_length = len(z_content)
288
309
 
289
 
    def _create_z_content_from_chunks(self):
 
310
    def _create_z_content_from_chunks(self, chunks):
290
311
        compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
291
 
        compressed_chunks = map(compressor.compress, self._content_chunks)
 
312
        # Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
 
313
        # (measured peak is maybe 30MB over the above...)
 
314
        compressed_chunks = map(compressor.compress, chunks)
292
315
        compressed_chunks.append(compressor.flush())
293
 
        self._z_content = ''.join(compressed_chunks)
294
 
        self._z_content_length = len(self._z_content)
 
316
        # Ignore empty chunks
 
317
        self._z_content_chunks = [c for c in compressed_chunks if c]
 
318
        self._z_content_length = sum(map(len, self._z_content_chunks))
295
319
 
296
320
    def _create_z_content(self):
297
 
        if self._z_content is not None:
 
321
        if self._z_content_chunks is not None:
298
322
            return
299
323
        if _USE_LZMA:
300
324
            self._create_z_content_using_lzma()
301
325
            return
302
326
        if self._content_chunks is not None:
303
 
            self._create_z_content_from_chunks()
304
 
            return
305
 
        self._z_content = zlib.compress(self._content)
306
 
        self._z_content_length = len(self._z_content)
 
327
            chunks = self._content_chunks
 
328
        else:
 
329
            chunks = (self._content,)
 
330
        self._create_z_content_from_chunks(chunks)
307
331
 
308
 
    def to_bytes(self):
309
 
        """Encode the information into a byte stream."""
 
332
    def to_chunks(self):
 
333
        """Create the byte stream as a series of 'chunks'"""
310
334
        self._create_z_content()
311
335
        if _USE_LZMA:
312
336
            header = self.GCB_LZ_HEADER
313
337
        else:
314
338
            header = self.GCB_HEADER
315
 
        chunks = [header,
316
 
                  '%d\n%d\n' % (self._z_content_length, self._content_length),
317
 
                  self._z_content,
 
339
        chunks = ['%s%d\n%d\n'
 
340
                  % (header, self._z_content_length, self._content_length),
318
341
                 ]
 
342
        chunks.extend(self._z_content_chunks)
 
343
        total_len = sum(map(len, chunks))
 
344
        return total_len, chunks
 
345
 
 
346
    def to_bytes(self):
 
347
        """Encode the information into a byte stream."""
 
348
        total_len, chunks = self.to_chunks()
319
349
        return ''.join(chunks)
320
350
 
321
351
    def _dump(self, include_text=False):
679
709
        z_header_bytes = zlib.compress(header_bytes)
680
710
        del header_bytes
681
711
        z_header_bytes_len = len(z_header_bytes)
682
 
        block_bytes = self._block.to_bytes()
 
712
        block_bytes_len, block_chunks = self._block.to_chunks()
683
713
        lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
684
 
                                       len(block_bytes)))
 
714
                                       block_bytes_len))
685
715
        lines.append(z_header_bytes)
686
 
        lines.append(block_bytes)
687
 
        del z_header_bytes, block_bytes
 
716
        lines.extend(block_chunks)
 
717
        del z_header_bytes, block_chunks
 
718
        # TODO: This is a point where we will double the memory consumption. To
 
719
        #       avoid this, we probably have to switch to a 'chunked' api
688
720
        return ''.join(lines)
689
721
 
690
722
    @classmethod
691
723
    def from_bytes(cls, bytes):
692
724
        # TODO: This does extra string copying, probably better to do it a
693
 
        #       different way
 
725
        #       different way. At a minimum this creates 2 copies of the
 
726
        #       compressed content
694
727
        (storage_kind, z_header_len, header_len,
695
728
         block_len, rest) = bytes.split('\n', 4)
696
729
        del bytes
854
887
 
855
888
        After calling this, the compressor should no longer be used
856
889
        """
857
 
        # TODO: this causes us to 'bloat' to 2x the size of content in the
858
 
        #       group. This has an impact for 'commit' of large objects.
859
 
        #       One possibility is to use self._content_chunks, and be lazy and
860
 
        #       only fill out self._content as a full string when we actually
861
 
        #       need it. That would at least drop the peak memory consumption
862
 
        #       for 'commit' down to ~1x the size of the largest file, at a
863
 
        #       cost of increased complexity within this code. 2x is still <<
864
 
        #       3x the size of the largest file, so we are doing ok.
865
890
        self._block.set_chunked_content(self.chunks, self.endpoint)
866
891
        self.chunks = None
867
892
        self._delta_index = None
1027
1052
        index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
1028
1053
            add_callback=graph_index.add_nodes,
1029
1054
            inconsistency_fatal=inconsistency_fatal)
1030
 
        access = knit._DirectPackAccess({})
 
1055
        access = pack_repo._DirectPackAccess({})
1031
1056
        access.set_writer(writer, graph_index, (transport, 'newpack'))
1032
1057
        result = GroupCompressVersionedFiles(index, access, delta)
1033
1058
        result.stream = stream
1149
1174
        self.total_bytes = 0
1150
1175
 
1151
1176
 
1152
 
class GroupCompressVersionedFiles(VersionedFiles):
 
1177
class GroupCompressVersionedFiles(VersionedFilesWithFallbacks):
1153
1178
    """A group-compress based VersionedFiles implementation."""
1154
1179
 
1155
 
    def __init__(self, index, access, delta=True, _unadded_refs=None):
 
1180
    def __init__(self, index, access, delta=True, _unadded_refs=None,
 
1181
            _group_cache=None):
1156
1182
        """Create a GroupCompressVersionedFiles object.
1157
1183
 
1158
1184
        :param index: The index object storing access and graph data.
1159
1185
        :param access: The access object storing raw data.
1160
1186
        :param delta: Whether to delta compress or just entropy compress.
1161
1187
        :param _unadded_refs: private parameter, don't use.
 
1188
        :param _group_cache: private parameter, don't use.
1162
1189
        """
1163
1190
        self._index = index
1164
1191
        self._access = access
1166
1193
        if _unadded_refs is None:
1167
1194
            _unadded_refs = {}
1168
1195
        self._unadded_refs = _unadded_refs
1169
 
        self._group_cache = LRUSizeCache(max_size=50*1024*1024)
1170
 
        self._fallback_vfs = []
 
1196
        if _group_cache is None:
 
1197
            _group_cache = LRUSizeCache(max_size=50*1024*1024)
 
1198
        self._group_cache = _group_cache
 
1199
        self._immediate_fallback_vfs = []
1171
1200
 
1172
1201
    def without_fallbacks(self):
1173
1202
        """Return a clone of this object without any fallbacks configured."""
1174
1203
        return GroupCompressVersionedFiles(self._index, self._access,
1175
 
            self._delta, _unadded_refs=dict(self._unadded_refs))
 
1204
            self._delta, _unadded_refs=dict(self._unadded_refs),
 
1205
            _group_cache=self._group_cache)
1176
1206
 
1177
1207
    def add_lines(self, key, parents, lines, parent_texts=None,
1178
1208
        left_matching_blocks=None, nostore_sha=None, random_id=False,
1247
1277
 
1248
1278
        :param a_versioned_files: A VersionedFiles object.
1249
1279
        """
1250
 
        self._fallback_vfs.append(a_versioned_files)
 
1280
        self._immediate_fallback_vfs.append(a_versioned_files)
1251
1281
 
1252
1282
    def annotate(self, key):
1253
1283
        """See VersionedFiles.annotate."""
1293
1323
        # KnitVersionedFiles.get_known_graph_ancestry, but they don't share
1294
1324
        # ancestry.
1295
1325
        parent_map, missing_keys = self._index.find_ancestry(keys)
1296
 
        for fallback in self._fallback_vfs:
 
1326
        for fallback in self._transitive_fallbacks():
1297
1327
            if not missing_keys:
1298
1328
                break
1299
1329
            (f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
1323
1353
            and so on.
1324
1354
        """
1325
1355
        result = {}
1326
 
        sources = [self._index] + self._fallback_vfs
 
1356
        sources = [self._index] + self._immediate_fallback_vfs
1327
1357
        source_results = []
1328
1358
        missing = set(keys)
1329
1359
        for source in sources:
1430
1460
        parent_map = {}
1431
1461
        key_to_source_map = {}
1432
1462
        source_results = []
1433
 
        for source in self._fallback_vfs:
 
1463
        for source in self._immediate_fallback_vfs:
1434
1464
            if not missing:
1435
1465
                break
1436
1466
            source_parents = source.get_parent_map(missing)
1451
1481
            the defined order, regardless of source.
1452
1482
        """
1453
1483
        if ordering == 'topological':
1454
 
            present_keys = topo_sort(parent_map)
 
1484
            present_keys = tsort.topo_sort(parent_map)
1455
1485
        else:
1456
1486
            # ordering == 'groupcompress'
1457
1487
            # XXX: This only optimizes for the target ordering. We may need
1630
1660
        self._unadded_refs = {}
1631
1661
        keys_to_add = []
1632
1662
        def flush():
1633
 
            bytes = self._compressor.flush().to_bytes()
 
1663
            bytes_len, chunks = self._compressor.flush().to_chunks()
1634
1664
            self._compressor = GroupCompressor()
 
1665
            # Note: At this point we still have 1 copy of the fulltext (in
 
1666
            #       record and the var 'bytes'), and this generates 2 copies of
 
1667
            #       the compressed text (one for bytes, one in chunks)
 
1668
            # TODO: Push 'chunks' down into the _access api, so that we don't
 
1669
            #       have to double compressed memory here
 
1670
            # TODO: Figure out how to indicate that we would be happy to free
 
1671
            #       the fulltext content at this point. Note that sometimes we
 
1672
            #       will want it later (streaming CHK pages), but most of the
 
1673
            #       time we won't (everything else)
 
1674
            bytes = ''.join(chunks)
 
1675
            del chunks
1635
1676
            index, start, length = self._access.add_raw_records(
1636
1677
                [(None, len(bytes))], bytes)[0]
1637
1678
            nodes = []
1802
1843
        """See VersionedFiles.keys."""
1803
1844
        if 'evil' in debug.debug_flags:
1804
1845
            trace.mutter_callsite(2, "keys scales with size of history")
1805
 
        sources = [self._index] + self._fallback_vfs
 
1846
        sources = [self._index] + self._immediate_fallback_vfs
1806
1847
        result = set()
1807
1848
        for source in sources:
1808
1849
            result.update(source.keys())
1809
1850
        return result
1810
1851
 
1811
1852
 
 
1853
class _GCBuildDetails(object):
 
1854
    """A blob of data about the build details.
 
1855
 
 
1856
    This stores the minimal data, which then allows compatibility with the old
 
1857
    api, without taking as much memory.
 
1858
    """
 
1859
 
 
1860
    __slots__ = ('_index', '_group_start', '_group_end', '_basis_end',
 
1861
                 '_delta_end', '_parents')
 
1862
 
 
1863
    method = 'group'
 
1864
    compression_parent = None
 
1865
 
 
1866
    def __init__(self, parents, position_info):
 
1867
        self._parents = parents
 
1868
        (self._index, self._group_start, self._group_end, self._basis_end,
 
1869
         self._delta_end) = position_info
 
1870
 
 
1871
    def __repr__(self):
 
1872
        return '%s(%s, %s)' % (self.__class__.__name__,
 
1873
            self.index_memo, self._parents)
 
1874
 
 
1875
    @property
 
1876
    def index_memo(self):
 
1877
        return (self._index, self._group_start, self._group_end,
 
1878
                self._basis_end, self._delta_end)
 
1879
 
 
1880
    @property
 
1881
    def record_details(self):
 
1882
        return static_tuple.StaticTuple(self.method, None)
 
1883
 
 
1884
    def __getitem__(self, offset):
 
1885
        """Compatibility thunk to act like a tuple."""
 
1886
        if offset == 0:
 
1887
            return self.index_memo
 
1888
        elif offset == 1:
 
1889
            return self.compression_parent # Always None
 
1890
        elif offset == 2:
 
1891
            return self._parents
 
1892
        elif offset == 3:
 
1893
            return self.record_details
 
1894
        else:
 
1895
            raise IndexError('offset out of range')
 
1896
            
 
1897
    def __len__(self):
 
1898
        return 4
 
1899
 
 
1900
 
1812
1901
class _GCGraphIndex(object):
1813
1902
    """Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
1814
1903
 
1843
1932
        # repeated over and over, this creates a surplus of ints
1844
1933
        self._int_cache = {}
1845
1934
        if track_external_parent_refs:
1846
 
            self._key_dependencies = knit._KeyRefs(
 
1935
            self._key_dependencies = _KeyRefs(
1847
1936
                track_new_keys=track_new_keys)
1848
1937
        else:
1849
1938
            self._key_dependencies = None
2009
2098
                parents = None
2010
2099
            else:
2011
2100
                parents = entry[3][0]
2012
 
            method = 'group'
2013
 
            result[key] = (self._node_to_position(entry),
2014
 
                                  None, parents, (method, None))
 
2101
            details = _GCBuildDetails(parents, self._node_to_position(entry))
 
2102
            result[key] = details
2015
2103
        return result
2016
2104
 
2017
2105
    def keys(self):
2033
2121
        # each, or about 7MB. Note that it might be even more when you consider
2034
2122
        # how PyInt is allocated in separate slabs. And you can't return a slab
2035
2123
        # to the OS if even 1 int on it is in use. Note though that Python uses
2036
 
        # a LIFO when re-using PyInt slots, which probably causes more
 
2124
        # a LIFO when re-using PyInt slots, which might cause more
2037
2125
        # fragmentation.
2038
2126
        start = int(bits[0])
2039
2127
        start = self._int_cache.setdefault(start, start)