1
# Copyright (C) 2008, 2009 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the python and pyrex extensions of groupcompress"""
26
def load_tests(standard_tests, module, loader):
27
"""Parameterize tests for all versions of groupcompress."""
29
('PP', {'make_delta': _groupcompress_py.make_delta,
30
'apply_delta': _groupcompress_py.apply_delta})
33
('python', {'_gc_module': _groupcompress_py}),
35
if CompiledGroupCompressFeature.available():
36
from bzrlib import _groupcompress_pyx
37
scenarios.append(('C',
38
{'_gc_module': _groupcompress_pyx}))
39
two_way_scenarios.extend([
40
('CC', {'make_delta': _groupcompress_pyx.make_delta,
41
'apply_delta': _groupcompress_pyx.apply_delta}),
42
('PC', {'make_delta': _groupcompress_py.make_delta,
43
'apply_delta': _groupcompress_pyx.apply_delta}),
44
('CP', {'make_delta': _groupcompress_pyx.make_delta,
45
'apply_delta': _groupcompress_py.apply_delta}),
47
to_adapt, result = tests.split_suite_by_condition(
48
standard_tests, tests.condition_isinstance((TestMakeAndApplyDelta,
50
result = tests.multiply_tests(to_adapt, scenarios, result)
51
to_adapt, result = tests.split_suite_by_condition(result,
52
tests.condition_isinstance(TestMakeAndApplyCompatible))
53
result = tests.multiply_tests(to_adapt, two_way_scenarios, result)
57
class _CompiledGroupCompressFeature(tests.Feature):
61
import bzrlib._groupcompress_pyx
67
def feature_name(self):
68
return 'bzrlib._groupcompress_pyx'
71
CompiledGroupCompressFeature = _CompiledGroupCompressFeature()
76
which is meant to be matched
83
which is meant to differ from
90
which is meant to be matched
94
at the end of the file
100
common with the next text
104
some more bit of text, that
105
does not have much in
106
common with the previous text
107
and has some extra text
113
has some in common with the previous text
114
and has some extra text
116
common with the next text
130
class TestMakeAndApplyDelta(tests.TestCase):
132
_gc_module = None # Set by load_tests
135
super(TestMakeAndApplyDelta, self).setUp()
136
self.make_delta = self._gc_module.make_delta
137
self.apply_delta = self._gc_module.apply_delta
138
self.apply_delta_to_source = self._gc_module.apply_delta_to_source
140
def test_make_delta_is_typesafe(self):
141
self.make_delta('a string', 'another string')
143
def _check_make_delta(string1, string2):
144
self.assertRaises(TypeError, self.make_delta, string1, string2)
146
_check_make_delta('a string', object())
147
_check_make_delta('a string', u'not a string')
148
_check_make_delta(object(), 'a string')
149
_check_make_delta(u'not a string', 'a string')
151
def test_make_noop_delta(self):
152
ident_delta = self.make_delta(_text1, _text1)
153
self.assertEqual('M\x90M', ident_delta)
154
ident_delta = self.make_delta(_text2, _text2)
155
self.assertEqual('N\x90N', ident_delta)
156
ident_delta = self.make_delta(_text3, _text3)
157
self.assertEqual('\x87\x01\x90\x87', ident_delta)
159
def assertDeltaIn(self, delta1, delta2, delta):
160
"""Make sure that the delta bytes match one of the expectations."""
161
# In general, the python delta matcher gives different results than the
162
# pyrex delta matcher. Both should be valid deltas, though.
163
if delta not in (delta1, delta2):
164
self.fail("Delta bytes:\n"
168
% (delta, delta1, delta2))
170
def test_make_delta(self):
171
delta = self.make_delta(_text1, _text2)
173
'N\x90/\x1fdiffer from\nagainst other text\n',
174
'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
176
delta = self.make_delta(_text2, _text1)
178
'M\x90/\x1ebe matched\nagainst other text\n',
179
'M\x90\x1d\x1dwhich is meant to be matched\n\x91;\x13',
181
delta = self.make_delta(_text3, _text1)
182
self.assertEqual('M\x90M', delta)
183
delta = self.make_delta(_text3, _text2)
185
'N\x90/\x1fdiffer from\nagainst other text\n',
186
'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
189
def test_apply_delta_is_typesafe(self):
190
self.apply_delta(_text1, 'M\x90M')
191
self.assertRaises(TypeError, self.apply_delta, object(), 'M\x90M')
192
self.assertRaises(TypeError, self.apply_delta,
193
unicode(_text1), 'M\x90M')
194
self.assertRaises(TypeError, self.apply_delta, _text1, u'M\x90M')
195
self.assertRaises(TypeError, self.apply_delta, _text1, object())
197
def test_apply_delta(self):
198
target = self.apply_delta(_text1,
199
'N\x90/\x1fdiffer from\nagainst other text\n')
200
self.assertEqual(_text2, target)
201
target = self.apply_delta(_text2,
202
'M\x90/\x1ebe matched\nagainst other text\n')
203
self.assertEqual(_text1, target)
205
def test_apply_delta_to_source_is_safe(self):
206
self.assertRaises(TypeError,
207
self.apply_delta_to_source, object(), 0, 1)
208
self.assertRaises(TypeError,
209
self.apply_delta_to_source, u'unicode str', 0, 1)
211
self.assertRaises(ValueError,
212
self.apply_delta_to_source, 'foo', 1, 4)
214
self.assertRaises(ValueError,
215
self.apply_delta_to_source, 'foo', 5, 3)
217
self.assertRaises(ValueError,
218
self.apply_delta_to_source, 'foo', 3, 2)
220
def test_apply_delta_to_source(self):
221
source_and_delta = (_text1
222
+ 'N\x90/\x1fdiffer from\nagainst other text\n')
223
self.assertEqual(_text2, self.apply_delta_to_source(source_and_delta,
224
len(_text1), len(source_and_delta)))
227
class TestMakeAndApplyCompatible(tests.TestCase):
229
make_delta = None # Set by load_tests
230
apply_delta = None # Set by load_tests
232
def assertMakeAndApply(self, source, target):
233
"""Assert that generating a delta and applying gives success."""
234
delta = self.make_delta(source, target)
235
bytes = self.apply_delta(source, delta)
236
self.assertEqualDiff(target, bytes)
238
def test_direct(self):
239
self.assertMakeAndApply(_text1, _text2)
240
self.assertMakeAndApply(_text2, _text1)
241
self.assertMakeAndApply(_text1, _text3)
242
self.assertMakeAndApply(_text3, _text1)
243
self.assertMakeAndApply(_text2, _text3)
244
self.assertMakeAndApply(_text3, _text2)
247
class TestDeltaIndex(tests.TestCase):
250
super(TestDeltaIndex, self).setUp()
251
# This test isn't multiplied, because we only have DeltaIndex for the
253
# We call this here, because _test_needs_features happens after setUp
254
self.requireFeature(CompiledGroupCompressFeature)
255
from bzrlib import _groupcompress_pyx
256
self._gc_module = _groupcompress_pyx
259
di = self._gc_module.DeltaIndex('test text\n')
260
self.assertEqual('DeltaIndex(1, 10)', repr(di))
262
def test_make_delta(self):
263
di = self._gc_module.DeltaIndex(_text1)
264
delta = di.make_delta(_text2)
265
self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
267
def test_delta_against_multiple_sources(self):
268
di = self._gc_module.DeltaIndex()
269
di.add_source(_first_text, 0)
270
self.assertEqual(len(_first_text), di._source_offset)
271
di.add_source(_second_text, 0)
272
self.assertEqual(len(_first_text) + len(_second_text),
274
delta = di.make_delta(_third_text)
275
result = self._gc_module.apply_delta(_first_text + _second_text, delta)
276
self.assertEqualDiff(_third_text, result)
277
self.assertEqual('\x85\x01\x90\x14\x0chas some in '
278
'\x91v6\x03and\x91d"\x91:\n', delta)
280
def test_delta_with_offsets(self):
281
di = self._gc_module.DeltaIndex()
282
di.add_source(_first_text, 5)
283
self.assertEqual(len(_first_text) + 5, di._source_offset)
284
di.add_source(_second_text, 10)
285
self.assertEqual(len(_first_text) + len(_second_text) + 15,
287
delta = di.make_delta(_third_text)
288
self.assertIsNot(None, delta)
289
result = self._gc_module.apply_delta(
290
'12345' + _first_text + '1234567890' + _second_text, delta)
291
self.assertIsNot(None, result)
292
self.assertEqualDiff(_third_text, result)
293
self.assertEqual('\x85\x01\x91\x05\x14\x0chas some in '
294
'\x91\x856\x03and\x91s"\x91?\n', delta)
296
def test_delta_with_delta_bytes(self):
297
di = self._gc_module.DeltaIndex()
299
di.add_source(_first_text, 0)
300
self.assertEqual(len(_first_text), di._source_offset)
301
delta = di.make_delta(_second_text)
302
self.assertEqual('h\tsome more\x91\x019'
303
'&previous text\nand has some extra text\n', delta)
304
di.add_delta_source(delta, 0)
306
self.assertEqual(len(_first_text) + len(delta), di._source_offset)
307
second_delta = di.make_delta(_third_text)
308
result = self._gc_module.apply_delta(source, second_delta)
309
self.assertEqualDiff(_third_text, result)
310
# We should be able to match against the
311
# 'previous text\nand has some...' that was part of the delta bytes
312
# Note that we don't match the 'common with the', because it isn't long
313
# enough to match in the original text, and those bytes are not present
314
# in the delta for the second text.
315
self.assertEqual('\x85\x01\x90\x14\x1chas some in common with the '
316
'\x91S&\x03and\x91\x18,', second_delta)
317
# Add this delta, and create a new delta for the same text. We should
318
# find the remaining text, and only insert the short 'and' text.
319
di.add_delta_source(second_delta, 0)
320
source += second_delta
321
third_delta = di.make_delta(_third_text)
322
result = self._gc_module.apply_delta(source, third_delta)
323
self.assertEqualDiff(_third_text, result)
324
self.assertEqual('\x85\x01\x90\x14\x91\x7e\x1c'
325
'\x91S&\x03and\x91\x18,', third_delta)
326
# Now create a delta, which we know won't be able to be 'fit' into the
328
fourth_delta = di.make_delta(_fourth_text)
329
self.assertEqual(_fourth_text,
330
self._gc_module.apply_delta(source, fourth_delta))
331
self.assertEqual('\x80\x01'
332
'\x7f123456789012345\nsame rabin hash\n'
333
'123456789012345\nsame rabin hash\n'
334
'123456789012345\nsame rabin hash\n'
335
'123456789012345\nsame rabin hash'
336
'\x01\n', fourth_delta)
337
di.add_delta_source(fourth_delta, 0)
338
source += fourth_delta
339
# With the next delta, everything should be found
340
fifth_delta = di.make_delta(_fourth_text)
341
self.assertEqual(_fourth_text,
342
self._gc_module.apply_delta(source, fifth_delta))
343
self.assertEqual('\x80\x01\x91\xa7\x7f\x01\n', fifth_delta)
346
class TestCopyInstruction(tests.TestCase):
348
def assertEncode(self, expected, offset, length):
349
bytes = _groupcompress_py.encode_copy_instruction(offset, length)
350
if expected != bytes:
351
self.assertEqual([hex(ord(e)) for e in expected],
352
[hex(ord(b)) for b in bytes])
354
def assertDecode(self, exp_offset, exp_length, exp_newpos, bytes, pos):
355
cmd = ord(bytes[pos])
357
out = _groupcompress_py.decode_copy_instruction(bytes, cmd, pos)
358
self.assertEqual((exp_offset, exp_length, exp_newpos), out)
360
def test_encode_no_length(self):
361
self.assertEncode('\x80', 0, None)
362
self.assertEncode('\x81\x01', 1, None)
363
self.assertEncode('\x81\x0a', 10, None)
364
self.assertEncode('\x81\xff', 255, None)
365
self.assertEncode('\x82\x01', 256, None)
366
self.assertEncode('\x83\x01\x01', 257, None)
367
self.assertEncode('\x8F\xff\xff\xff\xff', 0xFFFFFFFF, None)
368
self.assertEncode('\x8E\xff\xff\xff', 0xFFFFFF00, None)
369
self.assertEncode('\x8D\xff\xff\xff', 0xFFFF00FF, None)
370
self.assertEncode('\x8B\xff\xff\xff', 0xFF00FFFF, None)
371
self.assertEncode('\x87\xff\xff\xff', 0x00FFFFFF, None)
372
self.assertEncode('\x8F\x04\x03\x02\x01', 0x01020304, None)
374
def test_encode_no_offset(self):
375
self.assertEncode('\x90\x01', 0, 1)
376
self.assertEncode('\x90\x0a', 0, 10)
377
self.assertEncode('\x90\xff', 0, 255)
378
self.assertEncode('\xA0\x01', 0, 256)
379
self.assertEncode('\xB0\x01\x01', 0, 257)
380
self.assertEncode('\xB0\xff\xff', 0, 0xFFFF)
381
# Special case, if copy == 64KiB, then we store exactly 0
382
# Note that this puns with a copy of exactly 0 bytes, but we don't care
383
# about that, as we would never actually copy 0 bytes
384
self.assertEncode('\x80', 0, 64*1024)
386
def test_encode(self):
387
self.assertEncode('\x91\x01\x01', 1, 1)
388
self.assertEncode('\x91\x09\x0a', 9, 10)
389
self.assertEncode('\x91\xfe\xff', 254, 255)
390
self.assertEncode('\xA2\x02\x01', 512, 256)
391
self.assertEncode('\xB3\x02\x01\x01\x01', 258, 257)
392
self.assertEncode('\xB0\x01\x01', 0, 257)
393
# Special case, if copy == 64KiB, then we store exactly 0
394
# Note that this puns with a copy of exactly 0 bytes, but we don't care
395
# about that, as we would never actually copy 0 bytes
396
self.assertEncode('\x81\x0a', 10, 64*1024)
398
def test_decode_no_length(self):
399
# If length is 0, it is interpreted as 64KiB
400
# The shortest possible instruction is a copy of 64KiB from offset 0
401
self.assertDecode(0, 65536, 1, '\x80', 0)
402
self.assertDecode(1, 65536, 2, '\x81\x01', 0)
403
self.assertDecode(10, 65536, 2, '\x81\x0a', 0)
404
self.assertDecode(255, 65536, 2, '\x81\xff', 0)
405
self.assertDecode(256, 65536, 2, '\x82\x01', 0)
406
self.assertDecode(257, 65536, 3, '\x83\x01\x01', 0)
407
self.assertDecode(0xFFFFFFFF, 65536, 5, '\x8F\xff\xff\xff\xff', 0)
408
self.assertDecode(0xFFFFFF00, 65536, 4, '\x8E\xff\xff\xff', 0)
409
self.assertDecode(0xFFFF00FF, 65536, 4, '\x8D\xff\xff\xff', 0)
410
self.assertDecode(0xFF00FFFF, 65536, 4, '\x8B\xff\xff\xff', 0)
411
self.assertDecode(0x00FFFFFF, 65536, 4, '\x87\xff\xff\xff', 0)
412
self.assertDecode(0x01020304, 65536, 5, '\x8F\x04\x03\x02\x01', 0)
414
def test_decode_no_offset(self):
415
self.assertDecode(0, 1, 2, '\x90\x01', 0)
416
self.assertDecode(0, 10, 2, '\x90\x0a', 0)
417
self.assertDecode(0, 255, 2, '\x90\xff', 0)
418
self.assertDecode(0, 256, 2, '\xA0\x01', 0)
419
self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
420
self.assertDecode(0, 65535, 3, '\xB0\xff\xff', 0)
421
# Special case, if copy == 64KiB, then we store exactly 0
422
# Note that this puns with a copy of exactly 0 bytes, but we don't care
423
# about that, as we would never actually copy 0 bytes
424
self.assertDecode(0, 65536, 1, '\x80', 0)
426
def test_decode(self):
427
self.assertDecode(1, 1, 3, '\x91\x01\x01', 0)
428
self.assertDecode(9, 10, 3, '\x91\x09\x0a', 0)
429
self.assertDecode(254, 255, 3, '\x91\xfe\xff', 0)
430
self.assertDecode(512, 256, 3, '\xA2\x02\x01', 0)
431
self.assertDecode(258, 257, 5, '\xB3\x02\x01\x01\x01', 0)
432
self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
434
def test_decode_not_start(self):
435
self.assertDecode(1, 1, 6, 'abc\x91\x01\x01def', 3)
436
self.assertDecode(9, 10, 5, 'ab\x91\x09\x0ade', 2)
437
self.assertDecode(254, 255, 6, 'not\x91\xfe\xffcopy', 3)
440
class TestBase128Int(tests.TestCase):
442
_gc_module = None # Set by load_tests
444
def assertEqualEncode(self, bytes, val):
445
self.assertEqual(bytes, self._gc_module.encode_base128_int(val))
447
def assertEqualDecode(self, val, num_decode, bytes):
448
self.assertEqual((val, num_decode),
449
self._gc_module.decode_base128_int(bytes))
451
def test_encode(self):
452
self.assertEqualEncode('\x01', 1)
453
self.assertEqualEncode('\x02', 2)
454
self.assertEqualEncode('\x7f', 127)
455
self.assertEqualEncode('\x80\x01', 128)
456
self.assertEqualEncode('\xff\x01', 255)
457
self.assertEqualEncode('\x80\x02', 256)
458
self.assertEqualEncode('\xff\xff\xff\xff\x0f', 0xFFFFFFFF)
460
def test_decode(self):
461
self.assertEqualDecode(1, 1, '\x01')
462
self.assertEqualDecode(2, 1, '\x02')
463
self.assertEqualDecode(127, 1, '\x7f')
464
self.assertEqualDecode(128, 2, '\x80\x01')
465
self.assertEqualDecode(255, 2, '\xff\x01')
466
self.assertEqualDecode(256, 2, '\x80\x02')
467
self.assertEqualDecode(0xFFFFFFFF, 5, '\xff\xff\xff\xff\x0f')
469
def test_decode_with_trailing_bytes(self):
470
self.assertEqualDecode(1, 1, '\x01abcdef')
471
self.assertEqualDecode(127, 1, '\x7f\x01')
472
self.assertEqualDecode(128, 2, '\x80\x01abcdef')
473
self.assertEqualDecode(255, 2, '\xff\x01\xff')