~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_groupcompress.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2011-08-29 08:40:34 UTC
  • mfrom: (6107.1.1 trivial)
  • Revision ID: pqm@pqm.ubuntu.com-20110829084034-vzt7yztscr89cxb6
(mbp) top-level doc page should now point to whatsnew-2.5

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008, 2009, 2010 Canonical Ltd
 
1
# Copyright (C) 2008-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
20
20
 
21
21
from bzrlib import (
22
22
    btree_index,
 
23
    config,
23
24
    groupcompress,
24
25
    errors,
25
26
    index as _mod_index,
30
31
    )
31
32
from bzrlib.osutils import sha_string
32
33
from bzrlib.tests.test__groupcompress import compiled_groupcompress_feature
33
 
 
34
 
 
35
 
def load_tests(standard_tests, module, loader):
36
 
    """Parameterize tests for all versions of groupcompress."""
37
 
    to_adapt, result = tests.split_suite_by_condition(
38
 
        standard_tests, tests.condition_isinstance(TestAllGroupCompressors))
 
34
from bzrlib.tests.scenarios import load_tests_apply_scenarios
 
35
 
 
36
 
 
37
def group_compress_implementation_scenarios():
39
38
    scenarios = [
40
39
        ('python', {'compressor': groupcompress.PythonGroupCompressor}),
41
40
        ]
42
41
    if compiled_groupcompress_feature.available():
43
42
        scenarios.append(('C',
44
43
            {'compressor': groupcompress.PyrexGroupCompressor}))
45
 
    return tests.multiply_tests(to_adapt, scenarios, result)
 
44
    return scenarios
 
45
 
 
46
 
 
47
load_tests = load_tests_apply_scenarios
46
48
 
47
49
 
48
50
class TestGroupCompressor(tests.TestCase):
66
68
class TestAllGroupCompressors(TestGroupCompressor):
67
69
    """Tests for GroupCompressor"""
68
70
 
69
 
    compressor = None # Set by multiply_tests
 
71
    scenarios = group_compress_implementation_scenarios()
 
72
    compressor = None # Set by scenario
70
73
 
71
74
    def test_empty_delta(self):
72
75
        compressor = self.compressor()
550
553
                    'as-requested', False)]
551
554
        self.assertEqual([('b',), ('a',), ('d',), ('c',)], keys)
552
555
 
 
556
    def test_get_record_stream_max_bytes_to_index_default(self):
 
557
        vf = self.make_test_vf(True, dir='source')
 
558
        vf.add_lines(('a',), (), ['lines\n'])
 
559
        vf.writer.end()
 
560
        record = vf.get_record_stream([('a',)], 'unordered', True).next()
 
561
        self.assertEqual(vf._DEFAULT_COMPRESSOR_SETTINGS,
 
562
                         record._manager._get_compressor_settings())
 
563
 
 
564
    def test_get_record_stream_accesses_compressor_settings(self):
 
565
        vf = self.make_test_vf(True, dir='source')
 
566
        vf.add_lines(('a',), (), ['lines\n'])
 
567
        vf.writer.end()
 
568
        vf._max_bytes_to_index = 1234
 
569
        record = vf.get_record_stream([('a',)], 'unordered', True).next()
 
570
        self.assertEqual(dict(max_bytes_to_index=1234),
 
571
                         record._manager._get_compressor_settings())
 
572
 
553
573
    def test_insert_record_stream_reuses_blocks(self):
554
574
        vf = self.make_test_vf(True, dir='source')
555
575
        def grouped_stream(revision_ids, first_parents=()):
768
788
        self.assertEqual(0, len(vf._group_cache))
769
789
 
770
790
 
 
791
class TestGroupCompressConfig(tests.TestCaseWithTransport):
 
792
 
 
793
    def make_test_vf(self):
 
794
        t = self.get_transport('.')
 
795
        t.ensure_base()
 
796
        factory = groupcompress.make_pack_factory(graph=True,
 
797
            delta=False, keylength=1, inconsistency_fatal=True)
 
798
        vf = factory(t)
 
799
        self.addCleanup(groupcompress.cleanup_pack_group, vf)
 
800
        return vf
 
801
 
 
802
    def test_max_bytes_to_index_default(self):
 
803
        vf = self.make_test_vf()
 
804
        gc = vf._make_group_compressor()
 
805
        self.assertEqual(vf._DEFAULT_MAX_BYTES_TO_INDEX,
 
806
                         vf._max_bytes_to_index)
 
807
        if isinstance(gc, groupcompress.PyrexGroupCompressor):
 
808
            self.assertEqual(vf._DEFAULT_MAX_BYTES_TO_INDEX,
 
809
                             gc._delta_index._max_bytes_to_index)
 
810
 
 
811
    def test_max_bytes_to_index_in_config(self):
 
812
        c = config.GlobalConfig()
 
813
        c.set_user_option('bzr.groupcompress.max_bytes_to_index', '10000')
 
814
        vf = self.make_test_vf()
 
815
        gc = vf._make_group_compressor()
 
816
        self.assertEqual(10000, vf._max_bytes_to_index)
 
817
        if isinstance(gc, groupcompress.PyrexGroupCompressor):
 
818
            self.assertEqual(10000, gc._delta_index._max_bytes_to_index)
 
819
 
 
820
    def test_max_bytes_to_index_bad_config(self):
 
821
        c = config.GlobalConfig()
 
822
        c.set_user_option('bzr.groupcompress.max_bytes_to_index', 'boogah')
 
823
        vf = self.make_test_vf()
 
824
        # TODO: This is triggering a warning, we might want to trap and make
 
825
        #       sure it is readable.
 
826
        gc = vf._make_group_compressor()
 
827
        self.assertEqual(vf._DEFAULT_MAX_BYTES_TO_INDEX,
 
828
                         vf._max_bytes_to_index)
 
829
        if isinstance(gc, groupcompress.PyrexGroupCompressor):
 
830
            self.assertEqual(vf._DEFAULT_MAX_BYTES_TO_INDEX,
 
831
                             gc._delta_index._max_bytes_to_index)
 
832
 
771
833
 
772
834
class StubGCVF(object):
773
835
    def __init__(self, canned_get_blocks=None):
1044
1106
            self.assertEqual(self._texts[record.key],
1045
1107
                             record.get_bytes_as('fulltext'))
1046
1108
 
 
1109
    def test_manager_default_compressor_settings(self):
 
1110
        locations, old_block = self.make_block(self._texts)
 
1111
        manager = groupcompress._LazyGroupContentManager(old_block)
 
1112
        gcvf = groupcompress.GroupCompressVersionedFiles
 
1113
        # It doesn't greedily evaluate _max_bytes_to_index
 
1114
        self.assertIs(None, manager._compressor_settings)
 
1115
        self.assertEqual(gcvf._DEFAULT_COMPRESSOR_SETTINGS,
 
1116
                         manager._get_compressor_settings())
 
1117
 
 
1118
    def test_manager_custom_compressor_settings(self):
 
1119
        locations, old_block = self.make_block(self._texts)
 
1120
        called = []
 
1121
        def compressor_settings():
 
1122
            called.append('called')
 
1123
            return (10,)
 
1124
        manager = groupcompress._LazyGroupContentManager(old_block,
 
1125
            get_compressor_settings=compressor_settings)
 
1126
        gcvf = groupcompress.GroupCompressVersionedFiles
 
1127
        # It doesn't greedily evaluate compressor_settings
 
1128
        self.assertIs(None, manager._compressor_settings)
 
1129
        self.assertEqual((10,), manager._get_compressor_settings())
 
1130
        self.assertEqual((10,), manager._get_compressor_settings())
 
1131
        self.assertEqual((10,), manager._compressor_settings)
 
1132
        # Only called 1 time
 
1133
        self.assertEqual(['called'], called)
 
1134
 
 
1135
    def test__rebuild_handles_compressor_settings(self):
 
1136
        if not isinstance(groupcompress.GroupCompressor,
 
1137
                          groupcompress.PyrexGroupCompressor):
 
1138
            raise tests.TestNotApplicable('pure-python compressor'
 
1139
                ' does not handle compressor_settings')
 
1140
        locations, old_block = self.make_block(self._texts)
 
1141
        manager = groupcompress._LazyGroupContentManager(old_block,
 
1142
            get_compressor_settings=lambda: dict(max_bytes_to_index=32))
 
1143
        gc = manager._make_group_compressor()
 
1144
        self.assertEqual(32, gc._delta_index._max_bytes_to_index)
 
1145
        self.add_key_to_manager(('key3',), locations, old_block, manager)
 
1146
        self.add_key_to_manager(('key4',), locations, old_block, manager)
 
1147
        action, last_byte, total_bytes = manager._check_rebuild_action()
 
1148
        self.assertEqual('rebuild', action)
 
1149
        manager._rebuild_block()
 
1150
        new_block = manager._block
 
1151
        self.assertIsNot(old_block, new_block)
 
1152
        # Because of the new max_bytes_to_index, we do a poor job of
 
1153
        # rebuilding. This is a side-effect of the change, but at least it does
 
1154
        # show the setting had an effect.
 
1155
        self.assertTrue(old_block._content_length < new_block._content_length)
 
1156
 
1047
1157
    def test_check_is_well_utilized_all_keys(self):
1048
1158
        block, manager = self.make_block_and_full_manager(self._texts)
1049
1159
        self.assertFalse(manager.check_is_well_utilized())