1
# Copyright (C) 2008, 2009, 2010 Canonical Ltd
1
# Copyright (C) 2008-2011 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
31
31
from bzrlib.osutils import sha_string
32
32
from bzrlib.tests.test__groupcompress import compiled_groupcompress_feature
35
def load_tests(standard_tests, module, loader):
36
"""Parameterize tests for all versions of groupcompress."""
37
to_adapt, result = tests.split_suite_by_condition(
38
standard_tests, tests.condition_isinstance(TestAllGroupCompressors))
33
from bzrlib.tests.scenarios import load_tests_apply_scenarios
36
def group_compress_implementation_scenarios():
40
38
('python', {'compressor': groupcompress.PythonGroupCompressor}),
42
40
if compiled_groupcompress_feature.available():
43
41
scenarios.append(('C',
44
42
{'compressor': groupcompress.PyrexGroupCompressor}))
45
return tests.multiply_tests(to_adapt, scenarios, result)
46
load_tests = load_tests_apply_scenarios
48
49
class TestGroupCompressor(tests.TestCase):
66
67
class TestAllGroupCompressors(TestGroupCompressor):
67
68
"""Tests for GroupCompressor"""
69
compressor = None # Set by multiply_tests
70
scenarios = group_compress_implementation_scenarios()
71
compressor = None # Set by scenario
71
73
def test_empty_delta(self):
72
74
compressor = self.compressor()
347
349
self.assertEqual(z_content, block._z_content)
348
350
self.assertEqual(content, block._content)
352
def test_to_chunks(self):
353
content_chunks = ['this is some content\n',
354
'this content will be compressed\n']
355
content_len = sum(map(len, content_chunks))
356
content = ''.join(content_chunks)
357
gcb = groupcompress.GroupCompressBlock()
358
gcb.set_chunked_content(content_chunks, content_len)
359
total_len, block_chunks = gcb.to_chunks()
360
block_bytes = ''.join(block_chunks)
361
self.assertEqual(gcb._z_content_length, len(gcb._z_content))
362
self.assertEqual(total_len, len(block_bytes))
363
self.assertEqual(gcb._content_length, content_len)
364
expected_header =('gcb1z\n' # group compress block v1 zlib
365
'%d\n' # Length of compressed content
366
'%d\n' # Length of uncompressed content
367
) % (gcb._z_content_length, gcb._content_length)
368
# The first chunk should be the header chunk. It is small, fixed size,
369
# and there is no compelling reason to split it up
370
self.assertEqual(expected_header, block_chunks[0])
371
self.assertStartsWith(block_bytes, expected_header)
372
remaining_bytes = block_bytes[len(expected_header):]
373
raw_bytes = zlib.decompress(remaining_bytes)
374
self.assertEqual(content, raw_bytes)
350
376
def test_to_bytes(self):
351
377
content = ('this is some content\n'
352
378
'this content will be compressed\n')
389
415
z_content = zlib.compress(content)
390
416
self.assertEqual(57182, len(z_content))
391
417
block = groupcompress.GroupCompressBlock()
392
block._z_content = z_content
418
block._z_content_chunks = (z_content,)
393
419
block._z_content_length = len(z_content)
394
420
block._compressor_name = 'zlib'
395
421
block._content_length = 158634
434
460
z_content = zlib.compress(content)
435
461
self.assertEqual(57182, len(z_content))
436
462
block = groupcompress.GroupCompressBlock()
437
block._z_content = z_content
463
block._z_content_chunks = (z_content,)
438
464
block._z_content_length = len(z_content)
439
465
block._compressor_name = 'zlib'
440
466
block._content_length = 158634