~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_groupcompress.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2010-12-02 14:58:47 UTC
  • mfrom: (5554.1.3 trunk)
  • Revision ID: pqm@pqm.ubuntu.com-20101202145847-fw822sd3nyhvrwmi
(vila) Merge 2.2 into trunk including fix for bug #583667 and bug
        #681885 (Vincent Ladeuil)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008-2011 Canonical Ltd
 
1
# Copyright (C) 2008, 2009, 2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
20
20
 
21
21
from bzrlib import (
22
22
    btree_index,
23
 
    config,
24
23
    groupcompress,
25
24
    errors,
26
25
    index as _mod_index,
31
30
    )
32
31
from bzrlib.osutils import sha_string
33
32
from bzrlib.tests.test__groupcompress import compiled_groupcompress_feature
34
 
from bzrlib.tests.scenarios import load_tests_apply_scenarios
35
 
 
36
 
 
37
 
def group_compress_implementation_scenarios():
 
33
 
 
34
 
 
35
def load_tests(standard_tests, module, loader):
 
36
    """Parameterize tests for all versions of groupcompress."""
 
37
    to_adapt, result = tests.split_suite_by_condition(
 
38
        standard_tests, tests.condition_isinstance(TestAllGroupCompressors))
38
39
    scenarios = [
39
40
        ('python', {'compressor': groupcompress.PythonGroupCompressor}),
40
41
        ]
41
42
    if compiled_groupcompress_feature.available():
42
43
        scenarios.append(('C',
43
44
            {'compressor': groupcompress.PyrexGroupCompressor}))
44
 
    return scenarios
45
 
 
46
 
 
47
 
load_tests = load_tests_apply_scenarios
 
45
    return tests.multiply_tests(to_adapt, scenarios, result)
48
46
 
49
47
 
50
48
class TestGroupCompressor(tests.TestCase):
68
66
class TestAllGroupCompressors(TestGroupCompressor):
69
67
    """Tests for GroupCompressor"""
70
68
 
71
 
    scenarios = group_compress_implementation_scenarios()
72
 
    compressor = None # Set by scenario
 
69
    compressor = None # Set by multiply_tests
73
70
 
74
71
    def test_empty_delta(self):
75
72
        compressor = self.compressor()
553
550
                    'as-requested', False)]
554
551
        self.assertEqual([('b',), ('a',), ('d',), ('c',)], keys)
555
552
 
556
 
    def test_get_record_stream_max_bytes_to_index_default(self):
557
 
        vf = self.make_test_vf(True, dir='source')
558
 
        vf.add_lines(('a',), (), ['lines\n'])
559
 
        vf.writer.end()
560
 
        record = vf.get_record_stream([('a',)], 'unordered', True).next()
561
 
        self.assertEqual(vf._DEFAULT_COMPRESSOR_SETTINGS,
562
 
                         record._manager._get_compressor_settings())
563
 
 
564
 
    def test_get_record_stream_accesses_compressor_settings(self):
565
 
        vf = self.make_test_vf(True, dir='source')
566
 
        vf.add_lines(('a',), (), ['lines\n'])
567
 
        vf.writer.end()
568
 
        vf._max_bytes_to_index = 1234
569
 
        record = vf.get_record_stream([('a',)], 'unordered', True).next()
570
 
        self.assertEqual(dict(max_bytes_to_index=1234),
571
 
                         record._manager._get_compressor_settings())
572
 
 
573
553
    def test_insert_record_stream_reuses_blocks(self):
574
554
        vf = self.make_test_vf(True, dir='source')
575
555
        def grouped_stream(revision_ids, first_parents=()):
788
768
        self.assertEqual(0, len(vf._group_cache))
789
769
 
790
770
 
791
 
class TestGroupCompressConfig(tests.TestCaseWithTransport):
792
 
 
793
 
    def make_test_vf(self):
794
 
        t = self.get_transport('.')
795
 
        t.ensure_base()
796
 
        factory = groupcompress.make_pack_factory(graph=True,
797
 
            delta=False, keylength=1, inconsistency_fatal=True)
798
 
        vf = factory(t)
799
 
        self.addCleanup(groupcompress.cleanup_pack_group, vf)
800
 
        return vf
801
 
 
802
 
    def test_max_bytes_to_index_default(self):
803
 
        vf = self.make_test_vf()
804
 
        gc = vf._make_group_compressor()
805
 
        self.assertEqual(vf._DEFAULT_MAX_BYTES_TO_INDEX,
806
 
                         vf._max_bytes_to_index)
807
 
        if isinstance(gc, groupcompress.PyrexGroupCompressor):
808
 
            self.assertEqual(vf._DEFAULT_MAX_BYTES_TO_INDEX,
809
 
                             gc._delta_index._max_bytes_to_index)
810
 
 
811
 
    def test_max_bytes_to_index_in_config(self):
812
 
        c = config.GlobalConfig()
813
 
        c.set_user_option('bzr.groupcompress.max_bytes_to_index', '10000')
814
 
        vf = self.make_test_vf()
815
 
        gc = vf._make_group_compressor()
816
 
        self.assertEqual(10000, vf._max_bytes_to_index)
817
 
        if isinstance(gc, groupcompress.PyrexGroupCompressor):
818
 
            self.assertEqual(10000, gc._delta_index._max_bytes_to_index)
819
 
 
820
 
    def test_max_bytes_to_index_bad_config(self):
821
 
        c = config.GlobalConfig()
822
 
        c.set_user_option('bzr.groupcompress.max_bytes_to_index', 'boogah')
823
 
        vf = self.make_test_vf()
824
 
        # TODO: This is triggering a warning, we might want to trap and make
825
 
        #       sure it is readable.
826
 
        gc = vf._make_group_compressor()
827
 
        self.assertEqual(vf._DEFAULT_MAX_BYTES_TO_INDEX,
828
 
                         vf._max_bytes_to_index)
829
 
        if isinstance(gc, groupcompress.PyrexGroupCompressor):
830
 
            self.assertEqual(vf._DEFAULT_MAX_BYTES_TO_INDEX,
831
 
                             gc._delta_index._max_bytes_to_index)
832
 
 
833
771
 
834
772
class StubGCVF(object):
835
773
    def __init__(self, canned_get_blocks=None):
1106
1044
            self.assertEqual(self._texts[record.key],
1107
1045
                             record.get_bytes_as('fulltext'))
1108
1046
 
1109
 
    def test_manager_default_compressor_settings(self):
1110
 
        locations, old_block = self.make_block(self._texts)
1111
 
        manager = groupcompress._LazyGroupContentManager(old_block)
1112
 
        gcvf = groupcompress.GroupCompressVersionedFiles
1113
 
        # It doesn't greedily evaluate _max_bytes_to_index
1114
 
        self.assertIs(None, manager._compressor_settings)
1115
 
        self.assertEqual(gcvf._DEFAULT_COMPRESSOR_SETTINGS,
1116
 
                         manager._get_compressor_settings())
1117
 
 
1118
 
    def test_manager_custom_compressor_settings(self):
1119
 
        locations, old_block = self.make_block(self._texts)
1120
 
        called = []
1121
 
        def compressor_settings():
1122
 
            called.append('called')
1123
 
            return (10,)
1124
 
        manager = groupcompress._LazyGroupContentManager(old_block,
1125
 
            get_compressor_settings=compressor_settings)
1126
 
        gcvf = groupcompress.GroupCompressVersionedFiles
1127
 
        # It doesn't greedily evaluate compressor_settings
1128
 
        self.assertIs(None, manager._compressor_settings)
1129
 
        self.assertEqual((10,), manager._get_compressor_settings())
1130
 
        self.assertEqual((10,), manager._get_compressor_settings())
1131
 
        self.assertEqual((10,), manager._compressor_settings)
1132
 
        # Only called 1 time
1133
 
        self.assertEqual(['called'], called)
1134
 
 
1135
 
    def test__rebuild_handles_compressor_settings(self):
1136
 
        if not isinstance(groupcompress.GroupCompressor,
1137
 
                          groupcompress.PyrexGroupCompressor):
1138
 
            raise tests.TestNotApplicable('pure-python compressor'
1139
 
                ' does not handle compressor_settings')
1140
 
        locations, old_block = self.make_block(self._texts)
1141
 
        manager = groupcompress._LazyGroupContentManager(old_block,
1142
 
            get_compressor_settings=lambda: dict(max_bytes_to_index=32))
1143
 
        gc = manager._make_group_compressor()
1144
 
        self.assertEqual(32, gc._delta_index._max_bytes_to_index)
1145
 
        self.add_key_to_manager(('key3',), locations, old_block, manager)
1146
 
        self.add_key_to_manager(('key4',), locations, old_block, manager)
1147
 
        action, last_byte, total_bytes = manager._check_rebuild_action()
1148
 
        self.assertEqual('rebuild', action)
1149
 
        manager._rebuild_block()
1150
 
        new_block = manager._block
1151
 
        self.assertIsNot(old_block, new_block)
1152
 
        # Because of the new max_bytes_to_index, we do a poor job of
1153
 
        # rebuilding. This is a side-effect of the change, but at least it does
1154
 
        # show the setting had an effect.
1155
 
        self.assertTrue(old_block._content_length < new_block._content_length)
1156
 
 
1157
1047
    def test_check_is_well_utilized_all_keys(self):
1158
1048
        block, manager = self.make_block_and_full_manager(self._texts)
1159
1049
        self.assertFalse(manager.check_is_well_utilized())