1
# Copyright (C) 2008 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
"""ChunkWriter: write compressed data out with a fixed upper bound."""
21
from zlib import Z_FINISH, Z_SYNC_FLUSH
23
# [max_repack, buffer_full, repacks_with_space, min_compression,
24
# total_bytes_in, total_bytes_out, avg_comp]
25
_stats = [0, 0, 0, 999, 0, 0, 0]
27
class ChunkWriter(object):
28
"""ChunkWriter allows writing of compressed data with a fixed size.
30
If less data is supplied than fills a chunk, the chunk is padded with
31
NULL bytes. If more data is supplied, then the writer packs as much
32
in as it can, but never splits any item it was given.
34
The algorithm for packing is open to improvement! Current it is:
35
- write the bytes given
36
- if the total seen bytes so far exceeds the chunk size, flush.
38
:cvar _max_repack: To fit the maximum number of entries into a node, we
39
will sometimes start over and compress the whole list to get tighter
40
packing. We get diminishing returns after a while, so this limits the
41
number of times we will try.
42
In testing, some values for bzr.dev::
44
repack time MB hit_max_repack buffer_full
51
In testing, some values for mysql-unpacked::
53
repack time MB hit_max_repack buffer_full
63
def __init__(self, chunk_size, reserved=0):
64
"""Create a ChunkWriter to write chunk_size chunks.
66
:param chunk_size: The total byte count to emit at the end of the
68
:param reserved: How many bytes to allow for reserved data. reserved
69
data space can only be written to via the write_reserved method.
71
self.chunk_size = chunk_size
72
self.compressor = zlib.compressobj()
75
self.bytes_out_len = 0
76
self.compressed = None
78
# bytes that have been seen, but not included in a flush to out yet
79
self.unflushed_in_bytes = 0
81
self.done = False # We will accept no more bytes
82
self.unused_bytes = None
83
self.reserved_size = reserved
88
This returns the final compressed chunk, and either None, or the
89
bytes that did not fit in the chunk.
91
self.bytes_in = None # Free the data cached so far, we don't need it
92
out = self.compressor.flush(Z_FINISH)
93
self.bytes_list.append(out)
94
self.bytes_out_len += len(out)
95
if self.num_repack > 0 and self.bytes_out_len > 0:
96
comp = float(self.seen_bytes) / self.bytes_out_len
99
_stats[4] += self.seen_bytes
100
_stats[5] += self.bytes_out_len
101
_stats[6] = float(_stats[4]) / _stats[5]
103
if self.bytes_out_len > self.chunk_size:
104
raise AssertionError('Somehow we ended up with too much'
105
' compressed data, %d > %d'
106
% (self.bytes_out_len, self.chunk_size))
107
nulls_needed = self.chunk_size - self.bytes_out_len % self.chunk_size
109
self.bytes_list.append("\x00" * nulls_needed)
110
return self.bytes_list, self.unused_bytes, nulls_needed
112
def _recompress_all_bytes_in(self, extra_bytes=None):
113
"""Recompress the current bytes_in, and optionally more.
115
:param extra_bytes: Optional, if supplied we will try to add it with
117
:return: (bytes_out, compressor, alt_compressed)
118
bytes_out is the compressed bytes returned from the compressor
119
compressor An object with everything packed in so far, and
121
alt_compressed If the compressor supports copy(), then this is a
122
snapshot just before extra_bytes is added.
123
It is (bytes_out, compressor) as well.
124
The idea is if you find you cannot fit the new
125
bytes, you don't have to start over.
126
And if you *can* you don't have to Z_SYNC_FLUSH
129
compressor = zlib.compressobj()
131
append = bytes_out.append
132
compress = compressor.compress
133
for accepted_bytes in self.bytes_in:
134
out = compress(accepted_bytes)
138
out = compress(extra_bytes)
141
append(compressor.flush(Z_SYNC_FLUSH))
142
bytes_out_len = sum(map(len, bytes_out))
143
return bytes_out, bytes_out_len, compressor
145
def write(self, bytes, reserved=False):
146
"""Write some bytes to the chunk.
148
If the bytes fit, False is returned. Otherwise True is returned
149
and the bytes have not been added to the chunk.
151
if self.num_repack > self._max_repack and not reserved:
152
self.unused_bytes = bytes
155
capacity = self.chunk_size
157
capacity = self.chunk_size - self.reserved_size
158
comp = self.compressor
159
# Check to see if the currently unflushed bytes would fit with a bit of
160
# room to spare, assuming no compression.
161
next_unflushed = self.unflushed_in_bytes + len(bytes)
162
remaining_capacity = capacity - self.bytes_out_len - 10
163
if (next_unflushed < remaining_capacity):
164
# Yes, just push it in, assuming it will fit
165
out = comp.compress(bytes)
167
self.bytes_list.append(out)
168
self.bytes_out_len += len(out)
169
self.bytes_in.append(bytes)
170
self.seen_bytes += len(bytes)
171
self.unflushed_in_bytes += len(bytes)
173
# This may or may not fit, try to add it with Z_SYNC_FLUSH
174
out = comp.compress(bytes)
175
out += comp.flush(Z_SYNC_FLUSH)
176
self.unflushed_in_bytes = 0
178
self.bytes_list.append(out)
179
self.bytes_out_len += len(out)
181
# We are a bit extra conservative, because it seems that you *can*
182
# get better compression with Z_SYNC_FLUSH than a full compress. It
183
# is probably very rare, but we were able to trigger it.
184
if self.bytes_out_len + 100 <= capacity:
185
# It fit, so mark it added
186
self.bytes_in.append(bytes)
187
self.seen_bytes += len(bytes)
189
# We are over budget, try to squeeze this in without any
192
(bytes_out, this_len,
193
compressor) = self._recompress_all_bytes_in(bytes)
194
if self.num_repack >= self._max_repack:
195
# When we get *to* _max_repack, bump over so that the
196
# earlier > _max_repack will be triggered.
199
if this_len + 10 > capacity:
200
(bytes_out, this_len,
201
compressor) = self._recompress_all_bytes_in()
203
self.compressor = compressor
204
# Force us to not allow more data
205
self.num_repack = self._max_repack + 1
206
self.bytes_list = bytes_out
207
self.bytes_out_len = this_len
208
self.unused_bytes = bytes
211
# This fits when we pack it tighter, so use the new packing
212
# There is one Z_SYNC_FLUSH call in
213
# _recompress_all_bytes_in
215
self.compressor = compressor
216
self.bytes_in.append(bytes)
217
self.bytes_list = bytes_out
218
self.bytes_out_len = this_len