1
# Copyright (C) 2008 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18
"""ChunkWriter: write compressed data out with a fixed upper bound."""
21
from zlib import Z_FINISH, Z_SYNC_FLUSH
24
class ChunkWriter(object):
25
"""ChunkWriter allows writing of compressed data with a fixed size.
27
If less data is supplied than fills a chunk, the chunk is padded with
28
NULL bytes. If more data is supplied, then the writer packs as much
29
in as it can, but never splits any item it was given.
31
The algorithm for packing is open to improvement! Current it is:
32
- write the bytes given
33
- if the total seen bytes so far exceeds the chunk size, flush.
35
:cvar _max_repack: To fit the maximum number of entries into a node, we
36
will sometimes start over and compress the whole list to get tighter
37
packing. We get diminishing returns after a while, so this limits the
38
number of times we will try.
39
The default is to try to avoid recompressing entirely, but setting this
40
to something like 20 will give maximum compression.
42
:cvar _max_zsync: Another tunable nob. If _max_repack is set to 0, then you
43
can limit the number of times we will try to pack more data into a
44
node. This allows us to do a single compression pass, rather than
45
trying until we overflow, and then recompressing again.
47
# In testing, some values for bzr.dev::
48
# repack time MB max full
55
# zsync time MB repack stop_for_z
69
# In testing, some values for mysql-unpacked::
71
# repack time MB full stop_for_repack
77
# 0 29.5 116.5 0 29782
84
# 10 29.4 18.6 195 4526
85
# 11 29.2 18.0 421 4143
86
# 12 28.0 17.5 702 3738
87
# 15 28.9 16.5 1223 2969
88
# 20 29.6 15.7 2182 1810
89
# 30 31.4 15.4 3891 23
91
# Tuple of (num_repack_attempts, num_zsync_attempts)
92
# num_zsync_attempts only has meaning if num_repack_attempts is 0.
93
_repack_opts_for_speed = (0, 8)
94
_repack_opts_for_size = (20, 0)
96
def __init__(self, chunk_size, reserved=0, optimize_for_size=False):
97
"""Create a ChunkWriter to write chunk_size chunks.
99
:param chunk_size: The total byte count to emit at the end of the
101
:param reserved: How many bytes to allow for reserved data. reserved
102
data space can only be written to via the write(..., reserved=True).
104
self.chunk_size = chunk_size
105
self.compressor = zlib.compressobj()
108
self.bytes_out_len = 0
109
# bytes that have been seen, but not included in a flush to out yet
110
self.unflushed_in_bytes = 0
113
self.unused_bytes = None
114
self.reserved_size = reserved
115
# Default is to make building fast rather than compact
116
self.set_optimize(for_size=optimize_for_size)
121
This returns the final compressed chunk, and either None, or the
122
bytes that did not fit in the chunk.
124
:return: (compressed_bytes, unused_bytes, num_nulls_needed)
125
compressed_bytes a list of bytes that were output from the
126
compressor. If the compressed length was not
127
exactly chunk_size, the final string will be a
128
string of all null bytes to pad this to
130
unused_bytes None, or the last bytes that were added, which
132
num_nulls_needed How many nulls are padded at the end
134
self.bytes_in = None # Free the data cached so far, we don't need it
135
out = self.compressor.flush(Z_FINISH)
136
self.bytes_list.append(out)
137
self.bytes_out_len += len(out)
139
if self.bytes_out_len > self.chunk_size:
140
raise AssertionError('Somehow we ended up with too much'
141
' compressed data, %d > %d'
142
% (self.bytes_out_len, self.chunk_size))
143
nulls_needed = self.chunk_size - self.bytes_out_len
145
self.bytes_list.append("\x00" * nulls_needed)
146
return self.bytes_list, self.unused_bytes, nulls_needed
148
def set_optimize(self, for_size=True):
149
"""Change how we optimize our writes.
151
:param for_size: If True, optimize for minimum space usage, otherwise
152
optimize for fastest writing speed.
156
opts = ChunkWriter._repack_opts_for_size
158
opts = ChunkWriter._repack_opts_for_speed
159
self._max_repack, self._max_zsync = opts
161
def _recompress_all_bytes_in(self, extra_bytes=None):
162
"""Recompress the current bytes_in, and optionally more.
164
:param extra_bytes: Optional, if supplied we will add it with
166
:return: (bytes_out, bytes_out_len, alt_compressed)
167
bytes_out is the compressed bytes returned from the compressor
168
bytes_out_len the length of the compressed output
169
compressor An object with everything packed in so far, and
172
compressor = zlib.compressobj()
174
append = bytes_out.append
175
compress = compressor.compress
176
for accepted_bytes in self.bytes_in:
177
out = compress(accepted_bytes)
181
out = compress(extra_bytes)
182
out += compressor.flush(Z_SYNC_FLUSH)
184
bytes_out_len = sum(map(len, bytes_out))
185
return bytes_out, bytes_out_len, compressor
187
def write(self, bytes, reserved=False):
188
"""Write some bytes to the chunk.
190
If the bytes fit, False is returned. Otherwise True is returned
191
and the bytes have not been added to the chunk.
193
:param bytes: The bytes to include
194
:param reserved: If True, we can use the space reserved in the
197
if self.num_repack > self._max_repack and not reserved:
198
self.unused_bytes = bytes
201
capacity = self.chunk_size
203
capacity = self.chunk_size - self.reserved_size
204
comp = self.compressor
206
# Check to see if the currently unflushed bytes would fit with a bit of
207
# room to spare, assuming no compression.
208
next_unflushed = self.unflushed_in_bytes + len(bytes)
209
remaining_capacity = capacity - self.bytes_out_len - 10
210
if (next_unflushed < remaining_capacity):
211
# looks like it will fit
212
out = comp.compress(bytes)
214
self.bytes_list.append(out)
215
self.bytes_out_len += len(out)
216
self.bytes_in.append(bytes)
217
self.unflushed_in_bytes += len(bytes)
219
# This may or may not fit, try to add it with Z_SYNC_FLUSH
220
# Note: It is tempting to do this as a look-ahead pass, and to
221
# 'copy()' the compressor before flushing. However, it seems
222
# that Which means that it is the same thing as increasing
223
# repack, similar cost, same benefit. And this way we still
224
# have the 'repack' knob that can be adjusted, and not depend
225
# on a platform-specific 'copy()' function.
227
if self._max_repack == 0 and self.num_zsync > self._max_zsync:
229
self.unused_bytes = bytes
231
out = comp.compress(bytes)
232
out += comp.flush(Z_SYNC_FLUSH)
233
self.unflushed_in_bytes = 0
235
self.bytes_list.append(out)
236
self.bytes_out_len += len(out)
238
# We are a bit extra conservative, because it seems that you *can*
239
# get better compression with Z_SYNC_FLUSH than a full compress. It
240
# is probably very rare, but we were able to trigger it.
241
if self.num_repack == 0:
245
if self.bytes_out_len + safety_margin <= capacity:
246
# It fit, so mark it added
247
self.bytes_in.append(bytes)
249
# We are over budget, try to squeeze this in without any
252
(bytes_out, this_len,
253
compressor) = self._recompress_all_bytes_in(bytes)
254
if self.num_repack >= self._max_repack:
255
# When we get *to* _max_repack, bump over so that the
256
# earlier > _max_repack will be triggered.
258
if this_len + 10 > capacity:
259
(bytes_out, this_len,
260
compressor) = self._recompress_all_bytes_in()
261
self.compressor = compressor
262
# Force us to not allow more data
263
self.num_repack = self._max_repack + 1
264
self.bytes_list = bytes_out
265
self.bytes_out_len = this_len
266
self.unused_bytes = bytes
269
# This fits when we pack it tighter, so use the new packing
270
self.compressor = compressor
271
self.bytes_in.append(bytes)
272
self.bytes_list = bytes_out
273
self.bytes_out_len = this_len