~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/chunk_writer.py

  • Committer: John Arbash Meinel
  • Date: 2008-08-19 23:12:01 UTC
  • mto: This revision was merged to the branch mainline in revision 3644.
  • Revision ID: john@arbash-meinel.com-20080819231201-hc361se0f7okf08y
Bring in the btree_index and chunk_writer code and their tests.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# index2, a bzr plugin providing experimental index types.
 
2
# Copyright (C) 2008 Canonical Limited.
 
3
#
 
4
# This program is free software; you can redistribute it and/or modify
 
5
# it under the terms of the GNU General Public License version 2 as published
 
6
# by the Free Software Foundation.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
 
16
#
 
17
 
 
18
"""ChunkWriter: write compressed data out with a fixed upper bound."""
 
19
 
 
20
import zlib
 
21
from zlib import Z_FINISH, Z_SYNC_FLUSH
 
22
 
 
23
 
 
24
class ChunkWriter(object):
 
25
    """ChunkWriter allows writing of compressed data with a fixed size.
 
26
 
 
27
    If less data is supplied than fills a chunk, the chunk is padded with
 
28
    NULL bytes. If more data is supplied, then the writer packs as much
 
29
    in as it can, but never splits any item it was given.
 
30
 
 
31
    The algorithm for packing is open to improvement! Current it is:
 
32
     - write the bytes given
 
33
     - if the total seen bytes so far exceeds the chunk size, flush.
 
34
    """
 
35
 
 
36
    _max_repack = 2
 
37
 
 
38
    def __init__(self, chunk_size, reserved=0):
 
39
        """Create a ChunkWriter to write chunk_size chunks.
 
40
 
 
41
        :param chunk_size: The total byte count to emit at the end of the
 
42
            chunk.
 
43
        :param reserved: How many bytes to allow for reserved data. reserved
 
44
            data space can only be written to via the write_reserved method.
 
45
        """
 
46
        self.chunk_size = chunk_size
 
47
        self.compressor = zlib.compressobj()
 
48
        self.bytes_in = []
 
49
        self.bytes_list = []
 
50
        self.compressed = None
 
51
        self.seen_bytes = 0
 
52
        self.num_repack = 0
 
53
        self.unused_bytes = None
 
54
        self.reserved_size = reserved
 
55
 
 
56
    def finish(self):
 
57
        """Finish the chunk.
 
58
 
 
59
        This returns the final compressed chunk, and either None, or the
 
60
        bytes that did not fit in the chunk.
 
61
        """
 
62
        self.bytes_in = None # Free the data cached so far, we don't need it
 
63
        self.bytes_list.append(self.compressor.flush(Z_FINISH))
 
64
        total_len = sum(len(b) for b in self.bytes_list)
 
65
        if total_len > self.chunk_size:
 
66
            raise AssertionError('Somehow we ended up with too much'
 
67
                                 ' compressed data, %d > %d'
 
68
                                 % (total_len, self.chunk_size))
 
69
        nulls_needed = self.chunk_size - total_len % self.chunk_size
 
70
        if nulls_needed:
 
71
            self.bytes_list.append("\x00" * nulls_needed)
 
72
        return self.bytes_list, self.unused_bytes, nulls_needed
 
73
 
 
74
    def _recompress_all_bytes_in(self, extra_bytes=None):
 
75
        compressor = zlib.compressobj()
 
76
        bytes_out = []
 
77
        for accepted_bytes in self.bytes_in:
 
78
            out = compressor.compress(accepted_bytes)
 
79
            if out:
 
80
                bytes_out.append(out)
 
81
        if extra_bytes:
 
82
            out = compressor.compress(extra_bytes)
 
83
            if out:
 
84
                bytes_out.append(out)
 
85
            out = compressor.flush(Z_SYNC_FLUSH)
 
86
            if out:
 
87
                bytes_out.append(out)
 
88
        return bytes_out, compressor
 
89
 
 
90
    def write(self, bytes):
 
91
        """Write some bytes to the chunk.
 
92
 
 
93
        If the bytes fit, False is returned. Otherwise True is returned
 
94
        and the bytes have not been added to the chunk.
 
95
        """
 
96
        return self._write(bytes, False)
 
97
 
 
98
    def write_reserved(self, bytes):
 
99
        """Write some bytes to the chunk bypassing the reserved check.
 
100
 
 
101
        If the bytes fit, False is returned. Otherwise True is returned
 
102
        and the bytes have not been added to the chunk.
 
103
        """
 
104
        return self._write(bytes, True)
 
105
 
 
106
    def _write(self, bytes, reserved):
 
107
        if reserved:
 
108
            capacity = self.chunk_size
 
109
        else:
 
110
            capacity = self.chunk_size - self.reserved_size
 
111
        # Check quickly to see if this is likely to put us outside of our
 
112
        # budget:
 
113
        next_seen_size = self.seen_bytes + len(bytes)
 
114
        if (next_seen_size < 1.8 * capacity):
 
115
            # No need, we assume this will "just fit"
 
116
            out = self.compressor.compress(bytes)
 
117
            self.bytes_in.append(bytes)
 
118
            self.seen_bytes = next_seen_size
 
119
            if out:
 
120
                self.bytes_list.append(out)
 
121
        else:
 
122
            if not reserved and self.num_repack >= self._max_repack:
 
123
                # We have packed too many times already.
 
124
                return True
 
125
            # This may or may not fit, try to add it with Z_SYNC_FLUSH
 
126
            out = self.compressor.compress(bytes)
 
127
            if out:
 
128
                self.bytes_list.append(out)
 
129
            out = self.compressor.flush(Z_SYNC_FLUSH)
 
130
            if out:
 
131
                self.bytes_list.append(out)
 
132
            total_len = sum(len(b) for b in self.bytes_list)
 
133
            # Give us some extra room for a final Z_FINISH call.
 
134
            if total_len + 10 > capacity:
 
135
                # We are over budget, try to squeeze this in without any
 
136
                # Z_SYNC_FLUSH calls
 
137
                self.num_repack += 1
 
138
                bytes_out, compressor = self._recompress_all_bytes_in(bytes)
 
139
                this_len = sum(len(b) for b in bytes_out)
 
140
                if this_len + 10 > capacity:
 
141
                    # No way we can add anymore, we need to re-pack because our
 
142
                    # compressor is now out of sync
 
143
                    bytes_out, compressor = self._recompress_all_bytes_in()
 
144
                    self.compressor = compressor
 
145
                    self.bytes_list = bytes_out
 
146
                    self.unused_bytes = bytes
 
147
                    return True
 
148
                else:
 
149
                    # This fits when we pack it tighter, so use the new packing
 
150
                    self.compressor = compressor
 
151
                    self.bytes_in.append(bytes)
 
152
                    self.bytes_list = bytes_out
 
153
            else:
 
154
                # It fit, so mark it added
 
155
                self.bytes_in.append(bytes)
 
156
                self.seen_bytes = next_seen_size
 
157
        return False
 
158