~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/groupcompress.py

  • Committer: Jelmer Vernooij
  • Date: 2011-05-01 21:02:50 UTC
  • mto: This revision was merged to the branch mainline in revision 5842.
  • Revision ID: jelmer@samba.org-20110501210250-24jq6hrxxc9psvzf
Actually use branch format 5 in branch format 5 test.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008, 2009, 2010 Canonical Ltd
 
1
# Copyright (C) 2008-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
23
23
except ImportError:
24
24
    pylzma = None
25
25
 
 
26
from bzrlib.lazy_import import lazy_import
 
27
lazy_import(globals(), """
26
28
from bzrlib import (
27
29
    annotate,
28
30
    debug,
29
31
    errors,
30
32
    graph as _mod_graph,
31
 
    knit,
32
33
    osutils,
33
34
    pack,
34
35
    static_tuple,
35
36
    trace,
 
37
    tsort,
36
38
    )
 
39
 
 
40
from bzrlib.repofmt import pack_repo
 
41
""")
 
42
 
37
43
from bzrlib.btree_index import BTreeBuilder
38
44
from bzrlib.lru_cache import LRUSizeCache
39
 
from bzrlib.tsort import topo_sort
40
45
from bzrlib.versionedfile import (
 
46
    _KeyRefs,
41
47
    adapter_registry,
42
48
    AbsentContentFactory,
43
49
    ChunkedContentFactory,
77
83
 
78
84
    present_keys = []
79
85
    for prefix in sorted(per_prefix_map):
80
 
        present_keys.extend(reversed(topo_sort(per_prefix_map[prefix])))
 
86
        present_keys.extend(reversed(tsort.topo_sort(per_prefix_map[prefix])))
81
87
    return present_keys
82
88
 
83
89
 
101
107
    def __init__(self):
102
108
        # map by key? or just order in file?
103
109
        self._compressor_name = None
104
 
        self._z_content = None
 
110
        self._z_content_chunks = None
105
111
        self._z_content_decompressor = None
106
112
        self._z_content_length = None
107
113
        self._content_length = None
135
141
                self._content = ''.join(self._content_chunks)
136
142
                self._content_chunks = None
137
143
        if self._content is None:
138
 
            if self._z_content is None:
 
144
            # We join self._z_content_chunks here, because if we are
 
145
            # decompressing, then it is *very* likely that we have a single
 
146
            # chunk
 
147
            if self._z_content_chunks is None:
139
148
                raise AssertionError('No content to decompress')
140
 
            if self._z_content == '':
 
149
            z_content = ''.join(self._z_content_chunks)
 
150
            if z_content == '':
141
151
                self._content = ''
142
152
            elif self._compressor_name == 'lzma':
143
153
                # We don't do partial lzma decomp yet
144
 
                self._content = pylzma.decompress(self._z_content)
 
154
                self._content = pylzma.decompress(z_content)
145
155
            elif self._compressor_name == 'zlib':
146
156
                # Start a zlib decompressor
147
157
                if num_bytes * 4 > self._content_length * 3:
148
158
                    # If we are requesting more that 3/4ths of the content,
149
159
                    # just extract the whole thing in a single pass
150
160
                    num_bytes = self._content_length
151
 
                    self._content = zlib.decompress(self._z_content)
 
161
                    self._content = zlib.decompress(z_content)
152
162
                else:
153
163
                    self._z_content_decompressor = zlib.decompressobj()
154
164
                    # Seed the decompressor with the uncompressed bytes, so
155
165
                    # that the rest of the code is simplified
156
166
                    self._content = self._z_content_decompressor.decompress(
157
 
                        self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
 
167
                        z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
158
168
                    if not self._z_content_decompressor.unconsumed_tail:
159
169
                        self._z_content_decompressor = None
160
170
            else:
207
217
            # XXX: Define some GCCorrupt error ?
208
218
            raise AssertionError('Invalid bytes: (%d) != %d + %d' %
209
219
                                 (len(bytes), pos, self._z_content_length))
210
 
        self._z_content = bytes[pos:]
 
220
        self._z_content_chunks = (bytes[pos:],)
 
221
 
 
222
    @property
 
223
    def _z_content(self):
 
224
        """Return z_content_chunks as a simple string.
 
225
 
 
226
        Meant only to be used by the test suite.
 
227
        """
 
228
        if self._z_content_chunks is not None:
 
229
            return ''.join(self._z_content_chunks)
 
230
        return None
211
231
 
212
232
    @classmethod
213
233
    def from_bytes(cls, bytes):
269
289
        self._content_length = length
270
290
        self._content_chunks = content_chunks
271
291
        self._content = None
272
 
        self._z_content = None
 
292
        self._z_content_chunks = None
273
293
 
274
294
    def set_content(self, content):
275
295
        """Set the content of this block."""
276
296
        self._content_length = len(content)
277
297
        self._content = content
278
 
        self._z_content = None
 
298
        self._z_content_chunks = None
279
299
 
280
300
    def _create_z_content_using_lzma(self):
281
301
        if self._content_chunks is not None:
283
303
            self._content_chunks = None
284
304
        if self._content is None:
285
305
            raise AssertionError('Nothing to compress')
286
 
        self._z_content = pylzma.compress(self._content)
287
 
        self._z_content_length = len(self._z_content)
 
306
        z_content = pylzma.compress(self._content)
 
307
        self._z_content_chunks = (z_content,)
 
308
        self._z_content_length = len(z_content)
288
309
 
289
 
    def _create_z_content_from_chunks(self):
 
310
    def _create_z_content_from_chunks(self, chunks):
290
311
        compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
291
 
        compressed_chunks = map(compressor.compress, self._content_chunks)
 
312
        # Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
 
313
        # (measured peak is maybe 30MB over the above...)
 
314
        compressed_chunks = map(compressor.compress, chunks)
292
315
        compressed_chunks.append(compressor.flush())
293
 
        self._z_content = ''.join(compressed_chunks)
294
 
        self._z_content_length = len(self._z_content)
 
316
        # Ignore empty chunks
 
317
        self._z_content_chunks = [c for c in compressed_chunks if c]
 
318
        self._z_content_length = sum(map(len, self._z_content_chunks))
295
319
 
296
320
    def _create_z_content(self):
297
 
        if self._z_content is not None:
 
321
        if self._z_content_chunks is not None:
298
322
            return
299
323
        if _USE_LZMA:
300
324
            self._create_z_content_using_lzma()
301
325
            return
302
326
        if self._content_chunks is not None:
303
 
            self._create_z_content_from_chunks()
304
 
            return
305
 
        self._z_content = zlib.compress(self._content)
306
 
        self._z_content_length = len(self._z_content)
 
327
            chunks = self._content_chunks
 
328
        else:
 
329
            chunks = (self._content,)
 
330
        self._create_z_content_from_chunks(chunks)
307
331
 
308
 
    def to_bytes(self):
309
 
        """Encode the information into a byte stream."""
 
332
    def to_chunks(self):
 
333
        """Create the byte stream as a series of 'chunks'"""
310
334
        self._create_z_content()
311
335
        if _USE_LZMA:
312
336
            header = self.GCB_LZ_HEADER
313
337
        else:
314
338
            header = self.GCB_HEADER
315
 
        chunks = [header,
316
 
                  '%d\n%d\n' % (self._z_content_length, self._content_length),
317
 
                  self._z_content,
 
339
        chunks = ['%s%d\n%d\n'
 
340
                  % (header, self._z_content_length, self._content_length),
318
341
                 ]
 
342
        chunks.extend(self._z_content_chunks)
 
343
        total_len = sum(map(len, chunks))
 
344
        return total_len, chunks
 
345
 
 
346
    def to_bytes(self):
 
347
        """Encode the information into a byte stream."""
 
348
        total_len, chunks = self.to_chunks()
319
349
        return ''.join(chunks)
320
350
 
321
351
    def _dump(self, include_text=False):
679
709
        z_header_bytes = zlib.compress(header_bytes)
680
710
        del header_bytes
681
711
        z_header_bytes_len = len(z_header_bytes)
682
 
        block_bytes = self._block.to_bytes()
 
712
        block_bytes_len, block_chunks = self._block.to_chunks()
683
713
        lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
684
 
                                       len(block_bytes)))
 
714
                                       block_bytes_len))
685
715
        lines.append(z_header_bytes)
686
 
        lines.append(block_bytes)
687
 
        del z_header_bytes, block_bytes
 
716
        lines.extend(block_chunks)
 
717
        del z_header_bytes, block_chunks
 
718
        # TODO: This is a point where we will double the memory consumption. To
 
719
        #       avoid this, we probably have to switch to a 'chunked' api
688
720
        return ''.join(lines)
689
721
 
690
722
    @classmethod
691
723
    def from_bytes(cls, bytes):
692
724
        # TODO: This does extra string copying, probably better to do it a
693
 
        #       different way
 
725
        #       different way. At a minimum this creates 2 copies of the
 
726
        #       compressed content
694
727
        (storage_kind, z_header_len, header_len,
695
728
         block_len, rest) = bytes.split('\n', 4)
696
729
        del bytes
854
887
 
855
888
        After calling this, the compressor should no longer be used
856
889
        """
857
 
        # TODO: this causes us to 'bloat' to 2x the size of content in the
858
 
        #       group. This has an impact for 'commit' of large objects.
859
 
        #       One possibility is to use self._content_chunks, and be lazy and
860
 
        #       only fill out self._content as a full string when we actually
861
 
        #       need it. That would at least drop the peak memory consumption
862
 
        #       for 'commit' down to ~1x the size of the largest file, at a
863
 
        #       cost of increased complexity within this code. 2x is still <<
864
 
        #       3x the size of the largest file, so we are doing ok.
865
890
        self._block.set_chunked_content(self.chunks, self.endpoint)
866
891
        self.chunks = None
867
892
        self._delta_index = None
1027
1052
        index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
1028
1053
            add_callback=graph_index.add_nodes,
1029
1054
            inconsistency_fatal=inconsistency_fatal)
1030
 
        access = knit._DirectPackAccess({})
 
1055
        access = pack_repo._DirectPackAccess({})
1031
1056
        access.set_writer(writer, graph_index, (transport, 'newpack'))
1032
1057
        result = GroupCompressVersionedFiles(index, access, delta)
1033
1058
        result.stream = stream
1167
1192
            _unadded_refs = {}
1168
1193
        self._unadded_refs = _unadded_refs
1169
1194
        self._group_cache = LRUSizeCache(max_size=50*1024*1024)
1170
 
        self._fallback_vfs = []
 
1195
        self._immediate_fallback_vfs = []
1171
1196
 
1172
1197
    def without_fallbacks(self):
1173
1198
        """Return a clone of this object without any fallbacks configured."""
1247
1272
 
1248
1273
        :param a_versioned_files: A VersionedFiles object.
1249
1274
        """
1250
 
        self._fallback_vfs.append(a_versioned_files)
 
1275
        self._immediate_fallback_vfs.append(a_versioned_files)
1251
1276
 
1252
1277
    def annotate(self, key):
1253
1278
        """See VersionedFiles.annotate."""
1293
1318
        # KnitVersionedFiles.get_known_graph_ancestry, but they don't share
1294
1319
        # ancestry.
1295
1320
        parent_map, missing_keys = self._index.find_ancestry(keys)
1296
 
        for fallback in self._fallback_vfs:
 
1321
        for fallback in self._transitive_fallbacks():
1297
1322
            if not missing_keys:
1298
1323
                break
1299
1324
            (f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
1323
1348
            and so on.
1324
1349
        """
1325
1350
        result = {}
1326
 
        sources = [self._index] + self._fallback_vfs
 
1351
        sources = [self._index] + self._immediate_fallback_vfs
1327
1352
        source_results = []
1328
1353
        missing = set(keys)
1329
1354
        for source in sources:
1430
1455
        parent_map = {}
1431
1456
        key_to_source_map = {}
1432
1457
        source_results = []
1433
 
        for source in self._fallback_vfs:
 
1458
        for source in self._immediate_fallback_vfs:
1434
1459
            if not missing:
1435
1460
                break
1436
1461
            source_parents = source.get_parent_map(missing)
1451
1476
            the defined order, regardless of source.
1452
1477
        """
1453
1478
        if ordering == 'topological':
1454
 
            present_keys = topo_sort(parent_map)
 
1479
            present_keys = tsort.topo_sort(parent_map)
1455
1480
        else:
1456
1481
            # ordering == 'groupcompress'
1457
1482
            # XXX: This only optimizes for the target ordering. We may need
1630
1655
        self._unadded_refs = {}
1631
1656
        keys_to_add = []
1632
1657
        def flush():
1633
 
            bytes = self._compressor.flush().to_bytes()
 
1658
            bytes_len, chunks = self._compressor.flush().to_chunks()
1634
1659
            self._compressor = GroupCompressor()
 
1660
            # Note: At this point we still have 1 copy of the fulltext (in
 
1661
            #       record and the var 'bytes'), and this generates 2 copies of
 
1662
            #       the compressed text (one for bytes, one in chunks)
 
1663
            # TODO: Push 'chunks' down into the _access api, so that we don't
 
1664
            #       have to double compressed memory here
 
1665
            # TODO: Figure out how to indicate that we would be happy to free
 
1666
            #       the fulltext content at this point. Note that sometimes we
 
1667
            #       will want it later (streaming CHK pages), but most of the
 
1668
            #       time we won't (everything else)
 
1669
            bytes = ''.join(chunks)
 
1670
            del chunks
1635
1671
            index, start, length = self._access.add_raw_records(
1636
1672
                [(None, len(bytes))], bytes)[0]
1637
1673
            nodes = []
1802
1838
        """See VersionedFiles.keys."""
1803
1839
        if 'evil' in debug.debug_flags:
1804
1840
            trace.mutter_callsite(2, "keys scales with size of history")
1805
 
        sources = [self._index] + self._fallback_vfs
 
1841
        sources = [self._index] + self._immediate_fallback_vfs
1806
1842
        result = set()
1807
1843
        for source in sources:
1808
1844
            result.update(source.keys())
1809
1845
        return result
1810
1846
 
1811
1847
 
 
1848
class _GCBuildDetails(object):
 
1849
    """A blob of data about the build details.
 
1850
 
 
1851
    This stores the minimal data, which then allows compatibility with the old
 
1852
    api, without taking as much memory.
 
1853
    """
 
1854
 
 
1855
    __slots__ = ('_index', '_group_start', '_group_end', '_basis_end',
 
1856
                 '_delta_end', '_parents')
 
1857
 
 
1858
    method = 'group'
 
1859
    compression_parent = None
 
1860
 
 
1861
    def __init__(self, parents, position_info):
 
1862
        self._parents = parents
 
1863
        (self._index, self._group_start, self._group_end, self._basis_end,
 
1864
         self._delta_end) = position_info
 
1865
 
 
1866
    def __repr__(self):
 
1867
        return '%s(%s, %s)' % (self.__class__.__name__,
 
1868
            self.index_memo, self._parents)
 
1869
 
 
1870
    @property
 
1871
    def index_memo(self):
 
1872
        return (self._index, self._group_start, self._group_end,
 
1873
                self._basis_end, self._delta_end)
 
1874
 
 
1875
    @property
 
1876
    def record_details(self):
 
1877
        return static_tuple.StaticTuple(self.method, None)
 
1878
 
 
1879
    def __getitem__(self, offset):
 
1880
        """Compatibility thunk to act like a tuple."""
 
1881
        if offset == 0:
 
1882
            return self.index_memo
 
1883
        elif offset == 1:
 
1884
            return self.compression_parent # Always None
 
1885
        elif offset == 2:
 
1886
            return self._parents
 
1887
        elif offset == 3:
 
1888
            return self.record_details
 
1889
        else:
 
1890
            raise IndexError('offset out of range')
 
1891
            
 
1892
    def __len__(self):
 
1893
        return 4
 
1894
 
 
1895
 
1812
1896
class _GCGraphIndex(object):
1813
1897
    """Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
1814
1898
 
1843
1927
        # repeated over and over, this creates a surplus of ints
1844
1928
        self._int_cache = {}
1845
1929
        if track_external_parent_refs:
1846
 
            self._key_dependencies = knit._KeyRefs(
 
1930
            self._key_dependencies = _KeyRefs(
1847
1931
                track_new_keys=track_new_keys)
1848
1932
        else:
1849
1933
            self._key_dependencies = None
2009
2093
                parents = None
2010
2094
            else:
2011
2095
                parents = entry[3][0]
2012
 
            method = 'group'
2013
 
            result[key] = (self._node_to_position(entry),
2014
 
                                  None, parents, (method, None))
 
2096
            details = _GCBuildDetails(parents, self._node_to_position(entry))
 
2097
            result[key] = details
2015
2098
        return result
2016
2099
 
2017
2100
    def keys(self):
2033
2116
        # each, or about 7MB. Note that it might be even more when you consider
2034
2117
        # how PyInt is allocated in separate slabs. And you can't return a slab
2035
2118
        # to the OS if even 1 int on it is in use. Note though that Python uses
2036
 
        # a LIFO when re-using PyInt slots, which probably causes more
 
2119
        # a LIFO when re-using PyInt slots, which might cause more
2037
2120
        # fragmentation.
2038
2121
        start = int(bits[0])
2039
2122
        start = self._int_cache.setdefault(start, start)