36
36
will sometimes start over and compress the whole list to get tighter
37
37
packing. We get diminishing returns after a while, so this limits the
38
38
number of times we will try.
39
In testing, some values for bzr.dev::
41
w/o copy w/ copy w/ copy ins w/ copy & save
42
repack time MB time MB time MB time MB
43
1 8.8 5.1 8.9 5.1 9.6 4.4 12.5 4.1
44
2 9.6 4.4 10.1 4.3 10.4 4.2 11.1 4.1
45
3 10.6 4.2 11.1 4.1 11.2 4.1 11.3 4.1
48
20 12.9 4.1 12.2 4.1 12.3 4.1
50
In testing, some values for mysql-unpacked::
52
w/o copy w/ copy w/ copy ins w/ copy & save
53
repack time MB time MB time MB time MB
55
2 59.3 14.1 62.6 13.5 64.3 13.4
59
:cvar _default_min_compression_size: The expected minimum compression.
60
While packing nodes into the page, we won't Z_SYNC_FLUSH until we have
61
received this much input data. This saves time, because we don't bloat
62
the result with SYNC entries (and then need to repack), but if it is
63
set too high we will accept data that will never fit and trigger a
39
The default is to try to avoid recompressing entirely, but setting this
40
to something like 20 will give maximum compression.
42
:cvar _max_zsync: Another tunable nob. If _max_repack is set to 0, then you
43
can limit the number of times we will try to pack more data into a
44
node. This allows us to do a single compression pass, rather than
45
trying until we overflow, and then recompressing again.
68
_default_min_compression_size = 1.8
47
# In testing, some values for bzr.dev::
48
# repack time MB max full
50
# 2 8.4 4.2 1036 1 6.8
55
# zsync time MB repack max_z time w/ add_node
56
# 0 6.7 24.7 0 6270 5.0
57
# 1 6.5 13.2 0 3342 4.3
58
# 2 6.6 9.6 0 2414 4.9
59
# 5 6.5 6.2 0 1549 4.8
60
# 6 6.5 5.8 1 1435 4.8
61
# 7 6.6 5.5 19 1337 4.8
62
# 8 6.7 5.3 81 1220 4.4
63
# 10 6.8 5.0 260 967 5.3
64
# 11 6.8 4.9 366 839 5.3
65
# 12 6.9 4.8 454 731 5.1
66
# 15 7.2 4.7 704 450 5.8
67
# 20 7.7 4.6 1133 7 5.8
69
# In testing, some values for mysql-unpacked::
71
# repack time MB hit_max full
73
# 2 54.4 13.7 3467 0 35.4
74
# 20 67.0 13.4 0 3380 46.7
76
# zsync time w/ add_node
77
# 0 47.7 116.5 0 29782 29.5
78
# 1 48.5 60.2 0 15356 27.8
79
# 2 48.1 42.4 0 10822 27.8
80
# 5 48.3 25.5 0 6491 26.8
81
# 6 48.0 23.2 13 5896 27.3
82
# 7 48.1 21.6 29 5451 27.5
83
# 8 48.1 20.3 52 5108 27.1
84
# 10 46.9 18.6 195 4526 29.4
85
# 11 48.8 18.0 421 4143 29.2
86
# 12 47.4 17.5 702 3738 28.0
87
# 15 49.6 16.5 1223 2969 28.9
88
# 20 48.9 15.7 2182 1810 29.6
89
# 30 15.4 3891 23 31.4
70
94
def __init__(self, chunk_size, reserved=0):
71
95
"""Create a ChunkWriter to write chunk_size chunks.
73
97
:param chunk_size: The total byte count to emit at the end of the
75
99
:param reserved: How many bytes to allow for reserved data. reserved
76
data space can only be written to via the write_reserved method.
100
data space can only be written to via the write(..., reserved=True).
78
102
self.chunk_size = chunk_size
79
103
self.compressor = zlib.compressobj()
80
104
self.bytes_in = []
81
105
self.bytes_list = []
82
106
self.bytes_out_len = 0
83
self.compressed = None
107
# bytes that have been seen, but not included in a flush to out yet
108
self.unflushed_in_bytes = 0
85
109
self.num_repack = 0
86
111
self.unused_bytes = None
87
112
self.reserved_size = reserved
88
self.min_compress_size = self._default_min_compression_size
91
115
"""Finish the chunk.
93
117
This returns the final compressed chunk, and either None, or the
94
118
bytes that did not fit in the chunk.
120
:return: (compressed_bytes, unused_bytes, num_nulls_needed)
121
compressed_bytes a list of bytes that were output from the
122
compressor. If the compressed length was not
123
exactly chunk_size, the final string will be a
124
string of all null bytes to pad this to
126
unused_bytes None, or the last bytes that were added, which
128
num_nulls_needed How many nulls are padded at the end
96
130
self.bytes_in = None # Free the data cached so far, we don't need it
97
131
out = self.compressor.flush(Z_FINISH)
98
132
self.bytes_list.append(out)
99
133
self.bytes_out_len += len(out)
100
135
if self.bytes_out_len > self.chunk_size:
101
136
raise AssertionError('Somehow we ended up with too much'
102
137
' compressed data, %d > %d'
103
138
% (self.bytes_out_len, self.chunk_size))
104
nulls_needed = self.chunk_size - self.bytes_out_len % self.chunk_size
139
nulls_needed = self.chunk_size - self.bytes_out_len
106
141
self.bytes_list.append("\x00" * nulls_needed)
107
142
return self.bytes_list, self.unused_bytes, nulls_needed
109
144
def _recompress_all_bytes_in(self, extra_bytes=None):
110
145
"""Recompress the current bytes_in, and optionally more.
112
:param extra_bytes: Optional, if supplied we will try to add it with
147
:param extra_bytes: Optional, if supplied we will add it with
114
:return: (bytes_out, compressor, alt_compressed)
149
:return: (bytes_out, bytes_out_len, alt_compressed)
115
150
bytes_out is the compressed bytes returned from the compressor
151
bytes_out_len the length of the compressed output
116
152
compressor An object with everything packed in so far, and
117
153
Z_SYNC_FLUSH called.
118
alt_compressed If the compressor supports copy(), then this is a
119
snapshot just before extra_bytes is added.
120
It is (bytes_out, compressor) as well.
121
The idea is if you find you cannot fit the new
122
bytes, you don't have to start over.
123
And if you *can* you don't have to Z_SYNC_FLUSH
126
155
compressor = zlib.compressobj()
145
173
If the bytes fit, False is returned. Otherwise True is returned
146
174
and the bytes have not been added to the chunk.
176
:param bytes: The bytes to include
177
:param reserved: If True, we can use the space reserved in the
180
if self.num_repack > self._max_repack and not reserved:
181
self.unused_bytes = bytes
149
184
capacity = self.chunk_size
151
186
capacity = self.chunk_size - self.reserved_size
152
# Check quickly to see if this is likely to put us outside of our
154
next_seen_size = self.seen_bytes + len(bytes)
155
187
comp = self.compressor
156
if (next_seen_size < self.min_compress_size * capacity):
157
# No need, we assume this will "just fit"
189
# Check to see if the currently unflushed bytes would fit with a bit of
190
# room to spare, assuming no compression.
191
next_unflushed = self.unflushed_in_bytes + len(bytes)
192
remaining_capacity = capacity - self.bytes_out_len - 10
193
if (next_unflushed < remaining_capacity):
194
# looks like it will fit
158
195
out = comp.compress(bytes)
160
197
self.bytes_list.append(out)
161
198
self.bytes_out_len += len(out)
162
199
self.bytes_in.append(bytes)
163
self.seen_bytes = next_seen_size
200
self.unflushed_in_bytes += len(bytes)
165
if self.num_repack >= self._max_repack and not reserved:
166
# We already know we don't want to try to fit more
202
# This may or may not fit, try to add it with Z_SYNC_FLUSH
203
# Note: It is tempting to do this as a look-ahead pass, and to
204
# 'copy()' the compressor before flushing. However, it seems
205
# that Which means that it is the same thing as increasing
206
# repack, similar cost, same benefit. And this way we still
207
# have the 'repack' knob that can be adjusted, and not depend
208
# on a platform-specific 'copy()' function.
210
if self._max_repack == 0 and self.num_zsync > self._max_zsync:
212
self.unused_bytes = bytes
168
# This may or may not fit, try to add it with Z_SYNC_FLUSH
169
214
out = comp.compress(bytes)
170
215
out += comp.flush(Z_SYNC_FLUSH)
216
self.unflushed_in_bytes = 0
172
218
self.bytes_list.append(out)
173
219
self.bytes_out_len += len(out)
174
if self.bytes_out_len + 10 > capacity:
221
# We are a bit extra conservative, because it seems that you *can*
222
# get better compression with Z_SYNC_FLUSH than a full compress. It
223
# is probably very rare, but we were able to trigger it.
224
if self.num_repack == 0:
228
if self.bytes_out_len + safety_margin <= capacity:
229
# It fit, so mark it added
230
self.bytes_in.append(bytes)
175
232
# We are over budget, try to squeeze this in without any
176
233
# Z_SYNC_FLUSH calls
177
234
self.num_repack += 1
178
bytes_out, this_len, compressor = self._recompress_all_bytes_in(bytes)
235
(bytes_out, this_len,
236
compressor) = self._recompress_all_bytes_in(bytes)
237
if self.num_repack >= self._max_repack:
238
# When we get *to* _max_repack, bump over so that the
239
# earlier > _max_repack will be triggered.
179
241
if this_len + 10 > capacity:
180
# No way we can add anymore, we need to re-pack because our
181
# compressor is now out of sync.
182
# This seems to be rarely triggered over
183
# num_repack > _max_repack
184
bytes_out, this_len, compressor = self._recompress_all_bytes_in()
242
(bytes_out, this_len,
243
compressor) = self._recompress_all_bytes_in()
185
244
self.compressor = compressor
245
# Force us to not allow more data
246
self.num_repack = self._max_repack + 1
186
247
self.bytes_list = bytes_out
187
248
self.bytes_out_len = this_len
188
249
self.unused_bytes = bytes
191
252
# This fits when we pack it tighter, so use the new packing
192
# There is one Z_SYNC_FLUSH call in
193
# _recompress_all_bytes_in
194
253
self.compressor = compressor
195
254
self.bytes_in.append(bytes)
196
255
self.bytes_list = bytes_out
197
256
self.bytes_out_len = this_len
199
# It fit, so mark it added
200
self.bytes_in.append(bytes)
201
self.seen_bytes = next_seen_size