~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test__groupcompress.py

  • Committer: Aaron Bentley
  • Date: 2009-06-19 21:16:31 UTC
  • mto: This revision was merged to the branch mainline in revision 4481.
  • Revision ID: aaron@aaronbentley.com-20090619211631-4fnkv2uui98xj7ux
Provide control over switch and shelver messaging.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008-2011 Canonical Ltd
 
1
# Copyright (C) 2008, 2009 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
17
17
"""Tests for the python and pyrex extensions of groupcompress"""
18
18
 
19
19
from bzrlib import (
 
20
    groupcompress,
20
21
    _groupcompress_py,
21
22
    tests,
22
23
    )
23
 
from bzrlib.tests.scenarios import (
24
 
    load_tests_apply_scenarios,
25
 
    )
26
 
from bzrlib.tests import (
27
 
    features,
28
 
    )
29
 
 
30
 
 
31
 
def module_scenarios():
 
24
 
 
25
 
 
26
def load_tests(standard_tests, module, loader):
 
27
    """Parameterize tests for all versions of groupcompress."""
 
28
    two_way_scenarios = [
 
29
        ('PP', {'make_delta': _groupcompress_py.make_delta,
 
30
                'apply_delta': _groupcompress_py.apply_delta})
 
31
        ]
32
32
    scenarios = [
33
33
        ('python', {'_gc_module': _groupcompress_py}),
34
34
        ]
35
 
    if compiled_groupcompress_feature.available():
36
 
        gc_module = compiled_groupcompress_feature.module
 
35
    if CompiledGroupCompressFeature.available():
 
36
        from bzrlib import _groupcompress_pyx
37
37
        scenarios.append(('C',
38
 
            {'_gc_module': gc_module}))
39
 
    return scenarios
40
 
 
41
 
 
42
 
def two_way_scenarios():
43
 
    scenarios = [
44
 
        ('PP', {'make_delta': _groupcompress_py.make_delta,
45
 
                'apply_delta': _groupcompress_py.apply_delta})
46
 
        ]
47
 
    if compiled_groupcompress_feature.available():
48
 
        gc_module = compiled_groupcompress_feature.module
49
 
        scenarios.extend([
50
 
            ('CC', {'make_delta': gc_module.make_delta,
51
 
                    'apply_delta': gc_module.apply_delta}),
 
38
            {'_gc_module': _groupcompress_pyx}))
 
39
        two_way_scenarios.extend([
 
40
            ('CC', {'make_delta': _groupcompress_pyx.make_delta,
 
41
                    'apply_delta': _groupcompress_pyx.apply_delta}),
52
42
            ('PC', {'make_delta': _groupcompress_py.make_delta,
53
 
                    'apply_delta': gc_module.apply_delta}),
54
 
            ('CP', {'make_delta': gc_module.make_delta,
 
43
                    'apply_delta': _groupcompress_pyx.apply_delta}),
 
44
            ('CP', {'make_delta': _groupcompress_pyx.make_delta,
55
45
                    'apply_delta': _groupcompress_py.apply_delta}),
56
46
            ])
57
 
    return scenarios
58
 
 
59
 
 
60
 
load_tests = load_tests_apply_scenarios
61
 
 
62
 
 
63
 
compiled_groupcompress_feature = features.ModuleAvailableFeature(
64
 
    'bzrlib._groupcompress_pyx')
 
47
    to_adapt, result = tests.split_suite_by_condition(
 
48
        standard_tests, tests.condition_isinstance((TestMakeAndApplyDelta,
 
49
                                                    TestBase128Int)))
 
50
    result = tests.multiply_tests(to_adapt, scenarios, result)
 
51
    to_adapt, result = tests.split_suite_by_condition(result,
 
52
        tests.condition_isinstance(TestMakeAndApplyCompatible))
 
53
    result = tests.multiply_tests(to_adapt, two_way_scenarios, result)
 
54
    return result
 
55
 
 
56
 
 
57
class _CompiledGroupCompressFeature(tests.Feature):
 
58
 
 
59
    def _probe(self):
 
60
        try:
 
61
            import bzrlib._groupcompress_pyx
 
62
        except ImportError:
 
63
            return False
 
64
        else:
 
65
            return True
 
66
 
 
67
    def feature_name(self):
 
68
        return 'bzrlib._groupcompress_pyx'
 
69
 
 
70
 
 
71
CompiledGroupCompressFeature = _CompiledGroupCompressFeature()
65
72
 
66
73
_text1 = """\
67
74
This is a bit
122
129
 
123
130
class TestMakeAndApplyDelta(tests.TestCase):
124
131
 
125
 
    scenarios = module_scenarios()
126
132
    _gc_module = None # Set by load_tests
127
133
 
128
134
    def setUp(self):
233
239
 
234
240
class TestMakeAndApplyCompatible(tests.TestCase):
235
241
 
236
 
    scenarios = two_way_scenarios()
237
 
 
238
242
    make_delta = None # Set by load_tests
239
243
    apply_delta = None # Set by load_tests
240
244
 
260
264
        # This test isn't multiplied, because we only have DeltaIndex for the
261
265
        # compiled form
262
266
        # We call this here, because _test_needs_features happens after setUp
263
 
        self.requireFeature(compiled_groupcompress_feature)
264
 
        self._gc_module = compiled_groupcompress_feature.module
 
267
        self.requireFeature(CompiledGroupCompressFeature)
 
268
        from bzrlib import _groupcompress_pyx
 
269
        self._gc_module = _groupcompress_pyx
265
270
 
266
271
    def test_repr(self):
267
272
        di = self._gc_module.DeltaIndex('test text\n')
268
273
        self.assertEqual('DeltaIndex(1, 10)', repr(di))
269
274
 
270
 
    def test__dump_no_index(self):
271
 
        di = self._gc_module.DeltaIndex()
272
 
        self.assertEqual(None, di._dump_index())
273
 
 
274
 
    def test__dump_index_simple(self):
275
 
        di = self._gc_module.DeltaIndex()
276
 
        di.add_source(_text1, 0)
277
 
        self.assertFalse(di._has_index())
278
 
        self.assertEqual(None, di._dump_index())
279
 
        _ = di.make_delta(_text1)
280
 
        self.assertTrue(di._has_index())
281
 
        hash_list, entry_list = di._dump_index()
282
 
        self.assertEqual(16, len(hash_list))
283
 
        self.assertEqual(68, len(entry_list))
284
 
        just_entries = [(idx, text_offset, hash_val)
285
 
                        for idx, (text_offset, hash_val)
286
 
                         in enumerate(entry_list)
287
 
                         if text_offset != 0 or hash_val != 0]
288
 
        rabin_hash = self._gc_module._rabin_hash
289
 
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
290
 
                          (25, 48, rabin_hash(_text1[33:49])),
291
 
                          (34, 32, rabin_hash(_text1[17:33])),
292
 
                          (47, 64, rabin_hash(_text1[49:65])),
293
 
                         ], just_entries)
294
 
        # This ensures that the hash map points to the location we expect it to
295
 
        for entry_idx, text_offset, hash_val in just_entries:
296
 
            self.assertEqual(entry_idx, hash_list[hash_val & 0xf])
297
 
 
298
 
    def test__dump_index_two_sources(self):
299
 
        di = self._gc_module.DeltaIndex()
300
 
        di.add_source(_text1, 0)
301
 
        di.add_source(_text2, 2)
302
 
        start2 = len(_text1) + 2
303
 
        self.assertTrue(di._has_index())
304
 
        hash_list, entry_list = di._dump_index()
305
 
        self.assertEqual(16, len(hash_list))
306
 
        self.assertEqual(68, len(entry_list))
307
 
        just_entries = [(idx, text_offset, hash_val)
308
 
                        for idx, (text_offset, hash_val)
309
 
                         in enumerate(entry_list)
310
 
                         if text_offset != 0 or hash_val != 0]
311
 
        rabin_hash = self._gc_module._rabin_hash
312
 
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
313
 
                          (9, start2+16, rabin_hash(_text2[1:17])),
314
 
                          (25, 48, rabin_hash(_text1[33:49])),
315
 
                          (30, start2+64, rabin_hash(_text2[49:65])),
316
 
                          (34, 32, rabin_hash(_text1[17:33])),
317
 
                          (35, start2+32, rabin_hash(_text2[17:33])),
318
 
                          (43, start2+48, rabin_hash(_text2[33:49])),
319
 
                          (47, 64, rabin_hash(_text1[49:65])),
320
 
                         ], just_entries)
321
 
        # Each entry should be in the appropriate hash bucket.
322
 
        for entry_idx, text_offset, hash_val in just_entries:
323
 
            hash_idx = hash_val & 0xf
324
 
            self.assertTrue(
325
 
                hash_list[hash_idx] <= entry_idx < hash_list[hash_idx+1])
326
 
 
327
275
    def test_first_add_source_doesnt_index_until_make_delta(self):
328
276
        di = self._gc_module.DeltaIndex()
329
277
        self.assertFalse(di._has_index())
335
283
        self.assertTrue(di._has_index())
336
284
        self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
337
285
 
338
 
    def test_add_source_max_bytes_to_index(self):
339
 
        di = self._gc_module.DeltaIndex()
340
 
        di._max_bytes_to_index = 3*16
341
 
        di.add_source(_text1, 0) # (77 bytes -1) // 3 = 25 byte stride
342
 
        di.add_source(_text3, 3) # (135 bytes -1) // 3 = 44 byte stride
343
 
        start2 = len(_text1) + 3
344
 
        hash_list, entry_list = di._dump_index()
345
 
        self.assertEqual(16, len(hash_list))
346
 
        self.assertEqual(67, len(entry_list))
347
 
        just_entries = sorted([(text_offset, hash_val)
348
 
                               for text_offset, hash_val in entry_list
349
 
                                if text_offset != 0 or hash_val != 0])
350
 
        rabin_hash = self._gc_module._rabin_hash
351
 
        self.assertEqual([(25, rabin_hash(_text1[10:26])),
352
 
                          (50, rabin_hash(_text1[35:51])),
353
 
                          (75, rabin_hash(_text1[60:76])),
354
 
                          (start2+44, rabin_hash(_text3[29:45])),
355
 
                          (start2+88, rabin_hash(_text3[73:89])),
356
 
                          (start2+132, rabin_hash(_text3[117:133])),
357
 
                         ], just_entries)
358
 
 
359
286
    def test_second_add_source_triggers_make_index(self):
360
287
        di = self._gc_module.DeltaIndex()
361
288
        self.assertFalse(di._has_index())
544
471
 
545
472
class TestBase128Int(tests.TestCase):
546
473
 
547
 
    scenarios = module_scenarios()
548
 
 
549
474
    _gc_module = None # Set by load_tests
550
475
 
551
476
    def assertEqualEncode(self, bytes, val):