1
# Copyright (C) 2008-2011 Canonical Ltd
1
# Copyright (C) 2008, 2009, 2010 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
31
31
from bzrlib.osutils import sha_string
32
32
from bzrlib.tests.test__groupcompress import compiled_groupcompress_feature
33
from bzrlib.tests.scenarios import load_tests_apply_scenarios
36
def group_compress_implementation_scenarios():
35
def load_tests(standard_tests, module, loader):
36
"""Parameterize tests for all versions of groupcompress."""
37
to_adapt, result = tests.split_suite_by_condition(
38
standard_tests, tests.condition_isinstance(TestAllGroupCompressors))
38
40
('python', {'compressor': groupcompress.PythonGroupCompressor}),
40
42
if compiled_groupcompress_feature.available():
41
43
scenarios.append(('C',
42
44
{'compressor': groupcompress.PyrexGroupCompressor}))
46
load_tests = load_tests_apply_scenarios
45
return tests.multiply_tests(to_adapt, scenarios, result)
49
48
class TestGroupCompressor(tests.TestCase):
67
66
class TestAllGroupCompressors(TestGroupCompressor):
68
67
"""Tests for GroupCompressor"""
70
scenarios = group_compress_implementation_scenarios()
71
compressor = None # Set by scenario
69
compressor = None # Set by multiply_tests
73
71
def test_empty_delta(self):
74
72
compressor = self.compressor()
349
347
self.assertEqual(z_content, block._z_content)
350
348
self.assertEqual(content, block._content)
352
def test_to_chunks(self):
353
content_chunks = ['this is some content\n',
354
'this content will be compressed\n']
355
content_len = sum(map(len, content_chunks))
356
content = ''.join(content_chunks)
357
gcb = groupcompress.GroupCompressBlock()
358
gcb.set_chunked_content(content_chunks, content_len)
359
total_len, block_chunks = gcb.to_chunks()
360
block_bytes = ''.join(block_chunks)
361
self.assertEqual(gcb._z_content_length, len(gcb._z_content))
362
self.assertEqual(total_len, len(block_bytes))
363
self.assertEqual(gcb._content_length, content_len)
364
expected_header =('gcb1z\n' # group compress block v1 zlib
365
'%d\n' # Length of compressed content
366
'%d\n' # Length of uncompressed content
367
) % (gcb._z_content_length, gcb._content_length)
368
# The first chunk should be the header chunk. It is small, fixed size,
369
# and there is no compelling reason to split it up
370
self.assertEqual(expected_header, block_chunks[0])
371
self.assertStartsWith(block_bytes, expected_header)
372
remaining_bytes = block_bytes[len(expected_header):]
373
raw_bytes = zlib.decompress(remaining_bytes)
374
self.assertEqual(content, raw_bytes)
376
350
def test_to_bytes(self):
377
351
content = ('this is some content\n'
378
352
'this content will be compressed\n')
415
389
z_content = zlib.compress(content)
416
390
self.assertEqual(57182, len(z_content))
417
391
block = groupcompress.GroupCompressBlock()
418
block._z_content_chunks = (z_content,)
392
block._z_content = z_content
419
393
block._z_content_length = len(z_content)
420
394
block._compressor_name = 'zlib'
421
395
block._content_length = 158634
460
434
z_content = zlib.compress(content)
461
435
self.assertEqual(57182, len(z_content))
462
436
block = groupcompress.GroupCompressBlock()
463
block._z_content_chunks = (z_content,)
437
block._z_content = z_content
464
438
block._z_content_length = len(z_content)
465
439
block._compressor_name = 'zlib'
466
440
block._content_length = 158634
1093
1067
self.add_key_to_manager(('key4',), locations, block, manager)
1094
1068
self.assertTrue(manager.check_is_well_utilized())
1097
class Test_GCBuildDetails(tests.TestCase):
1099
def test_acts_like_tuple(self):
1100
# _GCBuildDetails inlines some of the data that used to be spread out
1101
# across a bunch of tuples
1102
bd = groupcompress._GCBuildDetails((('parent1',), ('parent2',)),
1103
('INDEX', 10, 20, 0, 5))
1104
self.assertEqual(4, len(bd))
1105
self.assertEqual(('INDEX', 10, 20, 0, 5), bd[0])
1106
self.assertEqual(None, bd[1]) # Compression Parent is always None
1107
self.assertEqual((('parent1',), ('parent2',)), bd[2])
1108
self.assertEqual(('group', None), bd[3]) # Record details
1110
def test__repr__(self):
1111
bd = groupcompress._GCBuildDetails((('parent1',), ('parent2',)),
1112
('INDEX', 10, 20, 0, 5))
1113
self.assertEqual("_GCBuildDetails(('INDEX', 10, 20, 0, 5),"
1114
" (('parent1',), ('parent2',)))",