1
# Copyright (C) 2008 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18
"""ChunkWriter: write compressed data out with a fixed upper bound."""
20
from __future__ import absolute_import
23
from zlib import Z_FINISH, Z_SYNC_FLUSH
26
class ChunkWriter(object):
27
"""ChunkWriter allows writing of compressed data with a fixed size.
29
If less data is supplied than fills a chunk, the chunk is padded with
30
NULL bytes. If more data is supplied, then the writer packs as much
31
in as it can, but never splits any item it was given.
33
The algorithm for packing is open to improvement! Current it is:
34
- write the bytes given
35
- if the total seen bytes so far exceeds the chunk size, flush.
37
:cvar _max_repack: To fit the maximum number of entries into a node, we
38
will sometimes start over and compress the whole list to get tighter
39
packing. We get diminishing returns after a while, so this limits the
40
number of times we will try.
41
The default is to try to avoid recompressing entirely, but setting this
42
to something like 20 will give maximum compression.
44
:cvar _max_zsync: Another tunable nob. If _max_repack is set to 0, then you
45
can limit the number of times we will try to pack more data into a
46
node. This allows us to do a single compression pass, rather than
47
trying until we overflow, and then recompressing again.
49
# In testing, some values for bzr.dev::
50
# repack time MB max full
57
# zsync time MB repack stop_for_z
71
# In testing, some values for mysql-unpacked::
73
# repack time MB full stop_for_repack
79
# 0 29.5 116.5 0 29782
86
# 10 29.4 18.6 195 4526
87
# 11 29.2 18.0 421 4143
88
# 12 28.0 17.5 702 3738
89
# 15 28.9 16.5 1223 2969
90
# 20 29.6 15.7 2182 1810
91
# 30 31.4 15.4 3891 23
93
# Tuple of (num_repack_attempts, num_zsync_attempts)
94
# num_zsync_attempts only has meaning if num_repack_attempts is 0.
95
_repack_opts_for_speed = (0, 8)
96
_repack_opts_for_size = (20, 0)
98
def __init__(self, chunk_size, reserved=0, optimize_for_size=False):
99
"""Create a ChunkWriter to write chunk_size chunks.
101
:param chunk_size: The total byte count to emit at the end of the
103
:param reserved: How many bytes to allow for reserved data. reserved
104
data space can only be written to via the write(..., reserved=True).
106
self.chunk_size = chunk_size
107
self.compressor = zlib.compressobj()
110
self.bytes_out_len = 0
111
# bytes that have been seen, but not included in a flush to out yet
112
self.unflushed_in_bytes = 0
115
self.unused_bytes = None
116
self.reserved_size = reserved
117
# Default is to make building fast rather than compact
118
self.set_optimize(for_size=optimize_for_size)
123
This returns the final compressed chunk, and either None, or the
124
bytes that did not fit in the chunk.
126
:return: (compressed_bytes, unused_bytes, num_nulls_needed)
128
* compressed_bytes: a list of bytes that were output from the
129
compressor. If the compressed length was not exactly chunk_size,
130
the final string will be a string of all null bytes to pad this
132
* unused_bytes: None, or the last bytes that were added, which we
134
* num_nulls_needed: How many nulls are padded at the end
136
self.bytes_in = None # Free the data cached so far, we don't need it
137
out = self.compressor.flush(Z_FINISH)
138
self.bytes_list.append(out)
139
self.bytes_out_len += len(out)
141
if self.bytes_out_len > self.chunk_size:
142
raise AssertionError('Somehow we ended up with too much'
143
' compressed data, %d > %d'
144
% (self.bytes_out_len, self.chunk_size))
145
nulls_needed = self.chunk_size - self.bytes_out_len
147
self.bytes_list.append("\x00" * nulls_needed)
148
return self.bytes_list, self.unused_bytes, nulls_needed
150
def set_optimize(self, for_size=True):
151
"""Change how we optimize our writes.
153
:param for_size: If True, optimize for minimum space usage, otherwise
154
optimize for fastest writing speed.
158
opts = ChunkWriter._repack_opts_for_size
160
opts = ChunkWriter._repack_opts_for_speed
161
self._max_repack, self._max_zsync = opts
163
def _recompress_all_bytes_in(self, extra_bytes=None):
164
"""Recompress the current bytes_in, and optionally more.
166
:param extra_bytes: Optional, if supplied we will add it with
168
:return: (bytes_out, bytes_out_len, alt_compressed)
170
* bytes_out: is the compressed bytes returned from the compressor
171
* bytes_out_len: the length of the compressed output
172
* compressor: An object with everything packed in so far, and
175
compressor = zlib.compressobj()
177
append = bytes_out.append
178
compress = compressor.compress
179
for accepted_bytes in self.bytes_in:
180
out = compress(accepted_bytes)
184
out = compress(extra_bytes)
185
out += compressor.flush(Z_SYNC_FLUSH)
187
bytes_out_len = sum(map(len, bytes_out))
188
return bytes_out, bytes_out_len, compressor
190
def write(self, bytes, reserved=False):
191
"""Write some bytes to the chunk.
193
If the bytes fit, False is returned. Otherwise True is returned
194
and the bytes have not been added to the chunk.
196
:param bytes: The bytes to include
197
:param reserved: If True, we can use the space reserved in the
200
if self.num_repack > self._max_repack and not reserved:
201
self.unused_bytes = bytes
204
capacity = self.chunk_size
206
capacity = self.chunk_size - self.reserved_size
207
comp = self.compressor
209
# Check to see if the currently unflushed bytes would fit with a bit of
210
# room to spare, assuming no compression.
211
next_unflushed = self.unflushed_in_bytes + len(bytes)
212
remaining_capacity = capacity - self.bytes_out_len - 10
213
if (next_unflushed < remaining_capacity):
214
# looks like it will fit
215
out = comp.compress(bytes)
217
self.bytes_list.append(out)
218
self.bytes_out_len += len(out)
219
self.bytes_in.append(bytes)
220
self.unflushed_in_bytes += len(bytes)
222
# This may or may not fit, try to add it with Z_SYNC_FLUSH
223
# Note: It is tempting to do this as a look-ahead pass, and to
224
# 'copy()' the compressor before flushing. However, it seems
225
# that Which means that it is the same thing as increasing
226
# repack, similar cost, same benefit. And this way we still
227
# have the 'repack' knob that can be adjusted, and not depend
228
# on a platform-specific 'copy()' function.
230
if self._max_repack == 0 and self.num_zsync > self._max_zsync:
232
self.unused_bytes = bytes
234
out = comp.compress(bytes)
235
out += comp.flush(Z_SYNC_FLUSH)
236
self.unflushed_in_bytes = 0
238
self.bytes_list.append(out)
239
self.bytes_out_len += len(out)
241
# We are a bit extra conservative, because it seems that you *can*
242
# get better compression with Z_SYNC_FLUSH than a full compress. It
243
# is probably very rare, but we were able to trigger it.
244
if self.num_repack == 0:
248
if self.bytes_out_len + safety_margin <= capacity:
249
# It fit, so mark it added
250
self.bytes_in.append(bytes)
252
# We are over budget, try to squeeze this in without any
255
(bytes_out, this_len,
256
compressor) = self._recompress_all_bytes_in(bytes)
257
if self.num_repack >= self._max_repack:
258
# When we get *to* _max_repack, bump over so that the
259
# earlier > _max_repack will be triggered.
261
if this_len + 10 > capacity:
262
(bytes_out, this_len,
263
compressor) = self._recompress_all_bytes_in()
264
self.compressor = compressor
265
# Force us to not allow more data
266
self.num_repack = self._max_repack + 1
267
self.bytes_list = bytes_out
268
self.bytes_out_len = this_len
269
self.unused_bytes = bytes
272
# This fits when we pack it tighter, so use the new packing
273
self.compressor = compressor
274
self.bytes_in.append(bytes)
275
self.bytes_list = bytes_out
276
self.bytes_out_len = this_len