~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test__groupcompress.py

  • Committer: Patch Queue Manager
  • Date: 2015-10-05 13:45:00 UTC
  • mfrom: (6603.3.1 bts794146)
  • Revision ID: pqm@pqm.ubuntu.com-20151005134500-v244rho557tv0ukd
(vila) Resolve Bug #1480015: Test failure: hexify removed from paramiko
 (Andrew Starr-Bochicchio) (Andrew Starr-Bochicchio)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008, 2009 Canonical Ltd
 
1
# Copyright (C) 2008-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
17
17
"""Tests for the python and pyrex extensions of groupcompress"""
18
18
 
19
19
from bzrlib import (
20
 
    groupcompress,
21
20
    _groupcompress_py,
22
21
    tests,
23
22
    )
24
 
 
25
 
 
26
 
def load_tests(standard_tests, module, loader):
27
 
    """Parameterize tests for all versions of groupcompress."""
28
 
    two_way_scenarios = [
 
23
from bzrlib.tests.scenarios import (
 
24
    load_tests_apply_scenarios,
 
25
    )
 
26
from bzrlib.tests import (
 
27
    features,
 
28
    )
 
29
 
 
30
 
 
31
def module_scenarios():
 
32
    scenarios = [
 
33
        ('python', {'_gc_module': _groupcompress_py}),
 
34
        ]
 
35
    if compiled_groupcompress_feature.available():
 
36
        gc_module = compiled_groupcompress_feature.module
 
37
        scenarios.append(('C',
 
38
            {'_gc_module': gc_module}))
 
39
    return scenarios
 
40
 
 
41
 
 
42
def two_way_scenarios():
 
43
    scenarios = [
29
44
        ('PP', {'make_delta': _groupcompress_py.make_delta,
30
45
                'apply_delta': _groupcompress_py.apply_delta})
31
46
        ]
32
 
    scenarios = [
33
 
        ('python', {'_gc_module': _groupcompress_py}),
34
 
        ]
35
 
    if CompiledGroupCompressFeature.available():
36
 
        from bzrlib import _groupcompress_pyx
37
 
        scenarios.append(('C',
38
 
            {'_gc_module': _groupcompress_pyx}))
39
 
        two_way_scenarios.extend([
40
 
            ('CC', {'make_delta': _groupcompress_pyx.make_delta,
41
 
                    'apply_delta': _groupcompress_pyx.apply_delta}),
 
47
    if compiled_groupcompress_feature.available():
 
48
        gc_module = compiled_groupcompress_feature.module
 
49
        scenarios.extend([
 
50
            ('CC', {'make_delta': gc_module.make_delta,
 
51
                    'apply_delta': gc_module.apply_delta}),
42
52
            ('PC', {'make_delta': _groupcompress_py.make_delta,
43
 
                    'apply_delta': _groupcompress_pyx.apply_delta}),
44
 
            ('CP', {'make_delta': _groupcompress_pyx.make_delta,
 
53
                    'apply_delta': gc_module.apply_delta}),
 
54
            ('CP', {'make_delta': gc_module.make_delta,
45
55
                    'apply_delta': _groupcompress_py.apply_delta}),
46
56
            ])
47
 
    to_adapt, result = tests.split_suite_by_condition(
48
 
        standard_tests, tests.condition_isinstance((TestMakeAndApplyDelta,
49
 
                                                    TestBase128Int)))
50
 
    result = tests.multiply_tests(to_adapt, scenarios, result)
51
 
    to_adapt, result = tests.split_suite_by_condition(result,
52
 
        tests.condition_isinstance(TestMakeAndApplyCompatible))
53
 
    result = tests.multiply_tests(to_adapt, two_way_scenarios, result)
54
 
    return result
55
 
 
56
 
 
57
 
class _CompiledGroupCompressFeature(tests.Feature):
58
 
 
59
 
    def _probe(self):
60
 
        try:
61
 
            import bzrlib._groupcompress_pyx
62
 
        except ImportError:
63
 
            return False
64
 
        else:
65
 
            return True
66
 
 
67
 
    def feature_name(self):
68
 
        return 'bzrlib._groupcompress_pyx'
69
 
 
70
 
 
71
 
CompiledGroupCompressFeature = _CompiledGroupCompressFeature()
 
57
    return scenarios
 
58
 
 
59
 
 
60
load_tests = load_tests_apply_scenarios
 
61
 
 
62
 
 
63
compiled_groupcompress_feature = features.ModuleAvailableFeature(
 
64
    'bzrlib._groupcompress_pyx')
72
65
 
73
66
_text1 = """\
74
67
This is a bit
129
122
 
130
123
class TestMakeAndApplyDelta(tests.TestCase):
131
124
 
 
125
    scenarios = module_scenarios()
132
126
    _gc_module = None # Set by load_tests
133
127
 
134
128
    def setUp(self):
186
180
            'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
187
181
            delta)
188
182
 
 
183
    def test_make_delta_with_large_copies(self):
 
184
        # We want to have a copy that is larger than 64kB, which forces us to
 
185
        # issue multiple copy instructions.
 
186
        big_text = _text3 * 1220
 
187
        delta = self.make_delta(big_text, big_text)
 
188
        self.assertDeltaIn(
 
189
            '\xdc\x86\x0a'      # Encoding the length of the uncompressed text
 
190
            '\x80'              # Copy 64kB, starting at byte 0
 
191
            '\x84\x01'          # and another 64kB starting at 64kB
 
192
            '\xb4\x02\x5c\x83', # And the bit of tail.
 
193
            None,   # Both implementations should be identical
 
194
            delta)
 
195
 
189
196
    def test_apply_delta_is_typesafe(self):
190
197
        self.apply_delta(_text1, 'M\x90M')
191
198
        self.assertRaises(TypeError, self.apply_delta, object(), 'M\x90M')
226
233
 
227
234
class TestMakeAndApplyCompatible(tests.TestCase):
228
235
 
 
236
    scenarios = two_way_scenarios()
 
237
 
229
238
    make_delta = None # Set by load_tests
230
239
    apply_delta = None # Set by load_tests
231
240
 
251
260
        # This test isn't multiplied, because we only have DeltaIndex for the
252
261
        # compiled form
253
262
        # We call this here, because _test_needs_features happens after setUp
254
 
        self.requireFeature(CompiledGroupCompressFeature)
255
 
        from bzrlib import _groupcompress_pyx
256
 
        self._gc_module = _groupcompress_pyx
 
263
        self.requireFeature(compiled_groupcompress_feature)
 
264
        self._gc_module = compiled_groupcompress_feature.module
257
265
 
258
266
    def test_repr(self):
259
267
        di = self._gc_module.DeltaIndex('test text\n')
260
268
        self.assertEqual('DeltaIndex(1, 10)', repr(di))
261
269
 
 
270
    def test__dump_no_index(self):
 
271
        di = self._gc_module.DeltaIndex()
 
272
        self.assertEqual(None, di._dump_index())
 
273
 
 
274
    def test__dump_index_simple(self):
 
275
        di = self._gc_module.DeltaIndex()
 
276
        di.add_source(_text1, 0)
 
277
        self.assertFalse(di._has_index())
 
278
        self.assertEqual(None, di._dump_index())
 
279
        _ = di.make_delta(_text1)
 
280
        self.assertTrue(di._has_index())
 
281
        hash_list, entry_list = di._dump_index()
 
282
        self.assertEqual(16, len(hash_list))
 
283
        self.assertEqual(68, len(entry_list))
 
284
        just_entries = [(idx, text_offset, hash_val)
 
285
                        for idx, (text_offset, hash_val)
 
286
                         in enumerate(entry_list)
 
287
                         if text_offset != 0 or hash_val != 0]
 
288
        rabin_hash = self._gc_module._rabin_hash
 
289
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
 
290
                          (25, 48, rabin_hash(_text1[33:49])),
 
291
                          (34, 32, rabin_hash(_text1[17:33])),
 
292
                          (47, 64, rabin_hash(_text1[49:65])),
 
293
                         ], just_entries)
 
294
        # This ensures that the hash map points to the location we expect it to
 
295
        for entry_idx, text_offset, hash_val in just_entries:
 
296
            self.assertEqual(entry_idx, hash_list[hash_val & 0xf])
 
297
 
 
298
    def test__dump_index_two_sources(self):
 
299
        di = self._gc_module.DeltaIndex()
 
300
        di.add_source(_text1, 0)
 
301
        di.add_source(_text2, 2)
 
302
        start2 = len(_text1) + 2
 
303
        self.assertTrue(di._has_index())
 
304
        hash_list, entry_list = di._dump_index()
 
305
        self.assertEqual(16, len(hash_list))
 
306
        self.assertEqual(68, len(entry_list))
 
307
        just_entries = [(idx, text_offset, hash_val)
 
308
                        for idx, (text_offset, hash_val)
 
309
                         in enumerate(entry_list)
 
310
                         if text_offset != 0 or hash_val != 0]
 
311
        rabin_hash = self._gc_module._rabin_hash
 
312
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
 
313
                          (9, start2+16, rabin_hash(_text2[1:17])),
 
314
                          (25, 48, rabin_hash(_text1[33:49])),
 
315
                          (30, start2+64, rabin_hash(_text2[49:65])),
 
316
                          (34, 32, rabin_hash(_text1[17:33])),
 
317
                          (35, start2+32, rabin_hash(_text2[17:33])),
 
318
                          (43, start2+48, rabin_hash(_text2[33:49])),
 
319
                          (47, 64, rabin_hash(_text1[49:65])),
 
320
                         ], just_entries)
 
321
        # Each entry should be in the appropriate hash bucket.
 
322
        for entry_idx, text_offset, hash_val in just_entries:
 
323
            hash_idx = hash_val & 0xf
 
324
            self.assertTrue(
 
325
                hash_list[hash_idx] <= entry_idx < hash_list[hash_idx+1])
 
326
 
 
327
    def test_first_add_source_doesnt_index_until_make_delta(self):
 
328
        di = self._gc_module.DeltaIndex()
 
329
        self.assertFalse(di._has_index())
 
330
        di.add_source(_text1, 0)
 
331
        self.assertFalse(di._has_index())
 
332
        # However, asking to make a delta will trigger the index to be
 
333
        # generated, and will generate a proper delta
 
334
        delta = di.make_delta(_text2)
 
335
        self.assertTrue(di._has_index())
 
336
        self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
 
337
 
 
338
    def test_add_source_max_bytes_to_index(self):
 
339
        di = self._gc_module.DeltaIndex()
 
340
        di._max_bytes_to_index = 3*16
 
341
        di.add_source(_text1, 0) # (77 bytes -1) // 3 = 25 byte stride
 
342
        di.add_source(_text3, 3) # (135 bytes -1) // 3 = 44 byte stride
 
343
        start2 = len(_text1) + 3
 
344
        hash_list, entry_list = di._dump_index()
 
345
        self.assertEqual(16, len(hash_list))
 
346
        self.assertEqual(67, len(entry_list))
 
347
        just_entries = sorted([(text_offset, hash_val)
 
348
                               for text_offset, hash_val in entry_list
 
349
                                if text_offset != 0 or hash_val != 0])
 
350
        rabin_hash = self._gc_module._rabin_hash
 
351
        self.assertEqual([(25, rabin_hash(_text1[10:26])),
 
352
                          (50, rabin_hash(_text1[35:51])),
 
353
                          (75, rabin_hash(_text1[60:76])),
 
354
                          (start2+44, rabin_hash(_text3[29:45])),
 
355
                          (start2+88, rabin_hash(_text3[73:89])),
 
356
                          (start2+132, rabin_hash(_text3[117:133])),
 
357
                         ], just_entries)
 
358
 
 
359
    def test_second_add_source_triggers_make_index(self):
 
360
        di = self._gc_module.DeltaIndex()
 
361
        self.assertFalse(di._has_index())
 
362
        di.add_source(_text1, 0)
 
363
        self.assertFalse(di._has_index())
 
364
        di.add_source(_text2, 0)
 
365
        self.assertTrue(di._has_index())
 
366
 
262
367
    def test_make_delta(self):
263
368
        di = self._gc_module.DeltaIndex(_text1)
264
369
        delta = di.make_delta(_text2)
358
463
        self.assertEqual((exp_offset, exp_length, exp_newpos), out)
359
464
 
360
465
    def test_encode_no_length(self):
361
 
        self.assertEncode('\x80', 0, None)
362
 
        self.assertEncode('\x81\x01', 1, None)
363
 
        self.assertEncode('\x81\x0a', 10, None)
364
 
        self.assertEncode('\x81\xff', 255, None)
365
 
        self.assertEncode('\x82\x01', 256, None)
366
 
        self.assertEncode('\x83\x01\x01', 257, None)
367
 
        self.assertEncode('\x8F\xff\xff\xff\xff', 0xFFFFFFFF, None)
368
 
        self.assertEncode('\x8E\xff\xff\xff', 0xFFFFFF00, None)
369
 
        self.assertEncode('\x8D\xff\xff\xff', 0xFFFF00FF, None)
370
 
        self.assertEncode('\x8B\xff\xff\xff', 0xFF00FFFF, None)
371
 
        self.assertEncode('\x87\xff\xff\xff', 0x00FFFFFF, None)
372
 
        self.assertEncode('\x8F\x04\x03\x02\x01', 0x01020304, None)
 
466
        self.assertEncode('\x80', 0, 64*1024)
 
467
        self.assertEncode('\x81\x01', 1, 64*1024)
 
468
        self.assertEncode('\x81\x0a', 10, 64*1024)
 
469
        self.assertEncode('\x81\xff', 255, 64*1024)
 
470
        self.assertEncode('\x82\x01', 256, 64*1024)
 
471
        self.assertEncode('\x83\x01\x01', 257, 64*1024)
 
472
        self.assertEncode('\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64*1024)
 
473
        self.assertEncode('\x8E\xff\xff\xff', 0xFFFFFF00, 64*1024)
 
474
        self.assertEncode('\x8D\xff\xff\xff', 0xFFFF00FF, 64*1024)
 
475
        self.assertEncode('\x8B\xff\xff\xff', 0xFF00FFFF, 64*1024)
 
476
        self.assertEncode('\x87\xff\xff\xff', 0x00FFFFFF, 64*1024)
 
477
        self.assertEncode('\x8F\x04\x03\x02\x01', 0x01020304, 64*1024)
373
478
 
374
479
    def test_encode_no_offset(self):
375
480
        self.assertEncode('\x90\x01', 0, 1)
439
544
 
440
545
class TestBase128Int(tests.TestCase):
441
546
 
 
547
    scenarios = module_scenarios()
 
548
 
442
549
    _gc_module = None # Set by load_tests
443
550
 
444
551
    def assertEqualEncode(self, bytes, val):