~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/chunk_writer.py

Merge bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
#
17
 
 
18
 
"""ChunkWriter: write compressed data out with a fixed upper bound."""
19
 
 
20
 
import zlib
21
 
from zlib import Z_FINISH, Z_SYNC_FLUSH
22
 
 
23
 
 
24
 
class ChunkWriter(object):
25
 
    """ChunkWriter allows writing of compressed data with a fixed size.
26
 
 
27
 
    If less data is supplied than fills a chunk, the chunk is padded with
28
 
    NULL bytes. If more data is supplied, then the writer packs as much
29
 
    in as it can, but never splits any item it was given.
30
 
 
31
 
    The algorithm for packing is open to improvement! Current it is:
32
 
     - write the bytes given
33
 
     - if the total seen bytes so far exceeds the chunk size, flush.
34
 
 
35
 
    :cvar _max_repack: To fit the maximum number of entries into a node, we
36
 
        will sometimes start over and compress the whole list to get tighter
37
 
        packing. We get diminishing returns after a while, so this limits the
38
 
        number of times we will try.
39
 
        The default is to try to avoid recompressing entirely, but setting this
40
 
        to something like 20 will give maximum compression.
41
 
 
42
 
    :cvar _max_zsync: Another tunable nob. If _max_repack is set to 0, then you
43
 
        can limit the number of times we will try to pack more data into a
44
 
        node. This allows us to do a single compression pass, rather than
45
 
        trying until we overflow, and then recompressing again.
46
 
    """
47
 
    #    In testing, some values for bzr.dev::
48
 
    #        repack  time  MB   max   full
49
 
    #         1       7.5  4.6  1140  0
50
 
    #         2       8.4  4.2  1036  1
51
 
    #         3       9.8  4.1  1012  278
52
 
    #         4      10.8  4.1  728   945
53
 
    #        20      11.1  4.1  0     1012
54
 
    #        repack = 0
55
 
    #        zsync   time  MB    repack  stop_for_z
56
 
    #         0       5.0  24.7  0       6270
57
 
    #         1       4.3  13.2  0       3342
58
 
    #         2       4.9   9.6  0       2414
59
 
    #         5       4.8   6.2  0       1549
60
 
    #         6       4.8   5.8  1       1435
61
 
    #         7       4.8   5.5  19      1337
62
 
    #         8       4.4   5.3  81      1220
63
 
    #        10       5.3   5.0  260     967
64
 
    #        11       5.3   4.9  366     839
65
 
    #        12       5.1   4.8  454     731
66
 
    #        15       5.8   4.7  704     450
67
 
    #        20       5.8   4.6  1133    7
68
 
 
69
 
    #    In testing, some values for mysql-unpacked::
70
 
    #                next_bytes estim
71
 
    #        repack  time  MB    full    stop_for_repack
72
 
    #         1            15.4  0       3913
73
 
    #         2      35.4  13.7  0       346
74
 
    #        20      46.7  13.4  3380    0
75
 
    #        repack=0
76
 
    #        zsync                       stop_for_z
77
 
    #         0      29.5 116.5  0       29782
78
 
    #         1      27.8  60.2  0       15356
79
 
    #         2      27.8  42.4  0       10822
80
 
    #         5      26.8  25.5  0       6491
81
 
    #         6      27.3  23.2  13      5896
82
 
    #         7      27.5  21.6  29      5451
83
 
    #         8      27.1  20.3  52      5108
84
 
    #        10      29.4  18.6  195     4526
85
 
    #        11      29.2  18.0  421     4143
86
 
    #        12      28.0  17.5  702     3738
87
 
    #        15      28.9  16.5  1223    2969
88
 
    #        20      29.6  15.7  2182    1810
89
 
    #        30      31.4  15.4  3891    23
90
 
 
91
 
    # Tuple of (num_repack_attempts, num_zsync_attempts)
92
 
    # num_zsync_attempts only has meaning if num_repack_attempts is 0.
93
 
    _repack_opts_for_speed = (0, 8)
94
 
    _repack_opts_for_size = (20, 0)
95
 
 
96
 
    def __init__(self, chunk_size, reserved=0, optimize_for_size=False):
97
 
        """Create a ChunkWriter to write chunk_size chunks.
98
 
 
99
 
        :param chunk_size: The total byte count to emit at the end of the
100
 
            chunk.
101
 
        :param reserved: How many bytes to allow for reserved data. reserved
102
 
            data space can only be written to via the write(..., reserved=True).
103
 
        """
104
 
        self.chunk_size = chunk_size
105
 
        self.compressor = zlib.compressobj()
106
 
        self.bytes_in = []
107
 
        self.bytes_list = []
108
 
        self.bytes_out_len = 0
109
 
        # bytes that have been seen, but not included in a flush to out yet
110
 
        self.unflushed_in_bytes = 0
111
 
        self.num_repack = 0
112
 
        self.num_zsync = 0
113
 
        self.unused_bytes = None
114
 
        self.reserved_size = reserved
115
 
        # Default is to make building fast rather than compact
116
 
        self.set_optimize(for_size=optimize_for_size)
117
 
 
118
 
    def finish(self):
119
 
        """Finish the chunk.
120
 
 
121
 
        This returns the final compressed chunk, and either None, or the
122
 
        bytes that did not fit in the chunk.
123
 
 
124
 
        :return: (compressed_bytes, unused_bytes, num_nulls_needed)
125
 
 
126
 
            * compressed_bytes: a list of bytes that were output from the
127
 
              compressor. If the compressed length was not exactly chunk_size,
128
 
              the final string will be a string of all null bytes to pad this
129
 
              to chunk_size
130
 
            * unused_bytes: None, or the last bytes that were added, which we
131
 
              could not fit.
132
 
            * num_nulls_needed: How many nulls are padded at the end
133
 
        """
134
 
        self.bytes_in = None # Free the data cached so far, we don't need it
135
 
        out = self.compressor.flush(Z_FINISH)
136
 
        self.bytes_list.append(out)
137
 
        self.bytes_out_len += len(out)
138
 
 
139
 
        if self.bytes_out_len > self.chunk_size:
140
 
            raise AssertionError('Somehow we ended up with too much'
141
 
                                 ' compressed data, %d > %d'
142
 
                                 % (self.bytes_out_len, self.chunk_size))
143
 
        nulls_needed = self.chunk_size - self.bytes_out_len
144
 
        if nulls_needed:
145
 
            self.bytes_list.append("\x00" * nulls_needed)
146
 
        return self.bytes_list, self.unused_bytes, nulls_needed
147
 
 
148
 
    def set_optimize(self, for_size=True):
149
 
        """Change how we optimize our writes.
150
 
 
151
 
        :param for_size: If True, optimize for minimum space usage, otherwise
152
 
            optimize for fastest writing speed.
153
 
        :return: None
154
 
        """
155
 
        if for_size:
156
 
            opts = ChunkWriter._repack_opts_for_size
157
 
        else:
158
 
            opts = ChunkWriter._repack_opts_for_speed
159
 
        self._max_repack, self._max_zsync = opts
160
 
 
161
 
    def _recompress_all_bytes_in(self, extra_bytes=None):
162
 
        """Recompress the current bytes_in, and optionally more.
163
 
 
164
 
        :param extra_bytes: Optional, if supplied we will add it with
165
 
            Z_SYNC_FLUSH
166
 
        :return: (bytes_out, bytes_out_len, alt_compressed)
167
 
 
168
 
            * bytes_out: is the compressed bytes returned from the compressor
169
 
            * bytes_out_len: the length of the compressed output
170
 
            * compressor: An object with everything packed in so far, and
171
 
              Z_SYNC_FLUSH called.
172
 
        """
173
 
        compressor = zlib.compressobj()
174
 
        bytes_out = []
175
 
        append = bytes_out.append
176
 
        compress = compressor.compress
177
 
        for accepted_bytes in self.bytes_in:
178
 
            out = compress(accepted_bytes)
179
 
            if out:
180
 
                append(out)
181
 
        if extra_bytes:
182
 
            out = compress(extra_bytes)
183
 
            out += compressor.flush(Z_SYNC_FLUSH)
184
 
            append(out)
185
 
        bytes_out_len = sum(map(len, bytes_out))
186
 
        return bytes_out, bytes_out_len, compressor
187
 
 
188
 
    def write(self, bytes, reserved=False):
189
 
        """Write some bytes to the chunk.
190
 
 
191
 
        If the bytes fit, False is returned. Otherwise True is returned
192
 
        and the bytes have not been added to the chunk.
193
 
 
194
 
        :param bytes: The bytes to include
195
 
        :param reserved: If True, we can use the space reserved in the
196
 
            constructor.
197
 
        """
198
 
        if self.num_repack > self._max_repack and not reserved:
199
 
            self.unused_bytes = bytes
200
 
            return True
201
 
        if reserved:
202
 
            capacity = self.chunk_size
203
 
        else:
204
 
            capacity = self.chunk_size - self.reserved_size
205
 
        comp = self.compressor
206
 
 
207
 
        # Check to see if the currently unflushed bytes would fit with a bit of
208
 
        # room to spare, assuming no compression.
209
 
        next_unflushed = self.unflushed_in_bytes + len(bytes)
210
 
        remaining_capacity = capacity - self.bytes_out_len - 10
211
 
        if (next_unflushed < remaining_capacity):
212
 
            # looks like it will fit
213
 
            out = comp.compress(bytes)
214
 
            if out:
215
 
                self.bytes_list.append(out)
216
 
                self.bytes_out_len += len(out)
217
 
            self.bytes_in.append(bytes)
218
 
            self.unflushed_in_bytes += len(bytes)
219
 
        else:
220
 
            # This may or may not fit, try to add it with Z_SYNC_FLUSH
221
 
            # Note: It is tempting to do this as a look-ahead pass, and to
222
 
            #       'copy()' the compressor before flushing. However, it seems
223
 
            #       that Which means that it is the same thing as increasing
224
 
            #       repack, similar cost, same benefit. And this way we still
225
 
            #       have the 'repack' knob that can be adjusted, and not depend
226
 
            #       on a platform-specific 'copy()' function.
227
 
            self.num_zsync += 1
228
 
            if self._max_repack == 0 and self.num_zsync > self._max_zsync:
229
 
                self.num_repack += 1
230
 
                self.unused_bytes = bytes
231
 
                return True
232
 
            out = comp.compress(bytes)
233
 
            out += comp.flush(Z_SYNC_FLUSH)
234
 
            self.unflushed_in_bytes = 0
235
 
            if out:
236
 
                self.bytes_list.append(out)
237
 
                self.bytes_out_len += len(out)
238
 
 
239
 
            # We are a bit extra conservative, because it seems that you *can*
240
 
            # get better compression with Z_SYNC_FLUSH than a full compress. It
241
 
            # is probably very rare, but we were able to trigger it.
242
 
            if self.num_repack == 0:
243
 
                safety_margin = 100
244
 
            else:
245
 
                safety_margin = 10
246
 
            if self.bytes_out_len + safety_margin <= capacity:
247
 
                # It fit, so mark it added
248
 
                self.bytes_in.append(bytes)
249
 
            else:
250
 
                # We are over budget, try to squeeze this in without any
251
 
                # Z_SYNC_FLUSH calls
252
 
                self.num_repack += 1
253
 
                (bytes_out, this_len,
254
 
                 compressor) = self._recompress_all_bytes_in(bytes)
255
 
                if self.num_repack >= self._max_repack:
256
 
                    # When we get *to* _max_repack, bump over so that the
257
 
                    # earlier > _max_repack will be triggered.
258
 
                    self.num_repack += 1
259
 
                if this_len + 10 > capacity:
260
 
                    (bytes_out, this_len,
261
 
                     compressor) = self._recompress_all_bytes_in()
262
 
                    self.compressor = compressor
263
 
                    # Force us to not allow more data
264
 
                    self.num_repack = self._max_repack + 1
265
 
                    self.bytes_list = bytes_out
266
 
                    self.bytes_out_len = this_len
267
 
                    self.unused_bytes = bytes
268
 
                    return True
269
 
                else:
270
 
                    # This fits when we pack it tighter, so use the new packing
271
 
                    self.compressor = compressor
272
 
                    self.bytes_in.append(bytes)
273
 
                    self.bytes_list = bytes_out
274
 
                    self.bytes_out_len = this_len
275
 
        return False
276