1
# Copyright (C) 2008 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
"""ChunkWriter: write compressed data out with a fixed upper bound."""
21
from zlib import Z_FINISH, Z_SYNC_FLUSH
24
class ChunkWriter(object):
25
"""ChunkWriter allows writing of compressed data with a fixed size.
27
If less data is supplied than fills a chunk, the chunk is padded with
28
NULL bytes. If more data is supplied, then the writer packs as much
29
in as it can, but never splits any item it was given.
31
The algorithm for packing is open to improvement! Current it is:
32
- write the bytes given
33
- if the total seen bytes so far exceeds the chunk size, flush.
35
:cvar _max_repack: To fit the maximum number of entries into a node, we
36
will sometimes start over and compress the whole list to get tighter
37
packing. We get diminishing returns after a while, so this limits the
38
number of times we will try.
39
The default is to try to avoid recompressing entirely, but setting this
40
to something like 20 will give maximum compression.
42
:cvar _max_zsync: Another tunable nob. If _max_repack is set to 0, then you
43
can limit the number of times we will try to pack more data into a
44
node. This allows us to do a single compression pass, rather than
45
trying until we overflow, and then recompressing again.
47
# In testing, some values for bzr.dev::
48
# repack time MB max full
50
# 2 8.4 4.2 1036 1 6.8
55
# zsync time MB repack max_z time w/ add_node
56
# 0 6.7 24.7 0 6270 5.0
57
# 1 6.5 13.2 0 3342 4.3
58
# 2 6.6 9.6 0 2414 4.9
59
# 5 6.5 6.2 0 1549 4.8
60
# 6 6.5 5.8 1 1435 4.8
61
# 7 6.6 5.5 19 1337 4.8
62
# 8 6.7 5.3 81 1220 4.4
63
# 10 6.8 5.0 260 967 5.3
64
# 11 6.8 4.9 366 839 5.3
65
# 12 6.9 4.8 454 731 5.1
66
# 15 7.2 4.7 704 450 5.8
67
# 20 7.7 4.6 1133 7 5.8
69
# In testing, some values for mysql-unpacked::
71
# repack time MB hit_max full
73
# 2 54.4 13.7 3467 0 35.4
74
# 20 67.0 13.4 0 3380 46.7
76
# zsync time w/ add_node
77
# 0 47.7 116.5 0 29782 29.5
78
# 1 48.5 60.2 0 15356 27.8
79
# 2 48.1 42.4 0 10822 27.8
80
# 5 48.3 25.5 0 6491 26.8
81
# 6 48.0 23.2 13 5896 27.3
82
# 7 48.1 21.6 29 5451 27.5
83
# 8 48.1 20.3 52 5108 27.1
84
# 10 46.9 18.6 195 4526 29.4
85
# 11 48.8 18.0 421 4143 29.2
86
# 12 47.4 17.5 702 3738 28.0
87
# 15 49.6 16.5 1223 2969 28.9
88
# 20 48.9 15.7 2182 1810 29.6
89
# 30 15.4 3891 23 31.4
94
def __init__(self, chunk_size, reserved=0):
95
"""Create a ChunkWriter to write chunk_size chunks.
97
:param chunk_size: The total byte count to emit at the end of the
99
:param reserved: How many bytes to allow for reserved data. reserved
100
data space can only be written to via the write(..., reserved=True).
102
self.chunk_size = chunk_size
103
self.compressor = zlib.compressobj()
106
self.bytes_out_len = 0
107
# bytes that have been seen, but not included in a flush to out yet
108
self.unflushed_in_bytes = 0
111
self.unused_bytes = None
112
self.reserved_size = reserved
117
This returns the final compressed chunk, and either None, or the
118
bytes that did not fit in the chunk.
120
:return: (compressed_bytes, unused_bytes, num_nulls_needed)
121
compressed_bytes a list of bytes that were output from the
122
compressor. If the compressed length was not
123
exactly chunk_size, the final string will be a
124
string of all null bytes to pad this to
126
unused_bytes None, or the last bytes that were added, which
128
num_nulls_needed How many nulls are padded at the end
130
self.bytes_in = None # Free the data cached so far, we don't need it
131
out = self.compressor.flush(Z_FINISH)
132
self.bytes_list.append(out)
133
self.bytes_out_len += len(out)
135
if self.bytes_out_len > self.chunk_size:
136
raise AssertionError('Somehow we ended up with too much'
137
' compressed data, %d > %d'
138
% (self.bytes_out_len, self.chunk_size))
139
nulls_needed = self.chunk_size - self.bytes_out_len
141
self.bytes_list.append("\x00" * nulls_needed)
142
return self.bytes_list, self.unused_bytes, nulls_needed
144
def _recompress_all_bytes_in(self, extra_bytes=None):
145
"""Recompress the current bytes_in, and optionally more.
147
:param extra_bytes: Optional, if supplied we will add it with
149
:return: (bytes_out, bytes_out_len, alt_compressed)
150
bytes_out is the compressed bytes returned from the compressor
151
bytes_out_len the length of the compressed output
152
compressor An object with everything packed in so far, and
155
compressor = zlib.compressobj()
157
append = bytes_out.append
158
compress = compressor.compress
159
for accepted_bytes in self.bytes_in:
160
out = compress(accepted_bytes)
164
out = compress(extra_bytes)
165
out += compressor.flush(Z_SYNC_FLUSH)
167
bytes_out_len = sum(map(len, bytes_out))
168
return bytes_out, bytes_out_len, compressor
170
def write(self, bytes, reserved=False):
171
"""Write some bytes to the chunk.
173
If the bytes fit, False is returned. Otherwise True is returned
174
and the bytes have not been added to the chunk.
176
:param bytes: The bytes to include
177
:param reserved: If True, we can use the space reserved in the
180
if self.num_repack > self._max_repack and not reserved:
181
self.unused_bytes = bytes
184
capacity = self.chunk_size
186
capacity = self.chunk_size - self.reserved_size
187
comp = self.compressor
189
# Check to see if the currently unflushed bytes would fit with a bit of
190
# room to spare, assuming no compression.
191
next_unflushed = self.unflushed_in_bytes + len(bytes)
192
remaining_capacity = capacity - self.bytes_out_len - 10
193
if (next_unflushed < remaining_capacity):
194
# looks like it will fit
195
out = comp.compress(bytes)
197
self.bytes_list.append(out)
198
self.bytes_out_len += len(out)
199
self.bytes_in.append(bytes)
200
self.unflushed_in_bytes += len(bytes)
202
# This may or may not fit, try to add it with Z_SYNC_FLUSH
203
# Note: It is tempting to do this as a look-ahead pass, and to
204
# 'copy()' the compressor before flushing. However, it seems
205
# that Which means that it is the same thing as increasing
206
# repack, similar cost, same benefit. And this way we still
207
# have the 'repack' knob that can be adjusted, and not depend
208
# on a platform-specific 'copy()' function.
210
if self._max_repack == 0 and self.num_zsync > self._max_zsync:
212
self.unused_bytes = bytes
214
out = comp.compress(bytes)
215
out += comp.flush(Z_SYNC_FLUSH)
216
self.unflushed_in_bytes = 0
218
self.bytes_list.append(out)
219
self.bytes_out_len += len(out)
221
# We are a bit extra conservative, because it seems that you *can*
222
# get better compression with Z_SYNC_FLUSH than a full compress. It
223
# is probably very rare, but we were able to trigger it.
224
if self.num_repack == 0:
228
if self.bytes_out_len + safety_margin <= capacity:
229
# It fit, so mark it added
230
self.bytes_in.append(bytes)
232
# We are over budget, try to squeeze this in without any
235
(bytes_out, this_len,
236
compressor) = self._recompress_all_bytes_in(bytes)
237
if self.num_repack >= self._max_repack:
238
# When we get *to* _max_repack, bump over so that the
239
# earlier > _max_repack will be triggered.
241
if this_len + 10 > capacity:
242
(bytes_out, this_len,
243
compressor) = self._recompress_all_bytes_in()
244
self.compressor = compressor
245
# Force us to not allow more data
246
self.num_repack = self._max_repack + 1
247
self.bytes_list = bytes_out
248
self.bytes_out_len = this_len
249
self.unused_bytes = bytes
252
# This fits when we pack it tighter, so use the new packing
253
self.compressor = compressor
254
self.bytes_in.append(bytes)
255
self.bytes_list = bytes_out
256
self.bytes_out_len = this_len