~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test__groupcompress.py

  • Committer: John Arbash Meinel
  • Date: 2011-05-11 11:35:28 UTC
  • mto: This revision was merged to the branch mainline in revision 5851.
  • Revision ID: john@arbash-meinel.com-20110511113528-qepibuwxicjrbb2h
Break compatibility with python <2.6.

This includes auditing the code for places where we were doing
explicit 'sys.version' checks and removing them as appropriate.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2008-2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for the python and pyrex extensions of groupcompress"""
 
18
 
 
19
from bzrlib import (
 
20
    _groupcompress_py,
 
21
    tests,
 
22
    )
 
23
from bzrlib.tests.scenarios import (
 
24
    load_tests_apply_scenarios,
 
25
    )
 
26
 
 
27
 
 
28
def module_scenarios():
 
29
    scenarios = [
 
30
        ('python', {'_gc_module': _groupcompress_py}),
 
31
        ]
 
32
    if compiled_groupcompress_feature.available():
 
33
        gc_module = compiled_groupcompress_feature.module
 
34
        scenarios.append(('C',
 
35
            {'_gc_module': gc_module}))
 
36
    return scenarios
 
37
 
 
38
 
 
39
def two_way_scenarios():
 
40
    scenarios = [
 
41
        ('PP', {'make_delta': _groupcompress_py.make_delta,
 
42
                'apply_delta': _groupcompress_py.apply_delta})
 
43
        ]
 
44
    if compiled_groupcompress_feature.available():
 
45
        gc_module = compiled_groupcompress_feature.module
 
46
        scenarios.extend([
 
47
            ('CC', {'make_delta': gc_module.make_delta,
 
48
                    'apply_delta': gc_module.apply_delta}),
 
49
            ('PC', {'make_delta': _groupcompress_py.make_delta,
 
50
                    'apply_delta': gc_module.apply_delta}),
 
51
            ('CP', {'make_delta': gc_module.make_delta,
 
52
                    'apply_delta': _groupcompress_py.apply_delta}),
 
53
            ])
 
54
    return scenarios
 
55
 
 
56
 
 
57
load_tests = load_tests_apply_scenarios
 
58
 
 
59
 
 
60
compiled_groupcompress_feature = tests.ModuleAvailableFeature(
 
61
                                    'bzrlib._groupcompress_pyx')
 
62
 
 
63
_text1 = """\
 
64
This is a bit
 
65
of source text
 
66
which is meant to be matched
 
67
against other text
 
68
"""
 
69
 
 
70
_text2 = """\
 
71
This is a bit
 
72
of source text
 
73
which is meant to differ from
 
74
against other text
 
75
"""
 
76
 
 
77
_text3 = """\
 
78
This is a bit
 
79
of source text
 
80
which is meant to be matched
 
81
against other text
 
82
except it also
 
83
has a lot more data
 
84
at the end of the file
 
85
"""
 
86
 
 
87
_first_text = """\
 
88
a bit of text, that
 
89
does not have much in
 
90
common with the next text
 
91
"""
 
92
 
 
93
_second_text = """\
 
94
some more bit of text, that
 
95
does not have much in
 
96
common with the previous text
 
97
and has some extra text
 
98
"""
 
99
 
 
100
 
 
101
_third_text = """\
 
102
a bit of text, that
 
103
has some in common with the previous text
 
104
and has some extra text
 
105
and not have much in
 
106
common with the next text
 
107
"""
 
108
 
 
109
_fourth_text = """\
 
110
123456789012345
 
111
same rabin hash
 
112
123456789012345
 
113
same rabin hash
 
114
123456789012345
 
115
same rabin hash
 
116
123456789012345
 
117
same rabin hash
 
118
"""
 
119
 
 
120
class TestMakeAndApplyDelta(tests.TestCase):
 
121
 
 
122
    scenarios = module_scenarios()
 
123
    _gc_module = None # Set by load_tests
 
124
 
 
125
    def setUp(self):
 
126
        super(TestMakeAndApplyDelta, self).setUp()
 
127
        self.make_delta = self._gc_module.make_delta
 
128
        self.apply_delta = self._gc_module.apply_delta
 
129
        self.apply_delta_to_source = self._gc_module.apply_delta_to_source
 
130
 
 
131
    def test_make_delta_is_typesafe(self):
 
132
        self.make_delta('a string', 'another string')
 
133
 
 
134
        def _check_make_delta(string1, string2):
 
135
            self.assertRaises(TypeError, self.make_delta, string1, string2)
 
136
 
 
137
        _check_make_delta('a string', object())
 
138
        _check_make_delta('a string', u'not a string')
 
139
        _check_make_delta(object(), 'a string')
 
140
        _check_make_delta(u'not a string', 'a string')
 
141
 
 
142
    def test_make_noop_delta(self):
 
143
        ident_delta = self.make_delta(_text1, _text1)
 
144
        self.assertEqual('M\x90M', ident_delta)
 
145
        ident_delta = self.make_delta(_text2, _text2)
 
146
        self.assertEqual('N\x90N', ident_delta)
 
147
        ident_delta = self.make_delta(_text3, _text3)
 
148
        self.assertEqual('\x87\x01\x90\x87', ident_delta)
 
149
 
 
150
    def assertDeltaIn(self, delta1, delta2, delta):
 
151
        """Make sure that the delta bytes match one of the expectations."""
 
152
        # In general, the python delta matcher gives different results than the
 
153
        # pyrex delta matcher. Both should be valid deltas, though.
 
154
        if delta not in (delta1, delta2):
 
155
            self.fail("Delta bytes:\n"
 
156
                      "       %r\n"
 
157
                      "not in %r\n"
 
158
                      "    or %r"
 
159
                      % (delta, delta1, delta2))
 
160
 
 
161
    def test_make_delta(self):
 
162
        delta = self.make_delta(_text1, _text2)
 
163
        self.assertDeltaIn(
 
164
            'N\x90/\x1fdiffer from\nagainst other text\n',
 
165
            'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
 
166
            delta)
 
167
        delta = self.make_delta(_text2, _text1)
 
168
        self.assertDeltaIn(
 
169
            'M\x90/\x1ebe matched\nagainst other text\n',
 
170
            'M\x90\x1d\x1dwhich is meant to be matched\n\x91;\x13',
 
171
            delta)
 
172
        delta = self.make_delta(_text3, _text1)
 
173
        self.assertEqual('M\x90M', delta)
 
174
        delta = self.make_delta(_text3, _text2)
 
175
        self.assertDeltaIn(
 
176
            'N\x90/\x1fdiffer from\nagainst other text\n',
 
177
            'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
 
178
            delta)
 
179
 
 
180
    def test_make_delta_with_large_copies(self):
 
181
        # We want to have a copy that is larger than 64kB, which forces us to
 
182
        # issue multiple copy instructions.
 
183
        big_text = _text3 * 1220
 
184
        delta = self.make_delta(big_text, big_text)
 
185
        self.assertDeltaIn(
 
186
            '\xdc\x86\x0a'      # Encoding the length of the uncompressed text
 
187
            '\x80'              # Copy 64kB, starting at byte 0
 
188
            '\x84\x01'          # and another 64kB starting at 64kB
 
189
            '\xb4\x02\x5c\x83', # And the bit of tail.
 
190
            None,   # Both implementations should be identical
 
191
            delta)
 
192
 
 
193
    def test_apply_delta_is_typesafe(self):
 
194
        self.apply_delta(_text1, 'M\x90M')
 
195
        self.assertRaises(TypeError, self.apply_delta, object(), 'M\x90M')
 
196
        self.assertRaises(TypeError, self.apply_delta,
 
197
                          unicode(_text1), 'M\x90M')
 
198
        self.assertRaises(TypeError, self.apply_delta, _text1, u'M\x90M')
 
199
        self.assertRaises(TypeError, self.apply_delta, _text1, object())
 
200
 
 
201
    def test_apply_delta(self):
 
202
        target = self.apply_delta(_text1,
 
203
                    'N\x90/\x1fdiffer from\nagainst other text\n')
 
204
        self.assertEqual(_text2, target)
 
205
        target = self.apply_delta(_text2,
 
206
                    'M\x90/\x1ebe matched\nagainst other text\n')
 
207
        self.assertEqual(_text1, target)
 
208
 
 
209
    def test_apply_delta_to_source_is_safe(self):
 
210
        self.assertRaises(TypeError,
 
211
            self.apply_delta_to_source, object(), 0, 1)
 
212
        self.assertRaises(TypeError,
 
213
            self.apply_delta_to_source, u'unicode str', 0, 1)
 
214
        # end > length
 
215
        self.assertRaises(ValueError,
 
216
            self.apply_delta_to_source, 'foo', 1, 4)
 
217
        # start > length
 
218
        self.assertRaises(ValueError,
 
219
            self.apply_delta_to_source, 'foo', 5, 3)
 
220
        # start > end
 
221
        self.assertRaises(ValueError,
 
222
            self.apply_delta_to_source, 'foo', 3, 2)
 
223
 
 
224
    def test_apply_delta_to_source(self):
 
225
        source_and_delta = (_text1
 
226
                            + 'N\x90/\x1fdiffer from\nagainst other text\n')
 
227
        self.assertEqual(_text2, self.apply_delta_to_source(source_and_delta,
 
228
                                    len(_text1), len(source_and_delta)))
 
229
 
 
230
 
 
231
class TestMakeAndApplyCompatible(tests.TestCase):
 
232
 
 
233
    scenarios = two_way_scenarios()
 
234
 
 
235
    make_delta = None # Set by load_tests
 
236
    apply_delta = None # Set by load_tests
 
237
 
 
238
    def assertMakeAndApply(self, source, target):
 
239
        """Assert that generating a delta and applying gives success."""
 
240
        delta = self.make_delta(source, target)
 
241
        bytes = self.apply_delta(source, delta)
 
242
        self.assertEqualDiff(target, bytes)
 
243
 
 
244
    def test_direct(self):
 
245
        self.assertMakeAndApply(_text1, _text2)
 
246
        self.assertMakeAndApply(_text2, _text1)
 
247
        self.assertMakeAndApply(_text1, _text3)
 
248
        self.assertMakeAndApply(_text3, _text1)
 
249
        self.assertMakeAndApply(_text2, _text3)
 
250
        self.assertMakeAndApply(_text3, _text2)
 
251
 
 
252
 
 
253
class TestDeltaIndex(tests.TestCase):
 
254
 
 
255
    def setUp(self):
 
256
        super(TestDeltaIndex, self).setUp()
 
257
        # This test isn't multiplied, because we only have DeltaIndex for the
 
258
        # compiled form
 
259
        # We call this here, because _test_needs_features happens after setUp
 
260
        self.requireFeature(compiled_groupcompress_feature)
 
261
        self._gc_module = compiled_groupcompress_feature.module
 
262
 
 
263
    def test_repr(self):
 
264
        di = self._gc_module.DeltaIndex('test text\n')
 
265
        self.assertEqual('DeltaIndex(1, 10)', repr(di))
 
266
 
 
267
    def test_first_add_source_doesnt_index_until_make_delta(self):
 
268
        di = self._gc_module.DeltaIndex()
 
269
        self.assertFalse(di._has_index())
 
270
        di.add_source(_text1, 0)
 
271
        self.assertFalse(di._has_index())
 
272
        # However, asking to make a delta will trigger the index to be
 
273
        # generated, and will generate a proper delta
 
274
        delta = di.make_delta(_text2)
 
275
        self.assertTrue(di._has_index())
 
276
        self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
 
277
 
 
278
    def test_second_add_source_triggers_make_index(self):
 
279
        di = self._gc_module.DeltaIndex()
 
280
        self.assertFalse(di._has_index())
 
281
        di.add_source(_text1, 0)
 
282
        self.assertFalse(di._has_index())
 
283
        di.add_source(_text2, 0)
 
284
        self.assertTrue(di._has_index())
 
285
 
 
286
    def test_make_delta(self):
 
287
        di = self._gc_module.DeltaIndex(_text1)
 
288
        delta = di.make_delta(_text2)
 
289
        self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
 
290
 
 
291
    def test_delta_against_multiple_sources(self):
 
292
        di = self._gc_module.DeltaIndex()
 
293
        di.add_source(_first_text, 0)
 
294
        self.assertEqual(len(_first_text), di._source_offset)
 
295
        di.add_source(_second_text, 0)
 
296
        self.assertEqual(len(_first_text) + len(_second_text),
 
297
                         di._source_offset)
 
298
        delta = di.make_delta(_third_text)
 
299
        result = self._gc_module.apply_delta(_first_text + _second_text, delta)
 
300
        self.assertEqualDiff(_third_text, result)
 
301
        self.assertEqual('\x85\x01\x90\x14\x0chas some in '
 
302
                         '\x91v6\x03and\x91d"\x91:\n', delta)
 
303
 
 
304
    def test_delta_with_offsets(self):
 
305
        di = self._gc_module.DeltaIndex()
 
306
        di.add_source(_first_text, 5)
 
307
        self.assertEqual(len(_first_text) + 5, di._source_offset)
 
308
        di.add_source(_second_text, 10)
 
309
        self.assertEqual(len(_first_text) + len(_second_text) + 15,
 
310
                         di._source_offset)
 
311
        delta = di.make_delta(_third_text)
 
312
        self.assertIsNot(None, delta)
 
313
        result = self._gc_module.apply_delta(
 
314
            '12345' + _first_text + '1234567890' + _second_text, delta)
 
315
        self.assertIsNot(None, result)
 
316
        self.assertEqualDiff(_third_text, result)
 
317
        self.assertEqual('\x85\x01\x91\x05\x14\x0chas some in '
 
318
                         '\x91\x856\x03and\x91s"\x91?\n', delta)
 
319
 
 
320
    def test_delta_with_delta_bytes(self):
 
321
        di = self._gc_module.DeltaIndex()
 
322
        source = _first_text
 
323
        di.add_source(_first_text, 0)
 
324
        self.assertEqual(len(_first_text), di._source_offset)
 
325
        delta = di.make_delta(_second_text)
 
326
        self.assertEqual('h\tsome more\x91\x019'
 
327
                         '&previous text\nand has some extra text\n', delta)
 
328
        di.add_delta_source(delta, 0)
 
329
        source += delta
 
330
        self.assertEqual(len(_first_text) + len(delta), di._source_offset)
 
331
        second_delta = di.make_delta(_third_text)
 
332
        result = self._gc_module.apply_delta(source, second_delta)
 
333
        self.assertEqualDiff(_third_text, result)
 
334
        # We should be able to match against the
 
335
        # 'previous text\nand has some...'  that was part of the delta bytes
 
336
        # Note that we don't match the 'common with the', because it isn't long
 
337
        # enough to match in the original text, and those bytes are not present
 
338
        # in the delta for the second text.
 
339
        self.assertEqual('\x85\x01\x90\x14\x1chas some in common with the '
 
340
                         '\x91S&\x03and\x91\x18,', second_delta)
 
341
        # Add this delta, and create a new delta for the same text. We should
 
342
        # find the remaining text, and only insert the short 'and' text.
 
343
        di.add_delta_source(second_delta, 0)
 
344
        source += second_delta
 
345
        third_delta = di.make_delta(_third_text)
 
346
        result = self._gc_module.apply_delta(source, third_delta)
 
347
        self.assertEqualDiff(_third_text, result)
 
348
        self.assertEqual('\x85\x01\x90\x14\x91\x7e\x1c'
 
349
                         '\x91S&\x03and\x91\x18,', third_delta)
 
350
        # Now create a delta, which we know won't be able to be 'fit' into the
 
351
        # existing index
 
352
        fourth_delta = di.make_delta(_fourth_text)
 
353
        self.assertEqual(_fourth_text,
 
354
                         self._gc_module.apply_delta(source, fourth_delta))
 
355
        self.assertEqual('\x80\x01'
 
356
                         '\x7f123456789012345\nsame rabin hash\n'
 
357
                         '123456789012345\nsame rabin hash\n'
 
358
                         '123456789012345\nsame rabin hash\n'
 
359
                         '123456789012345\nsame rabin hash'
 
360
                         '\x01\n', fourth_delta)
 
361
        di.add_delta_source(fourth_delta, 0)
 
362
        source += fourth_delta
 
363
        # With the next delta, everything should be found
 
364
        fifth_delta = di.make_delta(_fourth_text)
 
365
        self.assertEqual(_fourth_text,
 
366
                         self._gc_module.apply_delta(source, fifth_delta))
 
367
        self.assertEqual('\x80\x01\x91\xa7\x7f\x01\n', fifth_delta)
 
368
 
 
369
 
 
370
class TestCopyInstruction(tests.TestCase):
 
371
 
 
372
    def assertEncode(self, expected, offset, length):
 
373
        bytes = _groupcompress_py.encode_copy_instruction(offset, length)
 
374
        if expected != bytes:
 
375
            self.assertEqual([hex(ord(e)) for e in expected],
 
376
                             [hex(ord(b)) for b in bytes])
 
377
 
 
378
    def assertDecode(self, exp_offset, exp_length, exp_newpos, bytes, pos):
 
379
        cmd = ord(bytes[pos])
 
380
        pos += 1
 
381
        out = _groupcompress_py.decode_copy_instruction(bytes, cmd, pos)
 
382
        self.assertEqual((exp_offset, exp_length, exp_newpos), out)
 
383
 
 
384
    def test_encode_no_length(self):
 
385
        self.assertEncode('\x80', 0, 64*1024)
 
386
        self.assertEncode('\x81\x01', 1, 64*1024)
 
387
        self.assertEncode('\x81\x0a', 10, 64*1024)
 
388
        self.assertEncode('\x81\xff', 255, 64*1024)
 
389
        self.assertEncode('\x82\x01', 256, 64*1024)
 
390
        self.assertEncode('\x83\x01\x01', 257, 64*1024)
 
391
        self.assertEncode('\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64*1024)
 
392
        self.assertEncode('\x8E\xff\xff\xff', 0xFFFFFF00, 64*1024)
 
393
        self.assertEncode('\x8D\xff\xff\xff', 0xFFFF00FF, 64*1024)
 
394
        self.assertEncode('\x8B\xff\xff\xff', 0xFF00FFFF, 64*1024)
 
395
        self.assertEncode('\x87\xff\xff\xff', 0x00FFFFFF, 64*1024)
 
396
        self.assertEncode('\x8F\x04\x03\x02\x01', 0x01020304, 64*1024)
 
397
 
 
398
    def test_encode_no_offset(self):
 
399
        self.assertEncode('\x90\x01', 0, 1)
 
400
        self.assertEncode('\x90\x0a', 0, 10)
 
401
        self.assertEncode('\x90\xff', 0, 255)
 
402
        self.assertEncode('\xA0\x01', 0, 256)
 
403
        self.assertEncode('\xB0\x01\x01', 0, 257)
 
404
        self.assertEncode('\xB0\xff\xff', 0, 0xFFFF)
 
405
        # Special case, if copy == 64KiB, then we store exactly 0
 
406
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
 
407
        # about that, as we would never actually copy 0 bytes
 
408
        self.assertEncode('\x80', 0, 64*1024)
 
409
 
 
410
    def test_encode(self):
 
411
        self.assertEncode('\x91\x01\x01', 1, 1)
 
412
        self.assertEncode('\x91\x09\x0a', 9, 10)
 
413
        self.assertEncode('\x91\xfe\xff', 254, 255)
 
414
        self.assertEncode('\xA2\x02\x01', 512, 256)
 
415
        self.assertEncode('\xB3\x02\x01\x01\x01', 258, 257)
 
416
        self.assertEncode('\xB0\x01\x01', 0, 257)
 
417
        # Special case, if copy == 64KiB, then we store exactly 0
 
418
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
 
419
        # about that, as we would never actually copy 0 bytes
 
420
        self.assertEncode('\x81\x0a', 10, 64*1024)
 
421
 
 
422
    def test_decode_no_length(self):
 
423
        # If length is 0, it is interpreted as 64KiB
 
424
        # The shortest possible instruction is a copy of 64KiB from offset 0
 
425
        self.assertDecode(0, 65536, 1, '\x80', 0)
 
426
        self.assertDecode(1, 65536, 2, '\x81\x01', 0)
 
427
        self.assertDecode(10, 65536, 2, '\x81\x0a', 0)
 
428
        self.assertDecode(255, 65536, 2, '\x81\xff', 0)
 
429
        self.assertDecode(256, 65536, 2, '\x82\x01', 0)
 
430
        self.assertDecode(257, 65536, 3, '\x83\x01\x01', 0)
 
431
        self.assertDecode(0xFFFFFFFF, 65536, 5, '\x8F\xff\xff\xff\xff', 0)
 
432
        self.assertDecode(0xFFFFFF00, 65536, 4, '\x8E\xff\xff\xff', 0)
 
433
        self.assertDecode(0xFFFF00FF, 65536, 4, '\x8D\xff\xff\xff', 0)
 
434
        self.assertDecode(0xFF00FFFF, 65536, 4, '\x8B\xff\xff\xff', 0)
 
435
        self.assertDecode(0x00FFFFFF, 65536, 4, '\x87\xff\xff\xff', 0)
 
436
        self.assertDecode(0x01020304, 65536, 5, '\x8F\x04\x03\x02\x01', 0)
 
437
 
 
438
    def test_decode_no_offset(self):
 
439
        self.assertDecode(0, 1, 2, '\x90\x01', 0)
 
440
        self.assertDecode(0, 10, 2, '\x90\x0a', 0)
 
441
        self.assertDecode(0, 255, 2, '\x90\xff', 0)
 
442
        self.assertDecode(0, 256, 2, '\xA0\x01', 0)
 
443
        self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
 
444
        self.assertDecode(0, 65535, 3, '\xB0\xff\xff', 0)
 
445
        # Special case, if copy == 64KiB, then we store exactly 0
 
446
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
 
447
        # about that, as we would never actually copy 0 bytes
 
448
        self.assertDecode(0, 65536, 1, '\x80', 0)
 
449
 
 
450
    def test_decode(self):
 
451
        self.assertDecode(1, 1, 3, '\x91\x01\x01', 0)
 
452
        self.assertDecode(9, 10, 3, '\x91\x09\x0a', 0)
 
453
        self.assertDecode(254, 255, 3, '\x91\xfe\xff', 0)
 
454
        self.assertDecode(512, 256, 3, '\xA2\x02\x01', 0)
 
455
        self.assertDecode(258, 257, 5, '\xB3\x02\x01\x01\x01', 0)
 
456
        self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
 
457
 
 
458
    def test_decode_not_start(self):
 
459
        self.assertDecode(1, 1, 6, 'abc\x91\x01\x01def', 3)
 
460
        self.assertDecode(9, 10, 5, 'ab\x91\x09\x0ade', 2)
 
461
        self.assertDecode(254, 255, 6, 'not\x91\xfe\xffcopy', 3)
 
462
 
 
463
 
 
464
class TestBase128Int(tests.TestCase):
 
465
 
 
466
    scenarios = module_scenarios()
 
467
 
 
468
    _gc_module = None # Set by load_tests
 
469
 
 
470
    def assertEqualEncode(self, bytes, val):
 
471
        self.assertEqual(bytes, self._gc_module.encode_base128_int(val))
 
472
 
 
473
    def assertEqualDecode(self, val, num_decode, bytes):
 
474
        self.assertEqual((val, num_decode),
 
475
                         self._gc_module.decode_base128_int(bytes))
 
476
 
 
477
    def test_encode(self):
 
478
        self.assertEqualEncode('\x01', 1)
 
479
        self.assertEqualEncode('\x02', 2)
 
480
        self.assertEqualEncode('\x7f', 127)
 
481
        self.assertEqualEncode('\x80\x01', 128)
 
482
        self.assertEqualEncode('\xff\x01', 255)
 
483
        self.assertEqualEncode('\x80\x02', 256)
 
484
        self.assertEqualEncode('\xff\xff\xff\xff\x0f', 0xFFFFFFFF)
 
485
 
 
486
    def test_decode(self):
 
487
        self.assertEqualDecode(1, 1, '\x01')
 
488
        self.assertEqualDecode(2, 1, '\x02')
 
489
        self.assertEqualDecode(127, 1, '\x7f')
 
490
        self.assertEqualDecode(128, 2, '\x80\x01')
 
491
        self.assertEqualDecode(255, 2, '\xff\x01')
 
492
        self.assertEqualDecode(256, 2, '\x80\x02')
 
493
        self.assertEqualDecode(0xFFFFFFFF, 5, '\xff\xff\xff\xff\x0f')
 
494
 
 
495
    def test_decode_with_trailing_bytes(self):
 
496
        self.assertEqualDecode(1, 1, '\x01abcdef')
 
497
        self.assertEqualDecode(127, 1, '\x7f\x01')
 
498
        self.assertEqualDecode(128, 2, '\x80\x01abcdef')
 
499
        self.assertEqualDecode(255, 2, '\xff\x01\xff')
 
500
 
 
501