1
# Copyright (C) 2008-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the python and pyrex extensions of groupcompress"""
23
from bzrlib.tests.scenarios import (
24
load_tests_apply_scenarios,
26
from bzrlib.tests import (
31
def module_scenarios():
33
('python', {'_gc_module': _groupcompress_py}),
35
if compiled_groupcompress_feature.available():
36
gc_module = compiled_groupcompress_feature.module
37
scenarios.append(('C',
38
{'_gc_module': gc_module}))
42
def two_way_scenarios():
44
('PP', {'make_delta': _groupcompress_py.make_delta,
45
'apply_delta': _groupcompress_py.apply_delta})
47
if compiled_groupcompress_feature.available():
48
gc_module = compiled_groupcompress_feature.module
50
('CC', {'make_delta': gc_module.make_delta,
51
'apply_delta': gc_module.apply_delta}),
52
('PC', {'make_delta': _groupcompress_py.make_delta,
53
'apply_delta': gc_module.apply_delta}),
54
('CP', {'make_delta': gc_module.make_delta,
55
'apply_delta': _groupcompress_py.apply_delta}),
60
load_tests = load_tests_apply_scenarios
63
compiled_groupcompress_feature = features.ModuleAvailableFeature(
64
'bzrlib._groupcompress_pyx')
69
which is meant to be matched
76
which is meant to differ from
83
which is meant to be matched
87
at the end of the file
93
common with the next text
97
some more bit of text, that
99
common with the previous text
100
and has some extra text
106
has some in common with the previous text
107
and has some extra text
109
common with the next text
123
class TestMakeAndApplyDelta(tests.TestCase):
125
scenarios = module_scenarios()
126
_gc_module = None # Set by load_tests
129
super(TestMakeAndApplyDelta, self).setUp()
130
self.make_delta = self._gc_module.make_delta
131
self.apply_delta = self._gc_module.apply_delta
132
self.apply_delta_to_source = self._gc_module.apply_delta_to_source
134
def test_make_delta_is_typesafe(self):
135
self.make_delta('a string', 'another string')
137
def _check_make_delta(string1, string2):
138
self.assertRaises(TypeError, self.make_delta, string1, string2)
140
_check_make_delta('a string', object())
141
_check_make_delta('a string', u'not a string')
142
_check_make_delta(object(), 'a string')
143
_check_make_delta(u'not a string', 'a string')
145
def test_make_noop_delta(self):
146
ident_delta = self.make_delta(_text1, _text1)
147
self.assertEqual('M\x90M', ident_delta)
148
ident_delta = self.make_delta(_text2, _text2)
149
self.assertEqual('N\x90N', ident_delta)
150
ident_delta = self.make_delta(_text3, _text3)
151
self.assertEqual('\x87\x01\x90\x87', ident_delta)
153
def assertDeltaIn(self, delta1, delta2, delta):
154
"""Make sure that the delta bytes match one of the expectations."""
155
# In general, the python delta matcher gives different results than the
156
# pyrex delta matcher. Both should be valid deltas, though.
157
if delta not in (delta1, delta2):
158
self.fail("Delta bytes:\n"
162
% (delta, delta1, delta2))
164
def test_make_delta(self):
165
delta = self.make_delta(_text1, _text2)
167
'N\x90/\x1fdiffer from\nagainst other text\n',
168
'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
170
delta = self.make_delta(_text2, _text1)
172
'M\x90/\x1ebe matched\nagainst other text\n',
173
'M\x90\x1d\x1dwhich is meant to be matched\n\x91;\x13',
175
delta = self.make_delta(_text3, _text1)
176
self.assertEqual('M\x90M', delta)
177
delta = self.make_delta(_text3, _text2)
179
'N\x90/\x1fdiffer from\nagainst other text\n',
180
'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
183
def test_make_delta_with_large_copies(self):
184
# We want to have a copy that is larger than 64kB, which forces us to
185
# issue multiple copy instructions.
186
big_text = _text3 * 1220
187
delta = self.make_delta(big_text, big_text)
189
'\xdc\x86\x0a' # Encoding the length of the uncompressed text
190
'\x80' # Copy 64kB, starting at byte 0
191
'\x84\x01' # and another 64kB starting at 64kB
192
'\xb4\x02\x5c\x83', # And the bit of tail.
193
None, # Both implementations should be identical
196
def test_apply_delta_is_typesafe(self):
197
self.apply_delta(_text1, 'M\x90M')
198
self.assertRaises(TypeError, self.apply_delta, object(), 'M\x90M')
199
self.assertRaises(TypeError, self.apply_delta,
200
unicode(_text1), 'M\x90M')
201
self.assertRaises(TypeError, self.apply_delta, _text1, u'M\x90M')
202
self.assertRaises(TypeError, self.apply_delta, _text1, object())
204
def test_apply_delta(self):
205
target = self.apply_delta(_text1,
206
'N\x90/\x1fdiffer from\nagainst other text\n')
207
self.assertEqual(_text2, target)
208
target = self.apply_delta(_text2,
209
'M\x90/\x1ebe matched\nagainst other text\n')
210
self.assertEqual(_text1, target)
212
def test_apply_delta_to_source_is_safe(self):
213
self.assertRaises(TypeError,
214
self.apply_delta_to_source, object(), 0, 1)
215
self.assertRaises(TypeError,
216
self.apply_delta_to_source, u'unicode str', 0, 1)
218
self.assertRaises(ValueError,
219
self.apply_delta_to_source, 'foo', 1, 4)
221
self.assertRaises(ValueError,
222
self.apply_delta_to_source, 'foo', 5, 3)
224
self.assertRaises(ValueError,
225
self.apply_delta_to_source, 'foo', 3, 2)
227
def test_apply_delta_to_source(self):
228
source_and_delta = (_text1
229
+ 'N\x90/\x1fdiffer from\nagainst other text\n')
230
self.assertEqual(_text2, self.apply_delta_to_source(source_and_delta,
231
len(_text1), len(source_and_delta)))
234
class TestMakeAndApplyCompatible(tests.TestCase):
236
scenarios = two_way_scenarios()
238
make_delta = None # Set by load_tests
239
apply_delta = None # Set by load_tests
241
def assertMakeAndApply(self, source, target):
242
"""Assert that generating a delta and applying gives success."""
243
delta = self.make_delta(source, target)
244
bytes = self.apply_delta(source, delta)
245
self.assertEqualDiff(target, bytes)
247
def test_direct(self):
248
self.assertMakeAndApply(_text1, _text2)
249
self.assertMakeAndApply(_text2, _text1)
250
self.assertMakeAndApply(_text1, _text3)
251
self.assertMakeAndApply(_text3, _text1)
252
self.assertMakeAndApply(_text2, _text3)
253
self.assertMakeAndApply(_text3, _text2)
256
class TestDeltaIndex(tests.TestCase):
259
super(TestDeltaIndex, self).setUp()
260
# This test isn't multiplied, because we only have DeltaIndex for the
262
# We call this here, because _test_needs_features happens after setUp
263
self.requireFeature(compiled_groupcompress_feature)
264
self._gc_module = compiled_groupcompress_feature.module
267
di = self._gc_module.DeltaIndex('test text\n')
268
self.assertEqual('DeltaIndex(1, 10)', repr(di))
270
def test__dump_no_index(self):
271
di = self._gc_module.DeltaIndex()
272
self.assertEqual(None, di._dump_index())
274
def test__dump_index_simple(self):
275
di = self._gc_module.DeltaIndex()
276
di.add_source(_text1, 0)
277
self.assertFalse(di._has_index())
278
self.assertEqual(None, di._dump_index())
279
_ = di.make_delta(_text1)
280
self.assertTrue(di._has_index())
281
hash_list, entry_list = di._dump_index()
282
self.assertEqual(16, len(hash_list))
283
self.assertEqual(68, len(entry_list))
284
just_entries = [(idx, text_offset, hash_val)
285
for idx, (text_offset, hash_val)
286
in enumerate(entry_list)
287
if text_offset != 0 or hash_val != 0]
288
rabin_hash = self._gc_module._rabin_hash
289
self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
290
(25, 48, rabin_hash(_text1[33:49])),
291
(34, 32, rabin_hash(_text1[17:33])),
292
(47, 64, rabin_hash(_text1[49:65])),
294
# This ensures that the hash map points to the location we expect it to
295
for entry_idx, text_offset, hash_val in just_entries:
296
self.assertEqual(entry_idx, hash_list[hash_val & 0xf])
298
def test__dump_index_two_sources(self):
299
di = self._gc_module.DeltaIndex()
300
di.add_source(_text1, 0)
301
di.add_source(_text2, 2)
302
start2 = len(_text1) + 2
303
self.assertTrue(di._has_index())
304
hash_list, entry_list = di._dump_index()
305
self.assertEqual(16, len(hash_list))
306
self.assertEqual(68, len(entry_list))
307
just_entries = [(idx, text_offset, hash_val)
308
for idx, (text_offset, hash_val)
309
in enumerate(entry_list)
310
if text_offset != 0 or hash_val != 0]
311
rabin_hash = self._gc_module._rabin_hash
312
self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
313
(9, start2+16, rabin_hash(_text2[1:17])),
314
(25, 48, rabin_hash(_text1[33:49])),
315
(30, start2+64, rabin_hash(_text2[49:65])),
316
(34, 32, rabin_hash(_text1[17:33])),
317
(35, start2+32, rabin_hash(_text2[17:33])),
318
(43, start2+48, rabin_hash(_text2[33:49])),
319
(47, 64, rabin_hash(_text1[49:65])),
321
# Each entry should be in the appropriate hash bucket.
322
for entry_idx, text_offset, hash_val in just_entries:
323
hash_idx = hash_val & 0xf
325
hash_list[hash_idx] <= entry_idx < hash_list[hash_idx+1])
327
def test_first_add_source_doesnt_index_until_make_delta(self):
328
di = self._gc_module.DeltaIndex()
329
self.assertFalse(di._has_index())
330
di.add_source(_text1, 0)
331
self.assertFalse(di._has_index())
332
# However, asking to make a delta will trigger the index to be
333
# generated, and will generate a proper delta
334
delta = di.make_delta(_text2)
335
self.assertTrue(di._has_index())
336
self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
338
def test_add_source_max_bytes_to_index(self):
339
di = self._gc_module.DeltaIndex()
340
di._max_bytes_to_index = 3*16
341
di.add_source(_text1, 0) # (77 bytes -1) // 3 = 25 byte stride
342
di.add_source(_text3, 3) # (135 bytes -1) // 3 = 44 byte stride
343
start2 = len(_text1) + 3
344
hash_list, entry_list = di._dump_index()
345
self.assertEqual(16, len(hash_list))
346
self.assertEqual(67, len(entry_list))
347
just_entries = sorted([(text_offset, hash_val)
348
for text_offset, hash_val in entry_list
349
if text_offset != 0 or hash_val != 0])
350
rabin_hash = self._gc_module._rabin_hash
351
self.assertEqual([(25, rabin_hash(_text1[10:26])),
352
(50, rabin_hash(_text1[35:51])),
353
(75, rabin_hash(_text1[60:76])),
354
(start2+44, rabin_hash(_text3[29:45])),
355
(start2+88, rabin_hash(_text3[73:89])),
356
(start2+132, rabin_hash(_text3[117:133])),
359
def test_second_add_source_triggers_make_index(self):
360
di = self._gc_module.DeltaIndex()
361
self.assertFalse(di._has_index())
362
di.add_source(_text1, 0)
363
self.assertFalse(di._has_index())
364
di.add_source(_text2, 0)
365
self.assertTrue(di._has_index())
367
def test_make_delta(self):
368
di = self._gc_module.DeltaIndex(_text1)
369
delta = di.make_delta(_text2)
370
self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
372
def test_delta_against_multiple_sources(self):
373
di = self._gc_module.DeltaIndex()
374
di.add_source(_first_text, 0)
375
self.assertEqual(len(_first_text), di._source_offset)
376
di.add_source(_second_text, 0)
377
self.assertEqual(len(_first_text) + len(_second_text),
379
delta = di.make_delta(_third_text)
380
result = self._gc_module.apply_delta(_first_text + _second_text, delta)
381
self.assertEqualDiff(_third_text, result)
382
self.assertEqual('\x85\x01\x90\x14\x0chas some in '
383
'\x91v6\x03and\x91d"\x91:\n', delta)
385
def test_delta_with_offsets(self):
386
di = self._gc_module.DeltaIndex()
387
di.add_source(_first_text, 5)
388
self.assertEqual(len(_first_text) + 5, di._source_offset)
389
di.add_source(_second_text, 10)
390
self.assertEqual(len(_first_text) + len(_second_text) + 15,
392
delta = di.make_delta(_third_text)
393
self.assertIsNot(None, delta)
394
result = self._gc_module.apply_delta(
395
'12345' + _first_text + '1234567890' + _second_text, delta)
396
self.assertIsNot(None, result)
397
self.assertEqualDiff(_third_text, result)
398
self.assertEqual('\x85\x01\x91\x05\x14\x0chas some in '
399
'\x91\x856\x03and\x91s"\x91?\n', delta)
401
def test_delta_with_delta_bytes(self):
402
di = self._gc_module.DeltaIndex()
404
di.add_source(_first_text, 0)
405
self.assertEqual(len(_first_text), di._source_offset)
406
delta = di.make_delta(_second_text)
407
self.assertEqual('h\tsome more\x91\x019'
408
'&previous text\nand has some extra text\n', delta)
409
di.add_delta_source(delta, 0)
411
self.assertEqual(len(_first_text) + len(delta), di._source_offset)
412
second_delta = di.make_delta(_third_text)
413
result = self._gc_module.apply_delta(source, second_delta)
414
self.assertEqualDiff(_third_text, result)
415
# We should be able to match against the
416
# 'previous text\nand has some...' that was part of the delta bytes
417
# Note that we don't match the 'common with the', because it isn't long
418
# enough to match in the original text, and those bytes are not present
419
# in the delta for the second text.
420
self.assertEqual('\x85\x01\x90\x14\x1chas some in common with the '
421
'\x91S&\x03and\x91\x18,', second_delta)
422
# Add this delta, and create a new delta for the same text. We should
423
# find the remaining text, and only insert the short 'and' text.
424
di.add_delta_source(second_delta, 0)
425
source += second_delta
426
third_delta = di.make_delta(_third_text)
427
result = self._gc_module.apply_delta(source, third_delta)
428
self.assertEqualDiff(_third_text, result)
429
self.assertEqual('\x85\x01\x90\x14\x91\x7e\x1c'
430
'\x91S&\x03and\x91\x18,', third_delta)
431
# Now create a delta, which we know won't be able to be 'fit' into the
433
fourth_delta = di.make_delta(_fourth_text)
434
self.assertEqual(_fourth_text,
435
self._gc_module.apply_delta(source, fourth_delta))
436
self.assertEqual('\x80\x01'
437
'\x7f123456789012345\nsame rabin hash\n'
438
'123456789012345\nsame rabin hash\n'
439
'123456789012345\nsame rabin hash\n'
440
'123456789012345\nsame rabin hash'
441
'\x01\n', fourth_delta)
442
di.add_delta_source(fourth_delta, 0)
443
source += fourth_delta
444
# With the next delta, everything should be found
445
fifth_delta = di.make_delta(_fourth_text)
446
self.assertEqual(_fourth_text,
447
self._gc_module.apply_delta(source, fifth_delta))
448
self.assertEqual('\x80\x01\x91\xa7\x7f\x01\n', fifth_delta)
451
class TestCopyInstruction(tests.TestCase):
453
def assertEncode(self, expected, offset, length):
454
bytes = _groupcompress_py.encode_copy_instruction(offset, length)
455
if expected != bytes:
456
self.assertEqual([hex(ord(e)) for e in expected],
457
[hex(ord(b)) for b in bytes])
459
def assertDecode(self, exp_offset, exp_length, exp_newpos, bytes, pos):
460
cmd = ord(bytes[pos])
462
out = _groupcompress_py.decode_copy_instruction(bytes, cmd, pos)
463
self.assertEqual((exp_offset, exp_length, exp_newpos), out)
465
def test_encode_no_length(self):
466
self.assertEncode('\x80', 0, 64*1024)
467
self.assertEncode('\x81\x01', 1, 64*1024)
468
self.assertEncode('\x81\x0a', 10, 64*1024)
469
self.assertEncode('\x81\xff', 255, 64*1024)
470
self.assertEncode('\x82\x01', 256, 64*1024)
471
self.assertEncode('\x83\x01\x01', 257, 64*1024)
472
self.assertEncode('\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64*1024)
473
self.assertEncode('\x8E\xff\xff\xff', 0xFFFFFF00, 64*1024)
474
self.assertEncode('\x8D\xff\xff\xff', 0xFFFF00FF, 64*1024)
475
self.assertEncode('\x8B\xff\xff\xff', 0xFF00FFFF, 64*1024)
476
self.assertEncode('\x87\xff\xff\xff', 0x00FFFFFF, 64*1024)
477
self.assertEncode('\x8F\x04\x03\x02\x01', 0x01020304, 64*1024)
479
def test_encode_no_offset(self):
480
self.assertEncode('\x90\x01', 0, 1)
481
self.assertEncode('\x90\x0a', 0, 10)
482
self.assertEncode('\x90\xff', 0, 255)
483
self.assertEncode('\xA0\x01', 0, 256)
484
self.assertEncode('\xB0\x01\x01', 0, 257)
485
self.assertEncode('\xB0\xff\xff', 0, 0xFFFF)
486
# Special case, if copy == 64KiB, then we store exactly 0
487
# Note that this puns with a copy of exactly 0 bytes, but we don't care
488
# about that, as we would never actually copy 0 bytes
489
self.assertEncode('\x80', 0, 64*1024)
491
def test_encode(self):
492
self.assertEncode('\x91\x01\x01', 1, 1)
493
self.assertEncode('\x91\x09\x0a', 9, 10)
494
self.assertEncode('\x91\xfe\xff', 254, 255)
495
self.assertEncode('\xA2\x02\x01', 512, 256)
496
self.assertEncode('\xB3\x02\x01\x01\x01', 258, 257)
497
self.assertEncode('\xB0\x01\x01', 0, 257)
498
# Special case, if copy == 64KiB, then we store exactly 0
499
# Note that this puns with a copy of exactly 0 bytes, but we don't care
500
# about that, as we would never actually copy 0 bytes
501
self.assertEncode('\x81\x0a', 10, 64*1024)
503
def test_decode_no_length(self):
504
# If length is 0, it is interpreted as 64KiB
505
# The shortest possible instruction is a copy of 64KiB from offset 0
506
self.assertDecode(0, 65536, 1, '\x80', 0)
507
self.assertDecode(1, 65536, 2, '\x81\x01', 0)
508
self.assertDecode(10, 65536, 2, '\x81\x0a', 0)
509
self.assertDecode(255, 65536, 2, '\x81\xff', 0)
510
self.assertDecode(256, 65536, 2, '\x82\x01', 0)
511
self.assertDecode(257, 65536, 3, '\x83\x01\x01', 0)
512
self.assertDecode(0xFFFFFFFF, 65536, 5, '\x8F\xff\xff\xff\xff', 0)
513
self.assertDecode(0xFFFFFF00, 65536, 4, '\x8E\xff\xff\xff', 0)
514
self.assertDecode(0xFFFF00FF, 65536, 4, '\x8D\xff\xff\xff', 0)
515
self.assertDecode(0xFF00FFFF, 65536, 4, '\x8B\xff\xff\xff', 0)
516
self.assertDecode(0x00FFFFFF, 65536, 4, '\x87\xff\xff\xff', 0)
517
self.assertDecode(0x01020304, 65536, 5, '\x8F\x04\x03\x02\x01', 0)
519
def test_decode_no_offset(self):
520
self.assertDecode(0, 1, 2, '\x90\x01', 0)
521
self.assertDecode(0, 10, 2, '\x90\x0a', 0)
522
self.assertDecode(0, 255, 2, '\x90\xff', 0)
523
self.assertDecode(0, 256, 2, '\xA0\x01', 0)
524
self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
525
self.assertDecode(0, 65535, 3, '\xB0\xff\xff', 0)
526
# Special case, if copy == 64KiB, then we store exactly 0
527
# Note that this puns with a copy of exactly 0 bytes, but we don't care
528
# about that, as we would never actually copy 0 bytes
529
self.assertDecode(0, 65536, 1, '\x80', 0)
531
def test_decode(self):
532
self.assertDecode(1, 1, 3, '\x91\x01\x01', 0)
533
self.assertDecode(9, 10, 3, '\x91\x09\x0a', 0)
534
self.assertDecode(254, 255, 3, '\x91\xfe\xff', 0)
535
self.assertDecode(512, 256, 3, '\xA2\x02\x01', 0)
536
self.assertDecode(258, 257, 5, '\xB3\x02\x01\x01\x01', 0)
537
self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
539
def test_decode_not_start(self):
540
self.assertDecode(1, 1, 6, 'abc\x91\x01\x01def', 3)
541
self.assertDecode(9, 10, 5, 'ab\x91\x09\x0ade', 2)
542
self.assertDecode(254, 255, 6, 'not\x91\xfe\xffcopy', 3)
545
class TestBase128Int(tests.TestCase):
547
scenarios = module_scenarios()
549
_gc_module = None # Set by load_tests
551
def assertEqualEncode(self, bytes, val):
552
self.assertEqual(bytes, self._gc_module.encode_base128_int(val))
554
def assertEqualDecode(self, val, num_decode, bytes):
555
self.assertEqual((val, num_decode),
556
self._gc_module.decode_base128_int(bytes))
558
def test_encode(self):
559
self.assertEqualEncode('\x01', 1)
560
self.assertEqualEncode('\x02', 2)
561
self.assertEqualEncode('\x7f', 127)
562
self.assertEqualEncode('\x80\x01', 128)
563
self.assertEqualEncode('\xff\x01', 255)
564
self.assertEqualEncode('\x80\x02', 256)
565
self.assertEqualEncode('\xff\xff\xff\xff\x0f', 0xFFFFFFFF)
567
def test_decode(self):
568
self.assertEqualDecode(1, 1, '\x01')
569
self.assertEqualDecode(2, 1, '\x02')
570
self.assertEqualDecode(127, 1, '\x7f')
571
self.assertEqualDecode(128, 2, '\x80\x01')
572
self.assertEqualDecode(255, 2, '\xff\x01')
573
self.assertEqualDecode(256, 2, '\x80\x02')
574
self.assertEqualDecode(0xFFFFFFFF, 5, '\xff\xff\xff\xff\x0f')
576
def test_decode_with_trailing_bytes(self):
577
self.assertEqualDecode(1, 1, '\x01abcdef')
578
self.assertEqualDecode(127, 1, '\x7f\x01')
579
self.assertEqualDecode(128, 2, '\x80\x01abcdef')
580
self.assertEqualDecode(255, 2, '\xff\x01\xff')