31
31
from bzrlib.osutils import sha_string
32
from bzrlib.tests.test__groupcompress import compiled_groupcompress_feature
32
from bzrlib.tests.test__groupcompress import CompiledGroupCompressFeature
35
35
def load_tests(standard_tests, module, loader):
40
40
('python', {'compressor': groupcompress.PythonGroupCompressor}),
42
if compiled_groupcompress_feature.available():
42
if CompiledGroupCompressFeature.available():
43
43
scenarios.append(('C',
44
44
{'compressor': groupcompress.PyrexGroupCompressor}))
45
45
return tests.multiply_tests(to_adapt, scenarios, result)
136
136
class TestPyrexGroupCompressor(TestGroupCompressor):
138
_test_needs_features = [compiled_groupcompress_feature]
138
_test_needs_features = [CompiledGroupCompressFeature]
139
139
compressor = groupcompress.PyrexGroupCompressor
141
141
def test_stats(self):
418
418
# And the decompressor is finalized
419
419
self.assertIs(None, block._z_content_decompressor)
421
def test__ensure_all_content(self):
421
def test_partial_decomp_no_known_length(self):
422
422
content_chunks = []
423
# We need a sufficient amount of data so that zlib.decompress has
424
# partial decompression to work with. Most auto-generated data
425
# compresses a bit too well, we want a combination, so we combine a sha
426
# hash with compressible data.
427
423
for i in xrange(2048):
428
424
next_content = '%d\nThis is a bit of duplicate text\n' % (i,)
429
425
content_chunks.append(next_content)
437
433
block._z_content = z_content
438
434
block._z_content_length = len(z_content)
439
435
block._compressor_name = 'zlib'
440
block._content_length = 158634
436
block._content_length = None # Don't tell the decompressed length
441
437
self.assertIs(None, block._content)
442
# The first _ensure_content got all of the required data
443
block._ensure_content(158634)
438
block._ensure_content(100)
439
self.assertIsNot(None, block._content)
440
# We have decompressed at least 100 bytes
441
self.assertTrue(len(block._content) >= 100)
442
# We have not decompressed the whole content
443
self.assertTrue(len(block._content) < 158634)
444
self.assertEqualDiff(content[:len(block._content)], block._content)
445
# ensuring content that we already have shouldn't cause any more data
447
cur_len = len(block._content)
448
block._ensure_content(cur_len - 10)
449
self.assertEqual(cur_len, len(block._content))
450
# Now we want a bit more content
452
block._ensure_content(cur_len)
453
self.assertTrue(len(block._content) >= cur_len)
454
self.assertTrue(len(block._content) < 158634)
455
self.assertEqualDiff(content[:len(block._content)], block._content)
456
# And now lets finish
457
block._ensure_content()
444
458
self.assertEqualDiff(content, block._content)
445
# And we should have released the _z_content_decompressor since it was
459
# And the decompressor is finalized
447
460
self.assertIs(None, block._z_content_decompressor)
449
462
def test__dump(self):
459
472
], block._dump())
462
class TestCaseWithGroupCompressVersionedFiles(
463
tests.TestCaseWithMemoryTransport):
475
class TestCaseWithGroupCompressVersionedFiles(tests.TestCaseWithTransport):
465
477
def make_test_vf(self, create_graph, keylength=1, do_cleanup=True,
466
478
dir='.', inconsistency_fatal=True):
733
745
" \('b',\) \('42 32 0 8', \(\(\),\)\) \('74 32"
734
746
" 0 8', \(\(\('a',\),\),\)\)")
736
def test_clear_cache(self):
737
vf = self.make_source_with_b(True, 'source')
739
for record in vf.get_record_stream([('a',), ('b',)], 'unordered',
742
self.assertTrue(len(vf._group_cache) > 0)
744
self.assertEqual(0, len(vf._group_cache))
748
749
class StubGCVF(object):
749
750
def __init__(self, canned_get_blocks=None):