~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test__groupcompress.py

  • Committer: John Arbash Meinel
  • Date: 2005-11-30 15:43:57 UTC
  • mto: (1185.50.1 jam-integration)
  • mto: This revision was merged to the branch mainline in revision 1518.
  • Revision ID: john@arbash-meinel.com-20051130154357-614206b3a7b83cd0
Refactored bzrlib/ui.py into a module with the possibility for multiple ui forms.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008-2011 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
"""Tests for the python and pyrex extensions of groupcompress"""
18
 
 
19
 
from bzrlib import (
20
 
    _groupcompress_py,
21
 
    tests,
22
 
    )
23
 
from bzrlib.tests.scenarios import (
24
 
    load_tests_apply_scenarios,
25
 
    )
26
 
 
27
 
 
28
 
def module_scenarios():
29
 
    scenarios = [
30
 
        ('python', {'_gc_module': _groupcompress_py}),
31
 
        ]
32
 
    if compiled_groupcompress_feature.available():
33
 
        gc_module = compiled_groupcompress_feature.module
34
 
        scenarios.append(('C',
35
 
            {'_gc_module': gc_module}))
36
 
    return scenarios
37
 
 
38
 
 
39
 
def two_way_scenarios():
40
 
    scenarios = [
41
 
        ('PP', {'make_delta': _groupcompress_py.make_delta,
42
 
                'apply_delta': _groupcompress_py.apply_delta})
43
 
        ]
44
 
    if compiled_groupcompress_feature.available():
45
 
        gc_module = compiled_groupcompress_feature.module
46
 
        scenarios.extend([
47
 
            ('CC', {'make_delta': gc_module.make_delta,
48
 
                    'apply_delta': gc_module.apply_delta}),
49
 
            ('PC', {'make_delta': _groupcompress_py.make_delta,
50
 
                    'apply_delta': gc_module.apply_delta}),
51
 
            ('CP', {'make_delta': gc_module.make_delta,
52
 
                    'apply_delta': _groupcompress_py.apply_delta}),
53
 
            ])
54
 
    return scenarios
55
 
 
56
 
 
57
 
load_tests = load_tests_apply_scenarios
58
 
 
59
 
 
60
 
compiled_groupcompress_feature = tests.ModuleAvailableFeature(
61
 
                                    'bzrlib._groupcompress_pyx')
62
 
 
63
 
_text1 = """\
64
 
This is a bit
65
 
of source text
66
 
which is meant to be matched
67
 
against other text
68
 
"""
69
 
 
70
 
_text2 = """\
71
 
This is a bit
72
 
of source text
73
 
which is meant to differ from
74
 
against other text
75
 
"""
76
 
 
77
 
_text3 = """\
78
 
This is a bit
79
 
of source text
80
 
which is meant to be matched
81
 
against other text
82
 
except it also
83
 
has a lot more data
84
 
at the end of the file
85
 
"""
86
 
 
87
 
_first_text = """\
88
 
a bit of text, that
89
 
does not have much in
90
 
common with the next text
91
 
"""
92
 
 
93
 
_second_text = """\
94
 
some more bit of text, that
95
 
does not have much in
96
 
common with the previous text
97
 
and has some extra text
98
 
"""
99
 
 
100
 
 
101
 
_third_text = """\
102
 
a bit of text, that
103
 
has some in common with the previous text
104
 
and has some extra text
105
 
and not have much in
106
 
common with the next text
107
 
"""
108
 
 
109
 
_fourth_text = """\
110
 
123456789012345
111
 
same rabin hash
112
 
123456789012345
113
 
same rabin hash
114
 
123456789012345
115
 
same rabin hash
116
 
123456789012345
117
 
same rabin hash
118
 
"""
119
 
 
120
 
class TestMakeAndApplyDelta(tests.TestCase):
121
 
 
122
 
    scenarios = module_scenarios()
123
 
    _gc_module = None # Set by load_tests
124
 
 
125
 
    def setUp(self):
126
 
        super(TestMakeAndApplyDelta, self).setUp()
127
 
        self.make_delta = self._gc_module.make_delta
128
 
        self.apply_delta = self._gc_module.apply_delta
129
 
        self.apply_delta_to_source = self._gc_module.apply_delta_to_source
130
 
 
131
 
    def test_make_delta_is_typesafe(self):
132
 
        self.make_delta('a string', 'another string')
133
 
 
134
 
        def _check_make_delta(string1, string2):
135
 
            self.assertRaises(TypeError, self.make_delta, string1, string2)
136
 
 
137
 
        _check_make_delta('a string', object())
138
 
        _check_make_delta('a string', u'not a string')
139
 
        _check_make_delta(object(), 'a string')
140
 
        _check_make_delta(u'not a string', 'a string')
141
 
 
142
 
    def test_make_noop_delta(self):
143
 
        ident_delta = self.make_delta(_text1, _text1)
144
 
        self.assertEqual('M\x90M', ident_delta)
145
 
        ident_delta = self.make_delta(_text2, _text2)
146
 
        self.assertEqual('N\x90N', ident_delta)
147
 
        ident_delta = self.make_delta(_text3, _text3)
148
 
        self.assertEqual('\x87\x01\x90\x87', ident_delta)
149
 
 
150
 
    def assertDeltaIn(self, delta1, delta2, delta):
151
 
        """Make sure that the delta bytes match one of the expectations."""
152
 
        # In general, the python delta matcher gives different results than the
153
 
        # pyrex delta matcher. Both should be valid deltas, though.
154
 
        if delta not in (delta1, delta2):
155
 
            self.fail("Delta bytes:\n"
156
 
                      "       %r\n"
157
 
                      "not in %r\n"
158
 
                      "    or %r"
159
 
                      % (delta, delta1, delta2))
160
 
 
161
 
    def test_make_delta(self):
162
 
        delta = self.make_delta(_text1, _text2)
163
 
        self.assertDeltaIn(
164
 
            'N\x90/\x1fdiffer from\nagainst other text\n',
165
 
            'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
166
 
            delta)
167
 
        delta = self.make_delta(_text2, _text1)
168
 
        self.assertDeltaIn(
169
 
            'M\x90/\x1ebe matched\nagainst other text\n',
170
 
            'M\x90\x1d\x1dwhich is meant to be matched\n\x91;\x13',
171
 
            delta)
172
 
        delta = self.make_delta(_text3, _text1)
173
 
        self.assertEqual('M\x90M', delta)
174
 
        delta = self.make_delta(_text3, _text2)
175
 
        self.assertDeltaIn(
176
 
            'N\x90/\x1fdiffer from\nagainst other text\n',
177
 
            'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
178
 
            delta)
179
 
 
180
 
    def test_make_delta_with_large_copies(self):
181
 
        # We want to have a copy that is larger than 64kB, which forces us to
182
 
        # issue multiple copy instructions.
183
 
        big_text = _text3 * 1220
184
 
        delta = self.make_delta(big_text, big_text)
185
 
        self.assertDeltaIn(
186
 
            '\xdc\x86\x0a'      # Encoding the length of the uncompressed text
187
 
            '\x80'              # Copy 64kB, starting at byte 0
188
 
            '\x84\x01'          # and another 64kB starting at 64kB
189
 
            '\xb4\x02\x5c\x83', # And the bit of tail.
190
 
            None,   # Both implementations should be identical
191
 
            delta)
192
 
 
193
 
    def test_apply_delta_is_typesafe(self):
194
 
        self.apply_delta(_text1, 'M\x90M')
195
 
        self.assertRaises(TypeError, self.apply_delta, object(), 'M\x90M')
196
 
        self.assertRaises(TypeError, self.apply_delta,
197
 
                          unicode(_text1), 'M\x90M')
198
 
        self.assertRaises(TypeError, self.apply_delta, _text1, u'M\x90M')
199
 
        self.assertRaises(TypeError, self.apply_delta, _text1, object())
200
 
 
201
 
    def test_apply_delta(self):
202
 
        target = self.apply_delta(_text1,
203
 
                    'N\x90/\x1fdiffer from\nagainst other text\n')
204
 
        self.assertEqual(_text2, target)
205
 
        target = self.apply_delta(_text2,
206
 
                    'M\x90/\x1ebe matched\nagainst other text\n')
207
 
        self.assertEqual(_text1, target)
208
 
 
209
 
    def test_apply_delta_to_source_is_safe(self):
210
 
        self.assertRaises(TypeError,
211
 
            self.apply_delta_to_source, object(), 0, 1)
212
 
        self.assertRaises(TypeError,
213
 
            self.apply_delta_to_source, u'unicode str', 0, 1)
214
 
        # end > length
215
 
        self.assertRaises(ValueError,
216
 
            self.apply_delta_to_source, 'foo', 1, 4)
217
 
        # start > length
218
 
        self.assertRaises(ValueError,
219
 
            self.apply_delta_to_source, 'foo', 5, 3)
220
 
        # start > end
221
 
        self.assertRaises(ValueError,
222
 
            self.apply_delta_to_source, 'foo', 3, 2)
223
 
 
224
 
    def test_apply_delta_to_source(self):
225
 
        source_and_delta = (_text1
226
 
                            + 'N\x90/\x1fdiffer from\nagainst other text\n')
227
 
        self.assertEqual(_text2, self.apply_delta_to_source(source_and_delta,
228
 
                                    len(_text1), len(source_and_delta)))
229
 
 
230
 
 
231
 
class TestMakeAndApplyCompatible(tests.TestCase):
232
 
 
233
 
    scenarios = two_way_scenarios()
234
 
 
235
 
    make_delta = None # Set by load_tests
236
 
    apply_delta = None # Set by load_tests
237
 
 
238
 
    def assertMakeAndApply(self, source, target):
239
 
        """Assert that generating a delta and applying gives success."""
240
 
        delta = self.make_delta(source, target)
241
 
        bytes = self.apply_delta(source, delta)
242
 
        self.assertEqualDiff(target, bytes)
243
 
 
244
 
    def test_direct(self):
245
 
        self.assertMakeAndApply(_text1, _text2)
246
 
        self.assertMakeAndApply(_text2, _text1)
247
 
        self.assertMakeAndApply(_text1, _text3)
248
 
        self.assertMakeAndApply(_text3, _text1)
249
 
        self.assertMakeAndApply(_text2, _text3)
250
 
        self.assertMakeAndApply(_text3, _text2)
251
 
 
252
 
 
253
 
class TestDeltaIndex(tests.TestCase):
254
 
 
255
 
    def setUp(self):
256
 
        super(TestDeltaIndex, self).setUp()
257
 
        # This test isn't multiplied, because we only have DeltaIndex for the
258
 
        # compiled form
259
 
        # We call this here, because _test_needs_features happens after setUp
260
 
        self.requireFeature(compiled_groupcompress_feature)
261
 
        self._gc_module = compiled_groupcompress_feature.module
262
 
 
263
 
    def test_repr(self):
264
 
        di = self._gc_module.DeltaIndex('test text\n')
265
 
        self.assertEqual('DeltaIndex(1, 10)', repr(di))
266
 
 
267
 
    def test__dump_no_index(self):
268
 
        di = self._gc_module.DeltaIndex()
269
 
        self.assertEqual(None, di._dump_index())
270
 
 
271
 
    def test__dump_index_simple(self):
272
 
        di = self._gc_module.DeltaIndex()
273
 
        di.add_source(_text1, 0)
274
 
        self.assertFalse(di._has_index())
275
 
        self.assertEqual(None, di._dump_index())
276
 
        _ = di.make_delta(_text1)
277
 
        self.assertTrue(di._has_index())
278
 
        hash_list, entry_list = di._dump_index()
279
 
        self.assertEqual(16, len(hash_list))
280
 
        self.assertEqual(68, len(entry_list))
281
 
        just_entries = [(idx, text_offset, hash_val)
282
 
                        for idx, (text_offset, hash_val)
283
 
                         in enumerate(entry_list)
284
 
                         if text_offset != 0 or hash_val != 0]
285
 
        rabin_hash = self._gc_module._rabin_hash
286
 
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
287
 
                          (25, 48, rabin_hash(_text1[33:49])),
288
 
                          (34, 32, rabin_hash(_text1[17:33])),
289
 
                          (47, 64, rabin_hash(_text1[49:65])),
290
 
                         ], just_entries)
291
 
        # This ensures that the hash map points to the location we expect it to
292
 
        for entry_idx, text_offset, hash_val in just_entries:
293
 
            self.assertEqual(entry_idx, hash_list[hash_val & 0xf])
294
 
 
295
 
    def test__dump_index_two_sources(self):
296
 
        di = self._gc_module.DeltaIndex()
297
 
        di.add_source(_text1, 0)
298
 
        di.add_source(_text2, 2)
299
 
        start2 = len(_text1) + 2
300
 
        self.assertTrue(di._has_index())
301
 
        hash_list, entry_list = di._dump_index()
302
 
        self.assertEqual(16, len(hash_list))
303
 
        self.assertEqual(68, len(entry_list))
304
 
        just_entries = [(idx, text_offset, hash_val)
305
 
                        for idx, (text_offset, hash_val)
306
 
                         in enumerate(entry_list)
307
 
                         if text_offset != 0 or hash_val != 0]
308
 
        rabin_hash = self._gc_module._rabin_hash
309
 
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
310
 
                          (9, start2+16, rabin_hash(_text2[1:17])),
311
 
                          (25, 48, rabin_hash(_text1[33:49])),
312
 
                          (30, start2+64, rabin_hash(_text2[49:65])),
313
 
                          (34, 32, rabin_hash(_text1[17:33])),
314
 
                          (35, start2+32, rabin_hash(_text2[17:33])),
315
 
                          (43, start2+48, rabin_hash(_text2[33:49])),
316
 
                          (47, 64, rabin_hash(_text1[49:65])),
317
 
                         ], just_entries)
318
 
        # Each entry should be in the appropriate hash bucket.
319
 
        for entry_idx, text_offset, hash_val in just_entries:
320
 
            hash_idx = hash_val & 0xf
321
 
            self.assertTrue(
322
 
                hash_list[hash_idx] <= entry_idx < hash_list[hash_idx+1])
323
 
 
324
 
    def test_first_add_source_doesnt_index_until_make_delta(self):
325
 
        di = self._gc_module.DeltaIndex()
326
 
        self.assertFalse(di._has_index())
327
 
        di.add_source(_text1, 0)
328
 
        self.assertFalse(di._has_index())
329
 
        # However, asking to make a delta will trigger the index to be
330
 
        # generated, and will generate a proper delta
331
 
        delta = di.make_delta(_text2)
332
 
        self.assertTrue(di._has_index())
333
 
        self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
334
 
 
335
 
    def test_add_source_max_bytes_to_index(self):
336
 
        di = self._gc_module.DeltaIndex()
337
 
        di._max_bytes_to_index = 3*16
338
 
        di.add_source(_text1, 0) # (77 bytes -1) // 3 = 25 byte stride
339
 
        di.add_source(_text3, 3) # (135 bytes -1) // 3 = 44 byte stride
340
 
        start2 = len(_text1) + 3
341
 
        hash_list, entry_list = di._dump_index()
342
 
        self.assertEqual(16, len(hash_list))
343
 
        self.assertEqual(67, len(entry_list))
344
 
        just_entries = sorted([(text_offset, hash_val)
345
 
                               for text_offset, hash_val in entry_list
346
 
                                if text_offset != 0 or hash_val != 0])
347
 
        rabin_hash = self._gc_module._rabin_hash
348
 
        self.assertEqual([(25, rabin_hash(_text1[10:26])),
349
 
                          (50, rabin_hash(_text1[35:51])),
350
 
                          (75, rabin_hash(_text1[60:76])),
351
 
                          (start2+44, rabin_hash(_text3[29:45])),
352
 
                          (start2+88, rabin_hash(_text3[73:89])),
353
 
                          (start2+132, rabin_hash(_text3[117:133])),
354
 
                         ], just_entries)
355
 
 
356
 
    def test_second_add_source_triggers_make_index(self):
357
 
        di = self._gc_module.DeltaIndex()
358
 
        self.assertFalse(di._has_index())
359
 
        di.add_source(_text1, 0)
360
 
        self.assertFalse(di._has_index())
361
 
        di.add_source(_text2, 0)
362
 
        self.assertTrue(di._has_index())
363
 
 
364
 
    def test_make_delta(self):
365
 
        di = self._gc_module.DeltaIndex(_text1)
366
 
        delta = di.make_delta(_text2)
367
 
        self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
368
 
 
369
 
    def test_delta_against_multiple_sources(self):
370
 
        di = self._gc_module.DeltaIndex()
371
 
        di.add_source(_first_text, 0)
372
 
        self.assertEqual(len(_first_text), di._source_offset)
373
 
        di.add_source(_second_text, 0)
374
 
        self.assertEqual(len(_first_text) + len(_second_text),
375
 
                         di._source_offset)
376
 
        delta = di.make_delta(_third_text)
377
 
        result = self._gc_module.apply_delta(_first_text + _second_text, delta)
378
 
        self.assertEqualDiff(_third_text, result)
379
 
        self.assertEqual('\x85\x01\x90\x14\x0chas some in '
380
 
                         '\x91v6\x03and\x91d"\x91:\n', delta)
381
 
 
382
 
    def test_delta_with_offsets(self):
383
 
        di = self._gc_module.DeltaIndex()
384
 
        di.add_source(_first_text, 5)
385
 
        self.assertEqual(len(_first_text) + 5, di._source_offset)
386
 
        di.add_source(_second_text, 10)
387
 
        self.assertEqual(len(_first_text) + len(_second_text) + 15,
388
 
                         di._source_offset)
389
 
        delta = di.make_delta(_third_text)
390
 
        self.assertIsNot(None, delta)
391
 
        result = self._gc_module.apply_delta(
392
 
            '12345' + _first_text + '1234567890' + _second_text, delta)
393
 
        self.assertIsNot(None, result)
394
 
        self.assertEqualDiff(_third_text, result)
395
 
        self.assertEqual('\x85\x01\x91\x05\x14\x0chas some in '
396
 
                         '\x91\x856\x03and\x91s"\x91?\n', delta)
397
 
 
398
 
    def test_delta_with_delta_bytes(self):
399
 
        di = self._gc_module.DeltaIndex()
400
 
        source = _first_text
401
 
        di.add_source(_first_text, 0)
402
 
        self.assertEqual(len(_first_text), di._source_offset)
403
 
        delta = di.make_delta(_second_text)
404
 
        self.assertEqual('h\tsome more\x91\x019'
405
 
                         '&previous text\nand has some extra text\n', delta)
406
 
        di.add_delta_source(delta, 0)
407
 
        source += delta
408
 
        self.assertEqual(len(_first_text) + len(delta), di._source_offset)
409
 
        second_delta = di.make_delta(_third_text)
410
 
        result = self._gc_module.apply_delta(source, second_delta)
411
 
        self.assertEqualDiff(_third_text, result)
412
 
        # We should be able to match against the
413
 
        # 'previous text\nand has some...'  that was part of the delta bytes
414
 
        # Note that we don't match the 'common with the', because it isn't long
415
 
        # enough to match in the original text, and those bytes are not present
416
 
        # in the delta for the second text.
417
 
        self.assertEqual('\x85\x01\x90\x14\x1chas some in common with the '
418
 
                         '\x91S&\x03and\x91\x18,', second_delta)
419
 
        # Add this delta, and create a new delta for the same text. We should
420
 
        # find the remaining text, and only insert the short 'and' text.
421
 
        di.add_delta_source(second_delta, 0)
422
 
        source += second_delta
423
 
        third_delta = di.make_delta(_third_text)
424
 
        result = self._gc_module.apply_delta(source, third_delta)
425
 
        self.assertEqualDiff(_third_text, result)
426
 
        self.assertEqual('\x85\x01\x90\x14\x91\x7e\x1c'
427
 
                         '\x91S&\x03and\x91\x18,', third_delta)
428
 
        # Now create a delta, which we know won't be able to be 'fit' into the
429
 
        # existing index
430
 
        fourth_delta = di.make_delta(_fourth_text)
431
 
        self.assertEqual(_fourth_text,
432
 
                         self._gc_module.apply_delta(source, fourth_delta))
433
 
        self.assertEqual('\x80\x01'
434
 
                         '\x7f123456789012345\nsame rabin hash\n'
435
 
                         '123456789012345\nsame rabin hash\n'
436
 
                         '123456789012345\nsame rabin hash\n'
437
 
                         '123456789012345\nsame rabin hash'
438
 
                         '\x01\n', fourth_delta)
439
 
        di.add_delta_source(fourth_delta, 0)
440
 
        source += fourth_delta
441
 
        # With the next delta, everything should be found
442
 
        fifth_delta = di.make_delta(_fourth_text)
443
 
        self.assertEqual(_fourth_text,
444
 
                         self._gc_module.apply_delta(source, fifth_delta))
445
 
        self.assertEqual('\x80\x01\x91\xa7\x7f\x01\n', fifth_delta)
446
 
 
447
 
 
448
 
class TestCopyInstruction(tests.TestCase):
449
 
 
450
 
    def assertEncode(self, expected, offset, length):
451
 
        bytes = _groupcompress_py.encode_copy_instruction(offset, length)
452
 
        if expected != bytes:
453
 
            self.assertEqual([hex(ord(e)) for e in expected],
454
 
                             [hex(ord(b)) for b in bytes])
455
 
 
456
 
    def assertDecode(self, exp_offset, exp_length, exp_newpos, bytes, pos):
457
 
        cmd = ord(bytes[pos])
458
 
        pos += 1
459
 
        out = _groupcompress_py.decode_copy_instruction(bytes, cmd, pos)
460
 
        self.assertEqual((exp_offset, exp_length, exp_newpos), out)
461
 
 
462
 
    def test_encode_no_length(self):
463
 
        self.assertEncode('\x80', 0, 64*1024)
464
 
        self.assertEncode('\x81\x01', 1, 64*1024)
465
 
        self.assertEncode('\x81\x0a', 10, 64*1024)
466
 
        self.assertEncode('\x81\xff', 255, 64*1024)
467
 
        self.assertEncode('\x82\x01', 256, 64*1024)
468
 
        self.assertEncode('\x83\x01\x01', 257, 64*1024)
469
 
        self.assertEncode('\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64*1024)
470
 
        self.assertEncode('\x8E\xff\xff\xff', 0xFFFFFF00, 64*1024)
471
 
        self.assertEncode('\x8D\xff\xff\xff', 0xFFFF00FF, 64*1024)
472
 
        self.assertEncode('\x8B\xff\xff\xff', 0xFF00FFFF, 64*1024)
473
 
        self.assertEncode('\x87\xff\xff\xff', 0x00FFFFFF, 64*1024)
474
 
        self.assertEncode('\x8F\x04\x03\x02\x01', 0x01020304, 64*1024)
475
 
 
476
 
    def test_encode_no_offset(self):
477
 
        self.assertEncode('\x90\x01', 0, 1)
478
 
        self.assertEncode('\x90\x0a', 0, 10)
479
 
        self.assertEncode('\x90\xff', 0, 255)
480
 
        self.assertEncode('\xA0\x01', 0, 256)
481
 
        self.assertEncode('\xB0\x01\x01', 0, 257)
482
 
        self.assertEncode('\xB0\xff\xff', 0, 0xFFFF)
483
 
        # Special case, if copy == 64KiB, then we store exactly 0
484
 
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
485
 
        # about that, as we would never actually copy 0 bytes
486
 
        self.assertEncode('\x80', 0, 64*1024)
487
 
 
488
 
    def test_encode(self):
489
 
        self.assertEncode('\x91\x01\x01', 1, 1)
490
 
        self.assertEncode('\x91\x09\x0a', 9, 10)
491
 
        self.assertEncode('\x91\xfe\xff', 254, 255)
492
 
        self.assertEncode('\xA2\x02\x01', 512, 256)
493
 
        self.assertEncode('\xB3\x02\x01\x01\x01', 258, 257)
494
 
        self.assertEncode('\xB0\x01\x01', 0, 257)
495
 
        # Special case, if copy == 64KiB, then we store exactly 0
496
 
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
497
 
        # about that, as we would never actually copy 0 bytes
498
 
        self.assertEncode('\x81\x0a', 10, 64*1024)
499
 
 
500
 
    def test_decode_no_length(self):
501
 
        # If length is 0, it is interpreted as 64KiB
502
 
        # The shortest possible instruction is a copy of 64KiB from offset 0
503
 
        self.assertDecode(0, 65536, 1, '\x80', 0)
504
 
        self.assertDecode(1, 65536, 2, '\x81\x01', 0)
505
 
        self.assertDecode(10, 65536, 2, '\x81\x0a', 0)
506
 
        self.assertDecode(255, 65536, 2, '\x81\xff', 0)
507
 
        self.assertDecode(256, 65536, 2, '\x82\x01', 0)
508
 
        self.assertDecode(257, 65536, 3, '\x83\x01\x01', 0)
509
 
        self.assertDecode(0xFFFFFFFF, 65536, 5, '\x8F\xff\xff\xff\xff', 0)
510
 
        self.assertDecode(0xFFFFFF00, 65536, 4, '\x8E\xff\xff\xff', 0)
511
 
        self.assertDecode(0xFFFF00FF, 65536, 4, '\x8D\xff\xff\xff', 0)
512
 
        self.assertDecode(0xFF00FFFF, 65536, 4, '\x8B\xff\xff\xff', 0)
513
 
        self.assertDecode(0x00FFFFFF, 65536, 4, '\x87\xff\xff\xff', 0)
514
 
        self.assertDecode(0x01020304, 65536, 5, '\x8F\x04\x03\x02\x01', 0)
515
 
 
516
 
    def test_decode_no_offset(self):
517
 
        self.assertDecode(0, 1, 2, '\x90\x01', 0)
518
 
        self.assertDecode(0, 10, 2, '\x90\x0a', 0)
519
 
        self.assertDecode(0, 255, 2, '\x90\xff', 0)
520
 
        self.assertDecode(0, 256, 2, '\xA0\x01', 0)
521
 
        self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
522
 
        self.assertDecode(0, 65535, 3, '\xB0\xff\xff', 0)
523
 
        # Special case, if copy == 64KiB, then we store exactly 0
524
 
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
525
 
        # about that, as we would never actually copy 0 bytes
526
 
        self.assertDecode(0, 65536, 1, '\x80', 0)
527
 
 
528
 
    def test_decode(self):
529
 
        self.assertDecode(1, 1, 3, '\x91\x01\x01', 0)
530
 
        self.assertDecode(9, 10, 3, '\x91\x09\x0a', 0)
531
 
        self.assertDecode(254, 255, 3, '\x91\xfe\xff', 0)
532
 
        self.assertDecode(512, 256, 3, '\xA2\x02\x01', 0)
533
 
        self.assertDecode(258, 257, 5, '\xB3\x02\x01\x01\x01', 0)
534
 
        self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
535
 
 
536
 
    def test_decode_not_start(self):
537
 
        self.assertDecode(1, 1, 6, 'abc\x91\x01\x01def', 3)
538
 
        self.assertDecode(9, 10, 5, 'ab\x91\x09\x0ade', 2)
539
 
        self.assertDecode(254, 255, 6, 'not\x91\xfe\xffcopy', 3)
540
 
 
541
 
 
542
 
class TestBase128Int(tests.TestCase):
543
 
 
544
 
    scenarios = module_scenarios()
545
 
 
546
 
    _gc_module = None # Set by load_tests
547
 
 
548
 
    def assertEqualEncode(self, bytes, val):
549
 
        self.assertEqual(bytes, self._gc_module.encode_base128_int(val))
550
 
 
551
 
    def assertEqualDecode(self, val, num_decode, bytes):
552
 
        self.assertEqual((val, num_decode),
553
 
                         self._gc_module.decode_base128_int(bytes))
554
 
 
555
 
    def test_encode(self):
556
 
        self.assertEqualEncode('\x01', 1)
557
 
        self.assertEqualEncode('\x02', 2)
558
 
        self.assertEqualEncode('\x7f', 127)
559
 
        self.assertEqualEncode('\x80\x01', 128)
560
 
        self.assertEqualEncode('\xff\x01', 255)
561
 
        self.assertEqualEncode('\x80\x02', 256)
562
 
        self.assertEqualEncode('\xff\xff\xff\xff\x0f', 0xFFFFFFFF)
563
 
 
564
 
    def test_decode(self):
565
 
        self.assertEqualDecode(1, 1, '\x01')
566
 
        self.assertEqualDecode(2, 1, '\x02')
567
 
        self.assertEqualDecode(127, 1, '\x7f')
568
 
        self.assertEqualDecode(128, 2, '\x80\x01')
569
 
        self.assertEqualDecode(255, 2, '\xff\x01')
570
 
        self.assertEqualDecode(256, 2, '\x80\x02')
571
 
        self.assertEqualDecode(0xFFFFFFFF, 5, '\xff\xff\xff\xff\x0f')
572
 
 
573
 
    def test_decode_with_trailing_bytes(self):
574
 
        self.assertEqualDecode(1, 1, '\x01abcdef')
575
 
        self.assertEqualDecode(127, 1, '\x7f\x01')
576
 
        self.assertEqualDecode(128, 2, '\x80\x01abcdef')
577
 
        self.assertEqualDecode(255, 2, '\xff\x01\xff')
578
 
 
579