~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test__groupcompress.py

  • Committer: Martin Pool
  • Date: 2010-08-18 07:25:22 UTC
  • mto: This revision was merged to the branch mainline in revision 5383.
  • Revision ID: mbp@sourcefrog.net-20100818072522-uk3gsazoia3l3s0a
Start adding 'what's new in 2.3'

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008-2011 Canonical Ltd
 
1
# Copyright (C) 2008, 2009, 2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
17
17
"""Tests for the python and pyrex extensions of groupcompress"""
18
18
 
19
19
from bzrlib import (
 
20
    groupcompress,
20
21
    _groupcompress_py,
21
22
    tests,
22
23
    )
23
 
from bzrlib.tests.scenarios import (
24
 
    load_tests_apply_scenarios,
25
 
    )
26
 
 
27
 
 
28
 
def module_scenarios():
 
24
 
 
25
 
 
26
def load_tests(standard_tests, module, loader):
 
27
    """Parameterize tests for all versions of groupcompress."""
 
28
    two_way_scenarios = [
 
29
        ('PP', {'make_delta': _groupcompress_py.make_delta,
 
30
                'apply_delta': _groupcompress_py.apply_delta})
 
31
        ]
29
32
    scenarios = [
30
33
        ('python', {'_gc_module': _groupcompress_py}),
31
34
        ]
33
36
        gc_module = compiled_groupcompress_feature.module
34
37
        scenarios.append(('C',
35
38
            {'_gc_module': gc_module}))
36
 
    return scenarios
37
 
 
38
 
 
39
 
def two_way_scenarios():
40
 
    scenarios = [
41
 
        ('PP', {'make_delta': _groupcompress_py.make_delta,
42
 
                'apply_delta': _groupcompress_py.apply_delta})
43
 
        ]
44
 
    if compiled_groupcompress_feature.available():
45
 
        gc_module = compiled_groupcompress_feature.module
46
 
        scenarios.extend([
 
39
        two_way_scenarios.extend([
47
40
            ('CC', {'make_delta': gc_module.make_delta,
48
41
                    'apply_delta': gc_module.apply_delta}),
49
42
            ('PC', {'make_delta': _groupcompress_py.make_delta,
51
44
            ('CP', {'make_delta': gc_module.make_delta,
52
45
                    'apply_delta': _groupcompress_py.apply_delta}),
53
46
            ])
54
 
    return scenarios
55
 
 
56
 
 
57
 
load_tests = load_tests_apply_scenarios
 
47
    to_adapt, result = tests.split_suite_by_condition(
 
48
        standard_tests, tests.condition_isinstance((TestMakeAndApplyDelta,
 
49
                                                    TestBase128Int)))
 
50
    result = tests.multiply_tests(to_adapt, scenarios, result)
 
51
    to_adapt, result = tests.split_suite_by_condition(result,
 
52
        tests.condition_isinstance(TestMakeAndApplyCompatible))
 
53
    result = tests.multiply_tests(to_adapt, two_way_scenarios, result)
 
54
    return result
58
55
 
59
56
 
60
57
compiled_groupcompress_feature = tests.ModuleAvailableFeature(
119
116
 
120
117
class TestMakeAndApplyDelta(tests.TestCase):
121
118
 
122
 
    scenarios = module_scenarios()
123
119
    _gc_module = None # Set by load_tests
124
120
 
125
121
    def setUp(self):
230
226
 
231
227
class TestMakeAndApplyCompatible(tests.TestCase):
232
228
 
233
 
    scenarios = two_way_scenarios()
234
 
 
235
229
    make_delta = None # Set by load_tests
236
230
    apply_delta = None # Set by load_tests
237
231
 
264
258
        di = self._gc_module.DeltaIndex('test text\n')
265
259
        self.assertEqual('DeltaIndex(1, 10)', repr(di))
266
260
 
267
 
    def test__dump_no_index(self):
268
 
        di = self._gc_module.DeltaIndex()
269
 
        self.assertEqual(None, di._dump_index())
270
 
 
271
 
    def test__dump_index_simple(self):
272
 
        di = self._gc_module.DeltaIndex()
273
 
        di.add_source(_text1, 0)
274
 
        self.assertFalse(di._has_index())
275
 
        self.assertEqual(None, di._dump_index())
276
 
        _ = di.make_delta(_text1)
277
 
        self.assertTrue(di._has_index())
278
 
        hash_list, entry_list = di._dump_index()
279
 
        self.assertEqual(16, len(hash_list))
280
 
        self.assertEqual(68, len(entry_list))
281
 
        just_entries = [(idx, text_offset, hash_val)
282
 
                        for idx, (text_offset, hash_val)
283
 
                         in enumerate(entry_list)
284
 
                         if text_offset != 0 or hash_val != 0]
285
 
        rabin_hash = self._gc_module._rabin_hash
286
 
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
287
 
                          (25, 48, rabin_hash(_text1[33:49])),
288
 
                          (34, 32, rabin_hash(_text1[17:33])),
289
 
                          (47, 64, rabin_hash(_text1[49:65])),
290
 
                         ], just_entries)
291
 
        # This ensures that the hash map points to the location we expect it to
292
 
        for entry_idx, text_offset, hash_val in just_entries:
293
 
            self.assertEqual(entry_idx, hash_list[hash_val & 0xf])
294
 
 
295
 
    def test__dump_index_two_sources(self):
296
 
        di = self._gc_module.DeltaIndex()
297
 
        di.add_source(_text1, 0)
298
 
        di.add_source(_text2, 2)
299
 
        start2 = len(_text1) + 2
300
 
        self.assertTrue(di._has_index())
301
 
        hash_list, entry_list = di._dump_index()
302
 
        self.assertEqual(16, len(hash_list))
303
 
        self.assertEqual(68, len(entry_list))
304
 
        just_entries = [(idx, text_offset, hash_val)
305
 
                        for idx, (text_offset, hash_val)
306
 
                         in enumerate(entry_list)
307
 
                         if text_offset != 0 or hash_val != 0]
308
 
        rabin_hash = self._gc_module._rabin_hash
309
 
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
310
 
                          (9, start2+16, rabin_hash(_text2[1:17])),
311
 
                          (25, 48, rabin_hash(_text1[33:49])),
312
 
                          (30, start2+64, rabin_hash(_text2[49:65])),
313
 
                          (34, 32, rabin_hash(_text1[17:33])),
314
 
                          (35, start2+32, rabin_hash(_text2[17:33])),
315
 
                          (43, start2+48, rabin_hash(_text2[33:49])),
316
 
                          (47, 64, rabin_hash(_text1[49:65])),
317
 
                         ], just_entries)
318
 
        # Each entry should be in the appropriate hash bucket.
319
 
        for entry_idx, text_offset, hash_val in just_entries:
320
 
            hash_idx = hash_val & 0xf
321
 
            self.assertTrue(
322
 
                hash_list[hash_idx] <= entry_idx < hash_list[hash_idx+1])
323
 
 
324
261
    def test_first_add_source_doesnt_index_until_make_delta(self):
325
262
        di = self._gc_module.DeltaIndex()
326
263
        self.assertFalse(di._has_index())
332
269
        self.assertTrue(di._has_index())
333
270
        self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
334
271
 
335
 
    def test_add_source_max_bytes_to_index(self):
336
 
        di = self._gc_module.DeltaIndex()
337
 
        di._max_bytes_to_index = 3*16
338
 
        di.add_source(_text1, 0) # (77 bytes -1) // 3 = 25 byte stride
339
 
        di.add_source(_text3, 3) # (135 bytes -1) // 3 = 44 byte stride
340
 
        start2 = len(_text1) + 3
341
 
        hash_list, entry_list = di._dump_index()
342
 
        self.assertEqual(16, len(hash_list))
343
 
        self.assertEqual(67, len(entry_list))
344
 
        just_entries = sorted([(text_offset, hash_val)
345
 
                               for text_offset, hash_val in entry_list
346
 
                                if text_offset != 0 or hash_val != 0])
347
 
        rabin_hash = self._gc_module._rabin_hash
348
 
        self.assertEqual([(25, rabin_hash(_text1[10:26])),
349
 
                          (50, rabin_hash(_text1[35:51])),
350
 
                          (75, rabin_hash(_text1[60:76])),
351
 
                          (start2+44, rabin_hash(_text3[29:45])),
352
 
                          (start2+88, rabin_hash(_text3[73:89])),
353
 
                          (start2+132, rabin_hash(_text3[117:133])),
354
 
                         ], just_entries)
355
 
 
356
272
    def test_second_add_source_triggers_make_index(self):
357
273
        di = self._gc_module.DeltaIndex()
358
274
        self.assertFalse(di._has_index())
541
457
 
542
458
class TestBase128Int(tests.TestCase):
543
459
 
544
 
    scenarios = module_scenarios()
545
 
 
546
460
    _gc_module = None # Set by load_tests
547
461
 
548
462
    def assertEqualEncode(self, bytes, val):