~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/chunk_writer.py

  • Committer: Vincent Ladeuil
  • Date: 2010-04-12 16:41:03 UTC
  • mto: (5148.1.1 integration)
  • mto: This revision was merged to the branch mainline in revision 5151.
  • Revision ID: v.ladeuil+lp@free.fr-20100412164103-v157103xtwozjf7n
Failing tests for bug #519319.

* bzrlib/tests/blackbox/test_send.py:
(TestSendStrictMixin.assertSendSucceeds): We want to be able to
succeed with a warning.
(TestSendStrictWithChanges.test_send_default)
(TestSendStrictWithChanges.test_send_bogus_config_var_ignored):
Succeed with a warning.

* bzrlib/tests/blackbox/test_push.py:
(TestPushStrictMixin.assertPushSucceeds): We want to be able to
succeed with a warning.
(TestPushStrictWithChanges.test_push_default): By default we
succeed with a warning.
(TestPushStrictWithChanges.test_push_bogus_config_var_ignored):
Ignoring a conf variable also succeeds with a warning.

* bzrlib/tests/blackbox/test_dpush.py:
(TestDpushStrictMixin.assertPushSucceeds): We want to be able to
succeed with a warning.

Show diffs side-by-side

added added

removed removed

Lines of Context:
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
#
17
17
 
18
18
"""ChunkWriter: write compressed data out with a fixed upper bound."""
20
20
import zlib
21
21
from zlib import Z_FINISH, Z_SYNC_FLUSH
22
22
 
23
 
# [max_repack, buffer_full, repacks_with_space, min_compression,
24
 
#  total_bytes_in, total_bytes_out, avg_comp]
25
 
_stats = [0, 0, 0, 999, 0, 0, 0]
26
23
 
27
24
class ChunkWriter(object):
28
25
    """ChunkWriter allows writing of compressed data with a fixed size.
39
36
        will sometimes start over and compress the whole list to get tighter
40
37
        packing. We get diminishing returns after a while, so this limits the
41
38
        number of times we will try.
42
 
        In testing, some values for bzr.dev::
43
 
 
44
 
            repack  time  MB    hit_max_repack  buffer_full
45
 
             1       7.9  5.1   1268            0
46
 
             2       8.8  4.4   1069            0
47
 
             3       9.7  4.2   1022            46
48
 
             4      11.1  4.1   974             619
49
 
            20      11.9  4.1   0               1012
50
 
 
51
 
        In testing, some values for mysql-unpacked::
52
 
 
53
 
            repack  time  MB    hit_max_repack  buffer_full
54
 
             1      52.4  16.9  4295            0
55
 
             2      55.8  14.1  3561            0
56
 
             3      60.3  13.5  3407            197
57
 
             4      66.7  13.4  3203            2154
58
 
            20      69.3  13.4  0               3380
 
39
        The default is to try to avoid recompressing entirely, but setting this
 
40
        to something like 20 will give maximum compression.
 
41
 
 
42
    :cvar _max_zsync: Another tunable nob. If _max_repack is set to 0, then you
 
43
        can limit the number of times we will try to pack more data into a
 
44
        node. This allows us to do a single compression pass, rather than
 
45
        trying until we overflow, and then recompressing again.
59
46
    """
60
 
 
61
 
    _max_repack = 2
62
 
 
63
 
    def __init__(self, chunk_size, reserved=0):
 
47
    #    In testing, some values for bzr.dev::
 
48
    #        repack  time  MB   max   full
 
49
    #         1       7.5  4.6  1140  0
 
50
    #         2       8.4  4.2  1036  1
 
51
    #         3       9.8  4.1  1012  278
 
52
    #         4      10.8  4.1  728   945
 
53
    #        20      11.1  4.1  0     1012
 
54
    #        repack = 0
 
55
    #        zsync   time  MB    repack  stop_for_z
 
56
    #         0       5.0  24.7  0       6270
 
57
    #         1       4.3  13.2  0       3342
 
58
    #         2       4.9   9.6  0       2414
 
59
    #         5       4.8   6.2  0       1549
 
60
    #         6       4.8   5.8  1       1435
 
61
    #         7       4.8   5.5  19      1337
 
62
    #         8       4.4   5.3  81      1220
 
63
    #        10       5.3   5.0  260     967
 
64
    #        11       5.3   4.9  366     839
 
65
    #        12       5.1   4.8  454     731
 
66
    #        15       5.8   4.7  704     450
 
67
    #        20       5.8   4.6  1133    7
 
68
 
 
69
    #    In testing, some values for mysql-unpacked::
 
70
    #                next_bytes estim
 
71
    #        repack  time  MB    full    stop_for_repack
 
72
    #         1            15.4  0       3913
 
73
    #         2      35.4  13.7  0       346
 
74
    #        20      46.7  13.4  3380    0
 
75
    #        repack=0
 
76
    #        zsync                       stop_for_z
 
77
    #         0      29.5 116.5  0       29782
 
78
    #         1      27.8  60.2  0       15356
 
79
    #         2      27.8  42.4  0       10822
 
80
    #         5      26.8  25.5  0       6491
 
81
    #         6      27.3  23.2  13      5896
 
82
    #         7      27.5  21.6  29      5451
 
83
    #         8      27.1  20.3  52      5108
 
84
    #        10      29.4  18.6  195     4526
 
85
    #        11      29.2  18.0  421     4143
 
86
    #        12      28.0  17.5  702     3738
 
87
    #        15      28.9  16.5  1223    2969
 
88
    #        20      29.6  15.7  2182    1810
 
89
    #        30      31.4  15.4  3891    23
 
90
 
 
91
    # Tuple of (num_repack_attempts, num_zsync_attempts)
 
92
    # num_zsync_attempts only has meaning if num_repack_attempts is 0.
 
93
    _repack_opts_for_speed = (0, 8)
 
94
    _repack_opts_for_size = (20, 0)
 
95
 
 
96
    def __init__(self, chunk_size, reserved=0, optimize_for_size=False):
64
97
        """Create a ChunkWriter to write chunk_size chunks.
65
98
 
66
99
        :param chunk_size: The total byte count to emit at the end of the
67
100
            chunk.
68
101
        :param reserved: How many bytes to allow for reserved data. reserved
69
 
            data space can only be written to via the write_reserved method.
 
102
            data space can only be written to via the write(..., reserved=True).
70
103
        """
71
104
        self.chunk_size = chunk_size
72
105
        self.compressor = zlib.compressobj()
73
106
        self.bytes_in = []
74
107
        self.bytes_list = []
75
108
        self.bytes_out_len = 0
76
 
        self.compressed = None
77
 
        self.seen_bytes = 0
78
109
        # bytes that have been seen, but not included in a flush to out yet
79
110
        self.unflushed_in_bytes = 0
80
111
        self.num_repack = 0
81
 
        self.done = False # We will accept no more bytes
 
112
        self.num_zsync = 0
82
113
        self.unused_bytes = None
83
114
        self.reserved_size = reserved
 
115
        # Default is to make building fast rather than compact
 
116
        self.set_optimize(for_size=optimize_for_size)
84
117
 
85
118
    def finish(self):
86
119
        """Finish the chunk.
87
120
 
88
121
        This returns the final compressed chunk, and either None, or the
89
122
        bytes that did not fit in the chunk.
 
123
 
 
124
        :return: (compressed_bytes, unused_bytes, num_nulls_needed)
 
125
            compressed_bytes    a list of bytes that were output from the
 
126
                                compressor. If the compressed length was not
 
127
                                exactly chunk_size, the final string will be a
 
128
                                string of all null bytes to pad this to
 
129
                                chunk_size
 
130
            unused_bytes        None, or the last bytes that were added, which
 
131
                                we could not fit.
 
132
            num_nulls_needed    How many nulls are padded at the end
90
133
        """
91
134
        self.bytes_in = None # Free the data cached so far, we don't need it
92
135
        out = self.compressor.flush(Z_FINISH)
93
136
        self.bytes_list.append(out)
94
137
        self.bytes_out_len += len(out)
95
 
        if self.num_repack > 0 and self.bytes_out_len > 0:
96
 
            comp = float(self.seen_bytes) / self.bytes_out_len
97
 
            if comp < _stats[3]:
98
 
                _stats[3] = comp
99
 
        _stats[4] += self.seen_bytes
100
 
        _stats[5] += self.bytes_out_len
101
 
        _stats[6] = float(_stats[4]) / _stats[5]
102
138
 
103
139
        if self.bytes_out_len > self.chunk_size:
104
140
            raise AssertionError('Somehow we ended up with too much'
105
141
                                 ' compressed data, %d > %d'
106
142
                                 % (self.bytes_out_len, self.chunk_size))
107
 
        nulls_needed = self.chunk_size - self.bytes_out_len % self.chunk_size
 
143
        nulls_needed = self.chunk_size - self.bytes_out_len
108
144
        if nulls_needed:
109
145
            self.bytes_list.append("\x00" * nulls_needed)
110
146
        return self.bytes_list, self.unused_bytes, nulls_needed
111
147
 
 
148
    def set_optimize(self, for_size=True):
 
149
        """Change how we optimize our writes.
 
150
 
 
151
        :param for_size: If True, optimize for minimum space usage, otherwise
 
152
            optimize for fastest writing speed.
 
153
        :return: None
 
154
        """
 
155
        if for_size:
 
156
            opts = ChunkWriter._repack_opts_for_size
 
157
        else:
 
158
            opts = ChunkWriter._repack_opts_for_speed
 
159
        self._max_repack, self._max_zsync = opts
 
160
 
112
161
    def _recompress_all_bytes_in(self, extra_bytes=None):
113
162
        """Recompress the current bytes_in, and optionally more.
114
163
 
115
 
        :param extra_bytes: Optional, if supplied we will try to add it with
 
164
        :param extra_bytes: Optional, if supplied we will add it with
116
165
            Z_SYNC_FLUSH
117
 
        :return: (bytes_out, compressor, alt_compressed)
 
166
        :return: (bytes_out, bytes_out_len, alt_compressed)
118
167
            bytes_out   is the compressed bytes returned from the compressor
 
168
            bytes_out_len the length of the compressed output
119
169
            compressor  An object with everything packed in so far, and
120
170
                        Z_SYNC_FLUSH called.
121
 
            alt_compressed  If the compressor supports copy(), then this is a
122
 
                            snapshot just before extra_bytes is added.
123
 
                            It is (bytes_out, compressor) as well.
124
 
                            The idea is if you find you cannot fit the new
125
 
                            bytes, you don't have to start over.
126
 
                            And if you *can* you don't have to Z_SYNC_FLUSH
127
 
                            yet.
128
171
        """
129
172
        compressor = zlib.compressobj()
130
173
        bytes_out = []
136
179
                append(out)
137
180
        if extra_bytes:
138
181
            out = compress(extra_bytes)
139
 
            if out:
140
 
                append(out)
141
 
        append(compressor.flush(Z_SYNC_FLUSH))
 
182
            out += compressor.flush(Z_SYNC_FLUSH)
 
183
            append(out)
142
184
        bytes_out_len = sum(map(len, bytes_out))
143
185
        return bytes_out, bytes_out_len, compressor
144
186
 
147
189
 
148
190
        If the bytes fit, False is returned. Otherwise True is returned
149
191
        and the bytes have not been added to the chunk.
 
192
 
 
193
        :param bytes: The bytes to include
 
194
        :param reserved: If True, we can use the space reserved in the
 
195
            constructor.
150
196
        """
151
197
        if self.num_repack > self._max_repack and not reserved:
152
198
            self.unused_bytes = bytes
156
202
        else:
157
203
            capacity = self.chunk_size - self.reserved_size
158
204
        comp = self.compressor
 
205
 
159
206
        # Check to see if the currently unflushed bytes would fit with a bit of
160
207
        # room to spare, assuming no compression.
161
208
        next_unflushed = self.unflushed_in_bytes + len(bytes)
162
209
        remaining_capacity = capacity - self.bytes_out_len - 10
163
210
        if (next_unflushed < remaining_capacity):
164
 
            # Yes, just push it in, assuming it will fit
 
211
            # looks like it will fit
165
212
            out = comp.compress(bytes)
166
213
            if out:
167
214
                self.bytes_list.append(out)
168
215
                self.bytes_out_len += len(out)
169
216
            self.bytes_in.append(bytes)
170
 
            self.seen_bytes += len(bytes)
171
217
            self.unflushed_in_bytes += len(bytes)
172
218
        else:
173
219
            # This may or may not fit, try to add it with Z_SYNC_FLUSH
 
220
            # Note: It is tempting to do this as a look-ahead pass, and to
 
221
            #       'copy()' the compressor before flushing. However, it seems
 
222
            #       that Which means that it is the same thing as increasing
 
223
            #       repack, similar cost, same benefit. And this way we still
 
224
            #       have the 'repack' knob that can be adjusted, and not depend
 
225
            #       on a platform-specific 'copy()' function.
 
226
            self.num_zsync += 1
 
227
            if self._max_repack == 0 and self.num_zsync > self._max_zsync:
 
228
                self.num_repack += 1
 
229
                self.unused_bytes = bytes
 
230
                return True
174
231
            out = comp.compress(bytes)
175
232
            out += comp.flush(Z_SYNC_FLUSH)
176
233
            self.unflushed_in_bytes = 0
181
238
            # We are a bit extra conservative, because it seems that you *can*
182
239
            # get better compression with Z_SYNC_FLUSH than a full compress. It
183
240
            # is probably very rare, but we were able to trigger it.
184
 
            if self.bytes_out_len + 100 <= capacity:
 
241
            if self.num_repack == 0:
 
242
                safety_margin = 100
 
243
            else:
 
244
                safety_margin = 10
 
245
            if self.bytes_out_len + safety_margin <= capacity:
185
246
                # It fit, so mark it added
186
247
                self.bytes_in.append(bytes)
187
 
                self.seen_bytes += len(bytes)
188
248
            else:
189
249
                # We are over budget, try to squeeze this in without any
190
250
                # Z_SYNC_FLUSH calls
195
255
                    # When we get *to* _max_repack, bump over so that the
196
256
                    # earlier > _max_repack will be triggered.
197
257
                    self.num_repack += 1
198
 
                    _stats[0] += 1
199
258
                if this_len + 10 > capacity:
200
259
                    (bytes_out, this_len,
201
260
                     compressor) = self._recompress_all_bytes_in()
202
 
                    _stats[1] += 1
203
261
                    self.compressor = compressor
204
262
                    # Force us to not allow more data
205
263
                    self.num_repack = self._max_repack + 1
209
267
                    return True
210
268
                else:
211
269
                    # This fits when we pack it tighter, so use the new packing
212
 
                    # There is one Z_SYNC_FLUSH call in
213
 
                    # _recompress_all_bytes_in
214
 
                    _stats[2] += 1
215
270
                    self.compressor = compressor
216
271
                    self.bytes_in.append(bytes)
217
272
                    self.bytes_list = bytes_out