1
# Copyright (C) 2008, 2009 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the python and pyrex extensions of groupcompress"""
26
def load_tests(standard_tests, module, loader):
27
"""Parameterize tests for all versions of groupcompress."""
29
('PP', {'make_delta': _groupcompress_py.make_delta,
30
'apply_delta': _groupcompress_py.apply_delta})
33
('python', {'_gc_module': _groupcompress_py}),
35
if compiled_groupcompress_feature.available():
36
gc_module = compiled_groupcompress_feature.module
37
scenarios.append(('C',
38
{'_gc_module': gc_module}))
39
two_way_scenarios.extend([
40
('CC', {'make_delta': gc_module.make_delta,
41
'apply_delta': gc_module.apply_delta}),
42
('PC', {'make_delta': _groupcompress_py.make_delta,
43
'apply_delta': gc_module.apply_delta}),
44
('CP', {'make_delta': gc_module.make_delta,
45
'apply_delta': _groupcompress_py.apply_delta}),
47
to_adapt, result = tests.split_suite_by_condition(
48
standard_tests, tests.condition_isinstance((TestMakeAndApplyDelta,
50
result = tests.multiply_tests(to_adapt, scenarios, result)
51
to_adapt, result = tests.split_suite_by_condition(result,
52
tests.condition_isinstance(TestMakeAndApplyCompatible))
53
result = tests.multiply_tests(to_adapt, two_way_scenarios, result)
57
compiled_groupcompress_feature = tests.ModuleAvailableFeature(
58
'bzrlib._groupcompress_pyx')
63
which is meant to be matched
70
which is meant to differ from
77
which is meant to be matched
81
at the end of the file
87
common with the next text
91
some more bit of text, that
93
common with the previous text
94
and has some extra text
100
has some in common with the previous text
101
and has some extra text
103
common with the next text
117
class TestMakeAndApplyDelta(tests.TestCase):
119
_gc_module = None # Set by load_tests
122
super(TestMakeAndApplyDelta, self).setUp()
123
self.make_delta = self._gc_module.make_delta
124
self.apply_delta = self._gc_module.apply_delta
125
self.apply_delta_to_source = self._gc_module.apply_delta_to_source
127
def test_make_delta_is_typesafe(self):
128
self.make_delta('a string', 'another string')
130
def _check_make_delta(string1, string2):
131
self.assertRaises(TypeError, self.make_delta, string1, string2)
133
_check_make_delta('a string', object())
134
_check_make_delta('a string', u'not a string')
135
_check_make_delta(object(), 'a string')
136
_check_make_delta(u'not a string', 'a string')
138
def test_make_noop_delta(self):
139
ident_delta = self.make_delta(_text1, _text1)
140
self.assertEqual('M\x90M', ident_delta)
141
ident_delta = self.make_delta(_text2, _text2)
142
self.assertEqual('N\x90N', ident_delta)
143
ident_delta = self.make_delta(_text3, _text3)
144
self.assertEqual('\x87\x01\x90\x87', ident_delta)
146
def assertDeltaIn(self, delta1, delta2, delta):
147
"""Make sure that the delta bytes match one of the expectations."""
148
# In general, the python delta matcher gives different results than the
149
# pyrex delta matcher. Both should be valid deltas, though.
150
if delta not in (delta1, delta2):
151
self.fail("Delta bytes:\n"
155
% (delta, delta1, delta2))
157
def test_make_delta(self):
158
delta = self.make_delta(_text1, _text2)
160
'N\x90/\x1fdiffer from\nagainst other text\n',
161
'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
163
delta = self.make_delta(_text2, _text1)
165
'M\x90/\x1ebe matched\nagainst other text\n',
166
'M\x90\x1d\x1dwhich is meant to be matched\n\x91;\x13',
168
delta = self.make_delta(_text3, _text1)
169
self.assertEqual('M\x90M', delta)
170
delta = self.make_delta(_text3, _text2)
172
'N\x90/\x1fdiffer from\nagainst other text\n',
173
'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
176
def test_make_delta_with_large_copies(self):
177
# We want to have a copy that is larger than 64kB, which forces us to
178
# issue multiple copy instructions.
179
big_text = _text3 * 1220
180
delta = self.make_delta(big_text, big_text)
182
'\xdc\x86\x0a' # Encoding the length of the uncompressed text
183
'\x80' # Copy 64kB, starting at byte 0
184
'\x84\x01' # and another 64kB starting at 64kB
185
'\xb4\x02\x5c\x83', # And the bit of tail.
186
None, # Both implementations should be identical
189
def test_apply_delta_is_typesafe(self):
190
self.apply_delta(_text1, 'M\x90M')
191
self.assertRaises(TypeError, self.apply_delta, object(), 'M\x90M')
192
self.assertRaises(TypeError, self.apply_delta,
193
unicode(_text1), 'M\x90M')
194
self.assertRaises(TypeError, self.apply_delta, _text1, u'M\x90M')
195
self.assertRaises(TypeError, self.apply_delta, _text1, object())
197
def test_apply_delta(self):
198
target = self.apply_delta(_text1,
199
'N\x90/\x1fdiffer from\nagainst other text\n')
200
self.assertEqual(_text2, target)
201
target = self.apply_delta(_text2,
202
'M\x90/\x1ebe matched\nagainst other text\n')
203
self.assertEqual(_text1, target)
205
def test_apply_delta_to_source_is_safe(self):
206
self.assertRaises(TypeError,
207
self.apply_delta_to_source, object(), 0, 1)
208
self.assertRaises(TypeError,
209
self.apply_delta_to_source, u'unicode str', 0, 1)
211
self.assertRaises(ValueError,
212
self.apply_delta_to_source, 'foo', 1, 4)
214
self.assertRaises(ValueError,
215
self.apply_delta_to_source, 'foo', 5, 3)
217
self.assertRaises(ValueError,
218
self.apply_delta_to_source, 'foo', 3, 2)
220
def test_apply_delta_to_source(self):
221
source_and_delta = (_text1
222
+ 'N\x90/\x1fdiffer from\nagainst other text\n')
223
self.assertEqual(_text2, self.apply_delta_to_source(source_and_delta,
224
len(_text1), len(source_and_delta)))
227
class TestMakeAndApplyCompatible(tests.TestCase):
229
make_delta = None # Set by load_tests
230
apply_delta = None # Set by load_tests
232
def assertMakeAndApply(self, source, target):
233
"""Assert that generating a delta and applying gives success."""
234
delta = self.make_delta(source, target)
235
bytes = self.apply_delta(source, delta)
236
self.assertEqualDiff(target, bytes)
238
def test_direct(self):
239
self.assertMakeAndApply(_text1, _text2)
240
self.assertMakeAndApply(_text2, _text1)
241
self.assertMakeAndApply(_text1, _text3)
242
self.assertMakeAndApply(_text3, _text1)
243
self.assertMakeAndApply(_text2, _text3)
244
self.assertMakeAndApply(_text3, _text2)
247
class TestDeltaIndex(tests.TestCase):
250
super(TestDeltaIndex, self).setUp()
251
# This test isn't multiplied, because we only have DeltaIndex for the
253
# We call this here, because _test_needs_features happens after setUp
254
self.requireFeature(compiled_groupcompress_feature)
255
self._gc_module = compiled_groupcompress_feature.module
258
di = self._gc_module.DeltaIndex('test text\n')
259
self.assertEqual('DeltaIndex(1, 10)', repr(di))
261
def test_first_add_source_doesnt_index_until_make_delta(self):
262
di = self._gc_module.DeltaIndex()
263
self.assertFalse(di._has_index())
264
di.add_source(_text1, 0)
265
self.assertFalse(di._has_index())
266
# However, asking to make a delta will trigger the index to be
267
# generated, and will generate a proper delta
268
delta = di.make_delta(_text2)
269
self.assertTrue(di._has_index())
270
self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
272
def test_second_add_source_triggers_make_index(self):
273
di = self._gc_module.DeltaIndex()
274
self.assertFalse(di._has_index())
275
di.add_source(_text1, 0)
276
self.assertFalse(di._has_index())
277
di.add_source(_text2, 0)
278
self.assertTrue(di._has_index())
280
def test_make_delta(self):
281
di = self._gc_module.DeltaIndex(_text1)
282
delta = di.make_delta(_text2)
283
self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
285
def test_delta_against_multiple_sources(self):
286
di = self._gc_module.DeltaIndex()
287
di.add_source(_first_text, 0)
288
self.assertEqual(len(_first_text), di._source_offset)
289
di.add_source(_second_text, 0)
290
self.assertEqual(len(_first_text) + len(_second_text),
292
delta = di.make_delta(_third_text)
293
result = self._gc_module.apply_delta(_first_text + _second_text, delta)
294
self.assertEqualDiff(_third_text, result)
295
self.assertEqual('\x85\x01\x90\x14\x0chas some in '
296
'\x91v6\x03and\x91d"\x91:\n', delta)
298
def test_delta_with_offsets(self):
299
di = self._gc_module.DeltaIndex()
300
di.add_source(_first_text, 5)
301
self.assertEqual(len(_first_text) + 5, di._source_offset)
302
di.add_source(_second_text, 10)
303
self.assertEqual(len(_first_text) + len(_second_text) + 15,
305
delta = di.make_delta(_third_text)
306
self.assertIsNot(None, delta)
307
result = self._gc_module.apply_delta(
308
'12345' + _first_text + '1234567890' + _second_text, delta)
309
self.assertIsNot(None, result)
310
self.assertEqualDiff(_third_text, result)
311
self.assertEqual('\x85\x01\x91\x05\x14\x0chas some in '
312
'\x91\x856\x03and\x91s"\x91?\n', delta)
314
def test_delta_with_delta_bytes(self):
315
di = self._gc_module.DeltaIndex()
317
di.add_source(_first_text, 0)
318
self.assertEqual(len(_first_text), di._source_offset)
319
delta = di.make_delta(_second_text)
320
self.assertEqual('h\tsome more\x91\x019'
321
'&previous text\nand has some extra text\n', delta)
322
di.add_delta_source(delta, 0)
324
self.assertEqual(len(_first_text) + len(delta), di._source_offset)
325
second_delta = di.make_delta(_third_text)
326
result = self._gc_module.apply_delta(source, second_delta)
327
self.assertEqualDiff(_third_text, result)
328
# We should be able to match against the
329
# 'previous text\nand has some...' that was part of the delta bytes
330
# Note that we don't match the 'common with the', because it isn't long
331
# enough to match in the original text, and those bytes are not present
332
# in the delta for the second text.
333
self.assertEqual('\x85\x01\x90\x14\x1chas some in common with the '
334
'\x91S&\x03and\x91\x18,', second_delta)
335
# Add this delta, and create a new delta for the same text. We should
336
# find the remaining text, and only insert the short 'and' text.
337
di.add_delta_source(second_delta, 0)
338
source += second_delta
339
third_delta = di.make_delta(_third_text)
340
result = self._gc_module.apply_delta(source, third_delta)
341
self.assertEqualDiff(_third_text, result)
342
self.assertEqual('\x85\x01\x90\x14\x91\x7e\x1c'
343
'\x91S&\x03and\x91\x18,', third_delta)
344
# Now create a delta, which we know won't be able to be 'fit' into the
346
fourth_delta = di.make_delta(_fourth_text)
347
self.assertEqual(_fourth_text,
348
self._gc_module.apply_delta(source, fourth_delta))
349
self.assertEqual('\x80\x01'
350
'\x7f123456789012345\nsame rabin hash\n'
351
'123456789012345\nsame rabin hash\n'
352
'123456789012345\nsame rabin hash\n'
353
'123456789012345\nsame rabin hash'
354
'\x01\n', fourth_delta)
355
di.add_delta_source(fourth_delta, 0)
356
source += fourth_delta
357
# With the next delta, everything should be found
358
fifth_delta = di.make_delta(_fourth_text)
359
self.assertEqual(_fourth_text,
360
self._gc_module.apply_delta(source, fifth_delta))
361
self.assertEqual('\x80\x01\x91\xa7\x7f\x01\n', fifth_delta)
364
class TestCopyInstruction(tests.TestCase):
366
def assertEncode(self, expected, offset, length):
367
bytes = _groupcompress_py.encode_copy_instruction(offset, length)
368
if expected != bytes:
369
self.assertEqual([hex(ord(e)) for e in expected],
370
[hex(ord(b)) for b in bytes])
372
def assertDecode(self, exp_offset, exp_length, exp_newpos, bytes, pos):
373
cmd = ord(bytes[pos])
375
out = _groupcompress_py.decode_copy_instruction(bytes, cmd, pos)
376
self.assertEqual((exp_offset, exp_length, exp_newpos), out)
378
def test_encode_no_length(self):
379
self.assertEncode('\x80', 0, 64*1024)
380
self.assertEncode('\x81\x01', 1, 64*1024)
381
self.assertEncode('\x81\x0a', 10, 64*1024)
382
self.assertEncode('\x81\xff', 255, 64*1024)
383
self.assertEncode('\x82\x01', 256, 64*1024)
384
self.assertEncode('\x83\x01\x01', 257, 64*1024)
385
self.assertEncode('\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64*1024)
386
self.assertEncode('\x8E\xff\xff\xff', 0xFFFFFF00, 64*1024)
387
self.assertEncode('\x8D\xff\xff\xff', 0xFFFF00FF, 64*1024)
388
self.assertEncode('\x8B\xff\xff\xff', 0xFF00FFFF, 64*1024)
389
self.assertEncode('\x87\xff\xff\xff', 0x00FFFFFF, 64*1024)
390
self.assertEncode('\x8F\x04\x03\x02\x01', 0x01020304, 64*1024)
392
def test_encode_no_offset(self):
393
self.assertEncode('\x90\x01', 0, 1)
394
self.assertEncode('\x90\x0a', 0, 10)
395
self.assertEncode('\x90\xff', 0, 255)
396
self.assertEncode('\xA0\x01', 0, 256)
397
self.assertEncode('\xB0\x01\x01', 0, 257)
398
self.assertEncode('\xB0\xff\xff', 0, 0xFFFF)
399
# Special case, if copy == 64KiB, then we store exactly 0
400
# Note that this puns with a copy of exactly 0 bytes, but we don't care
401
# about that, as we would never actually copy 0 bytes
402
self.assertEncode('\x80', 0, 64*1024)
404
def test_encode(self):
405
self.assertEncode('\x91\x01\x01', 1, 1)
406
self.assertEncode('\x91\x09\x0a', 9, 10)
407
self.assertEncode('\x91\xfe\xff', 254, 255)
408
self.assertEncode('\xA2\x02\x01', 512, 256)
409
self.assertEncode('\xB3\x02\x01\x01\x01', 258, 257)
410
self.assertEncode('\xB0\x01\x01', 0, 257)
411
# Special case, if copy == 64KiB, then we store exactly 0
412
# Note that this puns with a copy of exactly 0 bytes, but we don't care
413
# about that, as we would never actually copy 0 bytes
414
self.assertEncode('\x81\x0a', 10, 64*1024)
416
def test_decode_no_length(self):
417
# If length is 0, it is interpreted as 64KiB
418
# The shortest possible instruction is a copy of 64KiB from offset 0
419
self.assertDecode(0, 65536, 1, '\x80', 0)
420
self.assertDecode(1, 65536, 2, '\x81\x01', 0)
421
self.assertDecode(10, 65536, 2, '\x81\x0a', 0)
422
self.assertDecode(255, 65536, 2, '\x81\xff', 0)
423
self.assertDecode(256, 65536, 2, '\x82\x01', 0)
424
self.assertDecode(257, 65536, 3, '\x83\x01\x01', 0)
425
self.assertDecode(0xFFFFFFFF, 65536, 5, '\x8F\xff\xff\xff\xff', 0)
426
self.assertDecode(0xFFFFFF00, 65536, 4, '\x8E\xff\xff\xff', 0)
427
self.assertDecode(0xFFFF00FF, 65536, 4, '\x8D\xff\xff\xff', 0)
428
self.assertDecode(0xFF00FFFF, 65536, 4, '\x8B\xff\xff\xff', 0)
429
self.assertDecode(0x00FFFFFF, 65536, 4, '\x87\xff\xff\xff', 0)
430
self.assertDecode(0x01020304, 65536, 5, '\x8F\x04\x03\x02\x01', 0)
432
def test_decode_no_offset(self):
433
self.assertDecode(0, 1, 2, '\x90\x01', 0)
434
self.assertDecode(0, 10, 2, '\x90\x0a', 0)
435
self.assertDecode(0, 255, 2, '\x90\xff', 0)
436
self.assertDecode(0, 256, 2, '\xA0\x01', 0)
437
self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
438
self.assertDecode(0, 65535, 3, '\xB0\xff\xff', 0)
439
# Special case, if copy == 64KiB, then we store exactly 0
440
# Note that this puns with a copy of exactly 0 bytes, but we don't care
441
# about that, as we would never actually copy 0 bytes
442
self.assertDecode(0, 65536, 1, '\x80', 0)
444
def test_decode(self):
445
self.assertDecode(1, 1, 3, '\x91\x01\x01', 0)
446
self.assertDecode(9, 10, 3, '\x91\x09\x0a', 0)
447
self.assertDecode(254, 255, 3, '\x91\xfe\xff', 0)
448
self.assertDecode(512, 256, 3, '\xA2\x02\x01', 0)
449
self.assertDecode(258, 257, 5, '\xB3\x02\x01\x01\x01', 0)
450
self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
452
def test_decode_not_start(self):
453
self.assertDecode(1, 1, 6, 'abc\x91\x01\x01def', 3)
454
self.assertDecode(9, 10, 5, 'ab\x91\x09\x0ade', 2)
455
self.assertDecode(254, 255, 6, 'not\x91\xfe\xffcopy', 3)
458
class TestBase128Int(tests.TestCase):
460
_gc_module = None # Set by load_tests
462
def assertEqualEncode(self, bytes, val):
463
self.assertEqual(bytes, self._gc_module.encode_base128_int(val))
465
def assertEqualDecode(self, val, num_decode, bytes):
466
self.assertEqual((val, num_decode),
467
self._gc_module.decode_base128_int(bytes))
469
def test_encode(self):
470
self.assertEqualEncode('\x01', 1)
471
self.assertEqualEncode('\x02', 2)
472
self.assertEqualEncode('\x7f', 127)
473
self.assertEqualEncode('\x80\x01', 128)
474
self.assertEqualEncode('\xff\x01', 255)
475
self.assertEqualEncode('\x80\x02', 256)
476
self.assertEqualEncode('\xff\xff\xff\xff\x0f', 0xFFFFFFFF)
478
def test_decode(self):
479
self.assertEqualDecode(1, 1, '\x01')
480
self.assertEqualDecode(2, 1, '\x02')
481
self.assertEqualDecode(127, 1, '\x7f')
482
self.assertEqualDecode(128, 2, '\x80\x01')
483
self.assertEqualDecode(255, 2, '\xff\x01')
484
self.assertEqualDecode(256, 2, '\x80\x02')
485
self.assertEqualDecode(0xFFFFFFFF, 5, '\xff\xff\xff\xff\x0f')
487
def test_decode_with_trailing_bytes(self):
488
self.assertEqualDecode(1, 1, '\x01abcdef')
489
self.assertEqualDecode(127, 1, '\x7f\x01')
490
self.assertEqualDecode(128, 2, '\x80\x01abcdef')
491
self.assertEqualDecode(255, 2, '\xff\x01\xff')