1
# Copyright (C) 2008 Canonical Limited.
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License version 2 as published
5
# by the Free Software Foundation.
7
# This program is distributed in the hope that it will be useful,
8
# but WITHOUT ANY WARRANTY; without even the implied warranty of
9
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10
# GNU General Public License for more details.
12
# You should have received a copy of the GNU General Public License
13
# along with this program; if not, write to the Free Software
14
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the pyrex extension of groupcompress"""
19
from bzrlib import tests
21
from bzrlib.plugins.groupcompress import groupcompress
24
class _CompiledGroupCompress(tests.Feature):
28
import bzrlib.plugins.groupcompress._groupcompress_c
34
def feature_name(self):
35
return 'bzrlib.plugins.groupcompress._groupcompress_c'
37
CompiledGroupCompress = _CompiledGroupCompress()
40
class TestCompiledEquivalenceTable(tests.TestCase):
41
"""Direct tests for the compiled Equivalence Table."""
43
_tests_need_features = [CompiledGroupCompress]
45
# These tests assume that hash(int) == int
46
# If that ever changes, we can simply change this code to use a custom
47
# class that has precomputed values returned from __hash__.
50
# TODO: We need a test that forces a rebuild of the hash array, and makes
51
# sure that the lines that should not be indexed *stay* not indexed
54
super(TestCompiledEquivalenceTable, self).setUp()
55
from bzrlib.plugins.groupcompress import _groupcompress_c
56
self._gc_module = _groupcompress_c
58
def test_minimum_hash_size(self):
59
eq = self._gc_module.EquivalenceTable([])
60
# We request at least 33% free space in the hash (to make collisions
62
self.assertEqual(1024, eq._py_compute_minimum_hash_size(683))
63
self.assertEqual(2048, eq._py_compute_minimum_hash_size(684))
64
self.assertEqual(2048, eq._py_compute_minimum_hash_size(1000))
65
self.assertEqual(2048, eq._py_compute_minimum_hash_size(1024))
67
def test_recommended_hash_size(self):
68
eq = self._gc_module.EquivalenceTable([])
69
# We always recommend a minimum of 8k
70
self.assertEqual(8192, eq._py_compute_recommended_hash_size(10))
71
self.assertEqual(8192, eq._py_compute_recommended_hash_size(1000))
72
self.assertEqual(8192, eq._py_compute_recommended_hash_size(2000))
73
self.assertEqual(8192, eq._py_compute_recommended_hash_size(4000))
75
# And we recommend at least 50% free slots
76
self.assertEqual(8192, eq._py_compute_recommended_hash_size(4096))
77
self.assertEqual(16384, eq._py_compute_recommended_hash_size(4097))
79
def test__raw_lines(self):
80
eq = self._gc_module.EquivalenceTable([1, 2, 3])
81
self.assertEqual([(1, 1, 1, -1), (2, 2, 2, -1), (3, 3, 3, -1)],
82
eq._inspect_left_lines())
84
def test_build_hash(self):
85
eq = self._gc_module.EquivalenceTable([1, 2, 3])
86
# (size, [(offset, head_offset_in_lines, count)])
87
self.assertEqual((8192, [(1, 0, 1), (2, 1, 1), (3, 2, 1)]),
88
eq._inspect_hash_table())
90
def test_build_hash_with_duplicates(self):
91
eq = self._gc_module.EquivalenceTable([1, 2, 4, 0, 1, 4, 2, 4])
101
], eq._inspect_left_lines())
102
# (hash_offset, head_offset_in_lines, count)
103
self.assertEqual((8192, [
108
]), eq._inspect_hash_table())
110
def test_build_hash_table_with_wraparound(self):
111
eq = self._gc_module.EquivalenceTable([1, 2+8192])
115
], eq._inspect_left_lines())
116
self.assertEqual((8192, [
119
]), eq._inspect_hash_table())
121
def test_build_hash_table_with_collisions(self):
122
# We build up backwards, so # 2+8192 will wrap around to 2, and take
123
# its spot because the 2 offset is taken, then the real '2' will get
124
# bumped to 3, which will bump 3 into 4. then when we have 5, it will
125
# be fine, but the 8192+5 will get bumped to 6
126
eq = self._gc_module.EquivalenceTable([1, 5+8192, 5, 3, 2, 2+8192])
134
], eq._inspect_left_lines())
135
self.assertEqual((8192, [
142
]), eq._inspect_hash_table())
144
def test_build_hash_table_with_wrapped_collisions(self):
145
eq = self._gc_module.EquivalenceTable([0, 8191+8192, 8191])
148
(16383, 16383, 0, -1),
149
(8191, 8191, 8191, -1),
150
], eq._inspect_left_lines())
151
self.assertEqual((8192, [
155
]), eq._inspect_hash_table())
157
def test_build_hash_table_with_strings(self):
158
eq = self._gc_module.EquivalenceTable(['a', 'b', 'c', 'b'])
160
('a', hash('a'), hash('a') & 8191, -1),
161
('b', hash('b'), hash('b') & 8191, 3),
162
('c', hash('c'), hash('c') & 8191, -1),
163
('b', hash('b'), hash('b') & 8191, -1),
164
], eq._inspect_left_lines())
165
self.assertEqual((8192, sorted([
166
(hash('a') & 8191, 0, 1),
167
(hash('b') & 8191, 1, 2),
168
(hash('c') & 8191, 2, 1),
169
])), eq._inspect_hash_table())
171
def test_with_unhashable(self):
172
self.assertRaises(TypeError, self._gc_module.EquivalenceTable,
173
[['unhashable', 'list'], 'foo'])
174
self.assertRaises(TypeError, self._gc_module.EquivalenceTable,
175
('foo', ['unhashable', 'list']))
177
def test_extend_lines(self):
178
eq = self._gc_module.EquivalenceTable([10, 20, 30, 20])
179
eq.extend_lines([30, 20, 40], [True, True, True])
188
], eq._inspect_left_lines())
189
# (hash_offset, head_offset_in_lines, count)
190
self.assertEqual((8192, [
195
]), eq._inspect_hash_table())
197
def test_extend_lines_ignored(self):
198
eq = self._gc_module.EquivalenceTable([10, 20, 30, 20])
199
eq.extend_lines([30, 20, 40], [True, False, False])
208
], eq._inspect_left_lines())
209
# (hash_offset, head_offset_in_lines, count)
210
self.assertEqual((8192, [
214
]), eq._inspect_hash_table())
217
class TestGetLongestMatch(tests.TestCase):
220
super(TestGetLongestMatch, self).setUp()
221
self._longest_match = groupcompress._get_longest_match
222
self._eq_class = groupcompress.GroupCompressor._equivalence_table_class
224
def assertLongestMatch(self, block, end_pos, matching_locs,
225
eq, start_pos, max_len, known_locs):
226
"""Check that _get_longest_match gives correct results."""
227
val = self._longest_match(eq, start_pos, max_len, known_locs)
228
self.assertEqual((block, end_pos, matching_locs), val)
230
def test_all_match(self):
231
eq = self._eq_class(['a', 'b', 'c'])
232
eq.set_right_lines(['a', 'b', 'c'])
233
self.assertLongestMatch((0, 0, 3), 3, None,
235
self.assertLongestMatch((0, 0, 3), 3, None,
237
self.assertLongestMatch((1, 1, 2), 3, None,
239
self.assertLongestMatch((1, 1, 2), 3, None,
242
def test_no_match(self):
243
eq = self._eq_class(['a', 'b', 'c'])
244
eq.set_right_lines(['d', 'e', 'f'])
245
self.assertLongestMatch(None, 1, None,
247
self.assertLongestMatch(None, 2, None,
249
self.assertLongestMatch(None, 3, None,
252
def test_next_match(self):
253
eq = self._eq_class(['a', 'b', 'c'])
254
eq.set_right_lines(['a', 'c'])
255
self.assertLongestMatch((0, 0, 1), 1, [2],
257
self.assertLongestMatch((2, 1, 1), 2, None,
260
def test_lots_of_matches(self):
261
eq = self._eq_class(['a']*1000)
262
eq.set_right_lines(['a']*1000)
263
self.assertLongestMatch((0, 0, 1000), 1000, None,
265
eq = self._eq_class(['a']*1000)
266
eq.set_right_lines(['a']*2000)
267
self.assertLongestMatch((0, 0, 1000), 1000, range(1000),
269
eq = self._eq_class(['a']*2000)
270
eq.set_right_lines(['a']*1000)
271
self.assertLongestMatch((0, 0, 1000), 1000, None,
275
class TestCompiledGetLongestMatch(TestGetLongestMatch):
277
_tests_need_features = [CompiledGroupCompress]
280
super(TestGetLongestMatch, self).setUp()
281
from bzrlib.plugins.groupcompress import _groupcompress_c
282
self._longest_match = _groupcompress_c._get_longest_match
283
self._eq_class = _groupcompress_c.EquivalenceTable