1
# Copyright (C) 2008-2011 Canonical Ltd
1
# Copyright (C) 2008, 2009, 2010 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
31
31
from bzrlib.osutils import sha_string
32
32
from bzrlib.tests.test__groupcompress import compiled_groupcompress_feature
33
from bzrlib.tests.scenarios import load_tests_apply_scenarios
36
def group_compress_implementation_scenarios():
35
def load_tests(standard_tests, module, loader):
36
"""Parameterize tests for all versions of groupcompress."""
37
to_adapt, result = tests.split_suite_by_condition(
38
standard_tests, tests.condition_isinstance(TestAllGroupCompressors))
38
40
('python', {'compressor': groupcompress.PythonGroupCompressor}),
40
42
if compiled_groupcompress_feature.available():
41
43
scenarios.append(('C',
42
44
{'compressor': groupcompress.PyrexGroupCompressor}))
46
load_tests = load_tests_apply_scenarios
45
return tests.multiply_tests(to_adapt, scenarios, result)
49
48
class TestGroupCompressor(tests.TestCase):
67
66
class TestAllGroupCompressors(TestGroupCompressor):
68
67
"""Tests for GroupCompressor"""
70
scenarios = group_compress_implementation_scenarios()
71
compressor = None # Set by scenario
69
compressor = None # Set by multiply_tests
73
71
def test_empty_delta(self):
74
72
compressor = self.compressor()
349
347
self.assertEqual(z_content, block._z_content)
350
348
self.assertEqual(content, block._content)
352
def test_to_chunks(self):
353
content_chunks = ['this is some content\n',
354
'this content will be compressed\n']
355
content_len = sum(map(len, content_chunks))
356
content = ''.join(content_chunks)
357
gcb = groupcompress.GroupCompressBlock()
358
gcb.set_chunked_content(content_chunks, content_len)
359
total_len, block_chunks = gcb.to_chunks()
360
block_bytes = ''.join(block_chunks)
361
self.assertEqual(gcb._z_content_length, len(gcb._z_content))
362
self.assertEqual(total_len, len(block_bytes))
363
self.assertEqual(gcb._content_length, content_len)
364
expected_header =('gcb1z\n' # group compress block v1 zlib
365
'%d\n' # Length of compressed content
366
'%d\n' # Length of uncompressed content
367
) % (gcb._z_content_length, gcb._content_length)
368
# The first chunk should be the header chunk. It is small, fixed size,
369
# and there is no compelling reason to split it up
370
self.assertEqual(expected_header, block_chunks[0])
371
self.assertStartsWith(block_bytes, expected_header)
372
remaining_bytes = block_bytes[len(expected_header):]
373
raw_bytes = zlib.decompress(remaining_bytes)
374
self.assertEqual(content, raw_bytes)
376
350
def test_to_bytes(self):
377
351
content = ('this is some content\n'
378
352
'this content will be compressed\n')
415
389
z_content = zlib.compress(content)
416
390
self.assertEqual(57182, len(z_content))
417
391
block = groupcompress.GroupCompressBlock()
418
block._z_content_chunks = (z_content,)
392
block._z_content = z_content
419
393
block._z_content_length = len(z_content)
420
394
block._compressor_name = 'zlib'
421
395
block._content_length = 158634
460
434
z_content = zlib.compress(content)
461
435
self.assertEqual(57182, len(z_content))
462
436
block = groupcompress.GroupCompressBlock()
463
block._z_content_chunks = (z_content,)
437
block._z_content = z_content
464
438
block._z_content_length = len(z_content)
465
439
block._compressor_name = 'zlib'
466
440
block._content_length = 158634