~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_groupcompress.py

Bring the groupcompress code into brisbane-core.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2008, 2009 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for group compression."""
 
18
 
 
19
import zlib
 
20
 
 
21
from bzrlib import (
 
22
    groupcompress,
 
23
    tests,
 
24
    )
 
25
from bzrlib.osutils import sha_string
 
26
from bzrlib.tests import (
 
27
    TestCaseWithTransport,
 
28
    multiply_tests,
 
29
    )
 
30
 
 
31
 
 
32
 
 
33
 
 
34
class TestGroupCompressor(tests.TestCase):
 
35
    """Tests for GroupCompressor"""
 
36
 
 
37
    def test_empty_delta(self):
 
38
        compressor = groupcompress.GroupCompressor(True)
 
39
        self.assertEqual([], compressor.lines)
 
40
 
 
41
    def test_one_nosha_delta(self):
 
42
        # diff against NUKK
 
43
        compressor = groupcompress.GroupCompressor(True)
 
44
        sha1, end_point, _, _ = compressor.compress(('label',),
 
45
            'strange\ncommon\n', None)
 
46
        self.assertEqual(sha_string('strange\ncommon\n'), sha1)
 
47
        expected_lines = [
 
48
            'f', '\x0f', 'strange\ncommon\n',
 
49
            ]
 
50
        self.assertEqual(expected_lines, compressor.lines)
 
51
        self.assertEqual(sum(map(len, expected_lines)), end_point)
 
52
 
 
53
    def _chunks_to_repr_lines(self, chunks):
 
54
        return '\n'.join(map(repr, ''.join(chunks).split('\n')))
 
55
 
 
56
    def assertEqualDiffEncoded(self, expected, actual):
 
57
        """Compare the actual content to the expected content.
 
58
 
 
59
        :param expected: A group of chunks that we expect to see
 
60
        :param actual: The measured 'chunks'
 
61
 
 
62
        We will transform the chunks back into lines, and then run 'repr()'
 
63
        over them to handle non-ascii characters.
 
64
        """
 
65
        self.assertEqualDiff(self._chunks_to_repr_lines(expected),
 
66
                             self._chunks_to_repr_lines(actual))
 
67
 
 
68
    def test_two_nosha_delta(self):
 
69
        compressor = groupcompress.GroupCompressor(True)
 
70
        sha1_1, _, _, _ = compressor.compress(('label',),
 
71
            'strange\ncommon long line\nthat needs a 16 byte match\n', None)
 
72
        expected_lines = list(compressor.lines)
 
73
        sha1_2, end_point, _, _ = compressor.compress(('newlabel',),
 
74
            'common long line\nthat needs a 16 byte match\ndifferent\n', None)
 
75
        self.assertEqual(sha_string('common long line\n'
 
76
                                    'that needs a 16 byte match\n'
 
77
                                    'different\n'), sha1_2)
 
78
        expected_lines.extend([
 
79
            # 'delta', delta length
 
80
            'd\x10',
 
81
            # source and target length
 
82
            '\x36\x36',
 
83
            # copy the line common
 
84
            '\x91\x0a\x2c', #copy, offset 0x0a, len 0x2c
 
85
            # add the line different, and the trailing newline
 
86
            '\x0adifferent\n', # insert 10 bytes
 
87
            ])
 
88
        self.assertEqualDiffEncoded(expected_lines, compressor.lines)
 
89
        self.assertEqual(sum(map(len, expected_lines)), end_point)
 
90
 
 
91
    def test_three_nosha_delta(self):
 
92
        # The first interesting test: make a change that should use lines from
 
93
        # both parents.
 
94
        compressor = groupcompress.GroupCompressor(True)
 
95
        sha1_1, end_point, _, _ = compressor.compress(('label',),
 
96
            'strange\ncommon very very long line\nwith some extra text\n', None)
 
97
        sha1_2, _, _, _ = compressor.compress(('newlabel',),
 
98
            'different\nmoredifferent\nand then some more\n', None)
 
99
        expected_lines = list(compressor.lines)
 
100
        sha1_3, end_point, _, _ = compressor.compress(('label3',),
 
101
            'new\ncommon very very long line\nwith some extra text\n'
 
102
            'different\nmoredifferent\nand then some more\n',
 
103
            None)
 
104
        self.assertEqual(
 
105
            sha_string('new\ncommon very very long line\nwith some extra text\n'
 
106
                       'different\nmoredifferent\nand then some more\n'),
 
107
            sha1_3)
 
108
        expected_lines.extend([
 
109
            # 'delta', delta length
 
110
            'd\x0c',
 
111
            # source and target length
 
112
            '\x67\x5f'
 
113
            # insert new
 
114
            '\x03new',
 
115
            # Copy of first parent 'common' range
 
116
            '\x91\x09\x31' # copy, offset 0x09, 0x31 bytes
 
117
            # Copy of second parent 'different' range
 
118
            '\x91\x3c\x2b' # copy, offset 0x3c, 0x2b bytes
 
119
            ])
 
120
        self.assertEqualDiffEncoded(expected_lines, compressor.lines)
 
121
        self.assertEqual(sum(map(len, expected_lines)), end_point)
 
122
 
 
123
    def test_stats(self):
 
124
        compressor = groupcompress.GroupCompressor(True)
 
125
        compressor.compress(('label',), 'strange\ncommon long line\n'
 
126
                                        'plus more text\n', None)
 
127
        compressor.compress(('newlabel',),
 
128
                            'common long line\nplus more text\n'
 
129
                            'different\nmoredifferent\n', None)
 
130
        compressor.compress(('label3',),
 
131
                            'new\ncommon long line\nplus more text\n'
 
132
                            '\ndifferent\nmoredifferent\n', None)
 
133
        self.assertAlmostEqual(1.4, compressor.ratio(), 1)
 
134
 
 
135
    def test_extract_from_compressor(self):
 
136
        # Knit fetching will try to reconstruct texts locally which results in
 
137
        # reading something that is in the compressor stream already.
 
138
        compressor = groupcompress.GroupCompressor(True)
 
139
        sha1_1, _, _, _ = compressor.compress(('label',),
 
140
            'strange\ncommon long line\nthat needs a 16 byte match\n', None)
 
141
        expected_lines = list(compressor.lines)
 
142
        sha1_2, end_point, _, _ = compressor.compress(('newlabel',),
 
143
            'common long line\nthat needs a 16 byte match\ndifferent\n', None)
 
144
        # get the first out
 
145
        self.assertEqual(('strange\ncommon long line\n'
 
146
                          'that needs a 16 byte match\n', sha1_1),
 
147
            compressor.extract(('label',)))
 
148
        # and the second
 
149
        self.assertEqual(('common long line\nthat needs a 16 byte match\n'
 
150
                          'different\n', sha1_2),
 
151
                         compressor.extract(('newlabel',)))
 
152
 
 
153
 
 
154
class TestBase128Int(tests.TestCase):
 
155
 
 
156
    def assertEqualEncode(self, bytes, val):
 
157
        self.assertEqual(bytes, groupcompress.encode_base128_int(val))
 
158
 
 
159
    def assertEqualDecode(self, val, num_decode, bytes):
 
160
        self.assertEqual((val, num_decode),
 
161
                         groupcompress.decode_base128_int(bytes))
 
162
 
 
163
    def test_encode(self):
 
164
        self.assertEqualEncode('\x01', 1)
 
165
        self.assertEqualEncode('\x02', 2)
 
166
        self.assertEqualEncode('\x7f', 127)
 
167
        self.assertEqualEncode('\x80\x01', 128)
 
168
        self.assertEqualEncode('\xff\x01', 255)
 
169
        self.assertEqualEncode('\x80\x02', 256)
 
170
        self.assertEqualEncode('\xff\xff\xff\xff\x0f', 0xFFFFFFFF)
 
171
 
 
172
    def test_decode(self):
 
173
        self.assertEqualDecode(1, 1, '\x01')
 
174
        self.assertEqualDecode(2, 1, '\x02')
 
175
        self.assertEqualDecode(127, 1, '\x7f')
 
176
        self.assertEqualDecode(128, 2, '\x80\x01')
 
177
        self.assertEqualDecode(255, 2, '\xff\x01')
 
178
        self.assertEqualDecode(256, 2, '\x80\x02')
 
179
        self.assertEqualDecode(0xFFFFFFFF, 5, '\xff\xff\xff\xff\x0f')
 
180
 
 
181
    def test_decode_with_trailing_bytes(self):
 
182
        self.assertEqualDecode(1, 1, '\x01abcdef')
 
183
        self.assertEqualDecode(127, 1, '\x7f\x01')
 
184
        self.assertEqualDecode(128, 2, '\x80\x01abcdef')
 
185
        self.assertEqualDecode(255, 2, '\xff\x01\xff')
 
186
 
 
187
 
 
188
class TestGroupCompressBlock(tests.TestCase):
 
189
 
 
190
    def test_from_empty_bytes(self):
 
191
        self.assertRaises(ValueError,
 
192
                          groupcompress.GroupCompressBlock.from_bytes, '')
 
193
 
 
194
    def test_from_minimal_bytes(self):
 
195
        block = groupcompress.GroupCompressBlock.from_bytes('gcb1z\n0\n0\n')
 
196
        self.assertIsInstance(block, groupcompress.GroupCompressBlock)
 
197
        self.assertEqual({}, block._entries)
 
198
 
 
199
    def test_from_bytes(self):
 
200
        z_header_bytes = (
 
201
            'gcb1z\n' # group compress block v1 plain
 
202
            '76\n' # Length of zlib bytes
 
203
            '183\n' # Length of all meta-info
 
204
            + zlib.compress(
 
205
            'key:bing\n'
 
206
            'sha1:abcdabcdabcdabcdabcdabcdabcdabcdabcdabcd\n'
 
207
            'type:fulltext\n'
 
208
            'start:100\n'
 
209
            'length:100\n'
 
210
            '\n'
 
211
            'key:foo\x00bar\n'
 
212
            'sha1:abcdabcdabcdabcdabcdabcdabcdabcdabcdabcd\n'
 
213
            'type:fulltext\n'
 
214
            'start:0\n'
 
215
            'length:100\n'
 
216
            '\n'))
 
217
        block = groupcompress.GroupCompressBlock.from_bytes(
 
218
            z_header_bytes)
 
219
        self.assertIs(None, block._content)
 
220
        self.assertIsInstance(block, groupcompress.GroupCompressBlock)
 
221
        self.assertEqual([('bing',), ('foo', 'bar')], sorted(block._entries))
 
222
        bing = block._entries[('bing',)]
 
223
        self.assertEqual(('bing',), bing.key)
 
224
        self.assertEqual('fulltext', bing.type)
 
225
        self.assertEqual('abcd'*10, bing.sha1)
 
226
        self.assertEqual(100, bing.start)
 
227
        self.assertEqual(100, bing.length)
 
228
        foobar = block._entries[('foo', 'bar')]
 
229
        self.assertEqual(('foo', 'bar'), foobar.key)
 
230
        self.assertEqual('fulltext', foobar.type)
 
231
        self.assertEqual('abcd'*10, foobar.sha1)
 
232
        self.assertEqual(0, foobar.start)
 
233
        self.assertEqual(100, foobar.length)
 
234
 
 
235
    def test_add_entry(self):
 
236
        gcb = groupcompress.GroupCompressBlock()
 
237
        e = gcb.add_entry(('foo', 'bar'), 'fulltext', 'abcd'*10, 0, 100)
 
238
        self.assertIsInstance(e, groupcompress.GroupCompressBlockEntry)
 
239
        self.assertEqual(('foo', 'bar'), e.key)
 
240
        self.assertEqual('fulltext', e.type)
 
241
        self.assertEqual('abcd'*10, e.sha1)
 
242
        self.assertEqual(0, e.start)
 
243
        self.assertEqual(100, e.length)
 
244
 
 
245
    def test_to_bytes(self):
 
246
        gcb = groupcompress.GroupCompressBlock()
 
247
        gcb.add_entry(('foo', 'bar'), 'fulltext', 'abcd'*10, 0, 100)
 
248
        gcb.add_entry(('bing',), 'fulltext', 'abcd'*10, 100, 100)
 
249
        bytes = gcb.to_bytes()
 
250
        self.assertStartsWith(bytes,
 
251
                              'gcb1z\n' # group compress block v1 zlib
 
252
                              '76\n' # Length of compressed bytes
 
253
                              '183\n' # Length of all meta-info
 
254
                             )
 
255
        remaining_bytes = bytes[13:]
 
256
        raw_bytes = zlib.decompress(remaining_bytes)
 
257
        self.assertEqualDiff('key:bing\n'
 
258
                             'sha1:abcdabcdabcdabcdabcdabcdabcdabcdabcdabcd\n'
 
259
                             'type:fulltext\n'
 
260
                             'start:100\n'
 
261
                             'length:100\n'
 
262
                             '\n'
 
263
                             'key:foo\x00bar\n'
 
264
                             'sha1:abcdabcdabcdabcdabcdabcdabcdabcdabcdabcd\n'
 
265
                             'type:fulltext\n'
 
266
                             'start:0\n'
 
267
                             'length:100\n'
 
268
                             '\n', raw_bytes)