1
# Copyright (C) 2008, 2009 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the python and pyrex extensions of groupcompress"""
26
def load_tests(standard_tests, module, loader):
27
"""Parameterize tests for all versions of groupcompress."""
29
('PP', {'make_delta': _groupcompress_py.make_delta,
30
'apply_delta': _groupcompress_py.apply_delta})
33
('python', {'_gc_module': _groupcompress_py}),
35
if CompiledGroupCompressFeature.available():
36
from bzrlib import _groupcompress_pyx
37
scenarios.append(('C',
38
{'_gc_module': _groupcompress_pyx}))
39
two_way_scenarios.extend([
40
('CC', {'make_delta': _groupcompress_pyx.make_delta,
41
'apply_delta': _groupcompress_pyx.apply_delta}),
42
('PC', {'make_delta': _groupcompress_py.make_delta,
43
'apply_delta': _groupcompress_pyx.apply_delta}),
44
('CP', {'make_delta': _groupcompress_pyx.make_delta,
45
'apply_delta': _groupcompress_py.apply_delta}),
47
to_adapt, result = tests.split_suite_by_condition(
48
standard_tests, tests.condition_isinstance((TestMakeAndApplyDelta,
50
result = tests.multiply_tests(to_adapt, scenarios, result)
51
to_adapt, result = tests.split_suite_by_condition(result,
52
tests.condition_isinstance(TestMakeAndApplyCompatible))
53
result = tests.multiply_tests(to_adapt, two_way_scenarios, result)
57
class _CompiledGroupCompressFeature(tests.Feature):
61
import bzrlib._groupcompress_pyx
67
def feature_name(self):
68
return 'bzrlib._groupcompress_pyx'
71
CompiledGroupCompressFeature = _CompiledGroupCompressFeature()
76
which is meant to be matched
83
which is meant to differ from
90
which is meant to be matched
94
at the end of the file
100
common with the next text
104
some more bit of text, that
105
does not have much in
106
common with the previous text
107
and has some extra text
113
has some in common with the previous text
114
and has some extra text
116
common with the next text
130
class TestMakeAndApplyDelta(tests.TestCase):
132
_gc_module = None # Set by load_tests
135
super(TestMakeAndApplyDelta, self).setUp()
136
self.make_delta = self._gc_module.make_delta
137
self.apply_delta = self._gc_module.apply_delta
138
self.apply_delta_to_source = self._gc_module.apply_delta_to_source
140
def test_make_delta_is_typesafe(self):
141
self.make_delta('a string', 'another string')
143
def _check_make_delta(string1, string2):
144
self.assertRaises(TypeError, self.make_delta, string1, string2)
146
_check_make_delta('a string', object())
147
_check_make_delta('a string', u'not a string')
148
_check_make_delta(object(), 'a string')
149
_check_make_delta(u'not a string', 'a string')
151
def test_make_noop_delta(self):
152
ident_delta = self.make_delta(_text1, _text1)
153
self.assertEqual('M\x90M', ident_delta)
154
ident_delta = self.make_delta(_text2, _text2)
155
self.assertEqual('N\x90N', ident_delta)
156
ident_delta = self.make_delta(_text3, _text3)
157
self.assertEqual('\x87\x01\x90\x87', ident_delta)
159
def assertDeltaIn(self, delta1, delta2, delta):
160
"""Make sure that the delta bytes match one of the expectations."""
161
# In general, the python delta matcher gives different results than the
162
# pyrex delta matcher. Both should be valid deltas, though.
163
if delta not in (delta1, delta2):
164
self.fail("Delta bytes:\n"
168
% (delta, delta1, delta2))
170
def test_make_delta(self):
171
delta = self.make_delta(_text1, _text2)
173
'N\x90/\x1fdiffer from\nagainst other text\n',
174
'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
176
delta = self.make_delta(_text2, _text1)
178
'M\x90/\x1ebe matched\nagainst other text\n',
179
'M\x90\x1d\x1dwhich is meant to be matched\n\x91;\x13',
181
delta = self.make_delta(_text3, _text1)
182
self.assertEqual('M\x90M', delta)
183
delta = self.make_delta(_text3, _text2)
185
'N\x90/\x1fdiffer from\nagainst other text\n',
186
'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
189
def test_make_delta_with_large_copies(self):
190
# We want to have a copy that is larger than 64kB, which forces us to
191
# issue multiple copy instructions.
192
big_text = _text3 * 1220
193
delta = self.make_delta(big_text, big_text)
195
'\xdc\x86\x0a' # Encoding the length of the uncompressed text
196
'\x80' # Copy 64kB, starting at byte 0
197
'\x84\x01' # and another 64kB starting at 64kB
198
'\xb4\x02\x5c\x83', # And the bit of tail.
199
None, # Both implementations should be identical
202
def test_apply_delta_is_typesafe(self):
203
self.apply_delta(_text1, 'M\x90M')
204
self.assertRaises(TypeError, self.apply_delta, object(), 'M\x90M')
205
self.assertRaises(TypeError, self.apply_delta,
206
unicode(_text1), 'M\x90M')
207
self.assertRaises(TypeError, self.apply_delta, _text1, u'M\x90M')
208
self.assertRaises(TypeError, self.apply_delta, _text1, object())
210
def test_apply_delta(self):
211
target = self.apply_delta(_text1,
212
'N\x90/\x1fdiffer from\nagainst other text\n')
213
self.assertEqual(_text2, target)
214
target = self.apply_delta(_text2,
215
'M\x90/\x1ebe matched\nagainst other text\n')
216
self.assertEqual(_text1, target)
218
def test_apply_delta_to_source_is_safe(self):
219
self.assertRaises(TypeError,
220
self.apply_delta_to_source, object(), 0, 1)
221
self.assertRaises(TypeError,
222
self.apply_delta_to_source, u'unicode str', 0, 1)
224
self.assertRaises(ValueError,
225
self.apply_delta_to_source, 'foo', 1, 4)
227
self.assertRaises(ValueError,
228
self.apply_delta_to_source, 'foo', 5, 3)
230
self.assertRaises(ValueError,
231
self.apply_delta_to_source, 'foo', 3, 2)
233
def test_apply_delta_to_source(self):
234
source_and_delta = (_text1
235
+ 'N\x90/\x1fdiffer from\nagainst other text\n')
236
self.assertEqual(_text2, self.apply_delta_to_source(source_and_delta,
237
len(_text1), len(source_and_delta)))
240
class TestMakeAndApplyCompatible(tests.TestCase):
242
make_delta = None # Set by load_tests
243
apply_delta = None # Set by load_tests
245
def assertMakeAndApply(self, source, target):
246
"""Assert that generating a delta and applying gives success."""
247
delta = self.make_delta(source, target)
248
bytes = self.apply_delta(source, delta)
249
self.assertEqualDiff(target, bytes)
251
def test_direct(self):
252
self.assertMakeAndApply(_text1, _text2)
253
self.assertMakeAndApply(_text2, _text1)
254
self.assertMakeAndApply(_text1, _text3)
255
self.assertMakeAndApply(_text3, _text1)
256
self.assertMakeAndApply(_text2, _text3)
257
self.assertMakeAndApply(_text3, _text2)
260
class TestDeltaIndex(tests.TestCase):
263
super(TestDeltaIndex, self).setUp()
264
# This test isn't multiplied, because we only have DeltaIndex for the
266
# We call this here, because _test_needs_features happens after setUp
267
self.requireFeature(CompiledGroupCompressFeature)
268
from bzrlib import _groupcompress_pyx
269
self._gc_module = _groupcompress_pyx
272
di = self._gc_module.DeltaIndex('test text\n')
273
self.assertEqual('DeltaIndex(1, 10)', repr(di))
275
def test_first_add_source_doesnt_index_until_make_delta(self):
276
di = self._gc_module.DeltaIndex()
277
self.assertFalse(di._has_index())
278
di.add_source(_text1, 0)
279
self.assertFalse(di._has_index())
280
# However, asking to make a delta will trigger the index to be
281
# generated, and will generate a proper delta
282
delta = di.make_delta(_text2)
283
self.assertTrue(di._has_index())
284
self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
286
def test_second_add_source_triggers_make_index(self):
287
di = self._gc_module.DeltaIndex()
288
self.assertFalse(di._has_index())
289
di.add_source(_text1, 0)
290
self.assertFalse(di._has_index())
291
di.add_source(_text2, 0)
292
self.assertTrue(di._has_index())
294
def test_make_delta(self):
295
di = self._gc_module.DeltaIndex(_text1)
296
delta = di.make_delta(_text2)
297
self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
299
def test_delta_against_multiple_sources(self):
300
di = self._gc_module.DeltaIndex()
301
di.add_source(_first_text, 0)
302
self.assertEqual(len(_first_text), di._source_offset)
303
di.add_source(_second_text, 0)
304
self.assertEqual(len(_first_text) + len(_second_text),
306
delta = di.make_delta(_third_text)
307
result = self._gc_module.apply_delta(_first_text + _second_text, delta)
308
self.assertEqualDiff(_third_text, result)
309
self.assertEqual('\x85\x01\x90\x14\x0chas some in '
310
'\x91v6\x03and\x91d"\x91:\n', delta)
312
def test_delta_with_offsets(self):
313
di = self._gc_module.DeltaIndex()
314
di.add_source(_first_text, 5)
315
self.assertEqual(len(_first_text) + 5, di._source_offset)
316
di.add_source(_second_text, 10)
317
self.assertEqual(len(_first_text) + len(_second_text) + 15,
319
delta = di.make_delta(_third_text)
320
self.assertIsNot(None, delta)
321
result = self._gc_module.apply_delta(
322
'12345' + _first_text + '1234567890' + _second_text, delta)
323
self.assertIsNot(None, result)
324
self.assertEqualDiff(_third_text, result)
325
self.assertEqual('\x85\x01\x91\x05\x14\x0chas some in '
326
'\x91\x856\x03and\x91s"\x91?\n', delta)
328
def test_delta_with_delta_bytes(self):
329
di = self._gc_module.DeltaIndex()
331
di.add_source(_first_text, 0)
332
self.assertEqual(len(_first_text), di._source_offset)
333
delta = di.make_delta(_second_text)
334
self.assertEqual('h\tsome more\x91\x019'
335
'&previous text\nand has some extra text\n', delta)
336
di.add_delta_source(delta, 0)
338
self.assertEqual(len(_first_text) + len(delta), di._source_offset)
339
second_delta = di.make_delta(_third_text)
340
result = self._gc_module.apply_delta(source, second_delta)
341
self.assertEqualDiff(_third_text, result)
342
# We should be able to match against the
343
# 'previous text\nand has some...' that was part of the delta bytes
344
# Note that we don't match the 'common with the', because it isn't long
345
# enough to match in the original text, and those bytes are not present
346
# in the delta for the second text.
347
self.assertEqual('\x85\x01\x90\x14\x1chas some in common with the '
348
'\x91S&\x03and\x91\x18,', second_delta)
349
# Add this delta, and create a new delta for the same text. We should
350
# find the remaining text, and only insert the short 'and' text.
351
di.add_delta_source(second_delta, 0)
352
source += second_delta
353
third_delta = di.make_delta(_third_text)
354
result = self._gc_module.apply_delta(source, third_delta)
355
self.assertEqualDiff(_third_text, result)
356
self.assertEqual('\x85\x01\x90\x14\x91\x7e\x1c'
357
'\x91S&\x03and\x91\x18,', third_delta)
358
# Now create a delta, which we know won't be able to be 'fit' into the
360
fourth_delta = di.make_delta(_fourth_text)
361
self.assertEqual(_fourth_text,
362
self._gc_module.apply_delta(source, fourth_delta))
363
self.assertEqual('\x80\x01'
364
'\x7f123456789012345\nsame rabin hash\n'
365
'123456789012345\nsame rabin hash\n'
366
'123456789012345\nsame rabin hash\n'
367
'123456789012345\nsame rabin hash'
368
'\x01\n', fourth_delta)
369
di.add_delta_source(fourth_delta, 0)
370
source += fourth_delta
371
# With the next delta, everything should be found
372
fifth_delta = di.make_delta(_fourth_text)
373
self.assertEqual(_fourth_text,
374
self._gc_module.apply_delta(source, fifth_delta))
375
self.assertEqual('\x80\x01\x91\xa7\x7f\x01\n', fifth_delta)
378
class TestCopyInstruction(tests.TestCase):
380
def assertEncode(self, expected, offset, length):
381
bytes = _groupcompress_py.encode_copy_instruction(offset, length)
382
if expected != bytes:
383
self.assertEqual([hex(ord(e)) for e in expected],
384
[hex(ord(b)) for b in bytes])
386
def assertDecode(self, exp_offset, exp_length, exp_newpos, bytes, pos):
387
cmd = ord(bytes[pos])
389
out = _groupcompress_py.decode_copy_instruction(bytes, cmd, pos)
390
self.assertEqual((exp_offset, exp_length, exp_newpos), out)
392
def test_encode_no_length(self):
393
self.assertEncode('\x80', 0, 64*1024)
394
self.assertEncode('\x81\x01', 1, 64*1024)
395
self.assertEncode('\x81\x0a', 10, 64*1024)
396
self.assertEncode('\x81\xff', 255, 64*1024)
397
self.assertEncode('\x82\x01', 256, 64*1024)
398
self.assertEncode('\x83\x01\x01', 257, 64*1024)
399
self.assertEncode('\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64*1024)
400
self.assertEncode('\x8E\xff\xff\xff', 0xFFFFFF00, 64*1024)
401
self.assertEncode('\x8D\xff\xff\xff', 0xFFFF00FF, 64*1024)
402
self.assertEncode('\x8B\xff\xff\xff', 0xFF00FFFF, 64*1024)
403
self.assertEncode('\x87\xff\xff\xff', 0x00FFFFFF, 64*1024)
404
self.assertEncode('\x8F\x04\x03\x02\x01', 0x01020304, 64*1024)
406
def test_encode_no_offset(self):
407
self.assertEncode('\x90\x01', 0, 1)
408
self.assertEncode('\x90\x0a', 0, 10)
409
self.assertEncode('\x90\xff', 0, 255)
410
self.assertEncode('\xA0\x01', 0, 256)
411
self.assertEncode('\xB0\x01\x01', 0, 257)
412
self.assertEncode('\xB0\xff\xff', 0, 0xFFFF)
413
# Special case, if copy == 64KiB, then we store exactly 0
414
# Note that this puns with a copy of exactly 0 bytes, but we don't care
415
# about that, as we would never actually copy 0 bytes
416
self.assertEncode('\x80', 0, 64*1024)
418
def test_encode(self):
419
self.assertEncode('\x91\x01\x01', 1, 1)
420
self.assertEncode('\x91\x09\x0a', 9, 10)
421
self.assertEncode('\x91\xfe\xff', 254, 255)
422
self.assertEncode('\xA2\x02\x01', 512, 256)
423
self.assertEncode('\xB3\x02\x01\x01\x01', 258, 257)
424
self.assertEncode('\xB0\x01\x01', 0, 257)
425
# Special case, if copy == 64KiB, then we store exactly 0
426
# Note that this puns with a copy of exactly 0 bytes, but we don't care
427
# about that, as we would never actually copy 0 bytes
428
self.assertEncode('\x81\x0a', 10, 64*1024)
430
def test_decode_no_length(self):
431
# If length is 0, it is interpreted as 64KiB
432
# The shortest possible instruction is a copy of 64KiB from offset 0
433
self.assertDecode(0, 65536, 1, '\x80', 0)
434
self.assertDecode(1, 65536, 2, '\x81\x01', 0)
435
self.assertDecode(10, 65536, 2, '\x81\x0a', 0)
436
self.assertDecode(255, 65536, 2, '\x81\xff', 0)
437
self.assertDecode(256, 65536, 2, '\x82\x01', 0)
438
self.assertDecode(257, 65536, 3, '\x83\x01\x01', 0)
439
self.assertDecode(0xFFFFFFFF, 65536, 5, '\x8F\xff\xff\xff\xff', 0)
440
self.assertDecode(0xFFFFFF00, 65536, 4, '\x8E\xff\xff\xff', 0)
441
self.assertDecode(0xFFFF00FF, 65536, 4, '\x8D\xff\xff\xff', 0)
442
self.assertDecode(0xFF00FFFF, 65536, 4, '\x8B\xff\xff\xff', 0)
443
self.assertDecode(0x00FFFFFF, 65536, 4, '\x87\xff\xff\xff', 0)
444
self.assertDecode(0x01020304, 65536, 5, '\x8F\x04\x03\x02\x01', 0)
446
def test_decode_no_offset(self):
447
self.assertDecode(0, 1, 2, '\x90\x01', 0)
448
self.assertDecode(0, 10, 2, '\x90\x0a', 0)
449
self.assertDecode(0, 255, 2, '\x90\xff', 0)
450
self.assertDecode(0, 256, 2, '\xA0\x01', 0)
451
self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
452
self.assertDecode(0, 65535, 3, '\xB0\xff\xff', 0)
453
# Special case, if copy == 64KiB, then we store exactly 0
454
# Note that this puns with a copy of exactly 0 bytes, but we don't care
455
# about that, as we would never actually copy 0 bytes
456
self.assertDecode(0, 65536, 1, '\x80', 0)
458
def test_decode(self):
459
self.assertDecode(1, 1, 3, '\x91\x01\x01', 0)
460
self.assertDecode(9, 10, 3, '\x91\x09\x0a', 0)
461
self.assertDecode(254, 255, 3, '\x91\xfe\xff', 0)
462
self.assertDecode(512, 256, 3, '\xA2\x02\x01', 0)
463
self.assertDecode(258, 257, 5, '\xB3\x02\x01\x01\x01', 0)
464
self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
466
def test_decode_not_start(self):
467
self.assertDecode(1, 1, 6, 'abc\x91\x01\x01def', 3)
468
self.assertDecode(9, 10, 5, 'ab\x91\x09\x0ade', 2)
469
self.assertDecode(254, 255, 6, 'not\x91\xfe\xffcopy', 3)
472
class TestBase128Int(tests.TestCase):
474
_gc_module = None # Set by load_tests
476
def assertEqualEncode(self, bytes, val):
477
self.assertEqual(bytes, self._gc_module.encode_base128_int(val))
479
def assertEqualDecode(self, val, num_decode, bytes):
480
self.assertEqual((val, num_decode),
481
self._gc_module.decode_base128_int(bytes))
483
def test_encode(self):
484
self.assertEqualEncode('\x01', 1)
485
self.assertEqualEncode('\x02', 2)
486
self.assertEqualEncode('\x7f', 127)
487
self.assertEqualEncode('\x80\x01', 128)
488
self.assertEqualEncode('\xff\x01', 255)
489
self.assertEqualEncode('\x80\x02', 256)
490
self.assertEqualEncode('\xff\xff\xff\xff\x0f', 0xFFFFFFFF)
492
def test_decode(self):
493
self.assertEqualDecode(1, 1, '\x01')
494
self.assertEqualDecode(2, 1, '\x02')
495
self.assertEqualDecode(127, 1, '\x7f')
496
self.assertEqualDecode(128, 2, '\x80\x01')
497
self.assertEqualDecode(255, 2, '\xff\x01')
498
self.assertEqualDecode(256, 2, '\x80\x02')
499
self.assertEqualDecode(0xFFFFFFFF, 5, '\xff\xff\xff\xff\x0f')
501
def test_decode_with_trailing_bytes(self):
502
self.assertEqualDecode(1, 1, '\x01abcdef')
503
self.assertEqualDecode(127, 1, '\x7f\x01')
504
self.assertEqualDecode(128, 2, '\x80\x01abcdef')
505
self.assertEqualDecode(255, 2, '\xff\x01\xff')