~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/chunk_writer.py

  • Committer: John Arbash Meinel
  • Date: 2008-08-22 05:40:12 UTC
  • mto: This revision was merged to the branch mainline in revision 3653.
  • Revision ID: john@arbash-meinel.com-20080822054012-ikrwmq9nm2q4h6q8
(broken, but hopeful) Change the compact logic.
Instead of only paying attention to the total bytes read,
use the fact that we *know* some of that is already compressed well.
So instead, we just pay attention to the bytes that are added since
the last sync. This means we Z_SYNC_FLUSH much less often.
(After syncing, we end up with more room to add without syncing.)
This improves both the time to compress and the final compressed
size. Need to update tests with the new offsets.
Also, we seem to have found a case where using Z_SYNC_FLUSH
in the middle of a stream will actually generate *better*
compression than compressing the whole stream in one pass.
test_too_much_data_does_not_exceed_size triggers this.
It *can* be packed in more than 100 bytes less than the
amount given by a full compress.

Show diffs side-by-side

added added

removed removed

Lines of Context:
20
20
import zlib
21
21
from zlib import Z_FINISH, Z_SYNC_FLUSH
22
22
 
23
 
_stats = [0, 0, 0]
 
23
# [max_repack, buffer_full, repacks_with_space, min_compression,
 
24
#  total_bytes_in, total_bytes_out, avg_comp]
 
25
_stats = [0, 0, 0, 999, 0, 0, 0]
24
26
 
25
27
class ChunkWriter(object):
26
28
    """ChunkWriter allows writing of compressed data with a fixed size.
54
56
             3      60.3  13.5  3407            197
55
57
             4      66.7  13.4  3203            2154
56
58
            20      69.3  13.4  0               3380
57
 
 
58
 
    :cvar _default_min_compression_size: The expected minimum compression.
59
 
        While packing nodes into the page, we won't Z_SYNC_FLUSH until we have
60
 
        received this much input data. This saves time, because we don't bloat
61
 
        the result with SYNC entries (and then need to repack), but if it is
62
 
        set too high we will accept data that will never fit and trigger a
63
 
        fault later.
64
59
    """
65
60
 
66
61
    _max_repack = 2
67
 
    _default_min_compression_size = 1.8
68
62
 
69
63
    def __init__(self, chunk_size, reserved=0):
70
64
        """Create a ChunkWriter to write chunk_size chunks.
81
75
        self.bytes_out_len = 0
82
76
        self.compressed = None
83
77
        self.seen_bytes = 0
 
78
        # bytes that have been seen, but not included in a flush to out yet
 
79
        self.unflushed_in_bytes = 0
84
80
        self.num_repack = 0
 
81
        self.done = False # We will accept no more bytes
85
82
        self.unused_bytes = None
86
83
        self.reserved_size = reserved
87
 
        self.min_compress_size = self._default_min_compression_size
88
84
 
89
85
    def finish(self):
90
86
        """Finish the chunk.
96
92
        out = self.compressor.flush(Z_FINISH)
97
93
        self.bytes_list.append(out)
98
94
        self.bytes_out_len += len(out)
 
95
        if self.num_repack > 0 and self.bytes_out_len > 0:
 
96
            comp = float(self.seen_bytes) / self.bytes_out_len
 
97
            if comp < _stats[3]:
 
98
                _stats[3] = comp
 
99
        _stats[4] += self.seen_bytes
 
100
        _stats[5] += self.bytes_out_len
 
101
        _stats[6] = float(_stats[4]) / _stats[5]
 
102
 
99
103
        if self.bytes_out_len > self.chunk_size:
100
104
            raise AssertionError('Somehow we ended up with too much'
101
105
                                 ' compressed data, %d > %d'
132
136
                append(out)
133
137
        if extra_bytes:
134
138
            out = compress(extra_bytes)
135
 
            out += compressor.flush(Z_SYNC_FLUSH)
136
139
            if out:
137
140
                append(out)
 
141
        append(compressor.flush(Z_SYNC_FLUSH))
138
142
        bytes_out_len = sum(map(len, bytes_out))
139
143
        return bytes_out, bytes_out_len, compressor
140
144
 
144
148
        If the bytes fit, False is returned. Otherwise True is returned
145
149
        and the bytes have not been added to the chunk.
146
150
        """
 
151
        if self.num_repack > self._max_repack and not reserved:
 
152
            self.unused_bytes = bytes
 
153
            return True
147
154
        if reserved:
148
155
            capacity = self.chunk_size
149
156
        else:
150
157
            capacity = self.chunk_size - self.reserved_size
151
 
        # Check quickly to see if this is likely to put us outside of our
152
 
        # budget:
153
 
        next_seen_size = self.seen_bytes + len(bytes)
154
158
        comp = self.compressor
155
 
        if (next_seen_size < self.min_compress_size * capacity):
156
 
            # No need, we assume this will "just fit"
 
159
        # Check to see if the currently unflushed bytes would fit with a bit of
 
160
        # room to spare, assuming no compression.
 
161
        next_unflushed = self.unflushed_in_bytes + len(bytes)
 
162
        remaining_capacity = capacity - self.bytes_out_len - 10
 
163
        if (next_unflushed < remaining_capacity):
 
164
            # Yes, just push it in, assuming it will fit
157
165
            out = comp.compress(bytes)
158
166
            if out:
159
167
                self.bytes_list.append(out)
160
168
                self.bytes_out_len += len(out)
161
169
            self.bytes_in.append(bytes)
162
 
            self.seen_bytes = next_seen_size
 
170
            self.seen_bytes += len(bytes)
 
171
            self.unflushed_in_bytes += len(bytes)
163
172
        else:
164
 
            if self.num_repack > self._max_repack and not reserved:
165
 
                self.unused_bytes = bytes
166
 
                return True
167
173
            # This may or may not fit, try to add it with Z_SYNC_FLUSH
168
174
            out = comp.compress(bytes)
169
175
            out += comp.flush(Z_SYNC_FLUSH)
 
176
            self.unflushed_in_bytes = 0
170
177
            if out:
171
178
                self.bytes_list.append(out)
172
179
                self.bytes_out_len += len(out)
173
180
            if self.bytes_out_len + 10 <= capacity:
174
181
                # It fit, so mark it added
175
182
                self.bytes_in.append(bytes)
176
 
                self.seen_bytes = next_seen_size
 
183
                self.seen_bytes += len(bytes)
177
184
            else:
178
185
                # We are over budget, try to squeeze this in without any
179
186
                # Z_SYNC_FLUSH calls
186
193
                    self.num_repack += 1
187
194
                    _stats[0] += 1
188
195
                if this_len + 10 > capacity:
189
 
                    # In real-world testing, this only happens when _max_repack
190
 
                    # is set >2, and even then rarely (46 out of 1022)
191
196
                    (bytes_out, this_len,
192
197
                     compressor) = self._recompress_all_bytes_in()
193
198
                    _stats[1] += 1
194
199
                    self.compressor = compressor
 
200
                    # Force us to not allow more data
 
201
                    self.num_repack = self._max_repack + 1
195
202
                    self.bytes_list = bytes_out
196
203
                    self.bytes_out_len = this_len
197
204
                    self.unused_bytes = bytes