~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test__groupcompress.py

Merge description into dont-add-conflict-helpers

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2008, 2009 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for the python and pyrex extensions of groupcompress"""
 
18
 
 
19
from bzrlib import (
 
20
    groupcompress,
 
21
    _groupcompress_py,
 
22
    tests,
 
23
    )
 
24
 
 
25
 
 
26
def load_tests(standard_tests, module, loader):
 
27
    """Parameterize tests for all versions of groupcompress."""
 
28
    two_way_scenarios = [
 
29
        ('PP', {'make_delta': _groupcompress_py.make_delta,
 
30
                'apply_delta': _groupcompress_py.apply_delta})
 
31
        ]
 
32
    scenarios = [
 
33
        ('python', {'_gc_module': _groupcompress_py}),
 
34
        ]
 
35
    if compiled_groupcompress_feature.available():
 
36
        gc_module = compiled_groupcompress_feature.module
 
37
        scenarios.append(('C',
 
38
            {'_gc_module': gc_module}))
 
39
        two_way_scenarios.extend([
 
40
            ('CC', {'make_delta': gc_module.make_delta,
 
41
                    'apply_delta': gc_module.apply_delta}),
 
42
            ('PC', {'make_delta': _groupcompress_py.make_delta,
 
43
                    'apply_delta': gc_module.apply_delta}),
 
44
            ('CP', {'make_delta': gc_module.make_delta,
 
45
                    'apply_delta': _groupcompress_py.apply_delta}),
 
46
            ])
 
47
    to_adapt, result = tests.split_suite_by_condition(
 
48
        standard_tests, tests.condition_isinstance((TestMakeAndApplyDelta,
 
49
                                                    TestBase128Int)))
 
50
    result = tests.multiply_tests(to_adapt, scenarios, result)
 
51
    to_adapt, result = tests.split_suite_by_condition(result,
 
52
        tests.condition_isinstance(TestMakeAndApplyCompatible))
 
53
    result = tests.multiply_tests(to_adapt, two_way_scenarios, result)
 
54
    return result
 
55
 
 
56
 
 
57
compiled_groupcompress_feature = tests.ModuleAvailableFeature(
 
58
                                    'bzrlib._groupcompress_pyx')
 
59
 
 
60
_text1 = """\
 
61
This is a bit
 
62
of source text
 
63
which is meant to be matched
 
64
against other text
 
65
"""
 
66
 
 
67
_text2 = """\
 
68
This is a bit
 
69
of source text
 
70
which is meant to differ from
 
71
against other text
 
72
"""
 
73
 
 
74
_text3 = """\
 
75
This is a bit
 
76
of source text
 
77
which is meant to be matched
 
78
against other text
 
79
except it also
 
80
has a lot more data
 
81
at the end of the file
 
82
"""
 
83
 
 
84
_first_text = """\
 
85
a bit of text, that
 
86
does not have much in
 
87
common with the next text
 
88
"""
 
89
 
 
90
_second_text = """\
 
91
some more bit of text, that
 
92
does not have much in
 
93
common with the previous text
 
94
and has some extra text
 
95
"""
 
96
 
 
97
 
 
98
_third_text = """\
 
99
a bit of text, that
 
100
has some in common with the previous text
 
101
and has some extra text
 
102
and not have much in
 
103
common with the next text
 
104
"""
 
105
 
 
106
_fourth_text = """\
 
107
123456789012345
 
108
same rabin hash
 
109
123456789012345
 
110
same rabin hash
 
111
123456789012345
 
112
same rabin hash
 
113
123456789012345
 
114
same rabin hash
 
115
"""
 
116
 
 
117
class TestMakeAndApplyDelta(tests.TestCase):
 
118
 
 
119
    _gc_module = None # Set by load_tests
 
120
 
 
121
    def setUp(self):
 
122
        super(TestMakeAndApplyDelta, self).setUp()
 
123
        self.make_delta = self._gc_module.make_delta
 
124
        self.apply_delta = self._gc_module.apply_delta
 
125
        self.apply_delta_to_source = self._gc_module.apply_delta_to_source
 
126
 
 
127
    def test_make_delta_is_typesafe(self):
 
128
        self.make_delta('a string', 'another string')
 
129
 
 
130
        def _check_make_delta(string1, string2):
 
131
            self.assertRaises(TypeError, self.make_delta, string1, string2)
 
132
 
 
133
        _check_make_delta('a string', object())
 
134
        _check_make_delta('a string', u'not a string')
 
135
        _check_make_delta(object(), 'a string')
 
136
        _check_make_delta(u'not a string', 'a string')
 
137
 
 
138
    def test_make_noop_delta(self):
 
139
        ident_delta = self.make_delta(_text1, _text1)
 
140
        self.assertEqual('M\x90M', ident_delta)
 
141
        ident_delta = self.make_delta(_text2, _text2)
 
142
        self.assertEqual('N\x90N', ident_delta)
 
143
        ident_delta = self.make_delta(_text3, _text3)
 
144
        self.assertEqual('\x87\x01\x90\x87', ident_delta)
 
145
 
 
146
    def assertDeltaIn(self, delta1, delta2, delta):
 
147
        """Make sure that the delta bytes match one of the expectations."""
 
148
        # In general, the python delta matcher gives different results than the
 
149
        # pyrex delta matcher. Both should be valid deltas, though.
 
150
        if delta not in (delta1, delta2):
 
151
            self.fail("Delta bytes:\n"
 
152
                      "       %r\n"
 
153
                      "not in %r\n"
 
154
                      "    or %r"
 
155
                      % (delta, delta1, delta2))
 
156
 
 
157
    def test_make_delta(self):
 
158
        delta = self.make_delta(_text1, _text2)
 
159
        self.assertDeltaIn(
 
160
            'N\x90/\x1fdiffer from\nagainst other text\n',
 
161
            'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
 
162
            delta)
 
163
        delta = self.make_delta(_text2, _text1)
 
164
        self.assertDeltaIn(
 
165
            'M\x90/\x1ebe matched\nagainst other text\n',
 
166
            'M\x90\x1d\x1dwhich is meant to be matched\n\x91;\x13',
 
167
            delta)
 
168
        delta = self.make_delta(_text3, _text1)
 
169
        self.assertEqual('M\x90M', delta)
 
170
        delta = self.make_delta(_text3, _text2)
 
171
        self.assertDeltaIn(
 
172
            'N\x90/\x1fdiffer from\nagainst other text\n',
 
173
            'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
 
174
            delta)
 
175
 
 
176
    def test_make_delta_with_large_copies(self):
 
177
        # We want to have a copy that is larger than 64kB, which forces us to
 
178
        # issue multiple copy instructions.
 
179
        big_text = _text3 * 1220
 
180
        delta = self.make_delta(big_text, big_text)
 
181
        self.assertDeltaIn(
 
182
            '\xdc\x86\x0a'      # Encoding the length of the uncompressed text
 
183
            '\x80'              # Copy 64kB, starting at byte 0
 
184
            '\x84\x01'          # and another 64kB starting at 64kB
 
185
            '\xb4\x02\x5c\x83', # And the bit of tail.
 
186
            None,   # Both implementations should be identical
 
187
            delta)
 
188
 
 
189
    def test_apply_delta_is_typesafe(self):
 
190
        self.apply_delta(_text1, 'M\x90M')
 
191
        self.assertRaises(TypeError, self.apply_delta, object(), 'M\x90M')
 
192
        self.assertRaises(TypeError, self.apply_delta,
 
193
                          unicode(_text1), 'M\x90M')
 
194
        self.assertRaises(TypeError, self.apply_delta, _text1, u'M\x90M')
 
195
        self.assertRaises(TypeError, self.apply_delta, _text1, object())
 
196
 
 
197
    def test_apply_delta(self):
 
198
        target = self.apply_delta(_text1,
 
199
                    'N\x90/\x1fdiffer from\nagainst other text\n')
 
200
        self.assertEqual(_text2, target)
 
201
        target = self.apply_delta(_text2,
 
202
                    'M\x90/\x1ebe matched\nagainst other text\n')
 
203
        self.assertEqual(_text1, target)
 
204
 
 
205
    def test_apply_delta_to_source_is_safe(self):
 
206
        self.assertRaises(TypeError,
 
207
            self.apply_delta_to_source, object(), 0, 1)
 
208
        self.assertRaises(TypeError,
 
209
            self.apply_delta_to_source, u'unicode str', 0, 1)
 
210
        # end > length
 
211
        self.assertRaises(ValueError,
 
212
            self.apply_delta_to_source, 'foo', 1, 4)
 
213
        # start > length
 
214
        self.assertRaises(ValueError,
 
215
            self.apply_delta_to_source, 'foo', 5, 3)
 
216
        # start > end
 
217
        self.assertRaises(ValueError,
 
218
            self.apply_delta_to_source, 'foo', 3, 2)
 
219
 
 
220
    def test_apply_delta_to_source(self):
 
221
        source_and_delta = (_text1
 
222
                            + 'N\x90/\x1fdiffer from\nagainst other text\n')
 
223
        self.assertEqual(_text2, self.apply_delta_to_source(source_and_delta,
 
224
                                    len(_text1), len(source_and_delta)))
 
225
 
 
226
 
 
227
class TestMakeAndApplyCompatible(tests.TestCase):
 
228
 
 
229
    make_delta = None # Set by load_tests
 
230
    apply_delta = None # Set by load_tests
 
231
 
 
232
    def assertMakeAndApply(self, source, target):
 
233
        """Assert that generating a delta and applying gives success."""
 
234
        delta = self.make_delta(source, target)
 
235
        bytes = self.apply_delta(source, delta)
 
236
        self.assertEqualDiff(target, bytes)
 
237
 
 
238
    def test_direct(self):
 
239
        self.assertMakeAndApply(_text1, _text2)
 
240
        self.assertMakeAndApply(_text2, _text1)
 
241
        self.assertMakeAndApply(_text1, _text3)
 
242
        self.assertMakeAndApply(_text3, _text1)
 
243
        self.assertMakeAndApply(_text2, _text3)
 
244
        self.assertMakeAndApply(_text3, _text2)
 
245
 
 
246
 
 
247
class TestDeltaIndex(tests.TestCase):
 
248
 
 
249
    def setUp(self):
 
250
        super(TestDeltaIndex, self).setUp()
 
251
        # This test isn't multiplied, because we only have DeltaIndex for the
 
252
        # compiled form
 
253
        # We call this here, because _test_needs_features happens after setUp
 
254
        self.requireFeature(compiled_groupcompress_feature)
 
255
        self._gc_module = compiled_groupcompress_feature.module
 
256
 
 
257
    def test_repr(self):
 
258
        di = self._gc_module.DeltaIndex('test text\n')
 
259
        self.assertEqual('DeltaIndex(1, 10)', repr(di))
 
260
 
 
261
    def test_first_add_source_doesnt_index_until_make_delta(self):
 
262
        di = self._gc_module.DeltaIndex()
 
263
        self.assertFalse(di._has_index())
 
264
        di.add_source(_text1, 0)
 
265
        self.assertFalse(di._has_index())
 
266
        # However, asking to make a delta will trigger the index to be
 
267
        # generated, and will generate a proper delta
 
268
        delta = di.make_delta(_text2)
 
269
        self.assertTrue(di._has_index())
 
270
        self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
 
271
 
 
272
    def test_second_add_source_triggers_make_index(self):
 
273
        di = self._gc_module.DeltaIndex()
 
274
        self.assertFalse(di._has_index())
 
275
        di.add_source(_text1, 0)
 
276
        self.assertFalse(di._has_index())
 
277
        di.add_source(_text2, 0)
 
278
        self.assertTrue(di._has_index())
 
279
 
 
280
    def test_make_delta(self):
 
281
        di = self._gc_module.DeltaIndex(_text1)
 
282
        delta = di.make_delta(_text2)
 
283
        self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
 
284
 
 
285
    def test_delta_against_multiple_sources(self):
 
286
        di = self._gc_module.DeltaIndex()
 
287
        di.add_source(_first_text, 0)
 
288
        self.assertEqual(len(_first_text), di._source_offset)
 
289
        di.add_source(_second_text, 0)
 
290
        self.assertEqual(len(_first_text) + len(_second_text),
 
291
                         di._source_offset)
 
292
        delta = di.make_delta(_third_text)
 
293
        result = self._gc_module.apply_delta(_first_text + _second_text, delta)
 
294
        self.assertEqualDiff(_third_text, result)
 
295
        self.assertEqual('\x85\x01\x90\x14\x0chas some in '
 
296
                         '\x91v6\x03and\x91d"\x91:\n', delta)
 
297
 
 
298
    def test_delta_with_offsets(self):
 
299
        di = self._gc_module.DeltaIndex()
 
300
        di.add_source(_first_text, 5)
 
301
        self.assertEqual(len(_first_text) + 5, di._source_offset)
 
302
        di.add_source(_second_text, 10)
 
303
        self.assertEqual(len(_first_text) + len(_second_text) + 15,
 
304
                         di._source_offset)
 
305
        delta = di.make_delta(_third_text)
 
306
        self.assertIsNot(None, delta)
 
307
        result = self._gc_module.apply_delta(
 
308
            '12345' + _first_text + '1234567890' + _second_text, delta)
 
309
        self.assertIsNot(None, result)
 
310
        self.assertEqualDiff(_third_text, result)
 
311
        self.assertEqual('\x85\x01\x91\x05\x14\x0chas some in '
 
312
                         '\x91\x856\x03and\x91s"\x91?\n', delta)
 
313
 
 
314
    def test_delta_with_delta_bytes(self):
 
315
        di = self._gc_module.DeltaIndex()
 
316
        source = _first_text
 
317
        di.add_source(_first_text, 0)
 
318
        self.assertEqual(len(_first_text), di._source_offset)
 
319
        delta = di.make_delta(_second_text)
 
320
        self.assertEqual('h\tsome more\x91\x019'
 
321
                         '&previous text\nand has some extra text\n', delta)
 
322
        di.add_delta_source(delta, 0)
 
323
        source += delta
 
324
        self.assertEqual(len(_first_text) + len(delta), di._source_offset)
 
325
        second_delta = di.make_delta(_third_text)
 
326
        result = self._gc_module.apply_delta(source, second_delta)
 
327
        self.assertEqualDiff(_third_text, result)
 
328
        # We should be able to match against the
 
329
        # 'previous text\nand has some...'  that was part of the delta bytes
 
330
        # Note that we don't match the 'common with the', because it isn't long
 
331
        # enough to match in the original text, and those bytes are not present
 
332
        # in the delta for the second text.
 
333
        self.assertEqual('\x85\x01\x90\x14\x1chas some in common with the '
 
334
                         '\x91S&\x03and\x91\x18,', second_delta)
 
335
        # Add this delta, and create a new delta for the same text. We should
 
336
        # find the remaining text, and only insert the short 'and' text.
 
337
        di.add_delta_source(second_delta, 0)
 
338
        source += second_delta
 
339
        third_delta = di.make_delta(_third_text)
 
340
        result = self._gc_module.apply_delta(source, third_delta)
 
341
        self.assertEqualDiff(_third_text, result)
 
342
        self.assertEqual('\x85\x01\x90\x14\x91\x7e\x1c'
 
343
                         '\x91S&\x03and\x91\x18,', third_delta)
 
344
        # Now create a delta, which we know won't be able to be 'fit' into the
 
345
        # existing index
 
346
        fourth_delta = di.make_delta(_fourth_text)
 
347
        self.assertEqual(_fourth_text,
 
348
                         self._gc_module.apply_delta(source, fourth_delta))
 
349
        self.assertEqual('\x80\x01'
 
350
                         '\x7f123456789012345\nsame rabin hash\n'
 
351
                         '123456789012345\nsame rabin hash\n'
 
352
                         '123456789012345\nsame rabin hash\n'
 
353
                         '123456789012345\nsame rabin hash'
 
354
                         '\x01\n', fourth_delta)
 
355
        di.add_delta_source(fourth_delta, 0)
 
356
        source += fourth_delta
 
357
        # With the next delta, everything should be found
 
358
        fifth_delta = di.make_delta(_fourth_text)
 
359
        self.assertEqual(_fourth_text,
 
360
                         self._gc_module.apply_delta(source, fifth_delta))
 
361
        self.assertEqual('\x80\x01\x91\xa7\x7f\x01\n', fifth_delta)
 
362
 
 
363
 
 
364
class TestCopyInstruction(tests.TestCase):
 
365
 
 
366
    def assertEncode(self, expected, offset, length):
 
367
        bytes = _groupcompress_py.encode_copy_instruction(offset, length)
 
368
        if expected != bytes:
 
369
            self.assertEqual([hex(ord(e)) for e in expected],
 
370
                             [hex(ord(b)) for b in bytes])
 
371
 
 
372
    def assertDecode(self, exp_offset, exp_length, exp_newpos, bytes, pos):
 
373
        cmd = ord(bytes[pos])
 
374
        pos += 1
 
375
        out = _groupcompress_py.decode_copy_instruction(bytes, cmd, pos)
 
376
        self.assertEqual((exp_offset, exp_length, exp_newpos), out)
 
377
 
 
378
    def test_encode_no_length(self):
 
379
        self.assertEncode('\x80', 0, 64*1024)
 
380
        self.assertEncode('\x81\x01', 1, 64*1024)
 
381
        self.assertEncode('\x81\x0a', 10, 64*1024)
 
382
        self.assertEncode('\x81\xff', 255, 64*1024)
 
383
        self.assertEncode('\x82\x01', 256, 64*1024)
 
384
        self.assertEncode('\x83\x01\x01', 257, 64*1024)
 
385
        self.assertEncode('\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64*1024)
 
386
        self.assertEncode('\x8E\xff\xff\xff', 0xFFFFFF00, 64*1024)
 
387
        self.assertEncode('\x8D\xff\xff\xff', 0xFFFF00FF, 64*1024)
 
388
        self.assertEncode('\x8B\xff\xff\xff', 0xFF00FFFF, 64*1024)
 
389
        self.assertEncode('\x87\xff\xff\xff', 0x00FFFFFF, 64*1024)
 
390
        self.assertEncode('\x8F\x04\x03\x02\x01', 0x01020304, 64*1024)
 
391
 
 
392
    def test_encode_no_offset(self):
 
393
        self.assertEncode('\x90\x01', 0, 1)
 
394
        self.assertEncode('\x90\x0a', 0, 10)
 
395
        self.assertEncode('\x90\xff', 0, 255)
 
396
        self.assertEncode('\xA0\x01', 0, 256)
 
397
        self.assertEncode('\xB0\x01\x01', 0, 257)
 
398
        self.assertEncode('\xB0\xff\xff', 0, 0xFFFF)
 
399
        # Special case, if copy == 64KiB, then we store exactly 0
 
400
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
 
401
        # about that, as we would never actually copy 0 bytes
 
402
        self.assertEncode('\x80', 0, 64*1024)
 
403
 
 
404
    def test_encode(self):
 
405
        self.assertEncode('\x91\x01\x01', 1, 1)
 
406
        self.assertEncode('\x91\x09\x0a', 9, 10)
 
407
        self.assertEncode('\x91\xfe\xff', 254, 255)
 
408
        self.assertEncode('\xA2\x02\x01', 512, 256)
 
409
        self.assertEncode('\xB3\x02\x01\x01\x01', 258, 257)
 
410
        self.assertEncode('\xB0\x01\x01', 0, 257)
 
411
        # Special case, if copy == 64KiB, then we store exactly 0
 
412
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
 
413
        # about that, as we would never actually copy 0 bytes
 
414
        self.assertEncode('\x81\x0a', 10, 64*1024)
 
415
 
 
416
    def test_decode_no_length(self):
 
417
        # If length is 0, it is interpreted as 64KiB
 
418
        # The shortest possible instruction is a copy of 64KiB from offset 0
 
419
        self.assertDecode(0, 65536, 1, '\x80', 0)
 
420
        self.assertDecode(1, 65536, 2, '\x81\x01', 0)
 
421
        self.assertDecode(10, 65536, 2, '\x81\x0a', 0)
 
422
        self.assertDecode(255, 65536, 2, '\x81\xff', 0)
 
423
        self.assertDecode(256, 65536, 2, '\x82\x01', 0)
 
424
        self.assertDecode(257, 65536, 3, '\x83\x01\x01', 0)
 
425
        self.assertDecode(0xFFFFFFFF, 65536, 5, '\x8F\xff\xff\xff\xff', 0)
 
426
        self.assertDecode(0xFFFFFF00, 65536, 4, '\x8E\xff\xff\xff', 0)
 
427
        self.assertDecode(0xFFFF00FF, 65536, 4, '\x8D\xff\xff\xff', 0)
 
428
        self.assertDecode(0xFF00FFFF, 65536, 4, '\x8B\xff\xff\xff', 0)
 
429
        self.assertDecode(0x00FFFFFF, 65536, 4, '\x87\xff\xff\xff', 0)
 
430
        self.assertDecode(0x01020304, 65536, 5, '\x8F\x04\x03\x02\x01', 0)
 
431
 
 
432
    def test_decode_no_offset(self):
 
433
        self.assertDecode(0, 1, 2, '\x90\x01', 0)
 
434
        self.assertDecode(0, 10, 2, '\x90\x0a', 0)
 
435
        self.assertDecode(0, 255, 2, '\x90\xff', 0)
 
436
        self.assertDecode(0, 256, 2, '\xA0\x01', 0)
 
437
        self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
 
438
        self.assertDecode(0, 65535, 3, '\xB0\xff\xff', 0)
 
439
        # Special case, if copy == 64KiB, then we store exactly 0
 
440
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
 
441
        # about that, as we would never actually copy 0 bytes
 
442
        self.assertDecode(0, 65536, 1, '\x80', 0)
 
443
 
 
444
    def test_decode(self):
 
445
        self.assertDecode(1, 1, 3, '\x91\x01\x01', 0)
 
446
        self.assertDecode(9, 10, 3, '\x91\x09\x0a', 0)
 
447
        self.assertDecode(254, 255, 3, '\x91\xfe\xff', 0)
 
448
        self.assertDecode(512, 256, 3, '\xA2\x02\x01', 0)
 
449
        self.assertDecode(258, 257, 5, '\xB3\x02\x01\x01\x01', 0)
 
450
        self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
 
451
 
 
452
    def test_decode_not_start(self):
 
453
        self.assertDecode(1, 1, 6, 'abc\x91\x01\x01def', 3)
 
454
        self.assertDecode(9, 10, 5, 'ab\x91\x09\x0ade', 2)
 
455
        self.assertDecode(254, 255, 6, 'not\x91\xfe\xffcopy', 3)
 
456
 
 
457
 
 
458
class TestBase128Int(tests.TestCase):
 
459
 
 
460
    _gc_module = None # Set by load_tests
 
461
 
 
462
    def assertEqualEncode(self, bytes, val):
 
463
        self.assertEqual(bytes, self._gc_module.encode_base128_int(val))
 
464
 
 
465
    def assertEqualDecode(self, val, num_decode, bytes):
 
466
        self.assertEqual((val, num_decode),
 
467
                         self._gc_module.decode_base128_int(bytes))
 
468
 
 
469
    def test_encode(self):
 
470
        self.assertEqualEncode('\x01', 1)
 
471
        self.assertEqualEncode('\x02', 2)
 
472
        self.assertEqualEncode('\x7f', 127)
 
473
        self.assertEqualEncode('\x80\x01', 128)
 
474
        self.assertEqualEncode('\xff\x01', 255)
 
475
        self.assertEqualEncode('\x80\x02', 256)
 
476
        self.assertEqualEncode('\xff\xff\xff\xff\x0f', 0xFFFFFFFF)
 
477
 
 
478
    def test_decode(self):
 
479
        self.assertEqualDecode(1, 1, '\x01')
 
480
        self.assertEqualDecode(2, 1, '\x02')
 
481
        self.assertEqualDecode(127, 1, '\x7f')
 
482
        self.assertEqualDecode(128, 2, '\x80\x01')
 
483
        self.assertEqualDecode(255, 2, '\xff\x01')
 
484
        self.assertEqualDecode(256, 2, '\x80\x02')
 
485
        self.assertEqualDecode(0xFFFFFFFF, 5, '\xff\xff\xff\xff\x0f')
 
486
 
 
487
    def test_decode_with_trailing_bytes(self):
 
488
        self.assertEqualDecode(1, 1, '\x01abcdef')
 
489
        self.assertEqualDecode(127, 1, '\x7f\x01')
 
490
        self.assertEqualDecode(128, 2, '\x80\x01abcdef')
 
491
        self.assertEqualDecode(255, 2, '\xff\x01\xff')
 
492
 
 
493