~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test__groupcompress.py

(vila) Open 2.4.3 for bug fixes (Vincent Ladeuil)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2008-2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for the python and pyrex extensions of groupcompress"""
 
18
 
 
19
from bzrlib import (
 
20
    _groupcompress_py,
 
21
    tests,
 
22
    )
 
23
from bzrlib.tests.scenarios import (
 
24
    load_tests_apply_scenarios,
 
25
    )
 
26
 
 
27
 
 
28
def module_scenarios():
 
29
    scenarios = [
 
30
        ('python', {'_gc_module': _groupcompress_py}),
 
31
        ]
 
32
    if compiled_groupcompress_feature.available():
 
33
        gc_module = compiled_groupcompress_feature.module
 
34
        scenarios.append(('C',
 
35
            {'_gc_module': gc_module}))
 
36
    return scenarios
 
37
 
 
38
 
 
39
def two_way_scenarios():
 
40
    scenarios = [
 
41
        ('PP', {'make_delta': _groupcompress_py.make_delta,
 
42
                'apply_delta': _groupcompress_py.apply_delta})
 
43
        ]
 
44
    if compiled_groupcompress_feature.available():
 
45
        gc_module = compiled_groupcompress_feature.module
 
46
        scenarios.extend([
 
47
            ('CC', {'make_delta': gc_module.make_delta,
 
48
                    'apply_delta': gc_module.apply_delta}),
 
49
            ('PC', {'make_delta': _groupcompress_py.make_delta,
 
50
                    'apply_delta': gc_module.apply_delta}),
 
51
            ('CP', {'make_delta': gc_module.make_delta,
 
52
                    'apply_delta': _groupcompress_py.apply_delta}),
 
53
            ])
 
54
    return scenarios
 
55
 
 
56
 
 
57
load_tests = load_tests_apply_scenarios
 
58
 
 
59
 
 
60
compiled_groupcompress_feature = tests.ModuleAvailableFeature(
 
61
                                    'bzrlib._groupcompress_pyx')
 
62
 
 
63
_text1 = """\
 
64
This is a bit
 
65
of source text
 
66
which is meant to be matched
 
67
against other text
 
68
"""
 
69
 
 
70
_text2 = """\
 
71
This is a bit
 
72
of source text
 
73
which is meant to differ from
 
74
against other text
 
75
"""
 
76
 
 
77
_text3 = """\
 
78
This is a bit
 
79
of source text
 
80
which is meant to be matched
 
81
against other text
 
82
except it also
 
83
has a lot more data
 
84
at the end of the file
 
85
"""
 
86
 
 
87
_first_text = """\
 
88
a bit of text, that
 
89
does not have much in
 
90
common with the next text
 
91
"""
 
92
 
 
93
_second_text = """\
 
94
some more bit of text, that
 
95
does not have much in
 
96
common with the previous text
 
97
and has some extra text
 
98
"""
 
99
 
 
100
 
 
101
_third_text = """\
 
102
a bit of text, that
 
103
has some in common with the previous text
 
104
and has some extra text
 
105
and not have much in
 
106
common with the next text
 
107
"""
 
108
 
 
109
_fourth_text = """\
 
110
123456789012345
 
111
same rabin hash
 
112
123456789012345
 
113
same rabin hash
 
114
123456789012345
 
115
same rabin hash
 
116
123456789012345
 
117
same rabin hash
 
118
"""
 
119
 
 
120
class TestMakeAndApplyDelta(tests.TestCase):
 
121
 
 
122
    scenarios = module_scenarios()
 
123
    _gc_module = None # Set by load_tests
 
124
 
 
125
    def setUp(self):
 
126
        super(TestMakeAndApplyDelta, self).setUp()
 
127
        self.make_delta = self._gc_module.make_delta
 
128
        self.apply_delta = self._gc_module.apply_delta
 
129
        self.apply_delta_to_source = self._gc_module.apply_delta_to_source
 
130
 
 
131
    def test_make_delta_is_typesafe(self):
 
132
        self.make_delta('a string', 'another string')
 
133
 
 
134
        def _check_make_delta(string1, string2):
 
135
            self.assertRaises(TypeError, self.make_delta, string1, string2)
 
136
 
 
137
        _check_make_delta('a string', object())
 
138
        _check_make_delta('a string', u'not a string')
 
139
        _check_make_delta(object(), 'a string')
 
140
        _check_make_delta(u'not a string', 'a string')
 
141
 
 
142
    def test_make_noop_delta(self):
 
143
        ident_delta = self.make_delta(_text1, _text1)
 
144
        self.assertEqual('M\x90M', ident_delta)
 
145
        ident_delta = self.make_delta(_text2, _text2)
 
146
        self.assertEqual('N\x90N', ident_delta)
 
147
        ident_delta = self.make_delta(_text3, _text3)
 
148
        self.assertEqual('\x87\x01\x90\x87', ident_delta)
 
149
 
 
150
    def assertDeltaIn(self, delta1, delta2, delta):
 
151
        """Make sure that the delta bytes match one of the expectations."""
 
152
        # In general, the python delta matcher gives different results than the
 
153
        # pyrex delta matcher. Both should be valid deltas, though.
 
154
        if delta not in (delta1, delta2):
 
155
            self.fail("Delta bytes:\n"
 
156
                      "       %r\n"
 
157
                      "not in %r\n"
 
158
                      "    or %r"
 
159
                      % (delta, delta1, delta2))
 
160
 
 
161
    def test_make_delta(self):
 
162
        delta = self.make_delta(_text1, _text2)
 
163
        self.assertDeltaIn(
 
164
            'N\x90/\x1fdiffer from\nagainst other text\n',
 
165
            'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
 
166
            delta)
 
167
        delta = self.make_delta(_text2, _text1)
 
168
        self.assertDeltaIn(
 
169
            'M\x90/\x1ebe matched\nagainst other text\n',
 
170
            'M\x90\x1d\x1dwhich is meant to be matched\n\x91;\x13',
 
171
            delta)
 
172
        delta = self.make_delta(_text3, _text1)
 
173
        self.assertEqual('M\x90M', delta)
 
174
        delta = self.make_delta(_text3, _text2)
 
175
        self.assertDeltaIn(
 
176
            'N\x90/\x1fdiffer from\nagainst other text\n',
 
177
            'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
 
178
            delta)
 
179
 
 
180
    def test_make_delta_with_large_copies(self):
 
181
        # We want to have a copy that is larger than 64kB, which forces us to
 
182
        # issue multiple copy instructions.
 
183
        big_text = _text3 * 1220
 
184
        delta = self.make_delta(big_text, big_text)
 
185
        self.assertDeltaIn(
 
186
            '\xdc\x86\x0a'      # Encoding the length of the uncompressed text
 
187
            '\x80'              # Copy 64kB, starting at byte 0
 
188
            '\x84\x01'          # and another 64kB starting at 64kB
 
189
            '\xb4\x02\x5c\x83', # And the bit of tail.
 
190
            None,   # Both implementations should be identical
 
191
            delta)
 
192
 
 
193
    def test_apply_delta_is_typesafe(self):
 
194
        self.apply_delta(_text1, 'M\x90M')
 
195
        self.assertRaises(TypeError, self.apply_delta, object(), 'M\x90M')
 
196
        self.assertRaises(TypeError, self.apply_delta,
 
197
                          unicode(_text1), 'M\x90M')
 
198
        self.assertRaises(TypeError, self.apply_delta, _text1, u'M\x90M')
 
199
        self.assertRaises(TypeError, self.apply_delta, _text1, object())
 
200
 
 
201
    def test_apply_delta(self):
 
202
        target = self.apply_delta(_text1,
 
203
                    'N\x90/\x1fdiffer from\nagainst other text\n')
 
204
        self.assertEqual(_text2, target)
 
205
        target = self.apply_delta(_text2,
 
206
                    'M\x90/\x1ebe matched\nagainst other text\n')
 
207
        self.assertEqual(_text1, target)
 
208
 
 
209
    def test_apply_delta_to_source_is_safe(self):
 
210
        self.assertRaises(TypeError,
 
211
            self.apply_delta_to_source, object(), 0, 1)
 
212
        self.assertRaises(TypeError,
 
213
            self.apply_delta_to_source, u'unicode str', 0, 1)
 
214
        # end > length
 
215
        self.assertRaises(ValueError,
 
216
            self.apply_delta_to_source, 'foo', 1, 4)
 
217
        # start > length
 
218
        self.assertRaises(ValueError,
 
219
            self.apply_delta_to_source, 'foo', 5, 3)
 
220
        # start > end
 
221
        self.assertRaises(ValueError,
 
222
            self.apply_delta_to_source, 'foo', 3, 2)
 
223
 
 
224
    def test_apply_delta_to_source(self):
 
225
        source_and_delta = (_text1
 
226
                            + 'N\x90/\x1fdiffer from\nagainst other text\n')
 
227
        self.assertEqual(_text2, self.apply_delta_to_source(source_and_delta,
 
228
                                    len(_text1), len(source_and_delta)))
 
229
 
 
230
 
 
231
class TestMakeAndApplyCompatible(tests.TestCase):
 
232
 
 
233
    scenarios = two_way_scenarios()
 
234
 
 
235
    make_delta = None # Set by load_tests
 
236
    apply_delta = None # Set by load_tests
 
237
 
 
238
    def assertMakeAndApply(self, source, target):
 
239
        """Assert that generating a delta and applying gives success."""
 
240
        delta = self.make_delta(source, target)
 
241
        bytes = self.apply_delta(source, delta)
 
242
        self.assertEqualDiff(target, bytes)
 
243
 
 
244
    def test_direct(self):
 
245
        self.assertMakeAndApply(_text1, _text2)
 
246
        self.assertMakeAndApply(_text2, _text1)
 
247
        self.assertMakeAndApply(_text1, _text3)
 
248
        self.assertMakeAndApply(_text3, _text1)
 
249
        self.assertMakeAndApply(_text2, _text3)
 
250
        self.assertMakeAndApply(_text3, _text2)
 
251
 
 
252
 
 
253
class TestDeltaIndex(tests.TestCase):
 
254
 
 
255
    def setUp(self):
 
256
        super(TestDeltaIndex, self).setUp()
 
257
        # This test isn't multiplied, because we only have DeltaIndex for the
 
258
        # compiled form
 
259
        # We call this here, because _test_needs_features happens after setUp
 
260
        self.requireFeature(compiled_groupcompress_feature)
 
261
        self._gc_module = compiled_groupcompress_feature.module
 
262
 
 
263
    def test_repr(self):
 
264
        di = self._gc_module.DeltaIndex('test text\n')
 
265
        self.assertEqual('DeltaIndex(1, 10)', repr(di))
 
266
 
 
267
    def test__dump_no_index(self):
 
268
        di = self._gc_module.DeltaIndex()
 
269
        self.assertEqual(None, di._dump_index())
 
270
 
 
271
    def test__dump_index_simple(self):
 
272
        di = self._gc_module.DeltaIndex()
 
273
        di.add_source(_text1, 0)
 
274
        self.assertFalse(di._has_index())
 
275
        self.assertEqual(None, di._dump_index())
 
276
        _ = di.make_delta(_text1)
 
277
        self.assertTrue(di._has_index())
 
278
        hash_list, entry_list = di._dump_index()
 
279
        self.assertEqual(16, len(hash_list))
 
280
        self.assertEqual(68, len(entry_list))
 
281
        just_entries = [(idx, text_offset, hash_val)
 
282
                        for idx, (text_offset, hash_val)
 
283
                         in enumerate(entry_list)
 
284
                         if text_offset != 0 or hash_val != 0]
 
285
        rabin_hash = self._gc_module._rabin_hash
 
286
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
 
287
                          (25, 48, rabin_hash(_text1[33:49])),
 
288
                          (34, 32, rabin_hash(_text1[17:33])),
 
289
                          (47, 64, rabin_hash(_text1[49:65])),
 
290
                         ], just_entries)
 
291
        # This ensures that the hash map points to the location we expect it to
 
292
        for entry_idx, text_offset, hash_val in just_entries:
 
293
            self.assertEqual(entry_idx, hash_list[hash_val & 0xf])
 
294
 
 
295
    def test__dump_index_two_sources(self):
 
296
        di = self._gc_module.DeltaIndex()
 
297
        di.add_source(_text1, 0)
 
298
        di.add_source(_text2, 2)
 
299
        start2 = len(_text1) + 2
 
300
        self.assertTrue(di._has_index())
 
301
        hash_list, entry_list = di._dump_index()
 
302
        self.assertEqual(16, len(hash_list))
 
303
        self.assertEqual(68, len(entry_list))
 
304
        just_entries = [(idx, text_offset, hash_val)
 
305
                        for idx, (text_offset, hash_val)
 
306
                         in enumerate(entry_list)
 
307
                         if text_offset != 0 or hash_val != 0]
 
308
        rabin_hash = self._gc_module._rabin_hash
 
309
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
 
310
                          (9, start2+16, rabin_hash(_text2[1:17])),
 
311
                          (25, 48, rabin_hash(_text1[33:49])),
 
312
                          (30, start2+64, rabin_hash(_text2[49:65])),
 
313
                          (34, 32, rabin_hash(_text1[17:33])),
 
314
                          (35, start2+32, rabin_hash(_text2[17:33])),
 
315
                          (43, start2+48, rabin_hash(_text2[33:49])),
 
316
                          (47, 64, rabin_hash(_text1[49:65])),
 
317
                         ], just_entries)
 
318
        # Each entry should be in the appropriate hash bucket.
 
319
        for entry_idx, text_offset, hash_val in just_entries:
 
320
            hash_idx = hash_val & 0xf
 
321
            self.assertTrue(
 
322
                hash_list[hash_idx] <= entry_idx < hash_list[hash_idx+1])
 
323
 
 
324
    def test_first_add_source_doesnt_index_until_make_delta(self):
 
325
        di = self._gc_module.DeltaIndex()
 
326
        self.assertFalse(di._has_index())
 
327
        di.add_source(_text1, 0)
 
328
        self.assertFalse(di._has_index())
 
329
        # However, asking to make a delta will trigger the index to be
 
330
        # generated, and will generate a proper delta
 
331
        delta = di.make_delta(_text2)
 
332
        self.assertTrue(di._has_index())
 
333
        self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
 
334
 
 
335
    def test_add_source_max_bytes_to_index(self):
 
336
        di = self._gc_module.DeltaIndex()
 
337
        di._max_bytes_to_index = 3*16
 
338
        di.add_source(_text1, 0) # (77 bytes -1) // 3 = 25 byte stride
 
339
        di.add_source(_text3, 3) # (135 bytes -1) // 3 = 44 byte stride
 
340
        start2 = len(_text1) + 3
 
341
        hash_list, entry_list = di._dump_index()
 
342
        self.assertEqual(16, len(hash_list))
 
343
        self.assertEqual(67, len(entry_list))
 
344
        just_entries = sorted([(text_offset, hash_val)
 
345
                               for text_offset, hash_val in entry_list
 
346
                                if text_offset != 0 or hash_val != 0])
 
347
        rabin_hash = self._gc_module._rabin_hash
 
348
        self.assertEqual([(25, rabin_hash(_text1[10:26])),
 
349
                          (50, rabin_hash(_text1[35:51])),
 
350
                          (75, rabin_hash(_text1[60:76])),
 
351
                          (start2+44, rabin_hash(_text3[29:45])),
 
352
                          (start2+88, rabin_hash(_text3[73:89])),
 
353
                          (start2+132, rabin_hash(_text3[117:133])),
 
354
                         ], just_entries)
 
355
 
 
356
    def test_second_add_source_triggers_make_index(self):
 
357
        di = self._gc_module.DeltaIndex()
 
358
        self.assertFalse(di._has_index())
 
359
        di.add_source(_text1, 0)
 
360
        self.assertFalse(di._has_index())
 
361
        di.add_source(_text2, 0)
 
362
        self.assertTrue(di._has_index())
 
363
 
 
364
    def test_make_delta(self):
 
365
        di = self._gc_module.DeltaIndex(_text1)
 
366
        delta = di.make_delta(_text2)
 
367
        self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
 
368
 
 
369
    def test_delta_against_multiple_sources(self):
 
370
        di = self._gc_module.DeltaIndex()
 
371
        di.add_source(_first_text, 0)
 
372
        self.assertEqual(len(_first_text), di._source_offset)
 
373
        di.add_source(_second_text, 0)
 
374
        self.assertEqual(len(_first_text) + len(_second_text),
 
375
                         di._source_offset)
 
376
        delta = di.make_delta(_third_text)
 
377
        result = self._gc_module.apply_delta(_first_text + _second_text, delta)
 
378
        self.assertEqualDiff(_third_text, result)
 
379
        self.assertEqual('\x85\x01\x90\x14\x0chas some in '
 
380
                         '\x91v6\x03and\x91d"\x91:\n', delta)
 
381
 
 
382
    def test_delta_with_offsets(self):
 
383
        di = self._gc_module.DeltaIndex()
 
384
        di.add_source(_first_text, 5)
 
385
        self.assertEqual(len(_first_text) + 5, di._source_offset)
 
386
        di.add_source(_second_text, 10)
 
387
        self.assertEqual(len(_first_text) + len(_second_text) + 15,
 
388
                         di._source_offset)
 
389
        delta = di.make_delta(_third_text)
 
390
        self.assertIsNot(None, delta)
 
391
        result = self._gc_module.apply_delta(
 
392
            '12345' + _first_text + '1234567890' + _second_text, delta)
 
393
        self.assertIsNot(None, result)
 
394
        self.assertEqualDiff(_third_text, result)
 
395
        self.assertEqual('\x85\x01\x91\x05\x14\x0chas some in '
 
396
                         '\x91\x856\x03and\x91s"\x91?\n', delta)
 
397
 
 
398
    def test_delta_with_delta_bytes(self):
 
399
        di = self._gc_module.DeltaIndex()
 
400
        source = _first_text
 
401
        di.add_source(_first_text, 0)
 
402
        self.assertEqual(len(_first_text), di._source_offset)
 
403
        delta = di.make_delta(_second_text)
 
404
        self.assertEqual('h\tsome more\x91\x019'
 
405
                         '&previous text\nand has some extra text\n', delta)
 
406
        di.add_delta_source(delta, 0)
 
407
        source += delta
 
408
        self.assertEqual(len(_first_text) + len(delta), di._source_offset)
 
409
        second_delta = di.make_delta(_third_text)
 
410
        result = self._gc_module.apply_delta(source, second_delta)
 
411
        self.assertEqualDiff(_third_text, result)
 
412
        # We should be able to match against the
 
413
        # 'previous text\nand has some...'  that was part of the delta bytes
 
414
        # Note that we don't match the 'common with the', because it isn't long
 
415
        # enough to match in the original text, and those bytes are not present
 
416
        # in the delta for the second text.
 
417
        self.assertEqual('\x85\x01\x90\x14\x1chas some in common with the '
 
418
                         '\x91S&\x03and\x91\x18,', second_delta)
 
419
        # Add this delta, and create a new delta for the same text. We should
 
420
        # find the remaining text, and only insert the short 'and' text.
 
421
        di.add_delta_source(second_delta, 0)
 
422
        source += second_delta
 
423
        third_delta = di.make_delta(_third_text)
 
424
        result = self._gc_module.apply_delta(source, third_delta)
 
425
        self.assertEqualDiff(_third_text, result)
 
426
        self.assertEqual('\x85\x01\x90\x14\x91\x7e\x1c'
 
427
                         '\x91S&\x03and\x91\x18,', third_delta)
 
428
        # Now create a delta, which we know won't be able to be 'fit' into the
 
429
        # existing index
 
430
        fourth_delta = di.make_delta(_fourth_text)
 
431
        self.assertEqual(_fourth_text,
 
432
                         self._gc_module.apply_delta(source, fourth_delta))
 
433
        self.assertEqual('\x80\x01'
 
434
                         '\x7f123456789012345\nsame rabin hash\n'
 
435
                         '123456789012345\nsame rabin hash\n'
 
436
                         '123456789012345\nsame rabin hash\n'
 
437
                         '123456789012345\nsame rabin hash'
 
438
                         '\x01\n', fourth_delta)
 
439
        di.add_delta_source(fourth_delta, 0)
 
440
        source += fourth_delta
 
441
        # With the next delta, everything should be found
 
442
        fifth_delta = di.make_delta(_fourth_text)
 
443
        self.assertEqual(_fourth_text,
 
444
                         self._gc_module.apply_delta(source, fifth_delta))
 
445
        self.assertEqual('\x80\x01\x91\xa7\x7f\x01\n', fifth_delta)
 
446
 
 
447
 
 
448
class TestCopyInstruction(tests.TestCase):
 
449
 
 
450
    def assertEncode(self, expected, offset, length):
 
451
        bytes = _groupcompress_py.encode_copy_instruction(offset, length)
 
452
        if expected != bytes:
 
453
            self.assertEqual([hex(ord(e)) for e in expected],
 
454
                             [hex(ord(b)) for b in bytes])
 
455
 
 
456
    def assertDecode(self, exp_offset, exp_length, exp_newpos, bytes, pos):
 
457
        cmd = ord(bytes[pos])
 
458
        pos += 1
 
459
        out = _groupcompress_py.decode_copy_instruction(bytes, cmd, pos)
 
460
        self.assertEqual((exp_offset, exp_length, exp_newpos), out)
 
461
 
 
462
    def test_encode_no_length(self):
 
463
        self.assertEncode('\x80', 0, 64*1024)
 
464
        self.assertEncode('\x81\x01', 1, 64*1024)
 
465
        self.assertEncode('\x81\x0a', 10, 64*1024)
 
466
        self.assertEncode('\x81\xff', 255, 64*1024)
 
467
        self.assertEncode('\x82\x01', 256, 64*1024)
 
468
        self.assertEncode('\x83\x01\x01', 257, 64*1024)
 
469
        self.assertEncode('\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64*1024)
 
470
        self.assertEncode('\x8E\xff\xff\xff', 0xFFFFFF00, 64*1024)
 
471
        self.assertEncode('\x8D\xff\xff\xff', 0xFFFF00FF, 64*1024)
 
472
        self.assertEncode('\x8B\xff\xff\xff', 0xFF00FFFF, 64*1024)
 
473
        self.assertEncode('\x87\xff\xff\xff', 0x00FFFFFF, 64*1024)
 
474
        self.assertEncode('\x8F\x04\x03\x02\x01', 0x01020304, 64*1024)
 
475
 
 
476
    def test_encode_no_offset(self):
 
477
        self.assertEncode('\x90\x01', 0, 1)
 
478
        self.assertEncode('\x90\x0a', 0, 10)
 
479
        self.assertEncode('\x90\xff', 0, 255)
 
480
        self.assertEncode('\xA0\x01', 0, 256)
 
481
        self.assertEncode('\xB0\x01\x01', 0, 257)
 
482
        self.assertEncode('\xB0\xff\xff', 0, 0xFFFF)
 
483
        # Special case, if copy == 64KiB, then we store exactly 0
 
484
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
 
485
        # about that, as we would never actually copy 0 bytes
 
486
        self.assertEncode('\x80', 0, 64*1024)
 
487
 
 
488
    def test_encode(self):
 
489
        self.assertEncode('\x91\x01\x01', 1, 1)
 
490
        self.assertEncode('\x91\x09\x0a', 9, 10)
 
491
        self.assertEncode('\x91\xfe\xff', 254, 255)
 
492
        self.assertEncode('\xA2\x02\x01', 512, 256)
 
493
        self.assertEncode('\xB3\x02\x01\x01\x01', 258, 257)
 
494
        self.assertEncode('\xB0\x01\x01', 0, 257)
 
495
        # Special case, if copy == 64KiB, then we store exactly 0
 
496
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
 
497
        # about that, as we would never actually copy 0 bytes
 
498
        self.assertEncode('\x81\x0a', 10, 64*1024)
 
499
 
 
500
    def test_decode_no_length(self):
 
501
        # If length is 0, it is interpreted as 64KiB
 
502
        # The shortest possible instruction is a copy of 64KiB from offset 0
 
503
        self.assertDecode(0, 65536, 1, '\x80', 0)
 
504
        self.assertDecode(1, 65536, 2, '\x81\x01', 0)
 
505
        self.assertDecode(10, 65536, 2, '\x81\x0a', 0)
 
506
        self.assertDecode(255, 65536, 2, '\x81\xff', 0)
 
507
        self.assertDecode(256, 65536, 2, '\x82\x01', 0)
 
508
        self.assertDecode(257, 65536, 3, '\x83\x01\x01', 0)
 
509
        self.assertDecode(0xFFFFFFFF, 65536, 5, '\x8F\xff\xff\xff\xff', 0)
 
510
        self.assertDecode(0xFFFFFF00, 65536, 4, '\x8E\xff\xff\xff', 0)
 
511
        self.assertDecode(0xFFFF00FF, 65536, 4, '\x8D\xff\xff\xff', 0)
 
512
        self.assertDecode(0xFF00FFFF, 65536, 4, '\x8B\xff\xff\xff', 0)
 
513
        self.assertDecode(0x00FFFFFF, 65536, 4, '\x87\xff\xff\xff', 0)
 
514
        self.assertDecode(0x01020304, 65536, 5, '\x8F\x04\x03\x02\x01', 0)
 
515
 
 
516
    def test_decode_no_offset(self):
 
517
        self.assertDecode(0, 1, 2, '\x90\x01', 0)
 
518
        self.assertDecode(0, 10, 2, '\x90\x0a', 0)
 
519
        self.assertDecode(0, 255, 2, '\x90\xff', 0)
 
520
        self.assertDecode(0, 256, 2, '\xA0\x01', 0)
 
521
        self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
 
522
        self.assertDecode(0, 65535, 3, '\xB0\xff\xff', 0)
 
523
        # Special case, if copy == 64KiB, then we store exactly 0
 
524
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
 
525
        # about that, as we would never actually copy 0 bytes
 
526
        self.assertDecode(0, 65536, 1, '\x80', 0)
 
527
 
 
528
    def test_decode(self):
 
529
        self.assertDecode(1, 1, 3, '\x91\x01\x01', 0)
 
530
        self.assertDecode(9, 10, 3, '\x91\x09\x0a', 0)
 
531
        self.assertDecode(254, 255, 3, '\x91\xfe\xff', 0)
 
532
        self.assertDecode(512, 256, 3, '\xA2\x02\x01', 0)
 
533
        self.assertDecode(258, 257, 5, '\xB3\x02\x01\x01\x01', 0)
 
534
        self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
 
535
 
 
536
    def test_decode_not_start(self):
 
537
        self.assertDecode(1, 1, 6, 'abc\x91\x01\x01def', 3)
 
538
        self.assertDecode(9, 10, 5, 'ab\x91\x09\x0ade', 2)
 
539
        self.assertDecode(254, 255, 6, 'not\x91\xfe\xffcopy', 3)
 
540
 
 
541
 
 
542
class TestBase128Int(tests.TestCase):
 
543
 
 
544
    scenarios = module_scenarios()
 
545
 
 
546
    _gc_module = None # Set by load_tests
 
547
 
 
548
    def assertEqualEncode(self, bytes, val):
 
549
        self.assertEqual(bytes, self._gc_module.encode_base128_int(val))
 
550
 
 
551
    def assertEqualDecode(self, val, num_decode, bytes):
 
552
        self.assertEqual((val, num_decode),
 
553
                         self._gc_module.decode_base128_int(bytes))
 
554
 
 
555
    def test_encode(self):
 
556
        self.assertEqualEncode('\x01', 1)
 
557
        self.assertEqualEncode('\x02', 2)
 
558
        self.assertEqualEncode('\x7f', 127)
 
559
        self.assertEqualEncode('\x80\x01', 128)
 
560
        self.assertEqualEncode('\xff\x01', 255)
 
561
        self.assertEqualEncode('\x80\x02', 256)
 
562
        self.assertEqualEncode('\xff\xff\xff\xff\x0f', 0xFFFFFFFF)
 
563
 
 
564
    def test_decode(self):
 
565
        self.assertEqualDecode(1, 1, '\x01')
 
566
        self.assertEqualDecode(2, 1, '\x02')
 
567
        self.assertEqualDecode(127, 1, '\x7f')
 
568
        self.assertEqualDecode(128, 2, '\x80\x01')
 
569
        self.assertEqualDecode(255, 2, '\xff\x01')
 
570
        self.assertEqualDecode(256, 2, '\x80\x02')
 
571
        self.assertEqualDecode(0xFFFFFFFF, 5, '\xff\xff\xff\xff\x0f')
 
572
 
 
573
    def test_decode_with_trailing_bytes(self):
 
574
        self.assertEqualDecode(1, 1, '\x01abcdef')
 
575
        self.assertEqualDecode(127, 1, '\x7f\x01')
 
576
        self.assertEqualDecode(128, 2, '\x80\x01abcdef')
 
577
        self.assertEqualDecode(255, 2, '\xff\x01\xff')
 
578
 
 
579