33
36
gc_module = compiled_groupcompress_feature.module
34
37
scenarios.append(('C',
35
38
{'_gc_module': gc_module}))
39
def two_way_scenarios():
41
('PP', {'make_delta': _groupcompress_py.make_delta,
42
'apply_delta': _groupcompress_py.apply_delta})
44
if compiled_groupcompress_feature.available():
45
gc_module = compiled_groupcompress_feature.module
39
two_way_scenarios.extend([
47
40
('CC', {'make_delta': gc_module.make_delta,
48
41
'apply_delta': gc_module.apply_delta}),
49
42
('PC', {'make_delta': _groupcompress_py.make_delta,
51
44
('CP', {'make_delta': gc_module.make_delta,
52
45
'apply_delta': _groupcompress_py.apply_delta}),
57
load_tests = load_tests_apply_scenarios
47
to_adapt, result = tests.split_suite_by_condition(
48
standard_tests, tests.condition_isinstance((TestMakeAndApplyDelta,
50
result = tests.multiply_tests(to_adapt, scenarios, result)
51
to_adapt, result = tests.split_suite_by_condition(result,
52
tests.condition_isinstance(TestMakeAndApplyCompatible))
53
result = tests.multiply_tests(to_adapt, two_way_scenarios, result)
60
57
compiled_groupcompress_feature = tests.ModuleAvailableFeature(
264
258
di = self._gc_module.DeltaIndex('test text\n')
265
259
self.assertEqual('DeltaIndex(1, 10)', repr(di))
267
def test__dump_no_index(self):
268
di = self._gc_module.DeltaIndex()
269
self.assertEqual(None, di._dump_index())
271
def test__dump_index_simple(self):
272
di = self._gc_module.DeltaIndex()
273
di.add_source(_text1, 0)
274
self.assertFalse(di._has_index())
275
self.assertEqual(None, di._dump_index())
276
_ = di.make_delta(_text1)
277
self.assertTrue(di._has_index())
278
hash_list, entry_list = di._dump_index()
279
self.assertEqual(16, len(hash_list))
280
self.assertEqual(68, len(entry_list))
281
just_entries = [(idx, text_offset, hash_val)
282
for idx, (text_offset, hash_val)
283
in enumerate(entry_list)
284
if text_offset != 0 or hash_val != 0]
285
rabin_hash = self._gc_module._rabin_hash
286
self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
287
(25, 48, rabin_hash(_text1[33:49])),
288
(34, 32, rabin_hash(_text1[17:33])),
289
(47, 64, rabin_hash(_text1[49:65])),
291
# This ensures that the hash map points to the location we expect it to
292
for entry_idx, text_offset, hash_val in just_entries:
293
self.assertEqual(entry_idx, hash_list[hash_val & 0xf])
295
def test__dump_index_two_sources(self):
296
di = self._gc_module.DeltaIndex()
297
di.add_source(_text1, 0)
298
di.add_source(_text2, 2)
299
start2 = len(_text1) + 2
300
self.assertTrue(di._has_index())
301
hash_list, entry_list = di._dump_index()
302
self.assertEqual(16, len(hash_list))
303
self.assertEqual(68, len(entry_list))
304
just_entries = [(idx, text_offset, hash_val)
305
for idx, (text_offset, hash_val)
306
in enumerate(entry_list)
307
if text_offset != 0 or hash_val != 0]
308
rabin_hash = self._gc_module._rabin_hash
309
self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
310
(9, start2+16, rabin_hash(_text2[1:17])),
311
(25, 48, rabin_hash(_text1[33:49])),
312
(30, start2+64, rabin_hash(_text2[49:65])),
313
(34, 32, rabin_hash(_text1[17:33])),
314
(35, start2+32, rabin_hash(_text2[17:33])),
315
(43, start2+48, rabin_hash(_text2[33:49])),
316
(47, 64, rabin_hash(_text1[49:65])),
318
# Each entry should be in the appropriate hash bucket.
319
for entry_idx, text_offset, hash_val in just_entries:
320
hash_idx = hash_val & 0xf
322
hash_list[hash_idx] <= entry_idx < hash_list[hash_idx+1])
324
261
def test_first_add_source_doesnt_index_until_make_delta(self):
325
262
di = self._gc_module.DeltaIndex()
326
263
self.assertFalse(di._has_index())
332
269
self.assertTrue(di._has_index())
333
270
self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
335
def test_add_source_max_bytes_to_index(self):
336
di = self._gc_module.DeltaIndex()
337
di._max_bytes_to_index = 3*16
338
di.add_source(_text1, 0) # (77 bytes -1) // 3 = 25 byte stride
339
di.add_source(_text3, 3) # (135 bytes -1) // 3 = 44 byte stride
340
start2 = len(_text1) + 3
341
hash_list, entry_list = di._dump_index()
342
self.assertEqual(16, len(hash_list))
343
self.assertEqual(67, len(entry_list))
344
just_entries = sorted([(text_offset, hash_val)
345
for text_offset, hash_val in entry_list
346
if text_offset != 0 or hash_val != 0])
347
rabin_hash = self._gc_module._rabin_hash
348
self.assertEqual([(25, rabin_hash(_text1[10:26])),
349
(50, rabin_hash(_text1[35:51])),
350
(75, rabin_hash(_text1[60:76])),
351
(start2+44, rabin_hash(_text3[29:45])),
352
(start2+88, rabin_hash(_text3[73:89])),
353
(start2+132, rabin_hash(_text3[117:133])),
356
272
def test_second_add_source_triggers_make_index(self):
357
273
di = self._gc_module.DeltaIndex()
358
274
self.assertFalse(di._has_index())