~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test__groupcompress.py

  • Committer: Martin Pool
  • Date: 2009-09-14 01:19:11 UTC
  • mto: This revision was merged to the branch mainline in revision 4688.
  • Revision ID: mbp@sourcefrog.net-20090914011911-llu9ujul97k8f8s7
News for fix of 406113

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2008, 2009 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for the python and pyrex extensions of groupcompress"""
 
18
 
 
19
from bzrlib import (
 
20
    groupcompress,
 
21
    _groupcompress_py,
 
22
    tests,
 
23
    )
 
24
 
 
25
 
 
26
def load_tests(standard_tests, module, loader):
 
27
    """Parameterize tests for all versions of groupcompress."""
 
28
    two_way_scenarios = [
 
29
        ('PP', {'make_delta': _groupcompress_py.make_delta,
 
30
                'apply_delta': _groupcompress_py.apply_delta})
 
31
        ]
 
32
    scenarios = [
 
33
        ('python', {'_gc_module': _groupcompress_py}),
 
34
        ]
 
35
    if CompiledGroupCompressFeature.available():
 
36
        from bzrlib import _groupcompress_pyx
 
37
        scenarios.append(('C',
 
38
            {'_gc_module': _groupcompress_pyx}))
 
39
        two_way_scenarios.extend([
 
40
            ('CC', {'make_delta': _groupcompress_pyx.make_delta,
 
41
                    'apply_delta': _groupcompress_pyx.apply_delta}),
 
42
            ('PC', {'make_delta': _groupcompress_py.make_delta,
 
43
                    'apply_delta': _groupcompress_pyx.apply_delta}),
 
44
            ('CP', {'make_delta': _groupcompress_pyx.make_delta,
 
45
                    'apply_delta': _groupcompress_py.apply_delta}),
 
46
            ])
 
47
    to_adapt, result = tests.split_suite_by_condition(
 
48
        standard_tests, tests.condition_isinstance((TestMakeAndApplyDelta,
 
49
                                                    TestBase128Int)))
 
50
    result = tests.multiply_tests(to_adapt, scenarios, result)
 
51
    to_adapt, result = tests.split_suite_by_condition(result,
 
52
        tests.condition_isinstance(TestMakeAndApplyCompatible))
 
53
    result = tests.multiply_tests(to_adapt, two_way_scenarios, result)
 
54
    return result
 
55
 
 
56
 
 
57
class _CompiledGroupCompressFeature(tests.Feature):
 
58
 
 
59
    def _probe(self):
 
60
        try:
 
61
            import bzrlib._groupcompress_pyx
 
62
        except ImportError:
 
63
            return False
 
64
        else:
 
65
            return True
 
66
 
 
67
    def feature_name(self):
 
68
        return 'bzrlib._groupcompress_pyx'
 
69
 
 
70
 
 
71
CompiledGroupCompressFeature = _CompiledGroupCompressFeature()
 
72
 
 
73
_text1 = """\
 
74
This is a bit
 
75
of source text
 
76
which is meant to be matched
 
77
against other text
 
78
"""
 
79
 
 
80
_text2 = """\
 
81
This is a bit
 
82
of source text
 
83
which is meant to differ from
 
84
against other text
 
85
"""
 
86
 
 
87
_text3 = """\
 
88
This is a bit
 
89
of source text
 
90
which is meant to be matched
 
91
against other text
 
92
except it also
 
93
has a lot more data
 
94
at the end of the file
 
95
"""
 
96
 
 
97
_first_text = """\
 
98
a bit of text, that
 
99
does not have much in
 
100
common with the next text
 
101
"""
 
102
 
 
103
_second_text = """\
 
104
some more bit of text, that
 
105
does not have much in
 
106
common with the previous text
 
107
and has some extra text
 
108
"""
 
109
 
 
110
 
 
111
_third_text = """\
 
112
a bit of text, that
 
113
has some in common with the previous text
 
114
and has some extra text
 
115
and not have much in
 
116
common with the next text
 
117
"""
 
118
 
 
119
_fourth_text = """\
 
120
123456789012345
 
121
same rabin hash
 
122
123456789012345
 
123
same rabin hash
 
124
123456789012345
 
125
same rabin hash
 
126
123456789012345
 
127
same rabin hash
 
128
"""
 
129
 
 
130
class TestMakeAndApplyDelta(tests.TestCase):
 
131
 
 
132
    _gc_module = None # Set by load_tests
 
133
 
 
134
    def setUp(self):
 
135
        super(TestMakeAndApplyDelta, self).setUp()
 
136
        self.make_delta = self._gc_module.make_delta
 
137
        self.apply_delta = self._gc_module.apply_delta
 
138
        self.apply_delta_to_source = self._gc_module.apply_delta_to_source
 
139
 
 
140
    def test_make_delta_is_typesafe(self):
 
141
        self.make_delta('a string', 'another string')
 
142
 
 
143
        def _check_make_delta(string1, string2):
 
144
            self.assertRaises(TypeError, self.make_delta, string1, string2)
 
145
 
 
146
        _check_make_delta('a string', object())
 
147
        _check_make_delta('a string', u'not a string')
 
148
        _check_make_delta(object(), 'a string')
 
149
        _check_make_delta(u'not a string', 'a string')
 
150
 
 
151
    def test_make_noop_delta(self):
 
152
        ident_delta = self.make_delta(_text1, _text1)
 
153
        self.assertEqual('M\x90M', ident_delta)
 
154
        ident_delta = self.make_delta(_text2, _text2)
 
155
        self.assertEqual('N\x90N', ident_delta)
 
156
        ident_delta = self.make_delta(_text3, _text3)
 
157
        self.assertEqual('\x87\x01\x90\x87', ident_delta)
 
158
 
 
159
    def assertDeltaIn(self, delta1, delta2, delta):
 
160
        """Make sure that the delta bytes match one of the expectations."""
 
161
        # In general, the python delta matcher gives different results than the
 
162
        # pyrex delta matcher. Both should be valid deltas, though.
 
163
        if delta not in (delta1, delta2):
 
164
            self.fail("Delta bytes:\n"
 
165
                      "       %r\n"
 
166
                      "not in %r\n"
 
167
                      "    or %r"
 
168
                      % (delta, delta1, delta2))
 
169
 
 
170
    def test_make_delta(self):
 
171
        delta = self.make_delta(_text1, _text2)
 
172
        self.assertDeltaIn(
 
173
            'N\x90/\x1fdiffer from\nagainst other text\n',
 
174
            'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
 
175
            delta)
 
176
        delta = self.make_delta(_text2, _text1)
 
177
        self.assertDeltaIn(
 
178
            'M\x90/\x1ebe matched\nagainst other text\n',
 
179
            'M\x90\x1d\x1dwhich is meant to be matched\n\x91;\x13',
 
180
            delta)
 
181
        delta = self.make_delta(_text3, _text1)
 
182
        self.assertEqual('M\x90M', delta)
 
183
        delta = self.make_delta(_text3, _text2)
 
184
        self.assertDeltaIn(
 
185
            'N\x90/\x1fdiffer from\nagainst other text\n',
 
186
            'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
 
187
            delta)
 
188
 
 
189
    def test_make_delta_with_large_copies(self):
 
190
        # We want to have a copy that is larger than 64kB, which forces us to
 
191
        # issue multiple copy instructions.
 
192
        big_text = _text3 * 1220
 
193
        delta = self.make_delta(big_text, big_text)
 
194
        self.assertDeltaIn(
 
195
            '\xdc\x86\x0a'      # Encoding the length of the uncompressed text
 
196
            '\x80'              # Copy 64kB, starting at byte 0
 
197
            '\x84\x01'          # and another 64kB starting at 64kB
 
198
            '\xb4\x02\x5c\x83', # And the bit of tail.
 
199
            None,   # Both implementations should be identical
 
200
            delta)
 
201
 
 
202
    def test_apply_delta_is_typesafe(self):
 
203
        self.apply_delta(_text1, 'M\x90M')
 
204
        self.assertRaises(TypeError, self.apply_delta, object(), 'M\x90M')
 
205
        self.assertRaises(TypeError, self.apply_delta,
 
206
                          unicode(_text1), 'M\x90M')
 
207
        self.assertRaises(TypeError, self.apply_delta, _text1, u'M\x90M')
 
208
        self.assertRaises(TypeError, self.apply_delta, _text1, object())
 
209
 
 
210
    def test_apply_delta(self):
 
211
        target = self.apply_delta(_text1,
 
212
                    'N\x90/\x1fdiffer from\nagainst other text\n')
 
213
        self.assertEqual(_text2, target)
 
214
        target = self.apply_delta(_text2,
 
215
                    'M\x90/\x1ebe matched\nagainst other text\n')
 
216
        self.assertEqual(_text1, target)
 
217
 
 
218
    def test_apply_delta_to_source_is_safe(self):
 
219
        self.assertRaises(TypeError,
 
220
            self.apply_delta_to_source, object(), 0, 1)
 
221
        self.assertRaises(TypeError,
 
222
            self.apply_delta_to_source, u'unicode str', 0, 1)
 
223
        # end > length
 
224
        self.assertRaises(ValueError,
 
225
            self.apply_delta_to_source, 'foo', 1, 4)
 
226
        # start > length
 
227
        self.assertRaises(ValueError,
 
228
            self.apply_delta_to_source, 'foo', 5, 3)
 
229
        # start > end
 
230
        self.assertRaises(ValueError,
 
231
            self.apply_delta_to_source, 'foo', 3, 2)
 
232
 
 
233
    def test_apply_delta_to_source(self):
 
234
        source_and_delta = (_text1
 
235
                            + 'N\x90/\x1fdiffer from\nagainst other text\n')
 
236
        self.assertEqual(_text2, self.apply_delta_to_source(source_and_delta,
 
237
                                    len(_text1), len(source_and_delta)))
 
238
 
 
239
 
 
240
class TestMakeAndApplyCompatible(tests.TestCase):
 
241
 
 
242
    make_delta = None # Set by load_tests
 
243
    apply_delta = None # Set by load_tests
 
244
 
 
245
    def assertMakeAndApply(self, source, target):
 
246
        """Assert that generating a delta and applying gives success."""
 
247
        delta = self.make_delta(source, target)
 
248
        bytes = self.apply_delta(source, delta)
 
249
        self.assertEqualDiff(target, bytes)
 
250
 
 
251
    def test_direct(self):
 
252
        self.assertMakeAndApply(_text1, _text2)
 
253
        self.assertMakeAndApply(_text2, _text1)
 
254
        self.assertMakeAndApply(_text1, _text3)
 
255
        self.assertMakeAndApply(_text3, _text1)
 
256
        self.assertMakeAndApply(_text2, _text3)
 
257
        self.assertMakeAndApply(_text3, _text2)
 
258
 
 
259
 
 
260
class TestDeltaIndex(tests.TestCase):
 
261
 
 
262
    def setUp(self):
 
263
        super(TestDeltaIndex, self).setUp()
 
264
        # This test isn't multiplied, because we only have DeltaIndex for the
 
265
        # compiled form
 
266
        # We call this here, because _test_needs_features happens after setUp
 
267
        self.requireFeature(CompiledGroupCompressFeature)
 
268
        from bzrlib import _groupcompress_pyx
 
269
        self._gc_module = _groupcompress_pyx
 
270
 
 
271
    def test_repr(self):
 
272
        di = self._gc_module.DeltaIndex('test text\n')
 
273
        self.assertEqual('DeltaIndex(1, 10)', repr(di))
 
274
 
 
275
    def test_first_add_source_doesnt_index_until_make_delta(self):
 
276
        di = self._gc_module.DeltaIndex()
 
277
        self.assertFalse(di._has_index())
 
278
        di.add_source(_text1, 0)
 
279
        self.assertFalse(di._has_index())
 
280
        # However, asking to make a delta will trigger the index to be
 
281
        # generated, and will generate a proper delta
 
282
        delta = di.make_delta(_text2)
 
283
        self.assertTrue(di._has_index())
 
284
        self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
 
285
 
 
286
    def test_second_add_source_triggers_make_index(self):
 
287
        di = self._gc_module.DeltaIndex()
 
288
        self.assertFalse(di._has_index())
 
289
        di.add_source(_text1, 0)
 
290
        self.assertFalse(di._has_index())
 
291
        di.add_source(_text2, 0)
 
292
        self.assertTrue(di._has_index())
 
293
 
 
294
    def test_make_delta(self):
 
295
        di = self._gc_module.DeltaIndex(_text1)
 
296
        delta = di.make_delta(_text2)
 
297
        self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
 
298
 
 
299
    def test_delta_against_multiple_sources(self):
 
300
        di = self._gc_module.DeltaIndex()
 
301
        di.add_source(_first_text, 0)
 
302
        self.assertEqual(len(_first_text), di._source_offset)
 
303
        di.add_source(_second_text, 0)
 
304
        self.assertEqual(len(_first_text) + len(_second_text),
 
305
                         di._source_offset)
 
306
        delta = di.make_delta(_third_text)
 
307
        result = self._gc_module.apply_delta(_first_text + _second_text, delta)
 
308
        self.assertEqualDiff(_third_text, result)
 
309
        self.assertEqual('\x85\x01\x90\x14\x0chas some in '
 
310
                         '\x91v6\x03and\x91d"\x91:\n', delta)
 
311
 
 
312
    def test_delta_with_offsets(self):
 
313
        di = self._gc_module.DeltaIndex()
 
314
        di.add_source(_first_text, 5)
 
315
        self.assertEqual(len(_first_text) + 5, di._source_offset)
 
316
        di.add_source(_second_text, 10)
 
317
        self.assertEqual(len(_first_text) + len(_second_text) + 15,
 
318
                         di._source_offset)
 
319
        delta = di.make_delta(_third_text)
 
320
        self.assertIsNot(None, delta)
 
321
        result = self._gc_module.apply_delta(
 
322
            '12345' + _first_text + '1234567890' + _second_text, delta)
 
323
        self.assertIsNot(None, result)
 
324
        self.assertEqualDiff(_third_text, result)
 
325
        self.assertEqual('\x85\x01\x91\x05\x14\x0chas some in '
 
326
                         '\x91\x856\x03and\x91s"\x91?\n', delta)
 
327
 
 
328
    def test_delta_with_delta_bytes(self):
 
329
        di = self._gc_module.DeltaIndex()
 
330
        source = _first_text
 
331
        di.add_source(_first_text, 0)
 
332
        self.assertEqual(len(_first_text), di._source_offset)
 
333
        delta = di.make_delta(_second_text)
 
334
        self.assertEqual('h\tsome more\x91\x019'
 
335
                         '&previous text\nand has some extra text\n', delta)
 
336
        di.add_delta_source(delta, 0)
 
337
        source += delta
 
338
        self.assertEqual(len(_first_text) + len(delta), di._source_offset)
 
339
        second_delta = di.make_delta(_third_text)
 
340
        result = self._gc_module.apply_delta(source, second_delta)
 
341
        self.assertEqualDiff(_third_text, result)
 
342
        # We should be able to match against the
 
343
        # 'previous text\nand has some...'  that was part of the delta bytes
 
344
        # Note that we don't match the 'common with the', because it isn't long
 
345
        # enough to match in the original text, and those bytes are not present
 
346
        # in the delta for the second text.
 
347
        self.assertEqual('\x85\x01\x90\x14\x1chas some in common with the '
 
348
                         '\x91S&\x03and\x91\x18,', second_delta)
 
349
        # Add this delta, and create a new delta for the same text. We should
 
350
        # find the remaining text, and only insert the short 'and' text.
 
351
        di.add_delta_source(second_delta, 0)
 
352
        source += second_delta
 
353
        third_delta = di.make_delta(_third_text)
 
354
        result = self._gc_module.apply_delta(source, third_delta)
 
355
        self.assertEqualDiff(_third_text, result)
 
356
        self.assertEqual('\x85\x01\x90\x14\x91\x7e\x1c'
 
357
                         '\x91S&\x03and\x91\x18,', third_delta)
 
358
        # Now create a delta, which we know won't be able to be 'fit' into the
 
359
        # existing index
 
360
        fourth_delta = di.make_delta(_fourth_text)
 
361
        self.assertEqual(_fourth_text,
 
362
                         self._gc_module.apply_delta(source, fourth_delta))
 
363
        self.assertEqual('\x80\x01'
 
364
                         '\x7f123456789012345\nsame rabin hash\n'
 
365
                         '123456789012345\nsame rabin hash\n'
 
366
                         '123456789012345\nsame rabin hash\n'
 
367
                         '123456789012345\nsame rabin hash'
 
368
                         '\x01\n', fourth_delta)
 
369
        di.add_delta_source(fourth_delta, 0)
 
370
        source += fourth_delta
 
371
        # With the next delta, everything should be found
 
372
        fifth_delta = di.make_delta(_fourth_text)
 
373
        self.assertEqual(_fourth_text,
 
374
                         self._gc_module.apply_delta(source, fifth_delta))
 
375
        self.assertEqual('\x80\x01\x91\xa7\x7f\x01\n', fifth_delta)
 
376
 
 
377
 
 
378
class TestCopyInstruction(tests.TestCase):
 
379
 
 
380
    def assertEncode(self, expected, offset, length):
 
381
        bytes = _groupcompress_py.encode_copy_instruction(offset, length)
 
382
        if expected != bytes:
 
383
            self.assertEqual([hex(ord(e)) for e in expected],
 
384
                             [hex(ord(b)) for b in bytes])
 
385
 
 
386
    def assertDecode(self, exp_offset, exp_length, exp_newpos, bytes, pos):
 
387
        cmd = ord(bytes[pos])
 
388
        pos += 1
 
389
        out = _groupcompress_py.decode_copy_instruction(bytes, cmd, pos)
 
390
        self.assertEqual((exp_offset, exp_length, exp_newpos), out)
 
391
 
 
392
    def test_encode_no_length(self):
 
393
        self.assertEncode('\x80', 0, 64*1024)
 
394
        self.assertEncode('\x81\x01', 1, 64*1024)
 
395
        self.assertEncode('\x81\x0a', 10, 64*1024)
 
396
        self.assertEncode('\x81\xff', 255, 64*1024)
 
397
        self.assertEncode('\x82\x01', 256, 64*1024)
 
398
        self.assertEncode('\x83\x01\x01', 257, 64*1024)
 
399
        self.assertEncode('\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64*1024)
 
400
        self.assertEncode('\x8E\xff\xff\xff', 0xFFFFFF00, 64*1024)
 
401
        self.assertEncode('\x8D\xff\xff\xff', 0xFFFF00FF, 64*1024)
 
402
        self.assertEncode('\x8B\xff\xff\xff', 0xFF00FFFF, 64*1024)
 
403
        self.assertEncode('\x87\xff\xff\xff', 0x00FFFFFF, 64*1024)
 
404
        self.assertEncode('\x8F\x04\x03\x02\x01', 0x01020304, 64*1024)
 
405
 
 
406
    def test_encode_no_offset(self):
 
407
        self.assertEncode('\x90\x01', 0, 1)
 
408
        self.assertEncode('\x90\x0a', 0, 10)
 
409
        self.assertEncode('\x90\xff', 0, 255)
 
410
        self.assertEncode('\xA0\x01', 0, 256)
 
411
        self.assertEncode('\xB0\x01\x01', 0, 257)
 
412
        self.assertEncode('\xB0\xff\xff', 0, 0xFFFF)
 
413
        # Special case, if copy == 64KiB, then we store exactly 0
 
414
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
 
415
        # about that, as we would never actually copy 0 bytes
 
416
        self.assertEncode('\x80', 0, 64*1024)
 
417
 
 
418
    def test_encode(self):
 
419
        self.assertEncode('\x91\x01\x01', 1, 1)
 
420
        self.assertEncode('\x91\x09\x0a', 9, 10)
 
421
        self.assertEncode('\x91\xfe\xff', 254, 255)
 
422
        self.assertEncode('\xA2\x02\x01', 512, 256)
 
423
        self.assertEncode('\xB3\x02\x01\x01\x01', 258, 257)
 
424
        self.assertEncode('\xB0\x01\x01', 0, 257)
 
425
        # Special case, if copy == 64KiB, then we store exactly 0
 
426
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
 
427
        # about that, as we would never actually copy 0 bytes
 
428
        self.assertEncode('\x81\x0a', 10, 64*1024)
 
429
 
 
430
    def test_decode_no_length(self):
 
431
        # If length is 0, it is interpreted as 64KiB
 
432
        # The shortest possible instruction is a copy of 64KiB from offset 0
 
433
        self.assertDecode(0, 65536, 1, '\x80', 0)
 
434
        self.assertDecode(1, 65536, 2, '\x81\x01', 0)
 
435
        self.assertDecode(10, 65536, 2, '\x81\x0a', 0)
 
436
        self.assertDecode(255, 65536, 2, '\x81\xff', 0)
 
437
        self.assertDecode(256, 65536, 2, '\x82\x01', 0)
 
438
        self.assertDecode(257, 65536, 3, '\x83\x01\x01', 0)
 
439
        self.assertDecode(0xFFFFFFFF, 65536, 5, '\x8F\xff\xff\xff\xff', 0)
 
440
        self.assertDecode(0xFFFFFF00, 65536, 4, '\x8E\xff\xff\xff', 0)
 
441
        self.assertDecode(0xFFFF00FF, 65536, 4, '\x8D\xff\xff\xff', 0)
 
442
        self.assertDecode(0xFF00FFFF, 65536, 4, '\x8B\xff\xff\xff', 0)
 
443
        self.assertDecode(0x00FFFFFF, 65536, 4, '\x87\xff\xff\xff', 0)
 
444
        self.assertDecode(0x01020304, 65536, 5, '\x8F\x04\x03\x02\x01', 0)
 
445
 
 
446
    def test_decode_no_offset(self):
 
447
        self.assertDecode(0, 1, 2, '\x90\x01', 0)
 
448
        self.assertDecode(0, 10, 2, '\x90\x0a', 0)
 
449
        self.assertDecode(0, 255, 2, '\x90\xff', 0)
 
450
        self.assertDecode(0, 256, 2, '\xA0\x01', 0)
 
451
        self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
 
452
        self.assertDecode(0, 65535, 3, '\xB0\xff\xff', 0)
 
453
        # Special case, if copy == 64KiB, then we store exactly 0
 
454
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
 
455
        # about that, as we would never actually copy 0 bytes
 
456
        self.assertDecode(0, 65536, 1, '\x80', 0)
 
457
 
 
458
    def test_decode(self):
 
459
        self.assertDecode(1, 1, 3, '\x91\x01\x01', 0)
 
460
        self.assertDecode(9, 10, 3, '\x91\x09\x0a', 0)
 
461
        self.assertDecode(254, 255, 3, '\x91\xfe\xff', 0)
 
462
        self.assertDecode(512, 256, 3, '\xA2\x02\x01', 0)
 
463
        self.assertDecode(258, 257, 5, '\xB3\x02\x01\x01\x01', 0)
 
464
        self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
 
465
 
 
466
    def test_decode_not_start(self):
 
467
        self.assertDecode(1, 1, 6, 'abc\x91\x01\x01def', 3)
 
468
        self.assertDecode(9, 10, 5, 'ab\x91\x09\x0ade', 2)
 
469
        self.assertDecode(254, 255, 6, 'not\x91\xfe\xffcopy', 3)
 
470
 
 
471
 
 
472
class TestBase128Int(tests.TestCase):
 
473
 
 
474
    _gc_module = None # Set by load_tests
 
475
 
 
476
    def assertEqualEncode(self, bytes, val):
 
477
        self.assertEqual(bytes, self._gc_module.encode_base128_int(val))
 
478
 
 
479
    def assertEqualDecode(self, val, num_decode, bytes):
 
480
        self.assertEqual((val, num_decode),
 
481
                         self._gc_module.decode_base128_int(bytes))
 
482
 
 
483
    def test_encode(self):
 
484
        self.assertEqualEncode('\x01', 1)
 
485
        self.assertEqualEncode('\x02', 2)
 
486
        self.assertEqualEncode('\x7f', 127)
 
487
        self.assertEqualEncode('\x80\x01', 128)
 
488
        self.assertEqualEncode('\xff\x01', 255)
 
489
        self.assertEqualEncode('\x80\x02', 256)
 
490
        self.assertEqualEncode('\xff\xff\xff\xff\x0f', 0xFFFFFFFF)
 
491
 
 
492
    def test_decode(self):
 
493
        self.assertEqualDecode(1, 1, '\x01')
 
494
        self.assertEqualDecode(2, 1, '\x02')
 
495
        self.assertEqualDecode(127, 1, '\x7f')
 
496
        self.assertEqualDecode(128, 2, '\x80\x01')
 
497
        self.assertEqualDecode(255, 2, '\xff\x01')
 
498
        self.assertEqualDecode(256, 2, '\x80\x02')
 
499
        self.assertEqualDecode(0xFFFFFFFF, 5, '\xff\xff\xff\xff\x0f')
 
500
 
 
501
    def test_decode_with_trailing_bytes(self):
 
502
        self.assertEqualDecode(1, 1, '\x01abcdef')
 
503
        self.assertEqualDecode(127, 1, '\x7f\x01')
 
504
        self.assertEqualDecode(128, 2, '\x80\x01abcdef')
 
505
        self.assertEqualDecode(255, 2, '\xff\x01\xff')
 
506
 
 
507