~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_groupcompress.py

terminal_width can now returns None.

* bzrlib/win32utils.py:
(get_console_size): Fix typo in comment.

* bzrlib/ui/text.py:
(TextProgressView._show_line): Handle the no terminal present case.

* bzrlib/tests/test_osutils.py:
(TestTerminalWidth): Update tests.

* bzrlib/tests/blackbox/test_too_much.py:
Fix some imports.
(OldTests.test_bzr): Handle the no terminal present case.

* bzrlib/tests/__init__.py:
(VerboseTestResult.report_test_start): Handle the no terminal
present case.

* bzrlib/status.py:
(show_pending_merges): Handle the no terminal present case.
(show_pending_merges.show_log_message): Factor out some
code. Handle the no terminal present case.

* bzrlib/osutils.py:
(terminal_width): Return None if no precise value can be found.

* bzrlib/log.py:
(LineLogFormatter.__init__): Handle the no terminal present case.
(LineLogFormatter.truncate): Accept None as max_len meaning no
truncation.
(LineLogFormatter.log_string): 

* bzrlib/help.py:
(_help_commands_to_text): Handle the no terminal present case.

Show diffs side-by-side

added added

removed removed

Lines of Context:
29
29
    versionedfile,
30
30
    )
31
31
from bzrlib.osutils import sha_string
32
 
from bzrlib.tests.test__groupcompress import compiled_groupcompress_feature
 
32
from bzrlib.tests.test__groupcompress import CompiledGroupCompressFeature
33
33
 
34
34
 
35
35
def load_tests(standard_tests, module, loader):
39
39
    scenarios = [
40
40
        ('python', {'compressor': groupcompress.PythonGroupCompressor}),
41
41
        ]
42
 
    if compiled_groupcompress_feature.available():
 
42
    if CompiledGroupCompressFeature.available():
43
43
        scenarios.append(('C',
44
44
            {'compressor': groupcompress.PyrexGroupCompressor}))
45
45
    return tests.multiply_tests(to_adapt, scenarios, result)
135
135
 
136
136
class TestPyrexGroupCompressor(TestGroupCompressor):
137
137
 
138
 
    _test_needs_features = [compiled_groupcompress_feature]
 
138
    _test_needs_features = [CompiledGroupCompressFeature]
139
139
    compressor = groupcompress.PyrexGroupCompressor
140
140
 
141
141
    def test_stats(self):
418
418
        # And the decompressor is finalized
419
419
        self.assertIs(None, block._z_content_decompressor)
420
420
 
421
 
    def test__ensure_all_content(self):
 
421
    def test_partial_decomp_no_known_length(self):
422
422
        content_chunks = []
423
 
        # We need a sufficient amount of data so that zlib.decompress has
424
 
        # partial decompression to work with. Most auto-generated data
425
 
        # compresses a bit too well, we want a combination, so we combine a sha
426
 
        # hash with compressible data.
427
423
        for i in xrange(2048):
428
424
            next_content = '%d\nThis is a bit of duplicate text\n' % (i,)
429
425
            content_chunks.append(next_content)
437
433
        block._z_content = z_content
438
434
        block._z_content_length = len(z_content)
439
435
        block._compressor_name = 'zlib'
440
 
        block._content_length = 158634
 
436
        block._content_length = None # Don't tell the decompressed length
441
437
        self.assertIs(None, block._content)
442
 
        # The first _ensure_content got all of the required data
443
 
        block._ensure_content(158634)
 
438
        block._ensure_content(100)
 
439
        self.assertIsNot(None, block._content)
 
440
        # We have decompressed at least 100 bytes
 
441
        self.assertTrue(len(block._content) >= 100)
 
442
        # We have not decompressed the whole content
 
443
        self.assertTrue(len(block._content) < 158634)
 
444
        self.assertEqualDiff(content[:len(block._content)], block._content)
 
445
        # ensuring content that we already have shouldn't cause any more data
 
446
        # to be extracted
 
447
        cur_len = len(block._content)
 
448
        block._ensure_content(cur_len - 10)
 
449
        self.assertEqual(cur_len, len(block._content))
 
450
        # Now we want a bit more content
 
451
        cur_len += 10
 
452
        block._ensure_content(cur_len)
 
453
        self.assertTrue(len(block._content) >= cur_len)
 
454
        self.assertTrue(len(block._content) < 158634)
 
455
        self.assertEqualDiff(content[:len(block._content)], block._content)
 
456
        # And now lets finish
 
457
        block._ensure_content()
444
458
        self.assertEqualDiff(content, block._content)
445
 
        # And we should have released the _z_content_decompressor since it was
446
 
        # fully consumed
 
459
        # And the decompressor is finalized
447
460
        self.assertIs(None, block._z_content_decompressor)
448
461
 
449
462
    def test__dump(self):
459
472
                         ], block._dump())
460
473
 
461
474
 
462
 
class TestCaseWithGroupCompressVersionedFiles(
463
 
        tests.TestCaseWithMemoryTransport):
 
475
class TestCaseWithGroupCompressVersionedFiles(tests.TestCaseWithTransport):
464
476
 
465
477
    def make_test_vf(self, create_graph, keylength=1, do_cleanup=True,
466
478
                     dir='.', inconsistency_fatal=True):
733
745
                              " \('b',\) \('42 32 0 8', \(\(\),\)\) \('74 32"
734
746
                              " 0 8', \(\(\('a',\),\),\)\)")
735
747
 
736
 
    def test_clear_cache(self):
737
 
        vf = self.make_source_with_b(True, 'source')
738
 
        vf.writer.end()
739
 
        for record in vf.get_record_stream([('a',), ('b',)], 'unordered',
740
 
                                           True):
741
 
            pass
742
 
        self.assertTrue(len(vf._group_cache) > 0)
743
 
        vf.clear_cache()
744
 
        self.assertEqual(0, len(vf._group_cache))
745
 
 
746
 
 
747
748
 
748
749
class StubGCVF(object):
749
750
    def __init__(self, canned_get_blocks=None):