39
36
will sometimes start over and compress the whole list to get tighter
40
37
packing. We get diminishing returns after a while, so this limits the
41
38
number of times we will try.
42
In testing, some values for bzr.dev::
44
repack time MB hit_max_repack buffer_full
51
In testing, some values for mysql-unpacked::
53
repack time MB hit_max_repack buffer_full
39
The default is to try to avoid recompressing entirely, but setting this
40
to something like 20 will give maximum compression.
42
:cvar _max_zsync: Another tunable nob. If _max_repack is set to 0, then you
43
can limit the number of times we will try to pack more data into a
44
node. This allows us to do a single compression pass, rather than
45
trying until we overflow, and then recompressing again.
63
def __init__(self, chunk_size, reserved=0):
47
# In testing, some values for bzr.dev::
48
# repack time MB max full
55
# zsync time MB repack stop_for_z
69
# In testing, some values for mysql-unpacked::
71
# repack time MB full stop_for_repack
77
# 0 29.5 116.5 0 29782
84
# 10 29.4 18.6 195 4526
85
# 11 29.2 18.0 421 4143
86
# 12 28.0 17.5 702 3738
87
# 15 28.9 16.5 1223 2969
88
# 20 29.6 15.7 2182 1810
89
# 30 31.4 15.4 3891 23
91
# Tuple of (num_repack_attempts, num_zsync_attempts)
92
# num_zsync_attempts only has meaning if num_repack_attempts is 0.
93
_repack_opts_for_speed = (0, 8)
94
_repack_opts_for_size = (20, 0)
96
def __init__(self, chunk_size, reserved=0, optimize_for_size=False):
64
97
"""Create a ChunkWriter to write chunk_size chunks.
66
99
:param chunk_size: The total byte count to emit at the end of the
68
101
:param reserved: How many bytes to allow for reserved data. reserved
69
data space can only be written to via the write_reserved method.
102
data space can only be written to via the write(..., reserved=True).
71
104
self.chunk_size = chunk_size
72
105
self.compressor = zlib.compressobj()
73
106
self.bytes_in = []
74
107
self.bytes_list = []
75
108
self.bytes_out_len = 0
76
self.compressed = None
78
109
# bytes that have been seen, but not included in a flush to out yet
79
110
self.unflushed_in_bytes = 0
80
111
self.num_repack = 0
81
self.done = False # We will accept no more bytes
82
113
self.unused_bytes = None
83
114
self.reserved_size = reserved
115
# Default is to make building fast rather than compact
116
self.set_optimize(for_size=optimize_for_size)
86
119
"""Finish the chunk.
88
121
This returns the final compressed chunk, and either None, or the
89
122
bytes that did not fit in the chunk.
124
:return: (compressed_bytes, unused_bytes, num_nulls_needed)
125
compressed_bytes a list of bytes that were output from the
126
compressor. If the compressed length was not
127
exactly chunk_size, the final string will be a
128
string of all null bytes to pad this to
130
unused_bytes None, or the last bytes that were added, which
132
num_nulls_needed How many nulls are padded at the end
91
134
self.bytes_in = None # Free the data cached so far, we don't need it
92
135
out = self.compressor.flush(Z_FINISH)
93
136
self.bytes_list.append(out)
94
137
self.bytes_out_len += len(out)
95
if self.num_repack > 0 and self.bytes_out_len > 0:
96
comp = float(self.seen_bytes) / self.bytes_out_len
99
_stats[4] += self.seen_bytes
100
_stats[5] += self.bytes_out_len
101
_stats[6] = float(_stats[4]) / _stats[5]
103
139
if self.bytes_out_len > self.chunk_size:
104
140
raise AssertionError('Somehow we ended up with too much'
105
141
' compressed data, %d > %d'
106
142
% (self.bytes_out_len, self.chunk_size))
107
nulls_needed = self.chunk_size - self.bytes_out_len % self.chunk_size
143
nulls_needed = self.chunk_size - self.bytes_out_len
109
145
self.bytes_list.append("\x00" * nulls_needed)
110
146
return self.bytes_list, self.unused_bytes, nulls_needed
148
def set_optimize(self, for_size=True):
149
"""Change how we optimize our writes.
151
:param for_size: If True, optimize for minimum space usage, otherwise
152
optimize for fastest writing speed.
156
opts = ChunkWriter._repack_opts_for_size
158
opts = ChunkWriter._repack_opts_for_speed
159
self._max_repack, self._max_zsync = opts
112
161
def _recompress_all_bytes_in(self, extra_bytes=None):
113
162
"""Recompress the current bytes_in, and optionally more.
115
:param extra_bytes: Optional, if supplied we will try to add it with
164
:param extra_bytes: Optional, if supplied we will add it with
117
:return: (bytes_out, compressor, alt_compressed)
166
:return: (bytes_out, bytes_out_len, alt_compressed)
118
167
bytes_out is the compressed bytes returned from the compressor
168
bytes_out_len the length of the compressed output
119
169
compressor An object with everything packed in so far, and
120
170
Z_SYNC_FLUSH called.
121
alt_compressed If the compressor supports copy(), then this is a
122
snapshot just before extra_bytes is added.
123
It is (bytes_out, compressor) as well.
124
The idea is if you find you cannot fit the new
125
bytes, you don't have to start over.
126
And if you *can* you don't have to Z_SYNC_FLUSH
129
172
compressor = zlib.compressobj()
157
203
capacity = self.chunk_size - self.reserved_size
158
204
comp = self.compressor
159
206
# Check to see if the currently unflushed bytes would fit with a bit of
160
207
# room to spare, assuming no compression.
161
208
next_unflushed = self.unflushed_in_bytes + len(bytes)
162
209
remaining_capacity = capacity - self.bytes_out_len - 10
163
210
if (next_unflushed < remaining_capacity):
164
# Yes, just push it in, assuming it will fit
211
# looks like it will fit
165
212
out = comp.compress(bytes)
167
214
self.bytes_list.append(out)
168
215
self.bytes_out_len += len(out)
169
216
self.bytes_in.append(bytes)
170
self.seen_bytes += len(bytes)
171
217
self.unflushed_in_bytes += len(bytes)
173
219
# This may or may not fit, try to add it with Z_SYNC_FLUSH
220
# Note: It is tempting to do this as a look-ahead pass, and to
221
# 'copy()' the compressor before flushing. However, it seems
222
# that Which means that it is the same thing as increasing
223
# repack, similar cost, same benefit. And this way we still
224
# have the 'repack' knob that can be adjusted, and not depend
225
# on a platform-specific 'copy()' function.
227
if self._max_repack == 0 and self.num_zsync > self._max_zsync:
229
self.unused_bytes = bytes
174
231
out = comp.compress(bytes)
175
232
out += comp.flush(Z_SYNC_FLUSH)
176
233
self.unflushed_in_bytes = 0