~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/chunk_writer.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2008-08-28 03:01:04 UTC
  • mfrom: (3641.5.19 btree)
  • Revision ID: pqm@pqm.ubuntu.com-20080828030104-6a87mmhafprj1prs
(jam) Updates to the btree indexing code.

Show diffs side-by-side

added added

removed removed

Lines of Context:
36
36
        will sometimes start over and compress the whole list to get tighter
37
37
        packing. We get diminishing returns after a while, so this limits the
38
38
        number of times we will try.
39
 
        In testing, some values for bzr.dev::
40
 
 
41
 
                    w/o copy    w/ copy     w/ copy ins w/ copy & save
42
 
            repack  time  MB    time  MB    time  MB    time  MB
43
 
             1       8.8  5.1    8.9  5.1    9.6  4.4   12.5  4.1
44
 
             2       9.6  4.4   10.1  4.3   10.4  4.2   11.1  4.1
45
 
             3      10.6  4.2   11.1  4.1   11.2  4.1   11.3  4.1
46
 
             4      12.0  4.1
47
 
             5      12.6  4.1
48
 
            20      12.9  4.1   12.2  4.1   12.3  4.1
49
 
 
50
 
        In testing, some values for mysql-unpacked::
51
 
 
52
 
                    w/o copy    w/ copy     w/ copy ins w/ copy & save
53
 
            repack  time  MB    time  MB    time  MB    time  MB
54
 
             1      56.6  16.9              60.7  14.2
55
 
             2      59.3  14.1              62.6  13.5  64.3  13.4
56
 
             3      64.4  13.5
57
 
            20      73.4  13.4
58
 
 
59
 
    :cvar _default_min_compression_size: The expected minimum compression.
60
 
        While packing nodes into the page, we won't Z_SYNC_FLUSH until we have
61
 
        received this much input data. This saves time, because we don't bloat
62
 
        the result with SYNC entries (and then need to repack), but if it is
63
 
        set too high we will accept data that will never fit and trigger a
64
 
        fault later.
 
39
        The default is to try to avoid recompressing entirely, but setting this
 
40
        to something like 20 will give maximum compression.
 
41
 
 
42
    :cvar _max_zsync: Another tunable nob. If _max_repack is set to 0, then you
 
43
        can limit the number of times we will try to pack more data into a
 
44
        node. This allows us to do a single compression pass, rather than
 
45
        trying until we overflow, and then recompressing again.
65
46
    """
66
 
 
67
 
    _max_repack = 2
68
 
    _default_min_compression_size = 1.8
 
47
    #    In testing, some values for bzr.dev::
 
48
    #        repack  time  MB   max   full
 
49
    #         1       7.5  4.6  1140  0
 
50
    #         2       8.4  4.2  1036  1          6.8
 
51
    #         3       9.8  4.1  1012  278
 
52
    #         4      10.8  4.1  728   945
 
53
    #        20      11.1  4.1  0     1012
 
54
    #        repack = 0
 
55
    #        zsync   time  MB    repack  max_z   time w/ add_node
 
56
    #         0       6.7  24.7  0       6270    5.0
 
57
    #         1       6.5  13.2  0       3342    4.3
 
58
    #         2       6.6   9.6  0       2414    4.9
 
59
    #         5       6.5   6.2  0       1549    4.8
 
60
    #         6       6.5   5.8  1       1435    4.8
 
61
    #         7       6.6   5.5  19      1337    4.8
 
62
    #         8       6.7   5.3  81      1220    4.4
 
63
    #        10       6.8   5.0  260     967     5.3
 
64
    #        11       6.8   4.9  366     839     5.3
 
65
    #        12       6.9   4.8  454     731     5.1
 
66
    #        15       7.2   4.7  704     450     5.8
 
67
    #        20       7.7   4.6  1133    7       5.8
 
68
 
 
69
    #    In testing, some values for mysql-unpacked::
 
70
    #                next_bytes estim
 
71
    #        repack  time  MB    hit_max full
 
72
    #         1      51.7  15.4  3913  0
 
73
    #         2      54.4  13.7  3467  0         35.4
 
74
    #        20      67.0  13.4  0     3380      46.7
 
75
    #        repack=0
 
76
    #        zsync                               time w/ add_node
 
77
    #         0      47.7 116.5  0       29782   29.5
 
78
    #         1      48.5  60.2  0       15356   27.8
 
79
    #         2      48.1  42.4  0       10822   27.8
 
80
    #         5      48.3  25.5  0       6491    26.8
 
81
    #         6      48.0  23.2  13      5896    27.3
 
82
    #         7      48.1  21.6  29      5451    27.5
 
83
    #         8      48.1  20.3  52      5108    27.1
 
84
    #        10      46.9  18.6  195     4526    29.4
 
85
    #        11      48.8  18.0  421     4143    29.2
 
86
    #        12      47.4  17.5  702     3738    28.0
 
87
    #        15      49.6  16.5  1223    2969    28.9
 
88
    #        20      48.9  15.7  2182    1810    29.6
 
89
    #        30            15.4  3891    23      31.4
 
90
 
 
91
    _max_repack = 0
 
92
    _max_zsync = 8
69
93
 
70
94
    def __init__(self, chunk_size, reserved=0):
71
95
        """Create a ChunkWriter to write chunk_size chunks.
73
97
        :param chunk_size: The total byte count to emit at the end of the
74
98
            chunk.
75
99
        :param reserved: How many bytes to allow for reserved data. reserved
76
 
            data space can only be written to via the write_reserved method.
 
100
            data space can only be written to via the write(..., reserved=True).
77
101
        """
78
102
        self.chunk_size = chunk_size
79
103
        self.compressor = zlib.compressobj()
80
104
        self.bytes_in = []
81
105
        self.bytes_list = []
82
106
        self.bytes_out_len = 0
83
 
        self.compressed = None
84
 
        self.seen_bytes = 0
 
107
        # bytes that have been seen, but not included in a flush to out yet
 
108
        self.unflushed_in_bytes = 0
85
109
        self.num_repack = 0
 
110
        self.num_zsync = 0
86
111
        self.unused_bytes = None
87
112
        self.reserved_size = reserved
88
 
        self.min_compress_size = self._default_min_compression_size
89
113
 
90
114
    def finish(self):
91
115
        """Finish the chunk.
92
116
 
93
117
        This returns the final compressed chunk, and either None, or the
94
118
        bytes that did not fit in the chunk.
 
119
 
 
120
        :return: (compressed_bytes, unused_bytes, num_nulls_needed)
 
121
            compressed_bytes    a list of bytes that were output from the
 
122
                                compressor. If the compressed length was not
 
123
                                exactly chunk_size, the final string will be a
 
124
                                string of all null bytes to pad this to
 
125
                                chunk_size
 
126
            unused_bytes        None, or the last bytes that were added, which
 
127
                                we could not fit.
 
128
            num_nulls_needed    How many nulls are padded at the end
95
129
        """
96
130
        self.bytes_in = None # Free the data cached so far, we don't need it
97
131
        out = self.compressor.flush(Z_FINISH)
98
132
        self.bytes_list.append(out)
99
133
        self.bytes_out_len += len(out)
 
134
 
100
135
        if self.bytes_out_len > self.chunk_size:
101
136
            raise AssertionError('Somehow we ended up with too much'
102
137
                                 ' compressed data, %d > %d'
103
138
                                 % (self.bytes_out_len, self.chunk_size))
104
 
        nulls_needed = self.chunk_size - self.bytes_out_len % self.chunk_size
 
139
        nulls_needed = self.chunk_size - self.bytes_out_len
105
140
        if nulls_needed:
106
141
            self.bytes_list.append("\x00" * nulls_needed)
107
142
        return self.bytes_list, self.unused_bytes, nulls_needed
109
144
    def _recompress_all_bytes_in(self, extra_bytes=None):
110
145
        """Recompress the current bytes_in, and optionally more.
111
146
 
112
 
        :param extra_bytes: Optional, if supplied we will try to add it with
 
147
        :param extra_bytes: Optional, if supplied we will add it with
113
148
            Z_SYNC_FLUSH
114
 
        :return: (bytes_out, compressor, alt_compressed)
 
149
        :return: (bytes_out, bytes_out_len, alt_compressed)
115
150
            bytes_out   is the compressed bytes returned from the compressor
 
151
            bytes_out_len the length of the compressed output
116
152
            compressor  An object with everything packed in so far, and
117
153
                        Z_SYNC_FLUSH called.
118
 
            alt_compressed  If the compressor supports copy(), then this is a
119
 
                            snapshot just before extra_bytes is added.
120
 
                            It is (bytes_out, compressor) as well.
121
 
                            The idea is if you find you cannot fit the new
122
 
                            bytes, you don't have to start over.
123
 
                            And if you *can* you don't have to Z_SYNC_FLUSH
124
 
                            yet.
125
154
        """
126
155
        compressor = zlib.compressobj()
127
156
        bytes_out = []
134
163
        if extra_bytes:
135
164
            out = compress(extra_bytes)
136
165
            out += compressor.flush(Z_SYNC_FLUSH)
137
 
            if out:
138
 
                append(out)
 
166
            append(out)
139
167
        bytes_out_len = sum(map(len, bytes_out))
140
168
        return bytes_out, bytes_out_len, compressor
141
169
 
144
172
 
145
173
        If the bytes fit, False is returned. Otherwise True is returned
146
174
        and the bytes have not been added to the chunk.
 
175
 
 
176
        :param bytes: The bytes to include
 
177
        :param reserved: If True, we can use the space reserved in the
 
178
            constructor.
147
179
        """
 
180
        if self.num_repack > self._max_repack and not reserved:
 
181
            self.unused_bytes = bytes
 
182
            return True
148
183
        if reserved:
149
184
            capacity = self.chunk_size
150
185
        else:
151
186
            capacity = self.chunk_size - self.reserved_size
152
 
        # Check quickly to see if this is likely to put us outside of our
153
 
        # budget:
154
 
        next_seen_size = self.seen_bytes + len(bytes)
155
187
        comp = self.compressor
156
 
        if (next_seen_size < self.min_compress_size * capacity):
157
 
            # No need, we assume this will "just fit"
 
188
 
 
189
        # Check to see if the currently unflushed bytes would fit with a bit of
 
190
        # room to spare, assuming no compression.
 
191
        next_unflushed = self.unflushed_in_bytes + len(bytes)
 
192
        remaining_capacity = capacity - self.bytes_out_len - 10
 
193
        if (next_unflushed < remaining_capacity):
 
194
            # looks like it will fit
158
195
            out = comp.compress(bytes)
159
196
            if out:
160
197
                self.bytes_list.append(out)
161
198
                self.bytes_out_len += len(out)
162
199
            self.bytes_in.append(bytes)
163
 
            self.seen_bytes = next_seen_size
 
200
            self.unflushed_in_bytes += len(bytes)
164
201
        else:
165
 
            if self.num_repack >= self._max_repack and not reserved:
166
 
                # We already know we don't want to try to fit more
 
202
            # This may or may not fit, try to add it with Z_SYNC_FLUSH
 
203
            # Note: It is tempting to do this as a look-ahead pass, and to
 
204
            #       'copy()' the compressor before flushing. However, it seems
 
205
            #       that Which means that it is the same thing as increasing
 
206
            #       repack, similar cost, same benefit. And this way we still
 
207
            #       have the 'repack' knob that can be adjusted, and not depend
 
208
            #       on a platform-specific 'copy()' function.
 
209
            self.num_zsync += 1
 
210
            if self._max_repack == 0 and self.num_zsync > self._max_zsync:
 
211
                self.num_repack += 1
 
212
                self.unused_bytes = bytes
167
213
                return True
168
 
            # This may or may not fit, try to add it with Z_SYNC_FLUSH
169
214
            out = comp.compress(bytes)
170
215
            out += comp.flush(Z_SYNC_FLUSH)
 
216
            self.unflushed_in_bytes = 0
171
217
            if out:
172
218
                self.bytes_list.append(out)
173
219
                self.bytes_out_len += len(out)
174
 
            if self.bytes_out_len + 10 > capacity:
 
220
 
 
221
            # We are a bit extra conservative, because it seems that you *can*
 
222
            # get better compression with Z_SYNC_FLUSH than a full compress. It
 
223
            # is probably very rare, but we were able to trigger it.
 
224
            if self.num_repack == 0:
 
225
                safety_margin = 100
 
226
            else:
 
227
                safety_margin = 10
 
228
            if self.bytes_out_len + safety_margin <= capacity:
 
229
                # It fit, so mark it added
 
230
                self.bytes_in.append(bytes)
 
231
            else:
175
232
                # We are over budget, try to squeeze this in without any
176
233
                # Z_SYNC_FLUSH calls
177
234
                self.num_repack += 1
178
 
                bytes_out, this_len, compressor = self._recompress_all_bytes_in(bytes)
 
235
                (bytes_out, this_len,
 
236
                 compressor) = self._recompress_all_bytes_in(bytes)
 
237
                if self.num_repack >= self._max_repack:
 
238
                    # When we get *to* _max_repack, bump over so that the
 
239
                    # earlier > _max_repack will be triggered.
 
240
                    self.num_repack += 1
179
241
                if this_len + 10 > capacity:
180
 
                    # No way we can add anymore, we need to re-pack because our
181
 
                    # compressor is now out of sync.
182
 
                    # This seems to be rarely triggered over
183
 
                    #   num_repack > _max_repack
184
 
                    bytes_out, this_len, compressor = self._recompress_all_bytes_in()
 
242
                    (bytes_out, this_len,
 
243
                     compressor) = self._recompress_all_bytes_in()
185
244
                    self.compressor = compressor
 
245
                    # Force us to not allow more data
 
246
                    self.num_repack = self._max_repack + 1
186
247
                    self.bytes_list = bytes_out
187
248
                    self.bytes_out_len = this_len
188
249
                    self.unused_bytes = bytes
189
250
                    return True
190
251
                else:
191
252
                    # This fits when we pack it tighter, so use the new packing
192
 
                    # There is one Z_SYNC_FLUSH call in
193
 
                    # _recompress_all_bytes_in
194
253
                    self.compressor = compressor
195
254
                    self.bytes_in.append(bytes)
196
255
                    self.bytes_list = bytes_out
197
256
                    self.bytes_out_len = this_len
198
 
            else:
199
 
                # It fit, so mark it added
200
 
                self.bytes_in.append(bytes)
201
 
                self.seen_bytes = next_seen_size
202
257
        return False
203
258