~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_groupcompress.py

  • Committer: Vincent Ladeuil
  • Date: 2010-06-17 07:11:55 UTC
  • mto: (5299.1.1 integration)
  • mto: This revision was merged to the branch mainline in revision 5302.
  • Revision ID: v.ladeuil+lp@free.fr-20100617071155-by34vqnqqlvzuwt1
Fix typo.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008-2011 Canonical Ltd
 
1
# Copyright (C) 2008, 2009, 2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
30
30
    )
31
31
from bzrlib.osutils import sha_string
32
32
from bzrlib.tests.test__groupcompress import compiled_groupcompress_feature
33
 
from bzrlib.tests.scenarios import load_tests_apply_scenarios
34
 
 
35
 
 
36
 
def group_compress_implementation_scenarios():
 
33
 
 
34
 
 
35
def load_tests(standard_tests, module, loader):
 
36
    """Parameterize tests for all versions of groupcompress."""
 
37
    to_adapt, result = tests.split_suite_by_condition(
 
38
        standard_tests, tests.condition_isinstance(TestAllGroupCompressors))
37
39
    scenarios = [
38
40
        ('python', {'compressor': groupcompress.PythonGroupCompressor}),
39
41
        ]
40
42
    if compiled_groupcompress_feature.available():
41
43
        scenarios.append(('C',
42
44
            {'compressor': groupcompress.PyrexGroupCompressor}))
43
 
    return scenarios
44
 
 
45
 
 
46
 
load_tests = load_tests_apply_scenarios
 
45
    return tests.multiply_tests(to_adapt, scenarios, result)
47
46
 
48
47
 
49
48
class TestGroupCompressor(tests.TestCase):
67
66
class TestAllGroupCompressors(TestGroupCompressor):
68
67
    """Tests for GroupCompressor"""
69
68
 
70
 
    scenarios = group_compress_implementation_scenarios()
71
 
    compressor = None # Set by scenario
 
69
    compressor = None # Set by multiply_tests
72
70
 
73
71
    def test_empty_delta(self):
74
72
        compressor = self.compressor()
349
347
        self.assertEqual(z_content, block._z_content)
350
348
        self.assertEqual(content, block._content)
351
349
 
352
 
    def test_to_chunks(self):
353
 
        content_chunks = ['this is some content\n',
354
 
                          'this content will be compressed\n']
355
 
        content_len = sum(map(len, content_chunks))
356
 
        content = ''.join(content_chunks)
357
 
        gcb = groupcompress.GroupCompressBlock()
358
 
        gcb.set_chunked_content(content_chunks, content_len)
359
 
        total_len, block_chunks = gcb.to_chunks()
360
 
        block_bytes = ''.join(block_chunks)
361
 
        self.assertEqual(gcb._z_content_length, len(gcb._z_content))
362
 
        self.assertEqual(total_len, len(block_bytes))
363
 
        self.assertEqual(gcb._content_length, content_len)
364
 
        expected_header =('gcb1z\n' # group compress block v1 zlib
365
 
                          '%d\n' # Length of compressed content
366
 
                          '%d\n' # Length of uncompressed content
367
 
                         ) % (gcb._z_content_length, gcb._content_length)
368
 
        # The first chunk should be the header chunk. It is small, fixed size,
369
 
        # and there is no compelling reason to split it up
370
 
        self.assertEqual(expected_header, block_chunks[0])
371
 
        self.assertStartsWith(block_bytes, expected_header)
372
 
        remaining_bytes = block_bytes[len(expected_header):]
373
 
        raw_bytes = zlib.decompress(remaining_bytes)
374
 
        self.assertEqual(content, raw_bytes)
375
 
 
376
350
    def test_to_bytes(self):
377
351
        content = ('this is some content\n'
378
352
                   'this content will be compressed\n')
415
389
        z_content = zlib.compress(content)
416
390
        self.assertEqual(57182, len(z_content))
417
391
        block = groupcompress.GroupCompressBlock()
418
 
        block._z_content_chunks = (z_content,)
 
392
        block._z_content = z_content
419
393
        block._z_content_length = len(z_content)
420
394
        block._compressor_name = 'zlib'
421
395
        block._content_length = 158634
460
434
        z_content = zlib.compress(content)
461
435
        self.assertEqual(57182, len(z_content))
462
436
        block = groupcompress.GroupCompressBlock()
463
 
        block._z_content_chunks = (z_content,)
 
437
        block._z_content = z_content
464
438
        block._z_content_length = len(z_content)
465
439
        block._compressor_name = 'zlib'
466
440
        block._content_length = 158634
1092
1066
        # consumption
1093
1067
        self.add_key_to_manager(('key4',), locations, block, manager)
1094
1068
        self.assertTrue(manager.check_is_well_utilized())
1095
 
 
1096
 
 
1097
 
class Test_GCBuildDetails(tests.TestCase):
1098
 
 
1099
 
    def test_acts_like_tuple(self):
1100
 
        # _GCBuildDetails inlines some of the data that used to be spread out
1101
 
        # across a bunch of tuples
1102
 
        bd = groupcompress._GCBuildDetails((('parent1',), ('parent2',)),
1103
 
            ('INDEX', 10, 20, 0, 5))
1104
 
        self.assertEqual(4, len(bd))
1105
 
        self.assertEqual(('INDEX', 10, 20, 0, 5), bd[0])
1106
 
        self.assertEqual(None, bd[1]) # Compression Parent is always None
1107
 
        self.assertEqual((('parent1',), ('parent2',)), bd[2])
1108
 
        self.assertEqual(('group', None), bd[3]) # Record details
1109
 
 
1110
 
    def test__repr__(self):
1111
 
        bd = groupcompress._GCBuildDetails((('parent1',), ('parent2',)),
1112
 
            ('INDEX', 10, 20, 0, 5))
1113
 
        self.assertEqual("_GCBuildDetails(('INDEX', 10, 20, 0, 5),"
1114
 
                         " (('parent1',), ('parent2',)))",
1115
 
                         repr(bd))
1116