1
# Copyright (C) 2008, 2009, 2010 Canonical Ltd
1
# Copyright (C) 2008-2011 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
31
31
from bzrlib.osutils import sha_string
32
32
from bzrlib.tests.test__groupcompress import compiled_groupcompress_feature
35
def load_tests(standard_tests, module, loader):
36
"""Parameterize tests for all versions of groupcompress."""
37
to_adapt, result = tests.split_suite_by_condition(
38
standard_tests, tests.condition_isinstance(TestAllGroupCompressors))
33
from bzrlib.tests.scenarios import load_tests_apply_scenarios
36
def group_compress_implementation_scenarios():
40
38
('python', {'compressor': groupcompress.PythonGroupCompressor}),
42
40
if compiled_groupcompress_feature.available():
43
41
scenarios.append(('C',
44
42
{'compressor': groupcompress.PyrexGroupCompressor}))
45
return tests.multiply_tests(to_adapt, scenarios, result)
46
load_tests = load_tests_apply_scenarios
48
49
class TestGroupCompressor(tests.TestCase):
66
67
class TestAllGroupCompressors(TestGroupCompressor):
67
68
"""Tests for GroupCompressor"""
69
compressor = None # Set by multiply_tests
70
scenarios = group_compress_implementation_scenarios()
71
compressor = None # Set by scenario
71
73
def test_empty_delta(self):
72
74
compressor = self.compressor()
347
349
self.assertEqual(z_content, block._z_content)
348
350
self.assertEqual(content, block._content)
352
def test_to_chunks(self):
353
content_chunks = ['this is some content\n',
354
'this content will be compressed\n']
355
content_len = sum(map(len, content_chunks))
356
content = ''.join(content_chunks)
357
gcb = groupcompress.GroupCompressBlock()
358
gcb.set_chunked_content(content_chunks, content_len)
359
total_len, block_chunks = gcb.to_chunks()
360
block_bytes = ''.join(block_chunks)
361
self.assertEqual(gcb._z_content_length, len(gcb._z_content))
362
self.assertEqual(total_len, len(block_bytes))
363
self.assertEqual(gcb._content_length, content_len)
364
expected_header =('gcb1z\n' # group compress block v1 zlib
365
'%d\n' # Length of compressed content
366
'%d\n' # Length of uncompressed content
367
) % (gcb._z_content_length, gcb._content_length)
368
# The first chunk should be the header chunk. It is small, fixed size,
369
# and there is no compelling reason to split it up
370
self.assertEqual(expected_header, block_chunks[0])
371
self.assertStartsWith(block_bytes, expected_header)
372
remaining_bytes = block_bytes[len(expected_header):]
373
raw_bytes = zlib.decompress(remaining_bytes)
374
self.assertEqual(content, raw_bytes)
350
376
def test_to_bytes(self):
351
377
content = ('this is some content\n'
352
378
'this content will be compressed\n')
389
415
z_content = zlib.compress(content)
390
416
self.assertEqual(57182, len(z_content))
391
417
block = groupcompress.GroupCompressBlock()
392
block._z_content = z_content
418
block._z_content_chunks = (z_content,)
393
419
block._z_content_length = len(z_content)
394
420
block._compressor_name = 'zlib'
395
421
block._content_length = 158634
434
460
z_content = zlib.compress(content)
435
461
self.assertEqual(57182, len(z_content))
436
462
block = groupcompress.GroupCompressBlock()
437
block._z_content = z_content
463
block._z_content_chunks = (z_content,)
438
464
block._z_content_length = len(z_content)
439
465
block._compressor_name = 'zlib'
440
466
block._content_length = 158634
1067
1093
self.add_key_to_manager(('key4',), locations, block, manager)
1068
1094
self.assertTrue(manager.check_is_well_utilized())
1097
class Test_GCBuildDetails(tests.TestCase):
1099
def test_acts_like_tuple(self):
1100
# _GCBuildDetails inlines some of the data that used to be spread out
1101
# across a bunch of tuples
1102
bd = groupcompress._GCBuildDetails((('parent1',), ('parent2',)),
1103
('INDEX', 10, 20, 0, 5))
1104
self.assertEqual(4, len(bd))
1105
self.assertEqual(('INDEX', 10, 20, 0, 5), bd[0])
1106
self.assertEqual(None, bd[1]) # Compression Parent is always None
1107
self.assertEqual((('parent1',), ('parent2',)), bd[2])
1108
self.assertEqual(('group', None), bd[3]) # Record details
1110
def test__repr__(self):
1111
bd = groupcompress._GCBuildDetails((('parent1',), ('parent2',)),
1112
('INDEX', 10, 20, 0, 5))
1113
self.assertEqual("_GCBuildDetails(('INDEX', 10, 20, 0, 5),"
1114
" (('parent1',), ('parent2',)))",