~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/chunk_writer.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2007-12-15 21:00:20 UTC
  • mfrom: (3113.5.1 test.174055)
  • Revision ID: pqm@pqm.ubuntu.com-20071215210020-m28kk1qmbcc9n6qs
(bialix) XFAIL test for #174055: can't run bzr info while dirstate
 is locked

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
 
#
17
 
 
18
 
"""ChunkWriter: write compressed data out with a fixed upper bound."""
19
 
 
20
 
import zlib
21
 
from zlib import Z_FINISH, Z_SYNC_FLUSH
22
 
 
23
 
# [max_repack, buffer_full, repacks_with_space, min_compression,
24
 
#  total_bytes_in, total_bytes_out, avg_comp]
25
 
_stats = [0, 0, 0, 999, 0, 0, 0]
26
 
 
27
 
class ChunkWriter(object):
28
 
    """ChunkWriter allows writing of compressed data with a fixed size.
29
 
 
30
 
    If less data is supplied than fills a chunk, the chunk is padded with
31
 
    NULL bytes. If more data is supplied, then the writer packs as much
32
 
    in as it can, but never splits any item it was given.
33
 
 
34
 
    The algorithm for packing is open to improvement! Current it is:
35
 
     - write the bytes given
36
 
     - if the total seen bytes so far exceeds the chunk size, flush.
37
 
 
38
 
    :cvar _max_repack: To fit the maximum number of entries into a node, we
39
 
        will sometimes start over and compress the whole list to get tighter
40
 
        packing. We get diminishing returns after a while, so this limits the
41
 
        number of times we will try.
42
 
        In testing, some values for bzr.dev::
43
 
 
44
 
            repack  time  MB    hit_max_repack  buffer_full
45
 
             1       7.9  5.1   1268            0
46
 
             2       8.8  4.4   1069            0
47
 
             3       9.7  4.2   1022            46
48
 
             4      11.1  4.1   974             619
49
 
            20      11.9  4.1   0               1012
50
 
 
51
 
        In testing, some values for mysql-unpacked::
52
 
 
53
 
            repack  time  MB    hit_max_repack  buffer_full
54
 
             1      52.4  16.9  4295            0
55
 
             2      55.8  14.1  3561            0
56
 
             3      60.3  13.5  3407            197
57
 
             4      66.7  13.4  3203            2154
58
 
            20      69.3  13.4  0               3380
59
 
    """
60
 
 
61
 
    _max_repack = 2
62
 
 
63
 
    def __init__(self, chunk_size, reserved=0):
64
 
        """Create a ChunkWriter to write chunk_size chunks.
65
 
 
66
 
        :param chunk_size: The total byte count to emit at the end of the
67
 
            chunk.
68
 
        :param reserved: How many bytes to allow for reserved data. reserved
69
 
            data space can only be written to via the write_reserved method.
70
 
        """
71
 
        self.chunk_size = chunk_size
72
 
        self.compressor = zlib.compressobj()
73
 
        self.bytes_in = []
74
 
        self.bytes_list = []
75
 
        self.bytes_out_len = 0
76
 
        self.compressed = None
77
 
        self.seen_bytes = 0
78
 
        # bytes that have been seen, but not included in a flush to out yet
79
 
        self.unflushed_in_bytes = 0
80
 
        self.num_repack = 0
81
 
        self.done = False # We will accept no more bytes
82
 
        self.unused_bytes = None
83
 
        self.reserved_size = reserved
84
 
 
85
 
    def finish(self):
86
 
        """Finish the chunk.
87
 
 
88
 
        This returns the final compressed chunk, and either None, or the
89
 
        bytes that did not fit in the chunk.
90
 
        """
91
 
        self.bytes_in = None # Free the data cached so far, we don't need it
92
 
        out = self.compressor.flush(Z_FINISH)
93
 
        self.bytes_list.append(out)
94
 
        self.bytes_out_len += len(out)
95
 
        if self.num_repack > 0 and self.bytes_out_len > 0:
96
 
            comp = float(self.seen_bytes) / self.bytes_out_len
97
 
            if comp < _stats[3]:
98
 
                _stats[3] = comp
99
 
        _stats[4] += self.seen_bytes
100
 
        _stats[5] += self.bytes_out_len
101
 
        _stats[6] = float(_stats[4]) / _stats[5]
102
 
 
103
 
        if self.bytes_out_len > self.chunk_size:
104
 
            raise AssertionError('Somehow we ended up with too much'
105
 
                                 ' compressed data, %d > %d'
106
 
                                 % (self.bytes_out_len, self.chunk_size))
107
 
        nulls_needed = self.chunk_size - self.bytes_out_len % self.chunk_size
108
 
        if nulls_needed:
109
 
            self.bytes_list.append("\x00" * nulls_needed)
110
 
        return self.bytes_list, self.unused_bytes, nulls_needed
111
 
 
112
 
    def _recompress_all_bytes_in(self, extra_bytes=None):
113
 
        """Recompress the current bytes_in, and optionally more.
114
 
 
115
 
        :param extra_bytes: Optional, if supplied we will try to add it with
116
 
            Z_SYNC_FLUSH
117
 
        :return: (bytes_out, compressor, alt_compressed)
118
 
            bytes_out   is the compressed bytes returned from the compressor
119
 
            compressor  An object with everything packed in so far, and
120
 
                        Z_SYNC_FLUSH called.
121
 
            alt_compressed  If the compressor supports copy(), then this is a
122
 
                            snapshot just before extra_bytes is added.
123
 
                            It is (bytes_out, compressor) as well.
124
 
                            The idea is if you find you cannot fit the new
125
 
                            bytes, you don't have to start over.
126
 
                            And if you *can* you don't have to Z_SYNC_FLUSH
127
 
                            yet.
128
 
        """
129
 
        compressor = zlib.compressobj()
130
 
        bytes_out = []
131
 
        append = bytes_out.append
132
 
        compress = compressor.compress
133
 
        for accepted_bytes in self.bytes_in:
134
 
            out = compress(accepted_bytes)
135
 
            if out:
136
 
                append(out)
137
 
        if extra_bytes:
138
 
            out = compress(extra_bytes)
139
 
            if out:
140
 
                append(out)
141
 
        append(compressor.flush(Z_SYNC_FLUSH))
142
 
        bytes_out_len = sum(map(len, bytes_out))
143
 
        return bytes_out, bytes_out_len, compressor
144
 
 
145
 
    def write(self, bytes, reserved=False):
146
 
        """Write some bytes to the chunk.
147
 
 
148
 
        If the bytes fit, False is returned. Otherwise True is returned
149
 
        and the bytes have not been added to the chunk.
150
 
        """
151
 
        if self.num_repack > self._max_repack and not reserved:
152
 
            self.unused_bytes = bytes
153
 
            return True
154
 
        if reserved:
155
 
            capacity = self.chunk_size
156
 
        else:
157
 
            capacity = self.chunk_size - self.reserved_size
158
 
        comp = self.compressor
159
 
        # Check to see if the currently unflushed bytes would fit with a bit of
160
 
        # room to spare, assuming no compression.
161
 
        next_unflushed = self.unflushed_in_bytes + len(bytes)
162
 
        remaining_capacity = capacity - self.bytes_out_len - 10
163
 
        if (next_unflushed < remaining_capacity):
164
 
            # Yes, just push it in, assuming it will fit
165
 
            out = comp.compress(bytes)
166
 
            if out:
167
 
                self.bytes_list.append(out)
168
 
                self.bytes_out_len += len(out)
169
 
            self.bytes_in.append(bytes)
170
 
            self.seen_bytes += len(bytes)
171
 
            self.unflushed_in_bytes += len(bytes)
172
 
        else:
173
 
            # This may or may not fit, try to add it with Z_SYNC_FLUSH
174
 
            out = comp.compress(bytes)
175
 
            out += comp.flush(Z_SYNC_FLUSH)
176
 
            self.unflushed_in_bytes = 0
177
 
            if out:
178
 
                self.bytes_list.append(out)
179
 
                self.bytes_out_len += len(out)
180
 
 
181
 
            # We are a bit extra conservative, because it seems that you *can*
182
 
            # get better compression with Z_SYNC_FLUSH than a full compress. It
183
 
            # is probably very rare, but we were able to trigger it.
184
 
            if self.bytes_out_len + 100 <= capacity:
185
 
                # It fit, so mark it added
186
 
                self.bytes_in.append(bytes)
187
 
                self.seen_bytes += len(bytes)
188
 
            else:
189
 
                # We are over budget, try to squeeze this in without any
190
 
                # Z_SYNC_FLUSH calls
191
 
                self.num_repack += 1
192
 
                (bytes_out, this_len,
193
 
                 compressor) = self._recompress_all_bytes_in(bytes)
194
 
                if self.num_repack >= self._max_repack:
195
 
                    # When we get *to* _max_repack, bump over so that the
196
 
                    # earlier > _max_repack will be triggered.
197
 
                    self.num_repack += 1
198
 
                    _stats[0] += 1
199
 
                if this_len + 10 > capacity:
200
 
                    (bytes_out, this_len,
201
 
                     compressor) = self._recompress_all_bytes_in()
202
 
                    _stats[1] += 1
203
 
                    self.compressor = compressor
204
 
                    # Force us to not allow more data
205
 
                    self.num_repack = self._max_repack + 1
206
 
                    self.bytes_list = bytes_out
207
 
                    self.bytes_out_len = this_len
208
 
                    self.unused_bytes = bytes
209
 
                    return True
210
 
                else:
211
 
                    # This fits when we pack it tighter, so use the new packing
212
 
                    # There is one Z_SYNC_FLUSH call in
213
 
                    # _recompress_all_bytes_in
214
 
                    _stats[2] += 1
215
 
                    self.compressor = compressor
216
 
                    self.bytes_in.append(bytes)
217
 
                    self.bytes_list = bytes_out
218
 
                    self.bytes_out_len = this_len
219
 
        return False
220