1
# Copyright (C) 2008-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the python and pyrex extensions of groupcompress"""
23
from bzrlib.tests.scenarios import (
24
load_tests_apply_scenarios,
28
def module_scenarios():
30
('python', {'_gc_module': _groupcompress_py}),
32
if compiled_groupcompress_feature.available():
33
gc_module = compiled_groupcompress_feature.module
34
scenarios.append(('C',
35
{'_gc_module': gc_module}))
39
def two_way_scenarios():
41
('PP', {'make_delta': _groupcompress_py.make_delta,
42
'apply_delta': _groupcompress_py.apply_delta})
44
if compiled_groupcompress_feature.available():
45
gc_module = compiled_groupcompress_feature.module
47
('CC', {'make_delta': gc_module.make_delta,
48
'apply_delta': gc_module.apply_delta}),
49
('PC', {'make_delta': _groupcompress_py.make_delta,
50
'apply_delta': gc_module.apply_delta}),
51
('CP', {'make_delta': gc_module.make_delta,
52
'apply_delta': _groupcompress_py.apply_delta}),
57
load_tests = load_tests_apply_scenarios
60
compiled_groupcompress_feature = tests.ModuleAvailableFeature(
61
'bzrlib._groupcompress_pyx')
66
which is meant to be matched
73
which is meant to differ from
80
which is meant to be matched
84
at the end of the file
90
common with the next text
94
some more bit of text, that
96
common with the previous text
97
and has some extra text
103
has some in common with the previous text
104
and has some extra text
106
common with the next text
120
class TestMakeAndApplyDelta(tests.TestCase):
122
scenarios = module_scenarios()
123
_gc_module = None # Set by load_tests
126
super(TestMakeAndApplyDelta, self).setUp()
127
self.make_delta = self._gc_module.make_delta
128
self.apply_delta = self._gc_module.apply_delta
129
self.apply_delta_to_source = self._gc_module.apply_delta_to_source
131
def test_make_delta_is_typesafe(self):
132
self.make_delta('a string', 'another string')
134
def _check_make_delta(string1, string2):
135
self.assertRaises(TypeError, self.make_delta, string1, string2)
137
_check_make_delta('a string', object())
138
_check_make_delta('a string', u'not a string')
139
_check_make_delta(object(), 'a string')
140
_check_make_delta(u'not a string', 'a string')
142
def test_make_noop_delta(self):
143
ident_delta = self.make_delta(_text1, _text1)
144
self.assertEqual('M\x90M', ident_delta)
145
ident_delta = self.make_delta(_text2, _text2)
146
self.assertEqual('N\x90N', ident_delta)
147
ident_delta = self.make_delta(_text3, _text3)
148
self.assertEqual('\x87\x01\x90\x87', ident_delta)
150
def assertDeltaIn(self, delta1, delta2, delta):
151
"""Make sure that the delta bytes match one of the expectations."""
152
# In general, the python delta matcher gives different results than the
153
# pyrex delta matcher. Both should be valid deltas, though.
154
if delta not in (delta1, delta2):
155
self.fail("Delta bytes:\n"
159
% (delta, delta1, delta2))
161
def test_make_delta(self):
162
delta = self.make_delta(_text1, _text2)
164
'N\x90/\x1fdiffer from\nagainst other text\n',
165
'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
167
delta = self.make_delta(_text2, _text1)
169
'M\x90/\x1ebe matched\nagainst other text\n',
170
'M\x90\x1d\x1dwhich is meant to be matched\n\x91;\x13',
172
delta = self.make_delta(_text3, _text1)
173
self.assertEqual('M\x90M', delta)
174
delta = self.make_delta(_text3, _text2)
176
'N\x90/\x1fdiffer from\nagainst other text\n',
177
'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
180
def test_make_delta_with_large_copies(self):
181
# We want to have a copy that is larger than 64kB, which forces us to
182
# issue multiple copy instructions.
183
big_text = _text3 * 1220
184
delta = self.make_delta(big_text, big_text)
186
'\xdc\x86\x0a' # Encoding the length of the uncompressed text
187
'\x80' # Copy 64kB, starting at byte 0
188
'\x84\x01' # and another 64kB starting at 64kB
189
'\xb4\x02\x5c\x83', # And the bit of tail.
190
None, # Both implementations should be identical
193
def test_apply_delta_is_typesafe(self):
194
self.apply_delta(_text1, 'M\x90M')
195
self.assertRaises(TypeError, self.apply_delta, object(), 'M\x90M')
196
self.assertRaises(TypeError, self.apply_delta,
197
unicode(_text1), 'M\x90M')
198
self.assertRaises(TypeError, self.apply_delta, _text1, u'M\x90M')
199
self.assertRaises(TypeError, self.apply_delta, _text1, object())
201
def test_apply_delta(self):
202
target = self.apply_delta(_text1,
203
'N\x90/\x1fdiffer from\nagainst other text\n')
204
self.assertEqual(_text2, target)
205
target = self.apply_delta(_text2,
206
'M\x90/\x1ebe matched\nagainst other text\n')
207
self.assertEqual(_text1, target)
209
def test_apply_delta_to_source_is_safe(self):
210
self.assertRaises(TypeError,
211
self.apply_delta_to_source, object(), 0, 1)
212
self.assertRaises(TypeError,
213
self.apply_delta_to_source, u'unicode str', 0, 1)
215
self.assertRaises(ValueError,
216
self.apply_delta_to_source, 'foo', 1, 4)
218
self.assertRaises(ValueError,
219
self.apply_delta_to_source, 'foo', 5, 3)
221
self.assertRaises(ValueError,
222
self.apply_delta_to_source, 'foo', 3, 2)
224
def test_apply_delta_to_source(self):
225
source_and_delta = (_text1
226
+ 'N\x90/\x1fdiffer from\nagainst other text\n')
227
self.assertEqual(_text2, self.apply_delta_to_source(source_and_delta,
228
len(_text1), len(source_and_delta)))
231
class TestMakeAndApplyCompatible(tests.TestCase):
233
scenarios = two_way_scenarios()
235
make_delta = None # Set by load_tests
236
apply_delta = None # Set by load_tests
238
def assertMakeAndApply(self, source, target):
239
"""Assert that generating a delta and applying gives success."""
240
delta = self.make_delta(source, target)
241
bytes = self.apply_delta(source, delta)
242
self.assertEqualDiff(target, bytes)
244
def test_direct(self):
245
self.assertMakeAndApply(_text1, _text2)
246
self.assertMakeAndApply(_text2, _text1)
247
self.assertMakeAndApply(_text1, _text3)
248
self.assertMakeAndApply(_text3, _text1)
249
self.assertMakeAndApply(_text2, _text3)
250
self.assertMakeAndApply(_text3, _text2)
253
class TestDeltaIndex(tests.TestCase):
256
super(TestDeltaIndex, self).setUp()
257
# This test isn't multiplied, because we only have DeltaIndex for the
259
# We call this here, because _test_needs_features happens after setUp
260
self.requireFeature(compiled_groupcompress_feature)
261
self._gc_module = compiled_groupcompress_feature.module
264
di = self._gc_module.DeltaIndex('test text\n')
265
self.assertEqual('DeltaIndex(1, 10)', repr(di))
267
def test__dump_no_index(self):
268
di = self._gc_module.DeltaIndex()
269
self.assertEqual(None, di._dump_index())
271
def test__dump_index_simple(self):
272
di = self._gc_module.DeltaIndex()
273
di.add_source(_text1, 0)
274
self.assertFalse(di._has_index())
275
self.assertEqual(None, di._dump_index())
276
_ = di.make_delta(_text1)
277
self.assertTrue(di._has_index())
278
hash_list, entry_list = di._dump_index()
279
self.assertEqual(16, len(hash_list))
280
self.assertEqual(68, len(entry_list))
281
just_entries = [(idx, text_offset, hash_val)
282
for idx, (text_offset, hash_val)
283
in enumerate(entry_list)
284
if text_offset != 0 or hash_val != 0]
285
rabin_hash = self._gc_module._rabin_hash
286
self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
287
(25, 48, rabin_hash(_text1[33:49])),
288
(34, 32, rabin_hash(_text1[17:33])),
289
(47, 64, rabin_hash(_text1[49:65])),
291
# This ensures that the hash map points to the location we expect it to
292
for entry_idx, text_offset, hash_val in just_entries:
293
self.assertEqual(entry_idx, hash_list[hash_val & 0xf])
295
def test__dump_index_two_sources(self):
296
di = self._gc_module.DeltaIndex()
297
di.add_source(_text1, 0)
298
di.add_source(_text2, 2)
299
start2 = len(_text1) + 2
300
self.assertTrue(di._has_index())
301
hash_list, entry_list = di._dump_index()
302
self.assertEqual(16, len(hash_list))
303
self.assertEqual(68, len(entry_list))
304
just_entries = [(idx, text_offset, hash_val)
305
for idx, (text_offset, hash_val)
306
in enumerate(entry_list)
307
if text_offset != 0 or hash_val != 0]
308
rabin_hash = self._gc_module._rabin_hash
309
self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
310
(9, start2+16, rabin_hash(_text2[1:17])),
311
(25, 48, rabin_hash(_text1[33:49])),
312
(30, start2+64, rabin_hash(_text2[49:65])),
313
(34, 32, rabin_hash(_text1[17:33])),
314
(35, start2+32, rabin_hash(_text2[17:33])),
315
(43, start2+48, rabin_hash(_text2[33:49])),
316
(47, 64, rabin_hash(_text1[49:65])),
318
# Each entry should be in the appropriate hash bucket.
319
for entry_idx, text_offset, hash_val in just_entries:
320
hash_idx = hash_val & 0xf
322
hash_list[hash_idx] <= entry_idx < hash_list[hash_idx+1])
324
def test_first_add_source_doesnt_index_until_make_delta(self):
325
di = self._gc_module.DeltaIndex()
326
self.assertFalse(di._has_index())
327
di.add_source(_text1, 0)
328
self.assertFalse(di._has_index())
329
# However, asking to make a delta will trigger the index to be
330
# generated, and will generate a proper delta
331
delta = di.make_delta(_text2)
332
self.assertTrue(di._has_index())
333
self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
335
def test_add_source_max_bytes_to_index(self):
336
di = self._gc_module.DeltaIndex()
337
di._max_bytes_to_index = 3*16
338
di.add_source(_text1, 0) # (77 bytes -1) // 3 = 25 byte stride
339
di.add_source(_text3, 3) # (135 bytes -1) // 3 = 44 byte stride
340
start2 = len(_text1) + 3
341
hash_list, entry_list = di._dump_index()
342
self.assertEqual(16, len(hash_list))
343
self.assertEqual(67, len(entry_list))
344
just_entries = sorted([(text_offset, hash_val)
345
for text_offset, hash_val in entry_list
346
if text_offset != 0 or hash_val != 0])
347
rabin_hash = self._gc_module._rabin_hash
348
self.assertEqual([(25, rabin_hash(_text1[10:26])),
349
(50, rabin_hash(_text1[35:51])),
350
(75, rabin_hash(_text1[60:76])),
351
(start2+44, rabin_hash(_text3[29:45])),
352
(start2+88, rabin_hash(_text3[73:89])),
353
(start2+132, rabin_hash(_text3[117:133])),
356
def test_second_add_source_triggers_make_index(self):
357
di = self._gc_module.DeltaIndex()
358
self.assertFalse(di._has_index())
359
di.add_source(_text1, 0)
360
self.assertFalse(di._has_index())
361
di.add_source(_text2, 0)
362
self.assertTrue(di._has_index())
364
def test_make_delta(self):
365
di = self._gc_module.DeltaIndex(_text1)
366
delta = di.make_delta(_text2)
367
self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
369
def test_delta_against_multiple_sources(self):
370
di = self._gc_module.DeltaIndex()
371
di.add_source(_first_text, 0)
372
self.assertEqual(len(_first_text), di._source_offset)
373
di.add_source(_second_text, 0)
374
self.assertEqual(len(_first_text) + len(_second_text),
376
delta = di.make_delta(_third_text)
377
result = self._gc_module.apply_delta(_first_text + _second_text, delta)
378
self.assertEqualDiff(_third_text, result)
379
self.assertEqual('\x85\x01\x90\x14\x0chas some in '
380
'\x91v6\x03and\x91d"\x91:\n', delta)
382
def test_delta_with_offsets(self):
383
di = self._gc_module.DeltaIndex()
384
di.add_source(_first_text, 5)
385
self.assertEqual(len(_first_text) + 5, di._source_offset)
386
di.add_source(_second_text, 10)
387
self.assertEqual(len(_first_text) + len(_second_text) + 15,
389
delta = di.make_delta(_third_text)
390
self.assertIsNot(None, delta)
391
result = self._gc_module.apply_delta(
392
'12345' + _first_text + '1234567890' + _second_text, delta)
393
self.assertIsNot(None, result)
394
self.assertEqualDiff(_third_text, result)
395
self.assertEqual('\x85\x01\x91\x05\x14\x0chas some in '
396
'\x91\x856\x03and\x91s"\x91?\n', delta)
398
def test_delta_with_delta_bytes(self):
399
di = self._gc_module.DeltaIndex()
401
di.add_source(_first_text, 0)
402
self.assertEqual(len(_first_text), di._source_offset)
403
delta = di.make_delta(_second_text)
404
self.assertEqual('h\tsome more\x91\x019'
405
'&previous text\nand has some extra text\n', delta)
406
di.add_delta_source(delta, 0)
408
self.assertEqual(len(_first_text) + len(delta), di._source_offset)
409
second_delta = di.make_delta(_third_text)
410
result = self._gc_module.apply_delta(source, second_delta)
411
self.assertEqualDiff(_third_text, result)
412
# We should be able to match against the
413
# 'previous text\nand has some...' that was part of the delta bytes
414
# Note that we don't match the 'common with the', because it isn't long
415
# enough to match in the original text, and those bytes are not present
416
# in the delta for the second text.
417
self.assertEqual('\x85\x01\x90\x14\x1chas some in common with the '
418
'\x91S&\x03and\x91\x18,', second_delta)
419
# Add this delta, and create a new delta for the same text. We should
420
# find the remaining text, and only insert the short 'and' text.
421
di.add_delta_source(second_delta, 0)
422
source += second_delta
423
third_delta = di.make_delta(_third_text)
424
result = self._gc_module.apply_delta(source, third_delta)
425
self.assertEqualDiff(_third_text, result)
426
self.assertEqual('\x85\x01\x90\x14\x91\x7e\x1c'
427
'\x91S&\x03and\x91\x18,', third_delta)
428
# Now create a delta, which we know won't be able to be 'fit' into the
430
fourth_delta = di.make_delta(_fourth_text)
431
self.assertEqual(_fourth_text,
432
self._gc_module.apply_delta(source, fourth_delta))
433
self.assertEqual('\x80\x01'
434
'\x7f123456789012345\nsame rabin hash\n'
435
'123456789012345\nsame rabin hash\n'
436
'123456789012345\nsame rabin hash\n'
437
'123456789012345\nsame rabin hash'
438
'\x01\n', fourth_delta)
439
di.add_delta_source(fourth_delta, 0)
440
source += fourth_delta
441
# With the next delta, everything should be found
442
fifth_delta = di.make_delta(_fourth_text)
443
self.assertEqual(_fourth_text,
444
self._gc_module.apply_delta(source, fifth_delta))
445
self.assertEqual('\x80\x01\x91\xa7\x7f\x01\n', fifth_delta)
448
class TestCopyInstruction(tests.TestCase):
450
def assertEncode(self, expected, offset, length):
451
bytes = _groupcompress_py.encode_copy_instruction(offset, length)
452
if expected != bytes:
453
self.assertEqual([hex(ord(e)) for e in expected],
454
[hex(ord(b)) for b in bytes])
456
def assertDecode(self, exp_offset, exp_length, exp_newpos, bytes, pos):
457
cmd = ord(bytes[pos])
459
out = _groupcompress_py.decode_copy_instruction(bytes, cmd, pos)
460
self.assertEqual((exp_offset, exp_length, exp_newpos), out)
462
def test_encode_no_length(self):
463
self.assertEncode('\x80', 0, 64*1024)
464
self.assertEncode('\x81\x01', 1, 64*1024)
465
self.assertEncode('\x81\x0a', 10, 64*1024)
466
self.assertEncode('\x81\xff', 255, 64*1024)
467
self.assertEncode('\x82\x01', 256, 64*1024)
468
self.assertEncode('\x83\x01\x01', 257, 64*1024)
469
self.assertEncode('\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64*1024)
470
self.assertEncode('\x8E\xff\xff\xff', 0xFFFFFF00, 64*1024)
471
self.assertEncode('\x8D\xff\xff\xff', 0xFFFF00FF, 64*1024)
472
self.assertEncode('\x8B\xff\xff\xff', 0xFF00FFFF, 64*1024)
473
self.assertEncode('\x87\xff\xff\xff', 0x00FFFFFF, 64*1024)
474
self.assertEncode('\x8F\x04\x03\x02\x01', 0x01020304, 64*1024)
476
def test_encode_no_offset(self):
477
self.assertEncode('\x90\x01', 0, 1)
478
self.assertEncode('\x90\x0a', 0, 10)
479
self.assertEncode('\x90\xff', 0, 255)
480
self.assertEncode('\xA0\x01', 0, 256)
481
self.assertEncode('\xB0\x01\x01', 0, 257)
482
self.assertEncode('\xB0\xff\xff', 0, 0xFFFF)
483
# Special case, if copy == 64KiB, then we store exactly 0
484
# Note that this puns with a copy of exactly 0 bytes, but we don't care
485
# about that, as we would never actually copy 0 bytes
486
self.assertEncode('\x80', 0, 64*1024)
488
def test_encode(self):
489
self.assertEncode('\x91\x01\x01', 1, 1)
490
self.assertEncode('\x91\x09\x0a', 9, 10)
491
self.assertEncode('\x91\xfe\xff', 254, 255)
492
self.assertEncode('\xA2\x02\x01', 512, 256)
493
self.assertEncode('\xB3\x02\x01\x01\x01', 258, 257)
494
self.assertEncode('\xB0\x01\x01', 0, 257)
495
# Special case, if copy == 64KiB, then we store exactly 0
496
# Note that this puns with a copy of exactly 0 bytes, but we don't care
497
# about that, as we would never actually copy 0 bytes
498
self.assertEncode('\x81\x0a', 10, 64*1024)
500
def test_decode_no_length(self):
501
# If length is 0, it is interpreted as 64KiB
502
# The shortest possible instruction is a copy of 64KiB from offset 0
503
self.assertDecode(0, 65536, 1, '\x80', 0)
504
self.assertDecode(1, 65536, 2, '\x81\x01', 0)
505
self.assertDecode(10, 65536, 2, '\x81\x0a', 0)
506
self.assertDecode(255, 65536, 2, '\x81\xff', 0)
507
self.assertDecode(256, 65536, 2, '\x82\x01', 0)
508
self.assertDecode(257, 65536, 3, '\x83\x01\x01', 0)
509
self.assertDecode(0xFFFFFFFF, 65536, 5, '\x8F\xff\xff\xff\xff', 0)
510
self.assertDecode(0xFFFFFF00, 65536, 4, '\x8E\xff\xff\xff', 0)
511
self.assertDecode(0xFFFF00FF, 65536, 4, '\x8D\xff\xff\xff', 0)
512
self.assertDecode(0xFF00FFFF, 65536, 4, '\x8B\xff\xff\xff', 0)
513
self.assertDecode(0x00FFFFFF, 65536, 4, '\x87\xff\xff\xff', 0)
514
self.assertDecode(0x01020304, 65536, 5, '\x8F\x04\x03\x02\x01', 0)
516
def test_decode_no_offset(self):
517
self.assertDecode(0, 1, 2, '\x90\x01', 0)
518
self.assertDecode(0, 10, 2, '\x90\x0a', 0)
519
self.assertDecode(0, 255, 2, '\x90\xff', 0)
520
self.assertDecode(0, 256, 2, '\xA0\x01', 0)
521
self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
522
self.assertDecode(0, 65535, 3, '\xB0\xff\xff', 0)
523
# Special case, if copy == 64KiB, then we store exactly 0
524
# Note that this puns with a copy of exactly 0 bytes, but we don't care
525
# about that, as we would never actually copy 0 bytes
526
self.assertDecode(0, 65536, 1, '\x80', 0)
528
def test_decode(self):
529
self.assertDecode(1, 1, 3, '\x91\x01\x01', 0)
530
self.assertDecode(9, 10, 3, '\x91\x09\x0a', 0)
531
self.assertDecode(254, 255, 3, '\x91\xfe\xff', 0)
532
self.assertDecode(512, 256, 3, '\xA2\x02\x01', 0)
533
self.assertDecode(258, 257, 5, '\xB3\x02\x01\x01\x01', 0)
534
self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
536
def test_decode_not_start(self):
537
self.assertDecode(1, 1, 6, 'abc\x91\x01\x01def', 3)
538
self.assertDecode(9, 10, 5, 'ab\x91\x09\x0ade', 2)
539
self.assertDecode(254, 255, 6, 'not\x91\xfe\xffcopy', 3)
542
class TestBase128Int(tests.TestCase):
544
scenarios = module_scenarios()
546
_gc_module = None # Set by load_tests
548
def assertEqualEncode(self, bytes, val):
549
self.assertEqual(bytes, self._gc_module.encode_base128_int(val))
551
def assertEqualDecode(self, val, num_decode, bytes):
552
self.assertEqual((val, num_decode),
553
self._gc_module.decode_base128_int(bytes))
555
def test_encode(self):
556
self.assertEqualEncode('\x01', 1)
557
self.assertEqualEncode('\x02', 2)
558
self.assertEqualEncode('\x7f', 127)
559
self.assertEqualEncode('\x80\x01', 128)
560
self.assertEqualEncode('\xff\x01', 255)
561
self.assertEqualEncode('\x80\x02', 256)
562
self.assertEqualEncode('\xff\xff\xff\xff\x0f', 0xFFFFFFFF)
564
def test_decode(self):
565
self.assertEqualDecode(1, 1, '\x01')
566
self.assertEqualDecode(2, 1, '\x02')
567
self.assertEqualDecode(127, 1, '\x7f')
568
self.assertEqualDecode(128, 2, '\x80\x01')
569
self.assertEqualDecode(255, 2, '\xff\x01')
570
self.assertEqualDecode(256, 2, '\x80\x02')
571
self.assertEqualDecode(0xFFFFFFFF, 5, '\xff\xff\xff\xff\x0f')
573
def test_decode_with_trailing_bytes(self):
574
self.assertEqualDecode(1, 1, '\x01abcdef')
575
self.assertEqualDecode(127, 1, '\x7f\x01')
576
self.assertEqualDecode(128, 2, '\x80\x01abcdef')
577
self.assertEqualDecode(255, 2, '\xff\x01\xff')