~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/groupcompress.py

  • Committer: Martin Pool
  • Date: 2010-02-25 06:17:27 UTC
  • mfrom: (5055 +trunk)
  • mto: This revision was merged to the branch mainline in revision 5057.
  • Revision ID: mbp@sourcefrog.net-20100225061727-4sd9lt0qmdc6087t
merge news

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008-2011 Canonical Ltd
 
1
# Copyright (C) 2008, 2009, 2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
23
23
except ImportError:
24
24
    pylzma = None
25
25
 
26
 
from bzrlib.lazy_import import lazy_import
27
 
lazy_import(globals(), """
28
26
from bzrlib import (
29
27
    annotate,
30
28
    debug,
31
29
    errors,
32
30
    graph as _mod_graph,
 
31
    knit,
33
32
    osutils,
34
33
    pack,
35
34
    static_tuple,
36
35
    trace,
37
 
    tsort,
38
36
    )
39
 
 
40
 
from bzrlib.repofmt import pack_repo
41
 
""")
42
 
 
43
37
from bzrlib.btree_index import BTreeBuilder
44
38
from bzrlib.lru_cache import LRUSizeCache
 
39
from bzrlib.tsort import topo_sort
45
40
from bzrlib.versionedfile import (
46
 
    _KeyRefs,
47
41
    adapter_registry,
48
42
    AbsentContentFactory,
49
43
    ChunkedContentFactory,
83
77
 
84
78
    present_keys = []
85
79
    for prefix in sorted(per_prefix_map):
86
 
        present_keys.extend(reversed(tsort.topo_sort(per_prefix_map[prefix])))
 
80
        present_keys.extend(reversed(topo_sort(per_prefix_map[prefix])))
87
81
    return present_keys
88
82
 
89
83
 
107
101
    def __init__(self):
108
102
        # map by key? or just order in file?
109
103
        self._compressor_name = None
110
 
        self._z_content_chunks = None
 
104
        self._z_content = None
111
105
        self._z_content_decompressor = None
112
106
        self._z_content_length = None
113
107
        self._content_length = None
141
135
                self._content = ''.join(self._content_chunks)
142
136
                self._content_chunks = None
143
137
        if self._content is None:
144
 
            # We join self._z_content_chunks here, because if we are
145
 
            # decompressing, then it is *very* likely that we have a single
146
 
            # chunk
147
 
            if self._z_content_chunks is None:
 
138
            if self._z_content is None:
148
139
                raise AssertionError('No content to decompress')
149
 
            z_content = ''.join(self._z_content_chunks)
150
 
            if z_content == '':
 
140
            if self._z_content == '':
151
141
                self._content = ''
152
142
            elif self._compressor_name == 'lzma':
153
143
                # We don't do partial lzma decomp yet
154
 
                self._content = pylzma.decompress(z_content)
 
144
                self._content = pylzma.decompress(self._z_content)
155
145
            elif self._compressor_name == 'zlib':
156
146
                # Start a zlib decompressor
157
147
                if num_bytes * 4 > self._content_length * 3:
158
148
                    # If we are requesting more that 3/4ths of the content,
159
149
                    # just extract the whole thing in a single pass
160
150
                    num_bytes = self._content_length
161
 
                    self._content = zlib.decompress(z_content)
 
151
                    self._content = zlib.decompress(self._z_content)
162
152
                else:
163
153
                    self._z_content_decompressor = zlib.decompressobj()
164
154
                    # Seed the decompressor with the uncompressed bytes, so
165
155
                    # that the rest of the code is simplified
166
156
                    self._content = self._z_content_decompressor.decompress(
167
 
                        z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
 
157
                        self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
168
158
                    if not self._z_content_decompressor.unconsumed_tail:
169
159
                        self._z_content_decompressor = None
170
160
            else:
217
207
            # XXX: Define some GCCorrupt error ?
218
208
            raise AssertionError('Invalid bytes: (%d) != %d + %d' %
219
209
                                 (len(bytes), pos, self._z_content_length))
220
 
        self._z_content_chunks = (bytes[pos:],)
221
 
 
222
 
    @property
223
 
    def _z_content(self):
224
 
        """Return z_content_chunks as a simple string.
225
 
 
226
 
        Meant only to be used by the test suite.
227
 
        """
228
 
        if self._z_content_chunks is not None:
229
 
            return ''.join(self._z_content_chunks)
230
 
        return None
 
210
        self._z_content = bytes[pos:]
231
211
 
232
212
    @classmethod
233
213
    def from_bytes(cls, bytes):
289
269
        self._content_length = length
290
270
        self._content_chunks = content_chunks
291
271
        self._content = None
292
 
        self._z_content_chunks = None
 
272
        self._z_content = None
293
273
 
294
274
    def set_content(self, content):
295
275
        """Set the content of this block."""
296
276
        self._content_length = len(content)
297
277
        self._content = content
298
 
        self._z_content_chunks = None
 
278
        self._z_content = None
299
279
 
300
280
    def _create_z_content_using_lzma(self):
301
281
        if self._content_chunks is not None:
303
283
            self._content_chunks = None
304
284
        if self._content is None:
305
285
            raise AssertionError('Nothing to compress')
306
 
        z_content = pylzma.compress(self._content)
307
 
        self._z_content_chunks = (z_content,)
308
 
        self._z_content_length = len(z_content)
 
286
        self._z_content = pylzma.compress(self._content)
 
287
        self._z_content_length = len(self._z_content)
309
288
 
310
 
    def _create_z_content_from_chunks(self, chunks):
 
289
    def _create_z_content_from_chunks(self):
311
290
        compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION)
312
 
        # Peak in this point is 1 fulltext, 1 compressed text, + zlib overhead
313
 
        # (measured peak is maybe 30MB over the above...)
314
 
        compressed_chunks = map(compressor.compress, chunks)
 
291
        compressed_chunks = map(compressor.compress, self._content_chunks)
315
292
        compressed_chunks.append(compressor.flush())
316
 
        # Ignore empty chunks
317
 
        self._z_content_chunks = [c for c in compressed_chunks if c]
318
 
        self._z_content_length = sum(map(len, self._z_content_chunks))
 
293
        self._z_content = ''.join(compressed_chunks)
 
294
        self._z_content_length = len(self._z_content)
319
295
 
320
296
    def _create_z_content(self):
321
 
        if self._z_content_chunks is not None:
 
297
        if self._z_content is not None:
322
298
            return
323
299
        if _USE_LZMA:
324
300
            self._create_z_content_using_lzma()
325
301
            return
326
302
        if self._content_chunks is not None:
327
 
            chunks = self._content_chunks
328
 
        else:
329
 
            chunks = (self._content,)
330
 
        self._create_z_content_from_chunks(chunks)
 
303
            self._create_z_content_from_chunks()
 
304
            return
 
305
        self._z_content = zlib.compress(self._content)
 
306
        self._z_content_length = len(self._z_content)
331
307
 
332
 
    def to_chunks(self):
333
 
        """Create the byte stream as a series of 'chunks'"""
 
308
    def to_bytes(self):
 
309
        """Encode the information into a byte stream."""
334
310
        self._create_z_content()
335
311
        if _USE_LZMA:
336
312
            header = self.GCB_LZ_HEADER
337
313
        else:
338
314
            header = self.GCB_HEADER
339
 
        chunks = ['%s%d\n%d\n'
340
 
                  % (header, self._z_content_length, self._content_length),
 
315
        chunks = [header,
 
316
                  '%d\n%d\n' % (self._z_content_length, self._content_length),
 
317
                  self._z_content,
341
318
                 ]
342
 
        chunks.extend(self._z_content_chunks)
343
 
        total_len = sum(map(len, chunks))
344
 
        return total_len, chunks
345
 
 
346
 
    def to_bytes(self):
347
 
        """Encode the information into a byte stream."""
348
 
        total_len, chunks = self.to_chunks()
349
319
        return ''.join(chunks)
350
320
 
351
321
    def _dump(self, include_text=False):
709
679
        z_header_bytes = zlib.compress(header_bytes)
710
680
        del header_bytes
711
681
        z_header_bytes_len = len(z_header_bytes)
712
 
        block_bytes_len, block_chunks = self._block.to_chunks()
 
682
        block_bytes = self._block.to_bytes()
713
683
        lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
714
 
                                       block_bytes_len))
 
684
                                       len(block_bytes)))
715
685
        lines.append(z_header_bytes)
716
 
        lines.extend(block_chunks)
717
 
        del z_header_bytes, block_chunks
718
 
        # TODO: This is a point where we will double the memory consumption. To
719
 
        #       avoid this, we probably have to switch to a 'chunked' api
 
686
        lines.append(block_bytes)
 
687
        del z_header_bytes, block_bytes
720
688
        return ''.join(lines)
721
689
 
722
690
    @classmethod
723
691
    def from_bytes(cls, bytes):
724
692
        # TODO: This does extra string copying, probably better to do it a
725
 
        #       different way. At a minimum this creates 2 copies of the
726
 
        #       compressed content
 
693
        #       different way
727
694
        (storage_kind, z_header_len, header_len,
728
695
         block_len, rest) = bytes.split('\n', 4)
729
696
        del bytes
887
854
 
888
855
        After calling this, the compressor should no longer be used
889
856
        """
 
857
        # TODO: this causes us to 'bloat' to 2x the size of content in the
 
858
        #       group. This has an impact for 'commit' of large objects.
 
859
        #       One possibility is to use self._content_chunks, and be lazy and
 
860
        #       only fill out self._content as a full string when we actually
 
861
        #       need it. That would at least drop the peak memory consumption
 
862
        #       for 'commit' down to ~1x the size of the largest file, at a
 
863
        #       cost of increased complexity within this code. 2x is still <<
 
864
        #       3x the size of the largest file, so we are doing ok.
890
865
        self._block.set_chunked_content(self.chunks, self.endpoint)
891
866
        self.chunks = None
892
867
        self._delta_index = None
1052
1027
        index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
1053
1028
            add_callback=graph_index.add_nodes,
1054
1029
            inconsistency_fatal=inconsistency_fatal)
1055
 
        access = pack_repo._DirectPackAccess({})
 
1030
        access = knit._DirectPackAccess({})
1056
1031
        access.set_writer(writer, graph_index, (transport, 'newpack'))
1057
1032
        result = GroupCompressVersionedFiles(index, access, delta)
1058
1033
        result.stream = stream
1192
1167
            _unadded_refs = {}
1193
1168
        self._unadded_refs = _unadded_refs
1194
1169
        self._group_cache = LRUSizeCache(max_size=50*1024*1024)
1195
 
        self._immediate_fallback_vfs = []
 
1170
        self._fallback_vfs = []
1196
1171
 
1197
1172
    def without_fallbacks(self):
1198
1173
        """Return a clone of this object without any fallbacks configured."""
1272
1247
 
1273
1248
        :param a_versioned_files: A VersionedFiles object.
1274
1249
        """
1275
 
        self._immediate_fallback_vfs.append(a_versioned_files)
 
1250
        self._fallback_vfs.append(a_versioned_files)
1276
1251
 
1277
1252
    def annotate(self, key):
1278
1253
        """See VersionedFiles.annotate."""
1318
1293
        # KnitVersionedFiles.get_known_graph_ancestry, but they don't share
1319
1294
        # ancestry.
1320
1295
        parent_map, missing_keys = self._index.find_ancestry(keys)
1321
 
        for fallback in self._transitive_fallbacks():
 
1296
        for fallback in self._fallback_vfs:
1322
1297
            if not missing_keys:
1323
1298
                break
1324
1299
            (f_parent_map, f_missing_keys) = fallback._index.find_ancestry(
1348
1323
            and so on.
1349
1324
        """
1350
1325
        result = {}
1351
 
        sources = [self._index] + self._immediate_fallback_vfs
 
1326
        sources = [self._index] + self._fallback_vfs
1352
1327
        source_results = []
1353
1328
        missing = set(keys)
1354
1329
        for source in sources:
1455
1430
        parent_map = {}
1456
1431
        key_to_source_map = {}
1457
1432
        source_results = []
1458
 
        for source in self._immediate_fallback_vfs:
 
1433
        for source in self._fallback_vfs:
1459
1434
            if not missing:
1460
1435
                break
1461
1436
            source_parents = source.get_parent_map(missing)
1476
1451
            the defined order, regardless of source.
1477
1452
        """
1478
1453
        if ordering == 'topological':
1479
 
            present_keys = tsort.topo_sort(parent_map)
 
1454
            present_keys = topo_sort(parent_map)
1480
1455
        else:
1481
1456
            # ordering == 'groupcompress'
1482
1457
            # XXX: This only optimizes for the target ordering. We may need
1655
1630
        self._unadded_refs = {}
1656
1631
        keys_to_add = []
1657
1632
        def flush():
1658
 
            bytes_len, chunks = self._compressor.flush().to_chunks()
1659
 
            self._compressor = GroupCompressor()
1660
 
            # Note: At this point we still have 1 copy of the fulltext (in
1661
 
            #       record and the var 'bytes'), and this generates 2 copies of
1662
 
            #       the compressed text (one for bytes, one in chunks)
1663
 
            # TODO: Push 'chunks' down into the _access api, so that we don't
1664
 
            #       have to double compressed memory here
1665
 
            # TODO: Figure out how to indicate that we would be happy to free
1666
 
            #       the fulltext content at this point. Note that sometimes we
1667
 
            #       will want it later (streaming CHK pages), but most of the
1668
 
            #       time we won't (everything else)
1669
 
            bytes = ''.join(chunks)
1670
 
            del chunks
 
1633
            bytes = self._compressor.flush().to_bytes()
1671
1634
            index, start, length = self._access.add_raw_records(
1672
1635
                [(None, len(bytes))], bytes)[0]
1673
1636
            nodes = []
1676
1639
            self._index.add_records(nodes, random_id=random_id)
1677
1640
            self._unadded_refs = {}
1678
1641
            del keys_to_add[:]
 
1642
            self._compressor = GroupCompressor()
1679
1643
 
1680
1644
        last_prefix = None
1681
1645
        max_fulltext_len = 0
1838
1802
        """See VersionedFiles.keys."""
1839
1803
        if 'evil' in debug.debug_flags:
1840
1804
            trace.mutter_callsite(2, "keys scales with size of history")
1841
 
        sources = [self._index] + self._immediate_fallback_vfs
 
1805
        sources = [self._index] + self._fallback_vfs
1842
1806
        result = set()
1843
1807
        for source in sources:
1844
1808
            result.update(source.keys())
1845
1809
        return result
1846
1810
 
1847
1811
 
1848
 
class _GCBuildDetails(object):
1849
 
    """A blob of data about the build details.
1850
 
 
1851
 
    This stores the minimal data, which then allows compatibility with the old
1852
 
    api, without taking as much memory.
1853
 
    """
1854
 
 
1855
 
    __slots__ = ('_index', '_group_start', '_group_end', '_basis_end',
1856
 
                 '_delta_end', '_parents')
1857
 
 
1858
 
    method = 'group'
1859
 
    compression_parent = None
1860
 
 
1861
 
    def __init__(self, parents, position_info):
1862
 
        self._parents = parents
1863
 
        (self._index, self._group_start, self._group_end, self._basis_end,
1864
 
         self._delta_end) = position_info
1865
 
 
1866
 
    def __repr__(self):
1867
 
        return '%s(%s, %s)' % (self.__class__.__name__,
1868
 
            self.index_memo, self._parents)
1869
 
 
1870
 
    @property
1871
 
    def index_memo(self):
1872
 
        return (self._index, self._group_start, self._group_end,
1873
 
                self._basis_end, self._delta_end)
1874
 
 
1875
 
    @property
1876
 
    def record_details(self):
1877
 
        return static_tuple.StaticTuple(self.method, None)
1878
 
 
1879
 
    def __getitem__(self, offset):
1880
 
        """Compatibility thunk to act like a tuple."""
1881
 
        if offset == 0:
1882
 
            return self.index_memo
1883
 
        elif offset == 1:
1884
 
            return self.compression_parent # Always None
1885
 
        elif offset == 2:
1886
 
            return self._parents
1887
 
        elif offset == 3:
1888
 
            return self.record_details
1889
 
        else:
1890
 
            raise IndexError('offset out of range')
1891
 
            
1892
 
    def __len__(self):
1893
 
        return 4
1894
 
 
1895
 
 
1896
1812
class _GCGraphIndex(object):
1897
1813
    """Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
1898
1814
 
1927
1843
        # repeated over and over, this creates a surplus of ints
1928
1844
        self._int_cache = {}
1929
1845
        if track_external_parent_refs:
1930
 
            self._key_dependencies = _KeyRefs(
 
1846
            self._key_dependencies = knit._KeyRefs(
1931
1847
                track_new_keys=track_new_keys)
1932
1848
        else:
1933
1849
            self._key_dependencies = None
2093
2009
                parents = None
2094
2010
            else:
2095
2011
                parents = entry[3][0]
2096
 
            details = _GCBuildDetails(parents, self._node_to_position(entry))
2097
 
            result[key] = details
 
2012
            method = 'group'
 
2013
            result[key] = (self._node_to_position(entry),
 
2014
                                  None, parents, (method, None))
2098
2015
        return result
2099
2016
 
2100
2017
    def keys(self):
2116
2033
        # each, or about 7MB. Note that it might be even more when you consider
2117
2034
        # how PyInt is allocated in separate slabs. And you can't return a slab
2118
2035
        # to the OS if even 1 int on it is in use. Note though that Python uses
2119
 
        # a LIFO when re-using PyInt slots, which might cause more
 
2036
        # a LIFO when re-using PyInt slots, which probably causes more
2120
2037
        # fragmentation.
2121
2038
        start = int(bits[0])
2122
2039
        start = self._int_cache.setdefault(start, start)