~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/chunk_writer.py

merge merge tweaks from aaron, which includes latest .dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
#
17
 
 
18
 
"""ChunkWriter: write compressed data out with a fixed upper bound."""
19
 
 
20
 
from __future__ import absolute_import
21
 
 
22
 
import zlib
23
 
from zlib import Z_FINISH, Z_SYNC_FLUSH
24
 
 
25
 
 
26
 
class ChunkWriter(object):
27
 
    """ChunkWriter allows writing of compressed data with a fixed size.
28
 
 
29
 
    If less data is supplied than fills a chunk, the chunk is padded with
30
 
    NULL bytes. If more data is supplied, then the writer packs as much
31
 
    in as it can, but never splits any item it was given.
32
 
 
33
 
    The algorithm for packing is open to improvement! Current it is:
34
 
     - write the bytes given
35
 
     - if the total seen bytes so far exceeds the chunk size, flush.
36
 
 
37
 
    :cvar _max_repack: To fit the maximum number of entries into a node, we
38
 
        will sometimes start over and compress the whole list to get tighter
39
 
        packing. We get diminishing returns after a while, so this limits the
40
 
        number of times we will try.
41
 
        The default is to try to avoid recompressing entirely, but setting this
42
 
        to something like 20 will give maximum compression.
43
 
 
44
 
    :cvar _max_zsync: Another tunable nob. If _max_repack is set to 0, then you
45
 
        can limit the number of times we will try to pack more data into a
46
 
        node. This allows us to do a single compression pass, rather than
47
 
        trying until we overflow, and then recompressing again.
48
 
    """
49
 
    #    In testing, some values for bzr.dev::
50
 
    #        repack  time  MB   max   full
51
 
    #         1       7.5  4.6  1140  0
52
 
    #         2       8.4  4.2  1036  1
53
 
    #         3       9.8  4.1  1012  278
54
 
    #         4      10.8  4.1  728   945
55
 
    #        20      11.1  4.1  0     1012
56
 
    #        repack = 0
57
 
    #        zsync   time  MB    repack  stop_for_z
58
 
    #         0       5.0  24.7  0       6270
59
 
    #         1       4.3  13.2  0       3342
60
 
    #         2       4.9   9.6  0       2414
61
 
    #         5       4.8   6.2  0       1549
62
 
    #         6       4.8   5.8  1       1435
63
 
    #         7       4.8   5.5  19      1337
64
 
    #         8       4.4   5.3  81      1220
65
 
    #        10       5.3   5.0  260     967
66
 
    #        11       5.3   4.9  366     839
67
 
    #        12       5.1   4.8  454     731
68
 
    #        15       5.8   4.7  704     450
69
 
    #        20       5.8   4.6  1133    7
70
 
 
71
 
    #    In testing, some values for mysql-unpacked::
72
 
    #                next_bytes estim
73
 
    #        repack  time  MB    full    stop_for_repack
74
 
    #         1            15.4  0       3913
75
 
    #         2      35.4  13.7  0       346
76
 
    #        20      46.7  13.4  3380    0
77
 
    #        repack=0
78
 
    #        zsync                       stop_for_z
79
 
    #         0      29.5 116.5  0       29782
80
 
    #         1      27.8  60.2  0       15356
81
 
    #         2      27.8  42.4  0       10822
82
 
    #         5      26.8  25.5  0       6491
83
 
    #         6      27.3  23.2  13      5896
84
 
    #         7      27.5  21.6  29      5451
85
 
    #         8      27.1  20.3  52      5108
86
 
    #        10      29.4  18.6  195     4526
87
 
    #        11      29.2  18.0  421     4143
88
 
    #        12      28.0  17.5  702     3738
89
 
    #        15      28.9  16.5  1223    2969
90
 
    #        20      29.6  15.7  2182    1810
91
 
    #        30      31.4  15.4  3891    23
92
 
 
93
 
    # Tuple of (num_repack_attempts, num_zsync_attempts)
94
 
    # num_zsync_attempts only has meaning if num_repack_attempts is 0.
95
 
    _repack_opts_for_speed = (0, 8)
96
 
    _repack_opts_for_size = (20, 0)
97
 
 
98
 
    def __init__(self, chunk_size, reserved=0, optimize_for_size=False):
99
 
        """Create a ChunkWriter to write chunk_size chunks.
100
 
 
101
 
        :param chunk_size: The total byte count to emit at the end of the
102
 
            chunk.
103
 
        :param reserved: How many bytes to allow for reserved data. reserved
104
 
            data space can only be written to via the write(..., reserved=True).
105
 
        """
106
 
        self.chunk_size = chunk_size
107
 
        self.compressor = zlib.compressobj()
108
 
        self.bytes_in = []
109
 
        self.bytes_list = []
110
 
        self.bytes_out_len = 0
111
 
        # bytes that have been seen, but not included in a flush to out yet
112
 
        self.unflushed_in_bytes = 0
113
 
        self.num_repack = 0
114
 
        self.num_zsync = 0
115
 
        self.unused_bytes = None
116
 
        self.reserved_size = reserved
117
 
        # Default is to make building fast rather than compact
118
 
        self.set_optimize(for_size=optimize_for_size)
119
 
 
120
 
    def finish(self):
121
 
        """Finish the chunk.
122
 
 
123
 
        This returns the final compressed chunk, and either None, or the
124
 
        bytes that did not fit in the chunk.
125
 
 
126
 
        :return: (compressed_bytes, unused_bytes, num_nulls_needed)
127
 
 
128
 
            * compressed_bytes: a list of bytes that were output from the
129
 
              compressor. If the compressed length was not exactly chunk_size,
130
 
              the final string will be a string of all null bytes to pad this
131
 
              to chunk_size
132
 
            * unused_bytes: None, or the last bytes that were added, which we
133
 
              could not fit.
134
 
            * num_nulls_needed: How many nulls are padded at the end
135
 
        """
136
 
        self.bytes_in = None # Free the data cached so far, we don't need it
137
 
        out = self.compressor.flush(Z_FINISH)
138
 
        self.bytes_list.append(out)
139
 
        self.bytes_out_len += len(out)
140
 
 
141
 
        if self.bytes_out_len > self.chunk_size:
142
 
            raise AssertionError('Somehow we ended up with too much'
143
 
                                 ' compressed data, %d > %d'
144
 
                                 % (self.bytes_out_len, self.chunk_size))
145
 
        nulls_needed = self.chunk_size - self.bytes_out_len
146
 
        if nulls_needed:
147
 
            self.bytes_list.append("\x00" * nulls_needed)
148
 
        return self.bytes_list, self.unused_bytes, nulls_needed
149
 
 
150
 
    def set_optimize(self, for_size=True):
151
 
        """Change how we optimize our writes.
152
 
 
153
 
        :param for_size: If True, optimize for minimum space usage, otherwise
154
 
            optimize for fastest writing speed.
155
 
        :return: None
156
 
        """
157
 
        if for_size:
158
 
            opts = ChunkWriter._repack_opts_for_size
159
 
        else:
160
 
            opts = ChunkWriter._repack_opts_for_speed
161
 
        self._max_repack, self._max_zsync = opts
162
 
 
163
 
    def _recompress_all_bytes_in(self, extra_bytes=None):
164
 
        """Recompress the current bytes_in, and optionally more.
165
 
 
166
 
        :param extra_bytes: Optional, if supplied we will add it with
167
 
            Z_SYNC_FLUSH
168
 
        :return: (bytes_out, bytes_out_len, alt_compressed)
169
 
 
170
 
            * bytes_out: is the compressed bytes returned from the compressor
171
 
            * bytes_out_len: the length of the compressed output
172
 
            * compressor: An object with everything packed in so far, and
173
 
              Z_SYNC_FLUSH called.
174
 
        """
175
 
        compressor = zlib.compressobj()
176
 
        bytes_out = []
177
 
        append = bytes_out.append
178
 
        compress = compressor.compress
179
 
        for accepted_bytes in self.bytes_in:
180
 
            out = compress(accepted_bytes)
181
 
            if out:
182
 
                append(out)
183
 
        if extra_bytes:
184
 
            out = compress(extra_bytes)
185
 
            out += compressor.flush(Z_SYNC_FLUSH)
186
 
            append(out)
187
 
        bytes_out_len = sum(map(len, bytes_out))
188
 
        return bytes_out, bytes_out_len, compressor
189
 
 
190
 
    def write(self, bytes, reserved=False):
191
 
        """Write some bytes to the chunk.
192
 
 
193
 
        If the bytes fit, False is returned. Otherwise True is returned
194
 
        and the bytes have not been added to the chunk.
195
 
 
196
 
        :param bytes: The bytes to include
197
 
        :param reserved: If True, we can use the space reserved in the
198
 
            constructor.
199
 
        """
200
 
        if self.num_repack > self._max_repack and not reserved:
201
 
            self.unused_bytes = bytes
202
 
            return True
203
 
        if reserved:
204
 
            capacity = self.chunk_size
205
 
        else:
206
 
            capacity = self.chunk_size - self.reserved_size
207
 
        comp = self.compressor
208
 
 
209
 
        # Check to see if the currently unflushed bytes would fit with a bit of
210
 
        # room to spare, assuming no compression.
211
 
        next_unflushed = self.unflushed_in_bytes + len(bytes)
212
 
        remaining_capacity = capacity - self.bytes_out_len - 10
213
 
        if (next_unflushed < remaining_capacity):
214
 
            # looks like it will fit
215
 
            out = comp.compress(bytes)
216
 
            if out:
217
 
                self.bytes_list.append(out)
218
 
                self.bytes_out_len += len(out)
219
 
            self.bytes_in.append(bytes)
220
 
            self.unflushed_in_bytes += len(bytes)
221
 
        else:
222
 
            # This may or may not fit, try to add it with Z_SYNC_FLUSH
223
 
            # Note: It is tempting to do this as a look-ahead pass, and to
224
 
            #       'copy()' the compressor before flushing. However, it seems
225
 
            #       that Which means that it is the same thing as increasing
226
 
            #       repack, similar cost, same benefit. And this way we still
227
 
            #       have the 'repack' knob that can be adjusted, and not depend
228
 
            #       on a platform-specific 'copy()' function.
229
 
            self.num_zsync += 1
230
 
            if self._max_repack == 0 and self.num_zsync > self._max_zsync:
231
 
                self.num_repack += 1
232
 
                self.unused_bytes = bytes
233
 
                return True
234
 
            out = comp.compress(bytes)
235
 
            out += comp.flush(Z_SYNC_FLUSH)
236
 
            self.unflushed_in_bytes = 0
237
 
            if out:
238
 
                self.bytes_list.append(out)
239
 
                self.bytes_out_len += len(out)
240
 
 
241
 
            # We are a bit extra conservative, because it seems that you *can*
242
 
            # get better compression with Z_SYNC_FLUSH than a full compress. It
243
 
            # is probably very rare, but we were able to trigger it.
244
 
            if self.num_repack == 0:
245
 
                safety_margin = 100
246
 
            else:
247
 
                safety_margin = 10
248
 
            if self.bytes_out_len + safety_margin <= capacity:
249
 
                # It fit, so mark it added
250
 
                self.bytes_in.append(bytes)
251
 
            else:
252
 
                # We are over budget, try to squeeze this in without any
253
 
                # Z_SYNC_FLUSH calls
254
 
                self.num_repack += 1
255
 
                (bytes_out, this_len,
256
 
                 compressor) = self._recompress_all_bytes_in(bytes)
257
 
                if self.num_repack >= self._max_repack:
258
 
                    # When we get *to* _max_repack, bump over so that the
259
 
                    # earlier > _max_repack will be triggered.
260
 
                    self.num_repack += 1
261
 
                if this_len + 10 > capacity:
262
 
                    (bytes_out, this_len,
263
 
                     compressor) = self._recompress_all_bytes_in()
264
 
                    self.compressor = compressor
265
 
                    # Force us to not allow more data
266
 
                    self.num_repack = self._max_repack + 1
267
 
                    self.bytes_list = bytes_out
268
 
                    self.bytes_out_len = this_len
269
 
                    self.unused_bytes = bytes
270
 
                    return True
271
 
                else:
272
 
                    # This fits when we pack it tighter, so use the new packing
273
 
                    self.compressor = compressor
274
 
                    self.bytes_in.append(bytes)
275
 
                    self.bytes_list = bytes_out
276
 
                    self.bytes_out_len = this_len
277
 
        return False
278