~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test__groupcompress.py

  • Committer: Jelmer Vernooij
  • Date: 2011-12-16 16:40:10 UTC
  • mto: This revision was merged to the branch mainline in revision 6391.
  • Revision ID: jelmer@samba.org-20111216164010-z3hy00xrnclnkf7a
Update tests.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008, 2009, 2010 Canonical Ltd
 
1
# Copyright (C) 2008-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
17
17
"""Tests for the python and pyrex extensions of groupcompress"""
18
18
 
19
19
from bzrlib import (
20
 
    groupcompress,
21
20
    _groupcompress_py,
22
21
    tests,
23
22
    )
24
 
 
25
 
 
26
 
def load_tests(standard_tests, module, loader):
27
 
    """Parameterize tests for all versions of groupcompress."""
28
 
    two_way_scenarios = [
29
 
        ('PP', {'make_delta': _groupcompress_py.make_delta,
30
 
                'apply_delta': _groupcompress_py.apply_delta})
31
 
        ]
 
23
from bzrlib.tests.scenarios import (
 
24
    load_tests_apply_scenarios,
 
25
    )
 
26
from bzrlib.tests import (
 
27
    features,
 
28
    )
 
29
 
 
30
 
 
31
def module_scenarios():
32
32
    scenarios = [
33
33
        ('python', {'_gc_module': _groupcompress_py}),
34
34
        ]
36
36
        gc_module = compiled_groupcompress_feature.module
37
37
        scenarios.append(('C',
38
38
            {'_gc_module': gc_module}))
39
 
        two_way_scenarios.extend([
 
39
    return scenarios
 
40
 
 
41
 
 
42
def two_way_scenarios():
 
43
    scenarios = [
 
44
        ('PP', {'make_delta': _groupcompress_py.make_delta,
 
45
                'apply_delta': _groupcompress_py.apply_delta})
 
46
        ]
 
47
    if compiled_groupcompress_feature.available():
 
48
        gc_module = compiled_groupcompress_feature.module
 
49
        scenarios.extend([
40
50
            ('CC', {'make_delta': gc_module.make_delta,
41
51
                    'apply_delta': gc_module.apply_delta}),
42
52
            ('PC', {'make_delta': _groupcompress_py.make_delta,
44
54
            ('CP', {'make_delta': gc_module.make_delta,
45
55
                    'apply_delta': _groupcompress_py.apply_delta}),
46
56
            ])
47
 
    to_adapt, result = tests.split_suite_by_condition(
48
 
        standard_tests, tests.condition_isinstance((TestMakeAndApplyDelta,
49
 
                                                    TestBase128Int)))
50
 
    result = tests.multiply_tests(to_adapt, scenarios, result)
51
 
    to_adapt, result = tests.split_suite_by_condition(result,
52
 
        tests.condition_isinstance(TestMakeAndApplyCompatible))
53
 
    result = tests.multiply_tests(to_adapt, two_way_scenarios, result)
54
 
    return result
55
 
 
56
 
 
57
 
compiled_groupcompress_feature = tests.ModuleAvailableFeature(
58
 
                                    'bzrlib._groupcompress_pyx')
 
57
    return scenarios
 
58
 
 
59
 
 
60
load_tests = load_tests_apply_scenarios
 
61
 
 
62
 
 
63
compiled_groupcompress_feature = features.ModuleAvailableFeature(
 
64
    'bzrlib._groupcompress_pyx')
59
65
 
60
66
_text1 = """\
61
67
This is a bit
116
122
 
117
123
class TestMakeAndApplyDelta(tests.TestCase):
118
124
 
 
125
    scenarios = module_scenarios()
119
126
    _gc_module = None # Set by load_tests
120
127
 
121
128
    def setUp(self):
226
233
 
227
234
class TestMakeAndApplyCompatible(tests.TestCase):
228
235
 
 
236
    scenarios = two_way_scenarios()
 
237
 
229
238
    make_delta = None # Set by load_tests
230
239
    apply_delta = None # Set by load_tests
231
240
 
258
267
        di = self._gc_module.DeltaIndex('test text\n')
259
268
        self.assertEqual('DeltaIndex(1, 10)', repr(di))
260
269
 
 
270
    def test__dump_no_index(self):
 
271
        di = self._gc_module.DeltaIndex()
 
272
        self.assertEqual(None, di._dump_index())
 
273
 
 
274
    def test__dump_index_simple(self):
 
275
        di = self._gc_module.DeltaIndex()
 
276
        di.add_source(_text1, 0)
 
277
        self.assertFalse(di._has_index())
 
278
        self.assertEqual(None, di._dump_index())
 
279
        _ = di.make_delta(_text1)
 
280
        self.assertTrue(di._has_index())
 
281
        hash_list, entry_list = di._dump_index()
 
282
        self.assertEqual(16, len(hash_list))
 
283
        self.assertEqual(68, len(entry_list))
 
284
        just_entries = [(idx, text_offset, hash_val)
 
285
                        for idx, (text_offset, hash_val)
 
286
                         in enumerate(entry_list)
 
287
                         if text_offset != 0 or hash_val != 0]
 
288
        rabin_hash = self._gc_module._rabin_hash
 
289
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
 
290
                          (25, 48, rabin_hash(_text1[33:49])),
 
291
                          (34, 32, rabin_hash(_text1[17:33])),
 
292
                          (47, 64, rabin_hash(_text1[49:65])),
 
293
                         ], just_entries)
 
294
        # This ensures that the hash map points to the location we expect it to
 
295
        for entry_idx, text_offset, hash_val in just_entries:
 
296
            self.assertEqual(entry_idx, hash_list[hash_val & 0xf])
 
297
 
 
298
    def test__dump_index_two_sources(self):
 
299
        di = self._gc_module.DeltaIndex()
 
300
        di.add_source(_text1, 0)
 
301
        di.add_source(_text2, 2)
 
302
        start2 = len(_text1) + 2
 
303
        self.assertTrue(di._has_index())
 
304
        hash_list, entry_list = di._dump_index()
 
305
        self.assertEqual(16, len(hash_list))
 
306
        self.assertEqual(68, len(entry_list))
 
307
        just_entries = [(idx, text_offset, hash_val)
 
308
                        for idx, (text_offset, hash_val)
 
309
                         in enumerate(entry_list)
 
310
                         if text_offset != 0 or hash_val != 0]
 
311
        rabin_hash = self._gc_module._rabin_hash
 
312
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
 
313
                          (9, start2+16, rabin_hash(_text2[1:17])),
 
314
                          (25, 48, rabin_hash(_text1[33:49])),
 
315
                          (30, start2+64, rabin_hash(_text2[49:65])),
 
316
                          (34, 32, rabin_hash(_text1[17:33])),
 
317
                          (35, start2+32, rabin_hash(_text2[17:33])),
 
318
                          (43, start2+48, rabin_hash(_text2[33:49])),
 
319
                          (47, 64, rabin_hash(_text1[49:65])),
 
320
                         ], just_entries)
 
321
        # Each entry should be in the appropriate hash bucket.
 
322
        for entry_idx, text_offset, hash_val in just_entries:
 
323
            hash_idx = hash_val & 0xf
 
324
            self.assertTrue(
 
325
                hash_list[hash_idx] <= entry_idx < hash_list[hash_idx+1])
 
326
 
261
327
    def test_first_add_source_doesnt_index_until_make_delta(self):
262
328
        di = self._gc_module.DeltaIndex()
263
329
        self.assertFalse(di._has_index())
269
335
        self.assertTrue(di._has_index())
270
336
        self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
271
337
 
 
338
    def test_add_source_max_bytes_to_index(self):
 
339
        di = self._gc_module.DeltaIndex()
 
340
        di._max_bytes_to_index = 3*16
 
341
        di.add_source(_text1, 0) # (77 bytes -1) // 3 = 25 byte stride
 
342
        di.add_source(_text3, 3) # (135 bytes -1) // 3 = 44 byte stride
 
343
        start2 = len(_text1) + 3
 
344
        hash_list, entry_list = di._dump_index()
 
345
        self.assertEqual(16, len(hash_list))
 
346
        self.assertEqual(67, len(entry_list))
 
347
        just_entries = sorted([(text_offset, hash_val)
 
348
                               for text_offset, hash_val in entry_list
 
349
                                if text_offset != 0 or hash_val != 0])
 
350
        rabin_hash = self._gc_module._rabin_hash
 
351
        self.assertEqual([(25, rabin_hash(_text1[10:26])),
 
352
                          (50, rabin_hash(_text1[35:51])),
 
353
                          (75, rabin_hash(_text1[60:76])),
 
354
                          (start2+44, rabin_hash(_text3[29:45])),
 
355
                          (start2+88, rabin_hash(_text3[73:89])),
 
356
                          (start2+132, rabin_hash(_text3[117:133])),
 
357
                         ], just_entries)
 
358
 
272
359
    def test_second_add_source_triggers_make_index(self):
273
360
        di = self._gc_module.DeltaIndex()
274
361
        self.assertFalse(di._has_index())
457
544
 
458
545
class TestBase128Int(tests.TestCase):
459
546
 
 
547
    scenarios = module_scenarios()
 
548
 
460
549
    _gc_module = None # Set by load_tests
461
550
 
462
551
    def assertEqualEncode(self, bytes, val):