17
17
"""Tests for the python and pyrex extensions of groupcompress"""
19
19
from bzrlib import (
23
from bzrlib.tests.scenarios import (
24
load_tests_apply_scenarios,
28
def module_scenarios():
26
def load_tests(standard_tests, module, loader):
27
"""Parameterize tests for all versions of groupcompress."""
29
('PP', {'make_delta': _groupcompress_py.make_delta,
30
'apply_delta': _groupcompress_py.apply_delta})
30
33
('python', {'_gc_module': _groupcompress_py}),
32
if compiled_groupcompress_feature.available():
33
gc_module = compiled_groupcompress_feature.module
35
if CompiledGroupCompressFeature.available():
36
from bzrlib import _groupcompress_pyx
34
37
scenarios.append(('C',
35
{'_gc_module': gc_module}))
39
def two_way_scenarios():
41
('PP', {'make_delta': _groupcompress_py.make_delta,
42
'apply_delta': _groupcompress_py.apply_delta})
44
if compiled_groupcompress_feature.available():
45
gc_module = compiled_groupcompress_feature.module
47
('CC', {'make_delta': gc_module.make_delta,
48
'apply_delta': gc_module.apply_delta}),
38
{'_gc_module': _groupcompress_pyx}))
39
two_way_scenarios.extend([
40
('CC', {'make_delta': _groupcompress_pyx.make_delta,
41
'apply_delta': _groupcompress_pyx.apply_delta}),
49
42
('PC', {'make_delta': _groupcompress_py.make_delta,
50
'apply_delta': gc_module.apply_delta}),
51
('CP', {'make_delta': gc_module.make_delta,
43
'apply_delta': _groupcompress_pyx.apply_delta}),
44
('CP', {'make_delta': _groupcompress_pyx.make_delta,
52
45
'apply_delta': _groupcompress_py.apply_delta}),
57
load_tests = load_tests_apply_scenarios
60
compiled_groupcompress_feature = tests.ModuleAvailableFeature(
61
'bzrlib._groupcompress_pyx')
47
to_adapt, result = tests.split_suite_by_condition(
48
standard_tests, tests.condition_isinstance((TestMakeAndApplyDelta,
50
result = tests.multiply_tests(to_adapt, scenarios, result)
51
to_adapt, result = tests.split_suite_by_condition(result,
52
tests.condition_isinstance(TestMakeAndApplyCompatible))
53
result = tests.multiply_tests(to_adapt, two_way_scenarios, result)
57
class _CompiledGroupCompressFeature(tests.Feature):
61
import bzrlib._groupcompress_pyx
67
def feature_name(self):
68
return 'bzrlib._groupcompress_pyx'
71
CompiledGroupCompressFeature = _CompiledGroupCompressFeature()
257
264
# This test isn't multiplied, because we only have DeltaIndex for the
259
266
# We call this here, because _test_needs_features happens after setUp
260
self.requireFeature(compiled_groupcompress_feature)
261
self._gc_module = compiled_groupcompress_feature.module
267
self.requireFeature(CompiledGroupCompressFeature)
268
from bzrlib import _groupcompress_pyx
269
self._gc_module = _groupcompress_pyx
263
271
def test_repr(self):
264
272
di = self._gc_module.DeltaIndex('test text\n')
265
273
self.assertEqual('DeltaIndex(1, 10)', repr(di))
267
def test__dump_no_index(self):
268
di = self._gc_module.DeltaIndex()
269
self.assertEqual(None, di._dump_index())
271
def test__dump_index_simple(self):
272
di = self._gc_module.DeltaIndex()
273
di.add_source(_text1, 0)
274
self.assertFalse(di._has_index())
275
self.assertEqual(None, di._dump_index())
276
_ = di.make_delta(_text1)
277
self.assertTrue(di._has_index())
278
hash_list, entry_list = di._dump_index()
279
self.assertEqual(16, len(hash_list))
280
self.assertEqual(68, len(entry_list))
281
just_entries = [(idx, text_offset, hash_val)
282
for idx, (text_offset, hash_val)
283
in enumerate(entry_list)
284
if text_offset != 0 or hash_val != 0]
285
rabin_hash = self._gc_module._rabin_hash
286
self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
287
(25, 48, rabin_hash(_text1[33:49])),
288
(34, 32, rabin_hash(_text1[17:33])),
289
(47, 64, rabin_hash(_text1[49:65])),
291
# This ensures that the hash map points to the location we expect it to
292
for entry_idx, text_offset, hash_val in just_entries:
293
self.assertEqual(entry_idx, hash_list[hash_val & 0xf])
295
def test__dump_index_two_sources(self):
296
di = self._gc_module.DeltaIndex()
297
di.add_source(_text1, 0)
298
di.add_source(_text2, 2)
299
start2 = len(_text1) + 2
300
self.assertTrue(di._has_index())
301
hash_list, entry_list = di._dump_index()
302
self.assertEqual(16, len(hash_list))
303
self.assertEqual(68, len(entry_list))
304
just_entries = [(idx, text_offset, hash_val)
305
for idx, (text_offset, hash_val)
306
in enumerate(entry_list)
307
if text_offset != 0 or hash_val != 0]
308
rabin_hash = self._gc_module._rabin_hash
309
self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
310
(9, start2+16, rabin_hash(_text2[1:17])),
311
(25, 48, rabin_hash(_text1[33:49])),
312
(30, start2+64, rabin_hash(_text2[49:65])),
313
(34, 32, rabin_hash(_text1[17:33])),
314
(35, start2+32, rabin_hash(_text2[17:33])),
315
(43, start2+48, rabin_hash(_text2[33:49])),
316
(47, 64, rabin_hash(_text1[49:65])),
318
# Each entry should be in the appropriate hash bucket.
319
for entry_idx, text_offset, hash_val in just_entries:
320
hash_idx = hash_val & 0xf
322
hash_list[hash_idx] <= entry_idx < hash_list[hash_idx+1])
324
275
def test_first_add_source_doesnt_index_until_make_delta(self):
325
276
di = self._gc_module.DeltaIndex()
326
277
self.assertFalse(di._has_index())
332
283
self.assertTrue(di._has_index())
333
284
self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
335
def test_add_source_max_bytes_to_index(self):
336
di = self._gc_module.DeltaIndex()
337
di._max_bytes_to_index = 3*16
338
di.add_source(_text1, 0) # (77 bytes -1) // 3 = 25 byte stride
339
di.add_source(_text3, 3) # (135 bytes -1) // 3 = 44 byte stride
340
start2 = len(_text1) + 3
341
hash_list, entry_list = di._dump_index()
342
self.assertEqual(16, len(hash_list))
343
self.assertEqual(67, len(entry_list))
344
just_entries = sorted([(text_offset, hash_val)
345
for text_offset, hash_val in entry_list
346
if text_offset != 0 or hash_val != 0])
347
rabin_hash = self._gc_module._rabin_hash
348
self.assertEqual([(25, rabin_hash(_text1[10:26])),
349
(50, rabin_hash(_text1[35:51])),
350
(75, rabin_hash(_text1[60:76])),
351
(start2+44, rabin_hash(_text3[29:45])),
352
(start2+88, rabin_hash(_text3[73:89])),
353
(start2+132, rabin_hash(_text3[117:133])),
356
286
def test_second_add_source_triggers_make_index(self):
357
287
di = self._gc_module.DeltaIndex()
358
288
self.assertFalse(di._has_index())