21
21
from zlib import Z_FINISH, Z_SYNC_FLUSH
23
# [max_repack, buffer_full, repacks_with_space, min_compression,
24
# total_bytes_in, total_bytes_out, avg_comp]
25
_stats = [0, 0, 0, 999, 0, 0, 0]
25
27
class ChunkWriter(object):
26
28
"""ChunkWriter allows writing of compressed data with a fixed size.
54
56
3 60.3 13.5 3407 197
55
57
4 66.7 13.4 3203 2154
56
58
20 69.3 13.4 0 3380
58
:cvar _default_min_compression_size: The expected minimum compression.
59
While packing nodes into the page, we won't Z_SYNC_FLUSH until we have
60
received this much input data. This saves time, because we don't bloat
61
the result with SYNC entries (and then need to repack), but if it is
62
set too high we will accept data that will never fit and trigger a
67
_default_min_compression_size = 1.8
69
63
def __init__(self, chunk_size, reserved=0):
70
64
"""Create a ChunkWriter to write chunk_size chunks.
81
75
self.bytes_out_len = 0
82
76
self.compressed = None
83
77
self.seen_bytes = 0
78
# bytes that have been seen, but not included in a flush to out yet
79
self.unflushed_in_bytes = 0
84
80
self.num_repack = 0
81
self.done = False # We will accept no more bytes
85
82
self.unused_bytes = None
86
83
self.reserved_size = reserved
87
self.min_compress_size = self._default_min_compression_size
90
86
"""Finish the chunk.
96
92
out = self.compressor.flush(Z_FINISH)
97
93
self.bytes_list.append(out)
98
94
self.bytes_out_len += len(out)
95
if self.num_repack > 0 and self.bytes_out_len > 0:
96
comp = float(self.seen_bytes) / self.bytes_out_len
99
_stats[4] += self.seen_bytes
100
_stats[5] += self.bytes_out_len
101
_stats[6] = float(_stats[4]) / _stats[5]
99
103
if self.bytes_out_len > self.chunk_size:
100
104
raise AssertionError('Somehow we ended up with too much'
101
105
' compressed data, %d > %d'
134
138
out = compress(extra_bytes)
135
out += compressor.flush(Z_SYNC_FLUSH)
141
append(compressor.flush(Z_SYNC_FLUSH))
138
142
bytes_out_len = sum(map(len, bytes_out))
139
143
return bytes_out, bytes_out_len, compressor
144
148
If the bytes fit, False is returned. Otherwise True is returned
145
149
and the bytes have not been added to the chunk.
151
if self.num_repack > self._max_repack and not reserved:
152
self.unused_bytes = bytes
148
155
capacity = self.chunk_size
150
157
capacity = self.chunk_size - self.reserved_size
151
# Check quickly to see if this is likely to put us outside of our
153
next_seen_size = self.seen_bytes + len(bytes)
154
158
comp = self.compressor
155
if (next_seen_size < self.min_compress_size * capacity):
156
# No need, we assume this will "just fit"
159
# Check to see if the currently unflushed bytes would fit with a bit of
160
# room to spare, assuming no compression.
161
next_unflushed = self.unflushed_in_bytes + len(bytes)
162
remaining_capacity = capacity - self.bytes_out_len - 10
163
if (next_unflushed < remaining_capacity):
164
# Yes, just push it in, assuming it will fit
157
165
out = comp.compress(bytes)
159
167
self.bytes_list.append(out)
160
168
self.bytes_out_len += len(out)
161
169
self.bytes_in.append(bytes)
162
self.seen_bytes = next_seen_size
170
self.seen_bytes += len(bytes)
171
self.unflushed_in_bytes += len(bytes)
164
if self.num_repack > self._max_repack and not reserved:
165
self.unused_bytes = bytes
167
173
# This may or may not fit, try to add it with Z_SYNC_FLUSH
168
174
out = comp.compress(bytes)
169
175
out += comp.flush(Z_SYNC_FLUSH)
176
self.unflushed_in_bytes = 0
171
178
self.bytes_list.append(out)
172
179
self.bytes_out_len += len(out)
173
180
if self.bytes_out_len + 10 <= capacity:
174
181
# It fit, so mark it added
175
182
self.bytes_in.append(bytes)
176
self.seen_bytes = next_seen_size
183
self.seen_bytes += len(bytes)
178
185
# We are over budget, try to squeeze this in without any
179
186
# Z_SYNC_FLUSH calls
186
193
self.num_repack += 1
188
195
if this_len + 10 > capacity:
189
# In real-world testing, this only happens when _max_repack
190
# is set >2, and even then rarely (46 out of 1022)
191
196
(bytes_out, this_len,
192
197
compressor) = self._recompress_all_bytes_in()
194
199
self.compressor = compressor
200
# Force us to not allow more data
201
self.num_repack = self._max_repack + 1
195
202
self.bytes_list = bytes_out
196
203
self.bytes_out_len = this_len
197
204
self.unused_bytes = bytes