1
# Copyright (C) 2008-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the python and pyrex extensions of groupcompress"""
23
from bzrlib.tests.scenarios import (
24
load_tests_apply_scenarios,
28
def module_scenarios():
30
('python', {'_gc_module': _groupcompress_py}),
32
if compiled_groupcompress_feature.available():
33
gc_module = compiled_groupcompress_feature.module
34
scenarios.append(('C',
35
{'_gc_module': gc_module}))
39
def two_way_scenarios():
41
('PP', {'make_delta': _groupcompress_py.make_delta,
42
'apply_delta': _groupcompress_py.apply_delta})
44
if compiled_groupcompress_feature.available():
45
gc_module = compiled_groupcompress_feature.module
47
('CC', {'make_delta': gc_module.make_delta,
48
'apply_delta': gc_module.apply_delta}),
49
('PC', {'make_delta': _groupcompress_py.make_delta,
50
'apply_delta': gc_module.apply_delta}),
51
('CP', {'make_delta': gc_module.make_delta,
52
'apply_delta': _groupcompress_py.apply_delta}),
57
load_tests = load_tests_apply_scenarios
60
compiled_groupcompress_feature = tests.ModuleAvailableFeature(
61
'bzrlib._groupcompress_pyx')
66
which is meant to be matched
73
which is meant to differ from
80
which is meant to be matched
84
at the end of the file
90
common with the next text
94
some more bit of text, that
96
common with the previous text
97
and has some extra text
103
has some in common with the previous text
104
and has some extra text
106
common with the next text
120
class TestMakeAndApplyDelta(tests.TestCase):
122
scenarios = module_scenarios()
123
_gc_module = None # Set by load_tests
126
super(TestMakeAndApplyDelta, self).setUp()
127
self.make_delta = self._gc_module.make_delta
128
self.apply_delta = self._gc_module.apply_delta
129
self.apply_delta_to_source = self._gc_module.apply_delta_to_source
131
def test_make_delta_is_typesafe(self):
132
self.make_delta('a string', 'another string')
134
def _check_make_delta(string1, string2):
135
self.assertRaises(TypeError, self.make_delta, string1, string2)
137
_check_make_delta('a string', object())
138
_check_make_delta('a string', u'not a string')
139
_check_make_delta(object(), 'a string')
140
_check_make_delta(u'not a string', 'a string')
142
def test_make_noop_delta(self):
143
ident_delta = self.make_delta(_text1, _text1)
144
self.assertEqual('M\x90M', ident_delta)
145
ident_delta = self.make_delta(_text2, _text2)
146
self.assertEqual('N\x90N', ident_delta)
147
ident_delta = self.make_delta(_text3, _text3)
148
self.assertEqual('\x87\x01\x90\x87', ident_delta)
150
def assertDeltaIn(self, delta1, delta2, delta):
151
"""Make sure that the delta bytes match one of the expectations."""
152
# In general, the python delta matcher gives different results than the
153
# pyrex delta matcher. Both should be valid deltas, though.
154
if delta not in (delta1, delta2):
155
self.fail("Delta bytes:\n"
159
% (delta, delta1, delta2))
161
def test_make_delta(self):
162
delta = self.make_delta(_text1, _text2)
164
'N\x90/\x1fdiffer from\nagainst other text\n',
165
'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
167
delta = self.make_delta(_text2, _text1)
169
'M\x90/\x1ebe matched\nagainst other text\n',
170
'M\x90\x1d\x1dwhich is meant to be matched\n\x91;\x13',
172
delta = self.make_delta(_text3, _text1)
173
self.assertEqual('M\x90M', delta)
174
delta = self.make_delta(_text3, _text2)
176
'N\x90/\x1fdiffer from\nagainst other text\n',
177
'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
180
def test_make_delta_with_large_copies(self):
181
# We want to have a copy that is larger than 64kB, which forces us to
182
# issue multiple copy instructions.
183
big_text = _text3 * 1220
184
delta = self.make_delta(big_text, big_text)
186
'\xdc\x86\x0a' # Encoding the length of the uncompressed text
187
'\x80' # Copy 64kB, starting at byte 0
188
'\x84\x01' # and another 64kB starting at 64kB
189
'\xb4\x02\x5c\x83', # And the bit of tail.
190
None, # Both implementations should be identical
193
def test_apply_delta_is_typesafe(self):
194
self.apply_delta(_text1, 'M\x90M')
195
self.assertRaises(TypeError, self.apply_delta, object(), 'M\x90M')
196
self.assertRaises(TypeError, self.apply_delta,
197
unicode(_text1), 'M\x90M')
198
self.assertRaises(TypeError, self.apply_delta, _text1, u'M\x90M')
199
self.assertRaises(TypeError, self.apply_delta, _text1, object())
201
def test_apply_delta(self):
202
target = self.apply_delta(_text1,
203
'N\x90/\x1fdiffer from\nagainst other text\n')
204
self.assertEqual(_text2, target)
205
target = self.apply_delta(_text2,
206
'M\x90/\x1ebe matched\nagainst other text\n')
207
self.assertEqual(_text1, target)
209
def test_apply_delta_to_source_is_safe(self):
210
self.assertRaises(TypeError,
211
self.apply_delta_to_source, object(), 0, 1)
212
self.assertRaises(TypeError,
213
self.apply_delta_to_source, u'unicode str', 0, 1)
215
self.assertRaises(ValueError,
216
self.apply_delta_to_source, 'foo', 1, 4)
218
self.assertRaises(ValueError,
219
self.apply_delta_to_source, 'foo', 5, 3)
221
self.assertRaises(ValueError,
222
self.apply_delta_to_source, 'foo', 3, 2)
224
def test_apply_delta_to_source(self):
225
source_and_delta = (_text1
226
+ 'N\x90/\x1fdiffer from\nagainst other text\n')
227
self.assertEqual(_text2, self.apply_delta_to_source(source_and_delta,
228
len(_text1), len(source_and_delta)))
231
class TestMakeAndApplyCompatible(tests.TestCase):
233
scenarios = two_way_scenarios()
235
make_delta = None # Set by load_tests
236
apply_delta = None # Set by load_tests
238
def assertMakeAndApply(self, source, target):
239
"""Assert that generating a delta and applying gives success."""
240
delta = self.make_delta(source, target)
241
bytes = self.apply_delta(source, delta)
242
self.assertEqualDiff(target, bytes)
244
def test_direct(self):
245
self.assertMakeAndApply(_text1, _text2)
246
self.assertMakeAndApply(_text2, _text1)
247
self.assertMakeAndApply(_text1, _text3)
248
self.assertMakeAndApply(_text3, _text1)
249
self.assertMakeAndApply(_text2, _text3)
250
self.assertMakeAndApply(_text3, _text2)
253
class TestDeltaIndex(tests.TestCase):
256
super(TestDeltaIndex, self).setUp()
257
# This test isn't multiplied, because we only have DeltaIndex for the
259
# We call this here, because _test_needs_features happens after setUp
260
self.requireFeature(compiled_groupcompress_feature)
261
self._gc_module = compiled_groupcompress_feature.module
264
di = self._gc_module.DeltaIndex('test text\n')
265
self.assertEqual('DeltaIndex(1, 10)', repr(di))
267
def test_first_add_source_doesnt_index_until_make_delta(self):
268
di = self._gc_module.DeltaIndex()
269
self.assertFalse(di._has_index())
270
di.add_source(_text1, 0)
271
self.assertFalse(di._has_index())
272
# However, asking to make a delta will trigger the index to be
273
# generated, and will generate a proper delta
274
delta = di.make_delta(_text2)
275
self.assertTrue(di._has_index())
276
self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
278
def test_second_add_source_triggers_make_index(self):
279
di = self._gc_module.DeltaIndex()
280
self.assertFalse(di._has_index())
281
di.add_source(_text1, 0)
282
self.assertFalse(di._has_index())
283
di.add_source(_text2, 0)
284
self.assertTrue(di._has_index())
286
def test_make_delta(self):
287
di = self._gc_module.DeltaIndex(_text1)
288
delta = di.make_delta(_text2)
289
self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
291
def test_delta_against_multiple_sources(self):
292
di = self._gc_module.DeltaIndex()
293
di.add_source(_first_text, 0)
294
self.assertEqual(len(_first_text), di._source_offset)
295
di.add_source(_second_text, 0)
296
self.assertEqual(len(_first_text) + len(_second_text),
298
delta = di.make_delta(_third_text)
299
result = self._gc_module.apply_delta(_first_text + _second_text, delta)
300
self.assertEqualDiff(_third_text, result)
301
self.assertEqual('\x85\x01\x90\x14\x0chas some in '
302
'\x91v6\x03and\x91d"\x91:\n', delta)
304
def test_delta_with_offsets(self):
305
di = self._gc_module.DeltaIndex()
306
di.add_source(_first_text, 5)
307
self.assertEqual(len(_first_text) + 5, di._source_offset)
308
di.add_source(_second_text, 10)
309
self.assertEqual(len(_first_text) + len(_second_text) + 15,
311
delta = di.make_delta(_third_text)
312
self.assertIsNot(None, delta)
313
result = self._gc_module.apply_delta(
314
'12345' + _first_text + '1234567890' + _second_text, delta)
315
self.assertIsNot(None, result)
316
self.assertEqualDiff(_third_text, result)
317
self.assertEqual('\x85\x01\x91\x05\x14\x0chas some in '
318
'\x91\x856\x03and\x91s"\x91?\n', delta)
320
def test_delta_with_delta_bytes(self):
321
di = self._gc_module.DeltaIndex()
323
di.add_source(_first_text, 0)
324
self.assertEqual(len(_first_text), di._source_offset)
325
delta = di.make_delta(_second_text)
326
self.assertEqual('h\tsome more\x91\x019'
327
'&previous text\nand has some extra text\n', delta)
328
di.add_delta_source(delta, 0)
330
self.assertEqual(len(_first_text) + len(delta), di._source_offset)
331
second_delta = di.make_delta(_third_text)
332
result = self._gc_module.apply_delta(source, second_delta)
333
self.assertEqualDiff(_third_text, result)
334
# We should be able to match against the
335
# 'previous text\nand has some...' that was part of the delta bytes
336
# Note that we don't match the 'common with the', because it isn't long
337
# enough to match in the original text, and those bytes are not present
338
# in the delta for the second text.
339
self.assertEqual('\x85\x01\x90\x14\x1chas some in common with the '
340
'\x91S&\x03and\x91\x18,', second_delta)
341
# Add this delta, and create a new delta for the same text. We should
342
# find the remaining text, and only insert the short 'and' text.
343
di.add_delta_source(second_delta, 0)
344
source += second_delta
345
third_delta = di.make_delta(_third_text)
346
result = self._gc_module.apply_delta(source, third_delta)
347
self.assertEqualDiff(_third_text, result)
348
self.assertEqual('\x85\x01\x90\x14\x91\x7e\x1c'
349
'\x91S&\x03and\x91\x18,', third_delta)
350
# Now create a delta, which we know won't be able to be 'fit' into the
352
fourth_delta = di.make_delta(_fourth_text)
353
self.assertEqual(_fourth_text,
354
self._gc_module.apply_delta(source, fourth_delta))
355
self.assertEqual('\x80\x01'
356
'\x7f123456789012345\nsame rabin hash\n'
357
'123456789012345\nsame rabin hash\n'
358
'123456789012345\nsame rabin hash\n'
359
'123456789012345\nsame rabin hash'
360
'\x01\n', fourth_delta)
361
di.add_delta_source(fourth_delta, 0)
362
source += fourth_delta
363
# With the next delta, everything should be found
364
fifth_delta = di.make_delta(_fourth_text)
365
self.assertEqual(_fourth_text,
366
self._gc_module.apply_delta(source, fifth_delta))
367
self.assertEqual('\x80\x01\x91\xa7\x7f\x01\n', fifth_delta)
370
class TestCopyInstruction(tests.TestCase):
372
def assertEncode(self, expected, offset, length):
373
bytes = _groupcompress_py.encode_copy_instruction(offset, length)
374
if expected != bytes:
375
self.assertEqual([hex(ord(e)) for e in expected],
376
[hex(ord(b)) for b in bytes])
378
def assertDecode(self, exp_offset, exp_length, exp_newpos, bytes, pos):
379
cmd = ord(bytes[pos])
381
out = _groupcompress_py.decode_copy_instruction(bytes, cmd, pos)
382
self.assertEqual((exp_offset, exp_length, exp_newpos), out)
384
def test_encode_no_length(self):
385
self.assertEncode('\x80', 0, 64*1024)
386
self.assertEncode('\x81\x01', 1, 64*1024)
387
self.assertEncode('\x81\x0a', 10, 64*1024)
388
self.assertEncode('\x81\xff', 255, 64*1024)
389
self.assertEncode('\x82\x01', 256, 64*1024)
390
self.assertEncode('\x83\x01\x01', 257, 64*1024)
391
self.assertEncode('\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64*1024)
392
self.assertEncode('\x8E\xff\xff\xff', 0xFFFFFF00, 64*1024)
393
self.assertEncode('\x8D\xff\xff\xff', 0xFFFF00FF, 64*1024)
394
self.assertEncode('\x8B\xff\xff\xff', 0xFF00FFFF, 64*1024)
395
self.assertEncode('\x87\xff\xff\xff', 0x00FFFFFF, 64*1024)
396
self.assertEncode('\x8F\x04\x03\x02\x01', 0x01020304, 64*1024)
398
def test_encode_no_offset(self):
399
self.assertEncode('\x90\x01', 0, 1)
400
self.assertEncode('\x90\x0a', 0, 10)
401
self.assertEncode('\x90\xff', 0, 255)
402
self.assertEncode('\xA0\x01', 0, 256)
403
self.assertEncode('\xB0\x01\x01', 0, 257)
404
self.assertEncode('\xB0\xff\xff', 0, 0xFFFF)
405
# Special case, if copy == 64KiB, then we store exactly 0
406
# Note that this puns with a copy of exactly 0 bytes, but we don't care
407
# about that, as we would never actually copy 0 bytes
408
self.assertEncode('\x80', 0, 64*1024)
410
def test_encode(self):
411
self.assertEncode('\x91\x01\x01', 1, 1)
412
self.assertEncode('\x91\x09\x0a', 9, 10)
413
self.assertEncode('\x91\xfe\xff', 254, 255)
414
self.assertEncode('\xA2\x02\x01', 512, 256)
415
self.assertEncode('\xB3\x02\x01\x01\x01', 258, 257)
416
self.assertEncode('\xB0\x01\x01', 0, 257)
417
# Special case, if copy == 64KiB, then we store exactly 0
418
# Note that this puns with a copy of exactly 0 bytes, but we don't care
419
# about that, as we would never actually copy 0 bytes
420
self.assertEncode('\x81\x0a', 10, 64*1024)
422
def test_decode_no_length(self):
423
# If length is 0, it is interpreted as 64KiB
424
# The shortest possible instruction is a copy of 64KiB from offset 0
425
self.assertDecode(0, 65536, 1, '\x80', 0)
426
self.assertDecode(1, 65536, 2, '\x81\x01', 0)
427
self.assertDecode(10, 65536, 2, '\x81\x0a', 0)
428
self.assertDecode(255, 65536, 2, '\x81\xff', 0)
429
self.assertDecode(256, 65536, 2, '\x82\x01', 0)
430
self.assertDecode(257, 65536, 3, '\x83\x01\x01', 0)
431
self.assertDecode(0xFFFFFFFF, 65536, 5, '\x8F\xff\xff\xff\xff', 0)
432
self.assertDecode(0xFFFFFF00, 65536, 4, '\x8E\xff\xff\xff', 0)
433
self.assertDecode(0xFFFF00FF, 65536, 4, '\x8D\xff\xff\xff', 0)
434
self.assertDecode(0xFF00FFFF, 65536, 4, '\x8B\xff\xff\xff', 0)
435
self.assertDecode(0x00FFFFFF, 65536, 4, '\x87\xff\xff\xff', 0)
436
self.assertDecode(0x01020304, 65536, 5, '\x8F\x04\x03\x02\x01', 0)
438
def test_decode_no_offset(self):
439
self.assertDecode(0, 1, 2, '\x90\x01', 0)
440
self.assertDecode(0, 10, 2, '\x90\x0a', 0)
441
self.assertDecode(0, 255, 2, '\x90\xff', 0)
442
self.assertDecode(0, 256, 2, '\xA0\x01', 0)
443
self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
444
self.assertDecode(0, 65535, 3, '\xB0\xff\xff', 0)
445
# Special case, if copy == 64KiB, then we store exactly 0
446
# Note that this puns with a copy of exactly 0 bytes, but we don't care
447
# about that, as we would never actually copy 0 bytes
448
self.assertDecode(0, 65536, 1, '\x80', 0)
450
def test_decode(self):
451
self.assertDecode(1, 1, 3, '\x91\x01\x01', 0)
452
self.assertDecode(9, 10, 3, '\x91\x09\x0a', 0)
453
self.assertDecode(254, 255, 3, '\x91\xfe\xff', 0)
454
self.assertDecode(512, 256, 3, '\xA2\x02\x01', 0)
455
self.assertDecode(258, 257, 5, '\xB3\x02\x01\x01\x01', 0)
456
self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
458
def test_decode_not_start(self):
459
self.assertDecode(1, 1, 6, 'abc\x91\x01\x01def', 3)
460
self.assertDecode(9, 10, 5, 'ab\x91\x09\x0ade', 2)
461
self.assertDecode(254, 255, 6, 'not\x91\xfe\xffcopy', 3)
464
class TestBase128Int(tests.TestCase):
466
scenarios = module_scenarios()
468
_gc_module = None # Set by load_tests
470
def assertEqualEncode(self, bytes, val):
471
self.assertEqual(bytes, self._gc_module.encode_base128_int(val))
473
def assertEqualDecode(self, val, num_decode, bytes):
474
self.assertEqual((val, num_decode),
475
self._gc_module.decode_base128_int(bytes))
477
def test_encode(self):
478
self.assertEqualEncode('\x01', 1)
479
self.assertEqualEncode('\x02', 2)
480
self.assertEqualEncode('\x7f', 127)
481
self.assertEqualEncode('\x80\x01', 128)
482
self.assertEqualEncode('\xff\x01', 255)
483
self.assertEqualEncode('\x80\x02', 256)
484
self.assertEqualEncode('\xff\xff\xff\xff\x0f', 0xFFFFFFFF)
486
def test_decode(self):
487
self.assertEqualDecode(1, 1, '\x01')
488
self.assertEqualDecode(2, 1, '\x02')
489
self.assertEqualDecode(127, 1, '\x7f')
490
self.assertEqualDecode(128, 2, '\x80\x01')
491
self.assertEqualDecode(255, 2, '\xff\x01')
492
self.assertEqualDecode(256, 2, '\x80\x02')
493
self.assertEqualDecode(0xFFFFFFFF, 5, '\xff\xff\xff\xff\x0f')
495
def test_decode_with_trailing_bytes(self):
496
self.assertEqualDecode(1, 1, '\x01abcdef')
497
self.assertEqualDecode(127, 1, '\x7f\x01')
498
self.assertEqualDecode(128, 2, '\x80\x01abcdef')
499
self.assertEqualDecode(255, 2, '\xff\x01\xff')