~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test__groupcompress.py

  • Committer: Vincent Ladeuil
  • Date: 2008-09-11 19:36:38 UTC
  • mfrom: (3703 +trunk)
  • mto: (3705.1.1 trunk2)
  • mto: This revision was merged to the branch mainline in revision 3708.
  • Revision ID: v.ladeuil+lp@free.fr-20080911193638-wtjyc1kcmacc6t1f
merge bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008-2011 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
"""Tests for the python and pyrex extensions of groupcompress"""
18
 
 
19
 
from bzrlib import (
20
 
    _groupcompress_py,
21
 
    tests,
22
 
    )
23
 
from bzrlib.tests.scenarios import (
24
 
    load_tests_apply_scenarios,
25
 
    )
26
 
from bzrlib.tests import (
27
 
    features,
28
 
    )
29
 
 
30
 
 
31
 
def module_scenarios():
32
 
    scenarios = [
33
 
        ('python', {'_gc_module': _groupcompress_py}),
34
 
        ]
35
 
    if compiled_groupcompress_feature.available():
36
 
        gc_module = compiled_groupcompress_feature.module
37
 
        scenarios.append(('C',
38
 
            {'_gc_module': gc_module}))
39
 
    return scenarios
40
 
 
41
 
 
42
 
def two_way_scenarios():
43
 
    scenarios = [
44
 
        ('PP', {'make_delta': _groupcompress_py.make_delta,
45
 
                'apply_delta': _groupcompress_py.apply_delta})
46
 
        ]
47
 
    if compiled_groupcompress_feature.available():
48
 
        gc_module = compiled_groupcompress_feature.module
49
 
        scenarios.extend([
50
 
            ('CC', {'make_delta': gc_module.make_delta,
51
 
                    'apply_delta': gc_module.apply_delta}),
52
 
            ('PC', {'make_delta': _groupcompress_py.make_delta,
53
 
                    'apply_delta': gc_module.apply_delta}),
54
 
            ('CP', {'make_delta': gc_module.make_delta,
55
 
                    'apply_delta': _groupcompress_py.apply_delta}),
56
 
            ])
57
 
    return scenarios
58
 
 
59
 
 
60
 
load_tests = load_tests_apply_scenarios
61
 
 
62
 
 
63
 
compiled_groupcompress_feature = features.ModuleAvailableFeature(
64
 
    'bzrlib._groupcompress_pyx')
65
 
 
66
 
_text1 = """\
67
 
This is a bit
68
 
of source text
69
 
which is meant to be matched
70
 
against other text
71
 
"""
72
 
 
73
 
_text2 = """\
74
 
This is a bit
75
 
of source text
76
 
which is meant to differ from
77
 
against other text
78
 
"""
79
 
 
80
 
_text3 = """\
81
 
This is a bit
82
 
of source text
83
 
which is meant to be matched
84
 
against other text
85
 
except it also
86
 
has a lot more data
87
 
at the end of the file
88
 
"""
89
 
 
90
 
_first_text = """\
91
 
a bit of text, that
92
 
does not have much in
93
 
common with the next text
94
 
"""
95
 
 
96
 
_second_text = """\
97
 
some more bit of text, that
98
 
does not have much in
99
 
common with the previous text
100
 
and has some extra text
101
 
"""
102
 
 
103
 
 
104
 
_third_text = """\
105
 
a bit of text, that
106
 
has some in common with the previous text
107
 
and has some extra text
108
 
and not have much in
109
 
common with the next text
110
 
"""
111
 
 
112
 
_fourth_text = """\
113
 
123456789012345
114
 
same rabin hash
115
 
123456789012345
116
 
same rabin hash
117
 
123456789012345
118
 
same rabin hash
119
 
123456789012345
120
 
same rabin hash
121
 
"""
122
 
 
123
 
class TestMakeAndApplyDelta(tests.TestCase):
124
 
 
125
 
    scenarios = module_scenarios()
126
 
    _gc_module = None # Set by load_tests
127
 
 
128
 
    def setUp(self):
129
 
        super(TestMakeAndApplyDelta, self).setUp()
130
 
        self.make_delta = self._gc_module.make_delta
131
 
        self.apply_delta = self._gc_module.apply_delta
132
 
        self.apply_delta_to_source = self._gc_module.apply_delta_to_source
133
 
 
134
 
    def test_make_delta_is_typesafe(self):
135
 
        self.make_delta('a string', 'another string')
136
 
 
137
 
        def _check_make_delta(string1, string2):
138
 
            self.assertRaises(TypeError, self.make_delta, string1, string2)
139
 
 
140
 
        _check_make_delta('a string', object())
141
 
        _check_make_delta('a string', u'not a string')
142
 
        _check_make_delta(object(), 'a string')
143
 
        _check_make_delta(u'not a string', 'a string')
144
 
 
145
 
    def test_make_noop_delta(self):
146
 
        ident_delta = self.make_delta(_text1, _text1)
147
 
        self.assertEqual('M\x90M', ident_delta)
148
 
        ident_delta = self.make_delta(_text2, _text2)
149
 
        self.assertEqual('N\x90N', ident_delta)
150
 
        ident_delta = self.make_delta(_text3, _text3)
151
 
        self.assertEqual('\x87\x01\x90\x87', ident_delta)
152
 
 
153
 
    def assertDeltaIn(self, delta1, delta2, delta):
154
 
        """Make sure that the delta bytes match one of the expectations."""
155
 
        # In general, the python delta matcher gives different results than the
156
 
        # pyrex delta matcher. Both should be valid deltas, though.
157
 
        if delta not in (delta1, delta2):
158
 
            self.fail("Delta bytes:\n"
159
 
                      "       %r\n"
160
 
                      "not in %r\n"
161
 
                      "    or %r"
162
 
                      % (delta, delta1, delta2))
163
 
 
164
 
    def test_make_delta(self):
165
 
        delta = self.make_delta(_text1, _text2)
166
 
        self.assertDeltaIn(
167
 
            'N\x90/\x1fdiffer from\nagainst other text\n',
168
 
            'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
169
 
            delta)
170
 
        delta = self.make_delta(_text2, _text1)
171
 
        self.assertDeltaIn(
172
 
            'M\x90/\x1ebe matched\nagainst other text\n',
173
 
            'M\x90\x1d\x1dwhich is meant to be matched\n\x91;\x13',
174
 
            delta)
175
 
        delta = self.make_delta(_text3, _text1)
176
 
        self.assertEqual('M\x90M', delta)
177
 
        delta = self.make_delta(_text3, _text2)
178
 
        self.assertDeltaIn(
179
 
            'N\x90/\x1fdiffer from\nagainst other text\n',
180
 
            'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
181
 
            delta)
182
 
 
183
 
    def test_make_delta_with_large_copies(self):
184
 
        # We want to have a copy that is larger than 64kB, which forces us to
185
 
        # issue multiple copy instructions.
186
 
        big_text = _text3 * 1220
187
 
        delta = self.make_delta(big_text, big_text)
188
 
        self.assertDeltaIn(
189
 
            '\xdc\x86\x0a'      # Encoding the length of the uncompressed text
190
 
            '\x80'              # Copy 64kB, starting at byte 0
191
 
            '\x84\x01'          # and another 64kB starting at 64kB
192
 
            '\xb4\x02\x5c\x83', # And the bit of tail.
193
 
            None,   # Both implementations should be identical
194
 
            delta)
195
 
 
196
 
    def test_apply_delta_is_typesafe(self):
197
 
        self.apply_delta(_text1, 'M\x90M')
198
 
        self.assertRaises(TypeError, self.apply_delta, object(), 'M\x90M')
199
 
        self.assertRaises(TypeError, self.apply_delta,
200
 
                          unicode(_text1), 'M\x90M')
201
 
        self.assertRaises(TypeError, self.apply_delta, _text1, u'M\x90M')
202
 
        self.assertRaises(TypeError, self.apply_delta, _text1, object())
203
 
 
204
 
    def test_apply_delta(self):
205
 
        target = self.apply_delta(_text1,
206
 
                    'N\x90/\x1fdiffer from\nagainst other text\n')
207
 
        self.assertEqual(_text2, target)
208
 
        target = self.apply_delta(_text2,
209
 
                    'M\x90/\x1ebe matched\nagainst other text\n')
210
 
        self.assertEqual(_text1, target)
211
 
 
212
 
    def test_apply_delta_to_source_is_safe(self):
213
 
        self.assertRaises(TypeError,
214
 
            self.apply_delta_to_source, object(), 0, 1)
215
 
        self.assertRaises(TypeError,
216
 
            self.apply_delta_to_source, u'unicode str', 0, 1)
217
 
        # end > length
218
 
        self.assertRaises(ValueError,
219
 
            self.apply_delta_to_source, 'foo', 1, 4)
220
 
        # start > length
221
 
        self.assertRaises(ValueError,
222
 
            self.apply_delta_to_source, 'foo', 5, 3)
223
 
        # start > end
224
 
        self.assertRaises(ValueError,
225
 
            self.apply_delta_to_source, 'foo', 3, 2)
226
 
 
227
 
    def test_apply_delta_to_source(self):
228
 
        source_and_delta = (_text1
229
 
                            + 'N\x90/\x1fdiffer from\nagainst other text\n')
230
 
        self.assertEqual(_text2, self.apply_delta_to_source(source_and_delta,
231
 
                                    len(_text1), len(source_and_delta)))
232
 
 
233
 
 
234
 
class TestMakeAndApplyCompatible(tests.TestCase):
235
 
 
236
 
    scenarios = two_way_scenarios()
237
 
 
238
 
    make_delta = None # Set by load_tests
239
 
    apply_delta = None # Set by load_tests
240
 
 
241
 
    def assertMakeAndApply(self, source, target):
242
 
        """Assert that generating a delta and applying gives success."""
243
 
        delta = self.make_delta(source, target)
244
 
        bytes = self.apply_delta(source, delta)
245
 
        self.assertEqualDiff(target, bytes)
246
 
 
247
 
    def test_direct(self):
248
 
        self.assertMakeAndApply(_text1, _text2)
249
 
        self.assertMakeAndApply(_text2, _text1)
250
 
        self.assertMakeAndApply(_text1, _text3)
251
 
        self.assertMakeAndApply(_text3, _text1)
252
 
        self.assertMakeAndApply(_text2, _text3)
253
 
        self.assertMakeAndApply(_text3, _text2)
254
 
 
255
 
 
256
 
class TestDeltaIndex(tests.TestCase):
257
 
 
258
 
    def setUp(self):
259
 
        super(TestDeltaIndex, self).setUp()
260
 
        # This test isn't multiplied, because we only have DeltaIndex for the
261
 
        # compiled form
262
 
        # We call this here, because _test_needs_features happens after setUp
263
 
        self.requireFeature(compiled_groupcompress_feature)
264
 
        self._gc_module = compiled_groupcompress_feature.module
265
 
 
266
 
    def test_repr(self):
267
 
        di = self._gc_module.DeltaIndex('test text\n')
268
 
        self.assertEqual('DeltaIndex(1, 10)', repr(di))
269
 
 
270
 
    def test__dump_no_index(self):
271
 
        di = self._gc_module.DeltaIndex()
272
 
        self.assertEqual(None, di._dump_index())
273
 
 
274
 
    def test__dump_index_simple(self):
275
 
        di = self._gc_module.DeltaIndex()
276
 
        di.add_source(_text1, 0)
277
 
        self.assertFalse(di._has_index())
278
 
        self.assertEqual(None, di._dump_index())
279
 
        _ = di.make_delta(_text1)
280
 
        self.assertTrue(di._has_index())
281
 
        hash_list, entry_list = di._dump_index()
282
 
        self.assertEqual(16, len(hash_list))
283
 
        self.assertEqual(68, len(entry_list))
284
 
        just_entries = [(idx, text_offset, hash_val)
285
 
                        for idx, (text_offset, hash_val)
286
 
                         in enumerate(entry_list)
287
 
                         if text_offset != 0 or hash_val != 0]
288
 
        rabin_hash = self._gc_module._rabin_hash
289
 
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
290
 
                          (25, 48, rabin_hash(_text1[33:49])),
291
 
                          (34, 32, rabin_hash(_text1[17:33])),
292
 
                          (47, 64, rabin_hash(_text1[49:65])),
293
 
                         ], just_entries)
294
 
        # This ensures that the hash map points to the location we expect it to
295
 
        for entry_idx, text_offset, hash_val in just_entries:
296
 
            self.assertEqual(entry_idx, hash_list[hash_val & 0xf])
297
 
 
298
 
    def test__dump_index_two_sources(self):
299
 
        di = self._gc_module.DeltaIndex()
300
 
        di.add_source(_text1, 0)
301
 
        di.add_source(_text2, 2)
302
 
        start2 = len(_text1) + 2
303
 
        self.assertTrue(di._has_index())
304
 
        hash_list, entry_list = di._dump_index()
305
 
        self.assertEqual(16, len(hash_list))
306
 
        self.assertEqual(68, len(entry_list))
307
 
        just_entries = [(idx, text_offset, hash_val)
308
 
                        for idx, (text_offset, hash_val)
309
 
                         in enumerate(entry_list)
310
 
                         if text_offset != 0 or hash_val != 0]
311
 
        rabin_hash = self._gc_module._rabin_hash
312
 
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
313
 
                          (9, start2+16, rabin_hash(_text2[1:17])),
314
 
                          (25, 48, rabin_hash(_text1[33:49])),
315
 
                          (30, start2+64, rabin_hash(_text2[49:65])),
316
 
                          (34, 32, rabin_hash(_text1[17:33])),
317
 
                          (35, start2+32, rabin_hash(_text2[17:33])),
318
 
                          (43, start2+48, rabin_hash(_text2[33:49])),
319
 
                          (47, 64, rabin_hash(_text1[49:65])),
320
 
                         ], just_entries)
321
 
        # Each entry should be in the appropriate hash bucket.
322
 
        for entry_idx, text_offset, hash_val in just_entries:
323
 
            hash_idx = hash_val & 0xf
324
 
            self.assertTrue(
325
 
                hash_list[hash_idx] <= entry_idx < hash_list[hash_idx+1])
326
 
 
327
 
    def test_first_add_source_doesnt_index_until_make_delta(self):
328
 
        di = self._gc_module.DeltaIndex()
329
 
        self.assertFalse(di._has_index())
330
 
        di.add_source(_text1, 0)
331
 
        self.assertFalse(di._has_index())
332
 
        # However, asking to make a delta will trigger the index to be
333
 
        # generated, and will generate a proper delta
334
 
        delta = di.make_delta(_text2)
335
 
        self.assertTrue(di._has_index())
336
 
        self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
337
 
 
338
 
    def test_add_source_max_bytes_to_index(self):
339
 
        di = self._gc_module.DeltaIndex()
340
 
        di._max_bytes_to_index = 3*16
341
 
        di.add_source(_text1, 0) # (77 bytes -1) // 3 = 25 byte stride
342
 
        di.add_source(_text3, 3) # (135 bytes -1) // 3 = 44 byte stride
343
 
        start2 = len(_text1) + 3
344
 
        hash_list, entry_list = di._dump_index()
345
 
        self.assertEqual(16, len(hash_list))
346
 
        self.assertEqual(67, len(entry_list))
347
 
        just_entries = sorted([(text_offset, hash_val)
348
 
                               for text_offset, hash_val in entry_list
349
 
                                if text_offset != 0 or hash_val != 0])
350
 
        rabin_hash = self._gc_module._rabin_hash
351
 
        self.assertEqual([(25, rabin_hash(_text1[10:26])),
352
 
                          (50, rabin_hash(_text1[35:51])),
353
 
                          (75, rabin_hash(_text1[60:76])),
354
 
                          (start2+44, rabin_hash(_text3[29:45])),
355
 
                          (start2+88, rabin_hash(_text3[73:89])),
356
 
                          (start2+132, rabin_hash(_text3[117:133])),
357
 
                         ], just_entries)
358
 
 
359
 
    def test_second_add_source_triggers_make_index(self):
360
 
        di = self._gc_module.DeltaIndex()
361
 
        self.assertFalse(di._has_index())
362
 
        di.add_source(_text1, 0)
363
 
        self.assertFalse(di._has_index())
364
 
        di.add_source(_text2, 0)
365
 
        self.assertTrue(di._has_index())
366
 
 
367
 
    def test_make_delta(self):
368
 
        di = self._gc_module.DeltaIndex(_text1)
369
 
        delta = di.make_delta(_text2)
370
 
        self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
371
 
 
372
 
    def test_delta_against_multiple_sources(self):
373
 
        di = self._gc_module.DeltaIndex()
374
 
        di.add_source(_first_text, 0)
375
 
        self.assertEqual(len(_first_text), di._source_offset)
376
 
        di.add_source(_second_text, 0)
377
 
        self.assertEqual(len(_first_text) + len(_second_text),
378
 
                         di._source_offset)
379
 
        delta = di.make_delta(_third_text)
380
 
        result = self._gc_module.apply_delta(_first_text + _second_text, delta)
381
 
        self.assertEqualDiff(_third_text, result)
382
 
        self.assertEqual('\x85\x01\x90\x14\x0chas some in '
383
 
                         '\x91v6\x03and\x91d"\x91:\n', delta)
384
 
 
385
 
    def test_delta_with_offsets(self):
386
 
        di = self._gc_module.DeltaIndex()
387
 
        di.add_source(_first_text, 5)
388
 
        self.assertEqual(len(_first_text) + 5, di._source_offset)
389
 
        di.add_source(_second_text, 10)
390
 
        self.assertEqual(len(_first_text) + len(_second_text) + 15,
391
 
                         di._source_offset)
392
 
        delta = di.make_delta(_third_text)
393
 
        self.assertIsNot(None, delta)
394
 
        result = self._gc_module.apply_delta(
395
 
            '12345' + _first_text + '1234567890' + _second_text, delta)
396
 
        self.assertIsNot(None, result)
397
 
        self.assertEqualDiff(_third_text, result)
398
 
        self.assertEqual('\x85\x01\x91\x05\x14\x0chas some in '
399
 
                         '\x91\x856\x03and\x91s"\x91?\n', delta)
400
 
 
401
 
    def test_delta_with_delta_bytes(self):
402
 
        di = self._gc_module.DeltaIndex()
403
 
        source = _first_text
404
 
        di.add_source(_first_text, 0)
405
 
        self.assertEqual(len(_first_text), di._source_offset)
406
 
        delta = di.make_delta(_second_text)
407
 
        self.assertEqual('h\tsome more\x91\x019'
408
 
                         '&previous text\nand has some extra text\n', delta)
409
 
        di.add_delta_source(delta, 0)
410
 
        source += delta
411
 
        self.assertEqual(len(_first_text) + len(delta), di._source_offset)
412
 
        second_delta = di.make_delta(_third_text)
413
 
        result = self._gc_module.apply_delta(source, second_delta)
414
 
        self.assertEqualDiff(_third_text, result)
415
 
        # We should be able to match against the
416
 
        # 'previous text\nand has some...'  that was part of the delta bytes
417
 
        # Note that we don't match the 'common with the', because it isn't long
418
 
        # enough to match in the original text, and those bytes are not present
419
 
        # in the delta for the second text.
420
 
        self.assertEqual('\x85\x01\x90\x14\x1chas some in common with the '
421
 
                         '\x91S&\x03and\x91\x18,', second_delta)
422
 
        # Add this delta, and create a new delta for the same text. We should
423
 
        # find the remaining text, and only insert the short 'and' text.
424
 
        di.add_delta_source(second_delta, 0)
425
 
        source += second_delta
426
 
        third_delta = di.make_delta(_third_text)
427
 
        result = self._gc_module.apply_delta(source, third_delta)
428
 
        self.assertEqualDiff(_third_text, result)
429
 
        self.assertEqual('\x85\x01\x90\x14\x91\x7e\x1c'
430
 
                         '\x91S&\x03and\x91\x18,', third_delta)
431
 
        # Now create a delta, which we know won't be able to be 'fit' into the
432
 
        # existing index
433
 
        fourth_delta = di.make_delta(_fourth_text)
434
 
        self.assertEqual(_fourth_text,
435
 
                         self._gc_module.apply_delta(source, fourth_delta))
436
 
        self.assertEqual('\x80\x01'
437
 
                         '\x7f123456789012345\nsame rabin hash\n'
438
 
                         '123456789012345\nsame rabin hash\n'
439
 
                         '123456789012345\nsame rabin hash\n'
440
 
                         '123456789012345\nsame rabin hash'
441
 
                         '\x01\n', fourth_delta)
442
 
        di.add_delta_source(fourth_delta, 0)
443
 
        source += fourth_delta
444
 
        # With the next delta, everything should be found
445
 
        fifth_delta = di.make_delta(_fourth_text)
446
 
        self.assertEqual(_fourth_text,
447
 
                         self._gc_module.apply_delta(source, fifth_delta))
448
 
        self.assertEqual('\x80\x01\x91\xa7\x7f\x01\n', fifth_delta)
449
 
 
450
 
 
451
 
class TestCopyInstruction(tests.TestCase):
452
 
 
453
 
    def assertEncode(self, expected, offset, length):
454
 
        bytes = _groupcompress_py.encode_copy_instruction(offset, length)
455
 
        if expected != bytes:
456
 
            self.assertEqual([hex(ord(e)) for e in expected],
457
 
                             [hex(ord(b)) for b in bytes])
458
 
 
459
 
    def assertDecode(self, exp_offset, exp_length, exp_newpos, bytes, pos):
460
 
        cmd = ord(bytes[pos])
461
 
        pos += 1
462
 
        out = _groupcompress_py.decode_copy_instruction(bytes, cmd, pos)
463
 
        self.assertEqual((exp_offset, exp_length, exp_newpos), out)
464
 
 
465
 
    def test_encode_no_length(self):
466
 
        self.assertEncode('\x80', 0, 64*1024)
467
 
        self.assertEncode('\x81\x01', 1, 64*1024)
468
 
        self.assertEncode('\x81\x0a', 10, 64*1024)
469
 
        self.assertEncode('\x81\xff', 255, 64*1024)
470
 
        self.assertEncode('\x82\x01', 256, 64*1024)
471
 
        self.assertEncode('\x83\x01\x01', 257, 64*1024)
472
 
        self.assertEncode('\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64*1024)
473
 
        self.assertEncode('\x8E\xff\xff\xff', 0xFFFFFF00, 64*1024)
474
 
        self.assertEncode('\x8D\xff\xff\xff', 0xFFFF00FF, 64*1024)
475
 
        self.assertEncode('\x8B\xff\xff\xff', 0xFF00FFFF, 64*1024)
476
 
        self.assertEncode('\x87\xff\xff\xff', 0x00FFFFFF, 64*1024)
477
 
        self.assertEncode('\x8F\x04\x03\x02\x01', 0x01020304, 64*1024)
478
 
 
479
 
    def test_encode_no_offset(self):
480
 
        self.assertEncode('\x90\x01', 0, 1)
481
 
        self.assertEncode('\x90\x0a', 0, 10)
482
 
        self.assertEncode('\x90\xff', 0, 255)
483
 
        self.assertEncode('\xA0\x01', 0, 256)
484
 
        self.assertEncode('\xB0\x01\x01', 0, 257)
485
 
        self.assertEncode('\xB0\xff\xff', 0, 0xFFFF)
486
 
        # Special case, if copy == 64KiB, then we store exactly 0
487
 
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
488
 
        # about that, as we would never actually copy 0 bytes
489
 
        self.assertEncode('\x80', 0, 64*1024)
490
 
 
491
 
    def test_encode(self):
492
 
        self.assertEncode('\x91\x01\x01', 1, 1)
493
 
        self.assertEncode('\x91\x09\x0a', 9, 10)
494
 
        self.assertEncode('\x91\xfe\xff', 254, 255)
495
 
        self.assertEncode('\xA2\x02\x01', 512, 256)
496
 
        self.assertEncode('\xB3\x02\x01\x01\x01', 258, 257)
497
 
        self.assertEncode('\xB0\x01\x01', 0, 257)
498
 
        # Special case, if copy == 64KiB, then we store exactly 0
499
 
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
500
 
        # about that, as we would never actually copy 0 bytes
501
 
        self.assertEncode('\x81\x0a', 10, 64*1024)
502
 
 
503
 
    def test_decode_no_length(self):
504
 
        # If length is 0, it is interpreted as 64KiB
505
 
        # The shortest possible instruction is a copy of 64KiB from offset 0
506
 
        self.assertDecode(0, 65536, 1, '\x80', 0)
507
 
        self.assertDecode(1, 65536, 2, '\x81\x01', 0)
508
 
        self.assertDecode(10, 65536, 2, '\x81\x0a', 0)
509
 
        self.assertDecode(255, 65536, 2, '\x81\xff', 0)
510
 
        self.assertDecode(256, 65536, 2, '\x82\x01', 0)
511
 
        self.assertDecode(257, 65536, 3, '\x83\x01\x01', 0)
512
 
        self.assertDecode(0xFFFFFFFF, 65536, 5, '\x8F\xff\xff\xff\xff', 0)
513
 
        self.assertDecode(0xFFFFFF00, 65536, 4, '\x8E\xff\xff\xff', 0)
514
 
        self.assertDecode(0xFFFF00FF, 65536, 4, '\x8D\xff\xff\xff', 0)
515
 
        self.assertDecode(0xFF00FFFF, 65536, 4, '\x8B\xff\xff\xff', 0)
516
 
        self.assertDecode(0x00FFFFFF, 65536, 4, '\x87\xff\xff\xff', 0)
517
 
        self.assertDecode(0x01020304, 65536, 5, '\x8F\x04\x03\x02\x01', 0)
518
 
 
519
 
    def test_decode_no_offset(self):
520
 
        self.assertDecode(0, 1, 2, '\x90\x01', 0)
521
 
        self.assertDecode(0, 10, 2, '\x90\x0a', 0)
522
 
        self.assertDecode(0, 255, 2, '\x90\xff', 0)
523
 
        self.assertDecode(0, 256, 2, '\xA0\x01', 0)
524
 
        self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
525
 
        self.assertDecode(0, 65535, 3, '\xB0\xff\xff', 0)
526
 
        # Special case, if copy == 64KiB, then we store exactly 0
527
 
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
528
 
        # about that, as we would never actually copy 0 bytes
529
 
        self.assertDecode(0, 65536, 1, '\x80', 0)
530
 
 
531
 
    def test_decode(self):
532
 
        self.assertDecode(1, 1, 3, '\x91\x01\x01', 0)
533
 
        self.assertDecode(9, 10, 3, '\x91\x09\x0a', 0)
534
 
        self.assertDecode(254, 255, 3, '\x91\xfe\xff', 0)
535
 
        self.assertDecode(512, 256, 3, '\xA2\x02\x01', 0)
536
 
        self.assertDecode(258, 257, 5, '\xB3\x02\x01\x01\x01', 0)
537
 
        self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
538
 
 
539
 
    def test_decode_not_start(self):
540
 
        self.assertDecode(1, 1, 6, 'abc\x91\x01\x01def', 3)
541
 
        self.assertDecode(9, 10, 5, 'ab\x91\x09\x0ade', 2)
542
 
        self.assertDecode(254, 255, 6, 'not\x91\xfe\xffcopy', 3)
543
 
 
544
 
 
545
 
class TestBase128Int(tests.TestCase):
546
 
 
547
 
    scenarios = module_scenarios()
548
 
 
549
 
    _gc_module = None # Set by load_tests
550
 
 
551
 
    def assertEqualEncode(self, bytes, val):
552
 
        self.assertEqual(bytes, self._gc_module.encode_base128_int(val))
553
 
 
554
 
    def assertEqualDecode(self, val, num_decode, bytes):
555
 
        self.assertEqual((val, num_decode),
556
 
                         self._gc_module.decode_base128_int(bytes))
557
 
 
558
 
    def test_encode(self):
559
 
        self.assertEqualEncode('\x01', 1)
560
 
        self.assertEqualEncode('\x02', 2)
561
 
        self.assertEqualEncode('\x7f', 127)
562
 
        self.assertEqualEncode('\x80\x01', 128)
563
 
        self.assertEqualEncode('\xff\x01', 255)
564
 
        self.assertEqualEncode('\x80\x02', 256)
565
 
        self.assertEqualEncode('\xff\xff\xff\xff\x0f', 0xFFFFFFFF)
566
 
 
567
 
    def test_decode(self):
568
 
        self.assertEqualDecode(1, 1, '\x01')
569
 
        self.assertEqualDecode(2, 1, '\x02')
570
 
        self.assertEqualDecode(127, 1, '\x7f')
571
 
        self.assertEqualDecode(128, 2, '\x80\x01')
572
 
        self.assertEqualDecode(255, 2, '\xff\x01')
573
 
        self.assertEqualDecode(256, 2, '\x80\x02')
574
 
        self.assertEqualDecode(0xFFFFFFFF, 5, '\xff\xff\xff\xff\x0f')
575
 
 
576
 
    def test_decode_with_trailing_bytes(self):
577
 
        self.assertEqualDecode(1, 1, '\x01abcdef')
578
 
        self.assertEqualDecode(127, 1, '\x7f\x01')
579
 
        self.assertEqualDecode(128, 2, '\x80\x01abcdef')
580
 
        self.assertEqualDecode(255, 2, '\xff\x01\xff')
581
 
 
582