17
17
"""Tests for the python and pyrex extensions of groupcompress"""
19
19
from bzrlib import (
23
from bzrlib.tests.scenarios import (
24
load_tests_apply_scenarios,
26
from bzrlib.tests import (
31
def module_scenarios():
26
def load_tests(standard_tests, module, loader):
27
"""Parameterize tests for all versions of groupcompress."""
29
('PP', {'make_delta': _groupcompress_py.make_delta,
30
'apply_delta': _groupcompress_py.apply_delta})
33
33
('python', {'_gc_module': _groupcompress_py}),
36
36
gc_module = compiled_groupcompress_feature.module
37
37
scenarios.append(('C',
38
38
{'_gc_module': gc_module}))
42
def two_way_scenarios():
44
('PP', {'make_delta': _groupcompress_py.make_delta,
45
'apply_delta': _groupcompress_py.apply_delta})
47
if compiled_groupcompress_feature.available():
48
gc_module = compiled_groupcompress_feature.module
39
two_way_scenarios.extend([
50
40
('CC', {'make_delta': gc_module.make_delta,
51
41
'apply_delta': gc_module.apply_delta}),
52
42
('PC', {'make_delta': _groupcompress_py.make_delta,
54
44
('CP', {'make_delta': gc_module.make_delta,
55
45
'apply_delta': _groupcompress_py.apply_delta}),
60
load_tests = load_tests_apply_scenarios
63
compiled_groupcompress_feature = features.ModuleAvailableFeature(
64
'bzrlib._groupcompress_pyx')
47
to_adapt, result = tests.split_suite_by_condition(
48
standard_tests, tests.condition_isinstance((TestMakeAndApplyDelta,
50
result = tests.multiply_tests(to_adapt, scenarios, result)
51
to_adapt, result = tests.split_suite_by_condition(result,
52
tests.condition_isinstance(TestMakeAndApplyCompatible))
53
result = tests.multiply_tests(to_adapt, two_way_scenarios, result)
57
compiled_groupcompress_feature = tests.ModuleAvailableFeature(
58
'bzrlib._groupcompress_pyx')
267
258
di = self._gc_module.DeltaIndex('test text\n')
268
259
self.assertEqual('DeltaIndex(1, 10)', repr(di))
270
def test__dump_no_index(self):
271
di = self._gc_module.DeltaIndex()
272
self.assertEqual(None, di._dump_index())
274
def test__dump_index_simple(self):
275
di = self._gc_module.DeltaIndex()
276
di.add_source(_text1, 0)
277
self.assertFalse(di._has_index())
278
self.assertEqual(None, di._dump_index())
279
_ = di.make_delta(_text1)
280
self.assertTrue(di._has_index())
281
hash_list, entry_list = di._dump_index()
282
self.assertEqual(16, len(hash_list))
283
self.assertEqual(68, len(entry_list))
284
just_entries = [(idx, text_offset, hash_val)
285
for idx, (text_offset, hash_val)
286
in enumerate(entry_list)
287
if text_offset != 0 or hash_val != 0]
288
rabin_hash = self._gc_module._rabin_hash
289
self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
290
(25, 48, rabin_hash(_text1[33:49])),
291
(34, 32, rabin_hash(_text1[17:33])),
292
(47, 64, rabin_hash(_text1[49:65])),
294
# This ensures that the hash map points to the location we expect it to
295
for entry_idx, text_offset, hash_val in just_entries:
296
self.assertEqual(entry_idx, hash_list[hash_val & 0xf])
298
def test__dump_index_two_sources(self):
299
di = self._gc_module.DeltaIndex()
300
di.add_source(_text1, 0)
301
di.add_source(_text2, 2)
302
start2 = len(_text1) + 2
303
self.assertTrue(di._has_index())
304
hash_list, entry_list = di._dump_index()
305
self.assertEqual(16, len(hash_list))
306
self.assertEqual(68, len(entry_list))
307
just_entries = [(idx, text_offset, hash_val)
308
for idx, (text_offset, hash_val)
309
in enumerate(entry_list)
310
if text_offset != 0 or hash_val != 0]
311
rabin_hash = self._gc_module._rabin_hash
312
self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
313
(9, start2+16, rabin_hash(_text2[1:17])),
314
(25, 48, rabin_hash(_text1[33:49])),
315
(30, start2+64, rabin_hash(_text2[49:65])),
316
(34, 32, rabin_hash(_text1[17:33])),
317
(35, start2+32, rabin_hash(_text2[17:33])),
318
(43, start2+48, rabin_hash(_text2[33:49])),
319
(47, 64, rabin_hash(_text1[49:65])),
321
# Each entry should be in the appropriate hash bucket.
322
for entry_idx, text_offset, hash_val in just_entries:
323
hash_idx = hash_val & 0xf
325
hash_list[hash_idx] <= entry_idx < hash_list[hash_idx+1])
327
261
def test_first_add_source_doesnt_index_until_make_delta(self):
328
262
di = self._gc_module.DeltaIndex()
329
263
self.assertFalse(di._has_index())
335
269
self.assertTrue(di._has_index())
336
270
self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
338
def test_add_source_max_bytes_to_index(self):
339
di = self._gc_module.DeltaIndex()
340
di._max_bytes_to_index = 3*16
341
di.add_source(_text1, 0) # (77 bytes -1) // 3 = 25 byte stride
342
di.add_source(_text3, 3) # (135 bytes -1) // 3 = 44 byte stride
343
start2 = len(_text1) + 3
344
hash_list, entry_list = di._dump_index()
345
self.assertEqual(16, len(hash_list))
346
self.assertEqual(67, len(entry_list))
347
just_entries = sorted([(text_offset, hash_val)
348
for text_offset, hash_val in entry_list
349
if text_offset != 0 or hash_val != 0])
350
rabin_hash = self._gc_module._rabin_hash
351
self.assertEqual([(25, rabin_hash(_text1[10:26])),
352
(50, rabin_hash(_text1[35:51])),
353
(75, rabin_hash(_text1[60:76])),
354
(start2+44, rabin_hash(_text3[29:45])),
355
(start2+88, rabin_hash(_text3[73:89])),
356
(start2+132, rabin_hash(_text3[117:133])),
359
272
def test_second_add_source_triggers_make_index(self):
360
273
di = self._gc_module.DeltaIndex()
361
274
self.assertFalse(di._has_index())