~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test__groupcompress.py

  • Committer: Patch Queue Manager
  • Date: 2012-07-28 15:55:41 UTC
  • mfrom: (5912.5.9 Base64CredentialStore)
  • Revision ID: pqm@pqm.ubuntu.com-20120728155541-d860rcyc2q82nhnj
(gz) Add Base64CredentialStore for authentication.conf password obfuscation
 (Martin Packman)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2008 Canonical Limited.
2
 
 
1
# Copyright (C) 2008-2011 Canonical Ltd
 
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License version 2 as published
5
 
# by the Free Software Foundation.
6
 
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
7
8
# This program is distributed in the hope that it will be useful,
8
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
9
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
11
# GNU General Public License for more details.
11
 
 
12
#
12
13
# You should have received a copy of the GNU General Public License
13
14
# along with this program; if not, write to the Free Software
14
 
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
15
 
16
 
 
17
 
"""Tests for the pyrex extension of groupcompress"""
18
 
 
19
 
from bzrlib import tests
20
 
 
21
 
from bzrlib.plugins.groupcompress import groupcompress
22
 
 
23
 
 
24
 
class _CompiledGroupCompress(tests.Feature):
25
 
 
26
 
    def _probe(self):
27
 
        try:
28
 
            import bzrlib.plugins.groupcompress._groupcompress_c
29
 
        except ImportError:
30
 
            return False
31
 
        else:
32
 
            return True
33
 
 
34
 
    def feature_name(self):
35
 
        return 'bzrlib.plugins.groupcompress._groupcompress_c'
36
 
 
37
 
CompiledGroupCompress = _CompiledGroupCompress()
38
 
 
39
 
 
40
 
class TestCompiledEquivalenceTable(tests.TestCase):
41
 
    """Direct tests for the compiled Equivalence Table."""
42
 
 
43
 
    _tests_need_features = [CompiledGroupCompress]
44
 
 
45
 
    # These tests assume that hash(int) == int
46
 
    # If that ever changes, we can simply change this code to use a custom
47
 
    # class that has precomputed values returned from __hash__.
48
 
 
49
 
 
50
 
    # TODO: We need a test that forces a rebuild of the hash array, and makes
51
 
    #       sure that the lines that should not be indexed *stay* not indexed
52
 
 
53
 
    def setUp(self):
54
 
        super(TestCompiledEquivalenceTable, self).setUp()
55
 
        from bzrlib.plugins.groupcompress import _groupcompress_c
56
 
        self._gc_module = _groupcompress_c
57
 
 
58
 
    def test_minimum_hash_size(self):
59
 
        eq = self._gc_module.EquivalenceTable([])
60
 
        # We request at least 50% free space in the hash (to make collisions
61
 
        # more bearable)
62
 
        self.assertEqual(1024, eq._py_compute_minimum_hash_size(512))
63
 
        self.assertEqual(2048, eq._py_compute_minimum_hash_size(513))
64
 
        self.assertEqual(2048, eq._py_compute_minimum_hash_size(1000))
65
 
        self.assertEqual(2048, eq._py_compute_minimum_hash_size(1024))
66
 
 
67
 
    def test_recommended_hash_size(self):
68
 
        eq = self._gc_module.EquivalenceTable([])
69
 
        # We always recommend a minimum of 8k
70
 
        self.assertEqual(8192, eq._py_compute_recommended_hash_size(10))
71
 
        self.assertEqual(8192, eq._py_compute_recommended_hash_size(1000))
72
 
        self.assertEqual(8192, eq._py_compute_recommended_hash_size(2000))
73
 
        self.assertEqual(16384, eq._py_compute_recommended_hash_size(4000))
74
 
 
75
 
        # And we recommend at least 75% free slots
76
 
        self.assertEqual(16384, eq._py_compute_recommended_hash_size(4096))
77
 
        self.assertEqual(32768, eq._py_compute_recommended_hash_size(4097))
78
 
 
79
 
    def test__raw_lines(self):
80
 
        eq = self._gc_module.EquivalenceTable([1, 2, 3])
81
 
        self.assertEqual([(1, 1, 1, -1), (2, 2, 2, -1), (3, 3, 3, -1)],
82
 
                         eq._inspect_left_lines())
83
 
 
84
 
    def test_build_hash(self):
85
 
        eq = self._gc_module.EquivalenceTable([1, 2, 3])
86
 
        # (size, [(offset, head_offset_in_lines, count)])
87
 
        self.assertEqual((8192, [(1, 0, 1), (2, 1, 1), (3, 2, 1)]),
88
 
                         eq._inspect_hash_table())
89
 
 
90
 
    def test_build_hash_with_duplicates(self):
91
 
        eq = self._gc_module.EquivalenceTable([1, 2, 4, 0, 1, 4, 2, 4])
92
 
        self.assertEqual([
93
 
            (1, 1, 1, 4),
94
 
            (2, 2, 2, 6),
95
 
            (4, 4, 4, 5),
96
 
            (0, 0, 0, -1),
97
 
            (1, 1, 1, -1),
98
 
            (4, 4, 4, 7),
99
 
            (2, 2, 2, -1),
100
 
            (4, 4, 4, -1),
101
 
            ], eq._inspect_left_lines())
102
 
        # (hash_offset, head_offset_in_lines, count)
103
 
        self.assertEqual((8192, [
104
 
            (0, 3, 1),
105
 
            (1, 0, 2),
106
 
            (2, 1, 2),
107
 
            (4, 2, 3),
108
 
            ]), eq._inspect_hash_table())
109
 
 
110
 
    def test_build_hash_table_with_wraparound(self):
111
 
        eq = self._gc_module.EquivalenceTable([1, 2+8192])
112
 
        self.assertEqual([
113
 
            (1, 1, 1, -1),
114
 
            (8194, 8194, 2, -1),
115
 
            ], eq._inspect_left_lines())
116
 
        self.assertEqual((8192, [
117
 
            (1, 0, 1),
118
 
            (2, 1, 1),
119
 
            ]), eq._inspect_hash_table())
120
 
 
121
 
    def test_build_hash_table_with_collisions(self):
122
 
        # We build up backwards, so # 2+8192 will wrap around to 2, and take
123
 
        # its spot because the 2 offset is taken, then the real '2' will get
124
 
        # bumped to 3, which will bump 3 into 4.  then when we have 5, it will
125
 
        # be fine, but the 8192+5 will get bumped to 6
126
 
        eq = self._gc_module.EquivalenceTable([1, 5+8192, 5, 3, 2, 2+8192])
127
 
        self.assertEqual([
128
 
            (1, 1, 1, -1),
129
 
            (8197, 8197, 6, -1),
130
 
            (5, 5, 5, -1),
131
 
            (3, 3, 4, -1),
132
 
            (2, 2, 3, -1),
133
 
            (8194, 8194, 2, -1),
134
 
            ], eq._inspect_left_lines())
135
 
        self.assertEqual((8192, [
136
 
            (1, 0, 1),
137
 
            (2, 5, 1),
138
 
            (3, 4, 1),
139
 
            (4, 3, 1),
140
 
            (5, 2, 1),
141
 
            (6, 1, 1),
142
 
            ]), eq._inspect_hash_table())
143
 
 
144
 
    def test_build_hash_table_with_wrapped_collisions(self):
145
 
        eq = self._gc_module.EquivalenceTable([0, 8191+8192, 8191])
146
 
        self.assertEqual([
147
 
            (0, 0, 1, -1),
148
 
            (16383, 16383, 0, -1),
149
 
            (8191, 8191, 8191, -1),
150
 
            ], eq._inspect_left_lines())
151
 
        self.assertEqual((8192, [
152
 
            (0, 1, 1),
153
 
            (1, 0, 1),
154
 
            (8191, 2, 1),
155
 
            ]), eq._inspect_hash_table())
156
 
 
157
 
    def test_build_hash_table_with_strings(self):
158
 
        eq = self._gc_module.EquivalenceTable(['a', 'b', 'c', 'b'])
159
 
        self.assertEqual([
160
 
            ('a', hash('a'), hash('a') & 8191, -1),
161
 
            ('b', hash('b'), hash('b') & 8191, 3),
162
 
            ('c', hash('c'), hash('c') & 8191, -1),
163
 
            ('b', hash('b'), hash('b') & 8191, -1),
164
 
            ], eq._inspect_left_lines())
165
 
        self.assertEqual((8192, sorted([
166
 
            (hash('a') & 8191, 0, 1),
167
 
            (hash('b') & 8191, 1, 2),
168
 
            (hash('c') & 8191, 2, 1),
169
 
            ])), eq._inspect_hash_table())
170
 
 
171
 
    def test_with_unhashable(self):
172
 
        self.assertRaises(TypeError, self._gc_module.EquivalenceTable,
173
 
                [['unhashable', 'list'], 'foo'])
174
 
        self.assertRaises(TypeError, self._gc_module.EquivalenceTable,
175
 
                ('foo', ['unhashable', 'list']))
176
 
 
177
 
    def test_extend_lines(self):
178
 
        eq = self._gc_module.EquivalenceTable([10, 20, 30, 20])
179
 
        eq.extend_lines([30, 20, 40], [True, True, True])
180
 
        self.assertEqual([
181
 
            (10, 10, 10, -1),
182
 
            (20, 20, 20, 3),
183
 
            (30, 30, 30, 4),
184
 
            (20, 20, 20, 5),
185
 
            (30, 30, 30, -1),
186
 
            (20, 20, 20, -1),
187
 
            (40, 40, 40, -1),
188
 
            ], eq._inspect_left_lines())
189
 
        # (hash_offset, head_offset_in_lines, count)
190
 
        self.assertEqual((8192, [
191
 
            (10, 0, 1),
192
 
            (20, 1, 3),
193
 
            (30, 2, 2),
194
 
            (40, 6, 1),
195
 
            ]), eq._inspect_hash_table())
196
 
 
197
 
    def test_extend_lines_ignored(self):
198
 
        eq = self._gc_module.EquivalenceTable([10, 20, 30, 20])
199
 
        eq.extend_lines([30, 20, 40], [True, False, False])
200
 
        self.assertEqual([
201
 
            (10, 10, 10, -1),
202
 
            (20, 20, 20, 3),
203
 
            (30, 30, 30, 4),
204
 
            (20, 20, 20, -1),
205
 
            (30, 30, 30, -1),
206
 
            (20, 20, -1, -1),
207
 
            (40, 40, -1, -1),
208
 
            ], eq._inspect_left_lines())
209
 
        # (hash_offset, head_offset_in_lines, count)
210
 
        self.assertEqual((8192, [
211
 
            (10, 0, 1),
212
 
            (20, 1, 2),
213
 
            (30, 2, 2),
214
 
            ]), eq._inspect_hash_table())
215
 
 
216
 
 
217
 
class TestGetLongestMatch(tests.TestCase):
218
 
 
219
 
    def setUp(self):
220
 
        super(TestGetLongestMatch, self).setUp()
221
 
        self._longest_match = groupcompress._get_longest_match
222
 
        self._eq_class = groupcompress.GroupCompressor._equivalence_table_class
223
 
 
224
 
    def assertLongestMatch(self, block, end_pos, matching_locs,
225
 
                           eq, start_pos, max_len, known_locs):
226
 
        """Check that _get_longest_match gives correct results."""
227
 
        the_block, the_end_pos, next_locs = self._longest_match(eq,
228
 
                                                start_pos, max_len, known_locs)
229
 
        self.assertEqual(block, the_block)
230
 
        self.assertEqual(end_pos, the_end_pos)
231
 
        if next_locs is not None:
232
 
            self.assertEqual(matching_locs, next_locs)
233
 
 
234
 
    def test_all_match(self):
235
 
        eq = self._eq_class(['a', 'b', 'c'])
236
 
        eq.set_right_lines(['a', 'b', 'c'])
237
 
        self.assertLongestMatch((0, 0, 3), 3, None,
238
 
                                eq, 0, 3, None)
239
 
        self.assertLongestMatch((0, 0, 3), 3, None,
240
 
                                 eq, 0, 3, [0])
241
 
        self.assertLongestMatch((1, 1, 2), 3, None,
242
 
                                eq, 1, 3, None)
243
 
        self.assertLongestMatch((1, 1, 2), 3, None,
244
 
                                eq, 1, 3, [1])
245
 
 
246
 
    def test_no_match(self):
247
 
        eq = self._eq_class(['a', 'b', 'c'])
248
 
        eq.set_right_lines(['d', 'e', 'f'])
249
 
        self.assertLongestMatch(None, 1, None,
250
 
                                eq, 0, 3, None)
251
 
        self.assertLongestMatch(None, 2, None,
252
 
                                eq, 1, 3, None)
253
 
        self.assertLongestMatch(None, 3, None,
254
 
                                eq, 2, 3, None)
255
 
 
256
 
    def test_next_match(self):
257
 
        eq = self._eq_class(['a', 'b', 'c'])
258
 
        eq.set_right_lines(['a', 'c'])
259
 
        self.assertLongestMatch((0, 0, 1), 1, [2],
260
 
                                eq, 0, 2, None)
261
 
        self.assertLongestMatch((2, 1, 1), 2, None,
262
 
                                eq, 1, 2, None)
263
 
 
264
 
    def test_lots_of_matches(self):
265
 
        eq = self._eq_class(['a']*1000)
266
 
        eq.set_right_lines(['a']*1000)
267
 
        self.assertLongestMatch((0, 0, 1000), 1000, None,
268
 
                                eq, 0, 1000, None)
269
 
        eq = self._eq_class(['a']*1000)
270
 
        eq.set_right_lines(['a']*2000)
271
 
        self.assertLongestMatch((0, 0, 1000), 1000, range(1000),
272
 
                                eq, 0, 2000, None)
273
 
        eq = self._eq_class(['a']*2000)
274
 
        eq.set_right_lines(['a']*1000)
275
 
        self.assertLongestMatch((0, 0, 1000), 1000, None,
276
 
                                eq, 0, 1000, None)
277
 
 
278
 
 
279
 
class TestCompiledGetLongestMatch(TestGetLongestMatch):
280
 
 
281
 
    _tests_need_features = [CompiledGroupCompress]
282
 
 
283
 
    def setUp(self):
284
 
        super(TestGetLongestMatch, self).setUp()
285
 
        from bzrlib.plugins.groupcompress import _groupcompress_c
286
 
        self._longest_match = _groupcompress_c._get_longest_match
287
 
        self._eq_class = _groupcompress_c.EquivalenceTable
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for the python and pyrex extensions of groupcompress"""
 
18
 
 
19
from bzrlib import (
 
20
    _groupcompress_py,
 
21
    tests,
 
22
    )
 
23
from bzrlib.tests.scenarios import (
 
24
    load_tests_apply_scenarios,
 
25
    )
 
26
from bzrlib.tests import (
 
27
    features,
 
28
    )
 
29
 
 
30
 
 
31
def module_scenarios():
 
32
    scenarios = [
 
33
        ('python', {'_gc_module': _groupcompress_py}),
 
34
        ]
 
35
    if compiled_groupcompress_feature.available():
 
36
        gc_module = compiled_groupcompress_feature.module
 
37
        scenarios.append(('C',
 
38
            {'_gc_module': gc_module}))
 
39
    return scenarios
 
40
 
 
41
 
 
42
def two_way_scenarios():
 
43
    scenarios = [
 
44
        ('PP', {'make_delta': _groupcompress_py.make_delta,
 
45
                'apply_delta': _groupcompress_py.apply_delta})
 
46
        ]
 
47
    if compiled_groupcompress_feature.available():
 
48
        gc_module = compiled_groupcompress_feature.module
 
49
        scenarios.extend([
 
50
            ('CC', {'make_delta': gc_module.make_delta,
 
51
                    'apply_delta': gc_module.apply_delta}),
 
52
            ('PC', {'make_delta': _groupcompress_py.make_delta,
 
53
                    'apply_delta': gc_module.apply_delta}),
 
54
            ('CP', {'make_delta': gc_module.make_delta,
 
55
                    'apply_delta': _groupcompress_py.apply_delta}),
 
56
            ])
 
57
    return scenarios
 
58
 
 
59
 
 
60
load_tests = load_tests_apply_scenarios
 
61
 
 
62
 
 
63
compiled_groupcompress_feature = features.ModuleAvailableFeature(
 
64
    'bzrlib._groupcompress_pyx')
 
65
 
 
66
_text1 = """\
 
67
This is a bit
 
68
of source text
 
69
which is meant to be matched
 
70
against other text
 
71
"""
 
72
 
 
73
_text2 = """\
 
74
This is a bit
 
75
of source text
 
76
which is meant to differ from
 
77
against other text
 
78
"""
 
79
 
 
80
_text3 = """\
 
81
This is a bit
 
82
of source text
 
83
which is meant to be matched
 
84
against other text
 
85
except it also
 
86
has a lot more data
 
87
at the end of the file
 
88
"""
 
89
 
 
90
_first_text = """\
 
91
a bit of text, that
 
92
does not have much in
 
93
common with the next text
 
94
"""
 
95
 
 
96
_second_text = """\
 
97
some more bit of text, that
 
98
does not have much in
 
99
common with the previous text
 
100
and has some extra text
 
101
"""
 
102
 
 
103
 
 
104
_third_text = """\
 
105
a bit of text, that
 
106
has some in common with the previous text
 
107
and has some extra text
 
108
and not have much in
 
109
common with the next text
 
110
"""
 
111
 
 
112
_fourth_text = """\
 
113
123456789012345
 
114
same rabin hash
 
115
123456789012345
 
116
same rabin hash
 
117
123456789012345
 
118
same rabin hash
 
119
123456789012345
 
120
same rabin hash
 
121
"""
 
122
 
 
123
class TestMakeAndApplyDelta(tests.TestCase):
 
124
 
 
125
    scenarios = module_scenarios()
 
126
    _gc_module = None # Set by load_tests
 
127
 
 
128
    def setUp(self):
 
129
        super(TestMakeAndApplyDelta, self).setUp()
 
130
        self.make_delta = self._gc_module.make_delta
 
131
        self.apply_delta = self._gc_module.apply_delta
 
132
        self.apply_delta_to_source = self._gc_module.apply_delta_to_source
 
133
 
 
134
    def test_make_delta_is_typesafe(self):
 
135
        self.make_delta('a string', 'another string')
 
136
 
 
137
        def _check_make_delta(string1, string2):
 
138
            self.assertRaises(TypeError, self.make_delta, string1, string2)
 
139
 
 
140
        _check_make_delta('a string', object())
 
141
        _check_make_delta('a string', u'not a string')
 
142
        _check_make_delta(object(), 'a string')
 
143
        _check_make_delta(u'not a string', 'a string')
 
144
 
 
145
    def test_make_noop_delta(self):
 
146
        ident_delta = self.make_delta(_text1, _text1)
 
147
        self.assertEqual('M\x90M', ident_delta)
 
148
        ident_delta = self.make_delta(_text2, _text2)
 
149
        self.assertEqual('N\x90N', ident_delta)
 
150
        ident_delta = self.make_delta(_text3, _text3)
 
151
        self.assertEqual('\x87\x01\x90\x87', ident_delta)
 
152
 
 
153
    def assertDeltaIn(self, delta1, delta2, delta):
 
154
        """Make sure that the delta bytes match one of the expectations."""
 
155
        # In general, the python delta matcher gives different results than the
 
156
        # pyrex delta matcher. Both should be valid deltas, though.
 
157
        if delta not in (delta1, delta2):
 
158
            self.fail("Delta bytes:\n"
 
159
                      "       %r\n"
 
160
                      "not in %r\n"
 
161
                      "    or %r"
 
162
                      % (delta, delta1, delta2))
 
163
 
 
164
    def test_make_delta(self):
 
165
        delta = self.make_delta(_text1, _text2)
 
166
        self.assertDeltaIn(
 
167
            'N\x90/\x1fdiffer from\nagainst other text\n',
 
168
            'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
 
169
            delta)
 
170
        delta = self.make_delta(_text2, _text1)
 
171
        self.assertDeltaIn(
 
172
            'M\x90/\x1ebe matched\nagainst other text\n',
 
173
            'M\x90\x1d\x1dwhich is meant to be matched\n\x91;\x13',
 
174
            delta)
 
175
        delta = self.make_delta(_text3, _text1)
 
176
        self.assertEqual('M\x90M', delta)
 
177
        delta = self.make_delta(_text3, _text2)
 
178
        self.assertDeltaIn(
 
179
            'N\x90/\x1fdiffer from\nagainst other text\n',
 
180
            'N\x90\x1d\x1ewhich is meant to differ from\n\x91:\x13',
 
181
            delta)
 
182
 
 
183
    def test_make_delta_with_large_copies(self):
 
184
        # We want to have a copy that is larger than 64kB, which forces us to
 
185
        # issue multiple copy instructions.
 
186
        big_text = _text3 * 1220
 
187
        delta = self.make_delta(big_text, big_text)
 
188
        self.assertDeltaIn(
 
189
            '\xdc\x86\x0a'      # Encoding the length of the uncompressed text
 
190
            '\x80'              # Copy 64kB, starting at byte 0
 
191
            '\x84\x01'          # and another 64kB starting at 64kB
 
192
            '\xb4\x02\x5c\x83', # And the bit of tail.
 
193
            None,   # Both implementations should be identical
 
194
            delta)
 
195
 
 
196
    def test_apply_delta_is_typesafe(self):
 
197
        self.apply_delta(_text1, 'M\x90M')
 
198
        self.assertRaises(TypeError, self.apply_delta, object(), 'M\x90M')
 
199
        self.assertRaises(TypeError, self.apply_delta,
 
200
                          unicode(_text1), 'M\x90M')
 
201
        self.assertRaises(TypeError, self.apply_delta, _text1, u'M\x90M')
 
202
        self.assertRaises(TypeError, self.apply_delta, _text1, object())
 
203
 
 
204
    def test_apply_delta(self):
 
205
        target = self.apply_delta(_text1,
 
206
                    'N\x90/\x1fdiffer from\nagainst other text\n')
 
207
        self.assertEqual(_text2, target)
 
208
        target = self.apply_delta(_text2,
 
209
                    'M\x90/\x1ebe matched\nagainst other text\n')
 
210
        self.assertEqual(_text1, target)
 
211
 
 
212
    def test_apply_delta_to_source_is_safe(self):
 
213
        self.assertRaises(TypeError,
 
214
            self.apply_delta_to_source, object(), 0, 1)
 
215
        self.assertRaises(TypeError,
 
216
            self.apply_delta_to_source, u'unicode str', 0, 1)
 
217
        # end > length
 
218
        self.assertRaises(ValueError,
 
219
            self.apply_delta_to_source, 'foo', 1, 4)
 
220
        # start > length
 
221
        self.assertRaises(ValueError,
 
222
            self.apply_delta_to_source, 'foo', 5, 3)
 
223
        # start > end
 
224
        self.assertRaises(ValueError,
 
225
            self.apply_delta_to_source, 'foo', 3, 2)
 
226
 
 
227
    def test_apply_delta_to_source(self):
 
228
        source_and_delta = (_text1
 
229
                            + 'N\x90/\x1fdiffer from\nagainst other text\n')
 
230
        self.assertEqual(_text2, self.apply_delta_to_source(source_and_delta,
 
231
                                    len(_text1), len(source_and_delta)))
 
232
 
 
233
 
 
234
class TestMakeAndApplyCompatible(tests.TestCase):
 
235
 
 
236
    scenarios = two_way_scenarios()
 
237
 
 
238
    make_delta = None # Set by load_tests
 
239
    apply_delta = None # Set by load_tests
 
240
 
 
241
    def assertMakeAndApply(self, source, target):
 
242
        """Assert that generating a delta and applying gives success."""
 
243
        delta = self.make_delta(source, target)
 
244
        bytes = self.apply_delta(source, delta)
 
245
        self.assertEqualDiff(target, bytes)
 
246
 
 
247
    def test_direct(self):
 
248
        self.assertMakeAndApply(_text1, _text2)
 
249
        self.assertMakeAndApply(_text2, _text1)
 
250
        self.assertMakeAndApply(_text1, _text3)
 
251
        self.assertMakeAndApply(_text3, _text1)
 
252
        self.assertMakeAndApply(_text2, _text3)
 
253
        self.assertMakeAndApply(_text3, _text2)
 
254
 
 
255
 
 
256
class TestDeltaIndex(tests.TestCase):
 
257
 
 
258
    def setUp(self):
 
259
        super(TestDeltaIndex, self).setUp()
 
260
        # This test isn't multiplied, because we only have DeltaIndex for the
 
261
        # compiled form
 
262
        # We call this here, because _test_needs_features happens after setUp
 
263
        self.requireFeature(compiled_groupcompress_feature)
 
264
        self._gc_module = compiled_groupcompress_feature.module
 
265
 
 
266
    def test_repr(self):
 
267
        di = self._gc_module.DeltaIndex('test text\n')
 
268
        self.assertEqual('DeltaIndex(1, 10)', repr(di))
 
269
 
 
270
    def test__dump_no_index(self):
 
271
        di = self._gc_module.DeltaIndex()
 
272
        self.assertEqual(None, di._dump_index())
 
273
 
 
274
    def test__dump_index_simple(self):
 
275
        di = self._gc_module.DeltaIndex()
 
276
        di.add_source(_text1, 0)
 
277
        self.assertFalse(di._has_index())
 
278
        self.assertEqual(None, di._dump_index())
 
279
        _ = di.make_delta(_text1)
 
280
        self.assertTrue(di._has_index())
 
281
        hash_list, entry_list = di._dump_index()
 
282
        self.assertEqual(16, len(hash_list))
 
283
        self.assertEqual(68, len(entry_list))
 
284
        just_entries = [(idx, text_offset, hash_val)
 
285
                        for idx, (text_offset, hash_val)
 
286
                         in enumerate(entry_list)
 
287
                         if text_offset != 0 or hash_val != 0]
 
288
        rabin_hash = self._gc_module._rabin_hash
 
289
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
 
290
                          (25, 48, rabin_hash(_text1[33:49])),
 
291
                          (34, 32, rabin_hash(_text1[17:33])),
 
292
                          (47, 64, rabin_hash(_text1[49:65])),
 
293
                         ], just_entries)
 
294
        # This ensures that the hash map points to the location we expect it to
 
295
        for entry_idx, text_offset, hash_val in just_entries:
 
296
            self.assertEqual(entry_idx, hash_list[hash_val & 0xf])
 
297
 
 
298
    def test__dump_index_two_sources(self):
 
299
        di = self._gc_module.DeltaIndex()
 
300
        di.add_source(_text1, 0)
 
301
        di.add_source(_text2, 2)
 
302
        start2 = len(_text1) + 2
 
303
        self.assertTrue(di._has_index())
 
304
        hash_list, entry_list = di._dump_index()
 
305
        self.assertEqual(16, len(hash_list))
 
306
        self.assertEqual(68, len(entry_list))
 
307
        just_entries = [(idx, text_offset, hash_val)
 
308
                        for idx, (text_offset, hash_val)
 
309
                         in enumerate(entry_list)
 
310
                         if text_offset != 0 or hash_val != 0]
 
311
        rabin_hash = self._gc_module._rabin_hash
 
312
        self.assertEqual([(8, 16, rabin_hash(_text1[1:17])),
 
313
                          (9, start2+16, rabin_hash(_text2[1:17])),
 
314
                          (25, 48, rabin_hash(_text1[33:49])),
 
315
                          (30, start2+64, rabin_hash(_text2[49:65])),
 
316
                          (34, 32, rabin_hash(_text1[17:33])),
 
317
                          (35, start2+32, rabin_hash(_text2[17:33])),
 
318
                          (43, start2+48, rabin_hash(_text2[33:49])),
 
319
                          (47, 64, rabin_hash(_text1[49:65])),
 
320
                         ], just_entries)
 
321
        # Each entry should be in the appropriate hash bucket.
 
322
        for entry_idx, text_offset, hash_val in just_entries:
 
323
            hash_idx = hash_val & 0xf
 
324
            self.assertTrue(
 
325
                hash_list[hash_idx] <= entry_idx < hash_list[hash_idx+1])
 
326
 
 
327
    def test_first_add_source_doesnt_index_until_make_delta(self):
 
328
        di = self._gc_module.DeltaIndex()
 
329
        self.assertFalse(di._has_index())
 
330
        di.add_source(_text1, 0)
 
331
        self.assertFalse(di._has_index())
 
332
        # However, asking to make a delta will trigger the index to be
 
333
        # generated, and will generate a proper delta
 
334
        delta = di.make_delta(_text2)
 
335
        self.assertTrue(di._has_index())
 
336
        self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
 
337
 
 
338
    def test_add_source_max_bytes_to_index(self):
 
339
        di = self._gc_module.DeltaIndex()
 
340
        di._max_bytes_to_index = 3*16
 
341
        di.add_source(_text1, 0) # (77 bytes -1) // 3 = 25 byte stride
 
342
        di.add_source(_text3, 3) # (135 bytes -1) // 3 = 44 byte stride
 
343
        start2 = len(_text1) + 3
 
344
        hash_list, entry_list = di._dump_index()
 
345
        self.assertEqual(16, len(hash_list))
 
346
        self.assertEqual(67, len(entry_list))
 
347
        just_entries = sorted([(text_offset, hash_val)
 
348
                               for text_offset, hash_val in entry_list
 
349
                                if text_offset != 0 or hash_val != 0])
 
350
        rabin_hash = self._gc_module._rabin_hash
 
351
        self.assertEqual([(25, rabin_hash(_text1[10:26])),
 
352
                          (50, rabin_hash(_text1[35:51])),
 
353
                          (75, rabin_hash(_text1[60:76])),
 
354
                          (start2+44, rabin_hash(_text3[29:45])),
 
355
                          (start2+88, rabin_hash(_text3[73:89])),
 
356
                          (start2+132, rabin_hash(_text3[117:133])),
 
357
                         ], just_entries)
 
358
 
 
359
    def test_second_add_source_triggers_make_index(self):
 
360
        di = self._gc_module.DeltaIndex()
 
361
        self.assertFalse(di._has_index())
 
362
        di.add_source(_text1, 0)
 
363
        self.assertFalse(di._has_index())
 
364
        di.add_source(_text2, 0)
 
365
        self.assertTrue(di._has_index())
 
366
 
 
367
    def test_make_delta(self):
 
368
        di = self._gc_module.DeltaIndex(_text1)
 
369
        delta = di.make_delta(_text2)
 
370
        self.assertEqual('N\x90/\x1fdiffer from\nagainst other text\n', delta)
 
371
 
 
372
    def test_delta_against_multiple_sources(self):
 
373
        di = self._gc_module.DeltaIndex()
 
374
        di.add_source(_first_text, 0)
 
375
        self.assertEqual(len(_first_text), di._source_offset)
 
376
        di.add_source(_second_text, 0)
 
377
        self.assertEqual(len(_first_text) + len(_second_text),
 
378
                         di._source_offset)
 
379
        delta = di.make_delta(_third_text)
 
380
        result = self._gc_module.apply_delta(_first_text + _second_text, delta)
 
381
        self.assertEqualDiff(_third_text, result)
 
382
        self.assertEqual('\x85\x01\x90\x14\x0chas some in '
 
383
                         '\x91v6\x03and\x91d"\x91:\n', delta)
 
384
 
 
385
    def test_delta_with_offsets(self):
 
386
        di = self._gc_module.DeltaIndex()
 
387
        di.add_source(_first_text, 5)
 
388
        self.assertEqual(len(_first_text) + 5, di._source_offset)
 
389
        di.add_source(_second_text, 10)
 
390
        self.assertEqual(len(_first_text) + len(_second_text) + 15,
 
391
                         di._source_offset)
 
392
        delta = di.make_delta(_third_text)
 
393
        self.assertIsNot(None, delta)
 
394
        result = self._gc_module.apply_delta(
 
395
            '12345' + _first_text + '1234567890' + _second_text, delta)
 
396
        self.assertIsNot(None, result)
 
397
        self.assertEqualDiff(_third_text, result)
 
398
        self.assertEqual('\x85\x01\x91\x05\x14\x0chas some in '
 
399
                         '\x91\x856\x03and\x91s"\x91?\n', delta)
 
400
 
 
401
    def test_delta_with_delta_bytes(self):
 
402
        di = self._gc_module.DeltaIndex()
 
403
        source = _first_text
 
404
        di.add_source(_first_text, 0)
 
405
        self.assertEqual(len(_first_text), di._source_offset)
 
406
        delta = di.make_delta(_second_text)
 
407
        self.assertEqual('h\tsome more\x91\x019'
 
408
                         '&previous text\nand has some extra text\n', delta)
 
409
        di.add_delta_source(delta, 0)
 
410
        source += delta
 
411
        self.assertEqual(len(_first_text) + len(delta), di._source_offset)
 
412
        second_delta = di.make_delta(_third_text)
 
413
        result = self._gc_module.apply_delta(source, second_delta)
 
414
        self.assertEqualDiff(_third_text, result)
 
415
        # We should be able to match against the
 
416
        # 'previous text\nand has some...'  that was part of the delta bytes
 
417
        # Note that we don't match the 'common with the', because it isn't long
 
418
        # enough to match in the original text, and those bytes are not present
 
419
        # in the delta for the second text.
 
420
        self.assertEqual('\x85\x01\x90\x14\x1chas some in common with the '
 
421
                         '\x91S&\x03and\x91\x18,', second_delta)
 
422
        # Add this delta, and create a new delta for the same text. We should
 
423
        # find the remaining text, and only insert the short 'and' text.
 
424
        di.add_delta_source(second_delta, 0)
 
425
        source += second_delta
 
426
        third_delta = di.make_delta(_third_text)
 
427
        result = self._gc_module.apply_delta(source, third_delta)
 
428
        self.assertEqualDiff(_third_text, result)
 
429
        self.assertEqual('\x85\x01\x90\x14\x91\x7e\x1c'
 
430
                         '\x91S&\x03and\x91\x18,', third_delta)
 
431
        # Now create a delta, which we know won't be able to be 'fit' into the
 
432
        # existing index
 
433
        fourth_delta = di.make_delta(_fourth_text)
 
434
        self.assertEqual(_fourth_text,
 
435
                         self._gc_module.apply_delta(source, fourth_delta))
 
436
        self.assertEqual('\x80\x01'
 
437
                         '\x7f123456789012345\nsame rabin hash\n'
 
438
                         '123456789012345\nsame rabin hash\n'
 
439
                         '123456789012345\nsame rabin hash\n'
 
440
                         '123456789012345\nsame rabin hash'
 
441
                         '\x01\n', fourth_delta)
 
442
        di.add_delta_source(fourth_delta, 0)
 
443
        source += fourth_delta
 
444
        # With the next delta, everything should be found
 
445
        fifth_delta = di.make_delta(_fourth_text)
 
446
        self.assertEqual(_fourth_text,
 
447
                         self._gc_module.apply_delta(source, fifth_delta))
 
448
        self.assertEqual('\x80\x01\x91\xa7\x7f\x01\n', fifth_delta)
 
449
 
 
450
 
 
451
class TestCopyInstruction(tests.TestCase):
 
452
 
 
453
    def assertEncode(self, expected, offset, length):
 
454
        bytes = _groupcompress_py.encode_copy_instruction(offset, length)
 
455
        if expected != bytes:
 
456
            self.assertEqual([hex(ord(e)) for e in expected],
 
457
                             [hex(ord(b)) for b in bytes])
 
458
 
 
459
    def assertDecode(self, exp_offset, exp_length, exp_newpos, bytes, pos):
 
460
        cmd = ord(bytes[pos])
 
461
        pos += 1
 
462
        out = _groupcompress_py.decode_copy_instruction(bytes, cmd, pos)
 
463
        self.assertEqual((exp_offset, exp_length, exp_newpos), out)
 
464
 
 
465
    def test_encode_no_length(self):
 
466
        self.assertEncode('\x80', 0, 64*1024)
 
467
        self.assertEncode('\x81\x01', 1, 64*1024)
 
468
        self.assertEncode('\x81\x0a', 10, 64*1024)
 
469
        self.assertEncode('\x81\xff', 255, 64*1024)
 
470
        self.assertEncode('\x82\x01', 256, 64*1024)
 
471
        self.assertEncode('\x83\x01\x01', 257, 64*1024)
 
472
        self.assertEncode('\x8F\xff\xff\xff\xff', 0xFFFFFFFF, 64*1024)
 
473
        self.assertEncode('\x8E\xff\xff\xff', 0xFFFFFF00, 64*1024)
 
474
        self.assertEncode('\x8D\xff\xff\xff', 0xFFFF00FF, 64*1024)
 
475
        self.assertEncode('\x8B\xff\xff\xff', 0xFF00FFFF, 64*1024)
 
476
        self.assertEncode('\x87\xff\xff\xff', 0x00FFFFFF, 64*1024)
 
477
        self.assertEncode('\x8F\x04\x03\x02\x01', 0x01020304, 64*1024)
 
478
 
 
479
    def test_encode_no_offset(self):
 
480
        self.assertEncode('\x90\x01', 0, 1)
 
481
        self.assertEncode('\x90\x0a', 0, 10)
 
482
        self.assertEncode('\x90\xff', 0, 255)
 
483
        self.assertEncode('\xA0\x01', 0, 256)
 
484
        self.assertEncode('\xB0\x01\x01', 0, 257)
 
485
        self.assertEncode('\xB0\xff\xff', 0, 0xFFFF)
 
486
        # Special case, if copy == 64KiB, then we store exactly 0
 
487
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
 
488
        # about that, as we would never actually copy 0 bytes
 
489
        self.assertEncode('\x80', 0, 64*1024)
 
490
 
 
491
    def test_encode(self):
 
492
        self.assertEncode('\x91\x01\x01', 1, 1)
 
493
        self.assertEncode('\x91\x09\x0a', 9, 10)
 
494
        self.assertEncode('\x91\xfe\xff', 254, 255)
 
495
        self.assertEncode('\xA2\x02\x01', 512, 256)
 
496
        self.assertEncode('\xB3\x02\x01\x01\x01', 258, 257)
 
497
        self.assertEncode('\xB0\x01\x01', 0, 257)
 
498
        # Special case, if copy == 64KiB, then we store exactly 0
 
499
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
 
500
        # about that, as we would never actually copy 0 bytes
 
501
        self.assertEncode('\x81\x0a', 10, 64*1024)
 
502
 
 
503
    def test_decode_no_length(self):
 
504
        # If length is 0, it is interpreted as 64KiB
 
505
        # The shortest possible instruction is a copy of 64KiB from offset 0
 
506
        self.assertDecode(0, 65536, 1, '\x80', 0)
 
507
        self.assertDecode(1, 65536, 2, '\x81\x01', 0)
 
508
        self.assertDecode(10, 65536, 2, '\x81\x0a', 0)
 
509
        self.assertDecode(255, 65536, 2, '\x81\xff', 0)
 
510
        self.assertDecode(256, 65536, 2, '\x82\x01', 0)
 
511
        self.assertDecode(257, 65536, 3, '\x83\x01\x01', 0)
 
512
        self.assertDecode(0xFFFFFFFF, 65536, 5, '\x8F\xff\xff\xff\xff', 0)
 
513
        self.assertDecode(0xFFFFFF00, 65536, 4, '\x8E\xff\xff\xff', 0)
 
514
        self.assertDecode(0xFFFF00FF, 65536, 4, '\x8D\xff\xff\xff', 0)
 
515
        self.assertDecode(0xFF00FFFF, 65536, 4, '\x8B\xff\xff\xff', 0)
 
516
        self.assertDecode(0x00FFFFFF, 65536, 4, '\x87\xff\xff\xff', 0)
 
517
        self.assertDecode(0x01020304, 65536, 5, '\x8F\x04\x03\x02\x01', 0)
 
518
 
 
519
    def test_decode_no_offset(self):
 
520
        self.assertDecode(0, 1, 2, '\x90\x01', 0)
 
521
        self.assertDecode(0, 10, 2, '\x90\x0a', 0)
 
522
        self.assertDecode(0, 255, 2, '\x90\xff', 0)
 
523
        self.assertDecode(0, 256, 2, '\xA0\x01', 0)
 
524
        self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
 
525
        self.assertDecode(0, 65535, 3, '\xB0\xff\xff', 0)
 
526
        # Special case, if copy == 64KiB, then we store exactly 0
 
527
        # Note that this puns with a copy of exactly 0 bytes, but we don't care
 
528
        # about that, as we would never actually copy 0 bytes
 
529
        self.assertDecode(0, 65536, 1, '\x80', 0)
 
530
 
 
531
    def test_decode(self):
 
532
        self.assertDecode(1, 1, 3, '\x91\x01\x01', 0)
 
533
        self.assertDecode(9, 10, 3, '\x91\x09\x0a', 0)
 
534
        self.assertDecode(254, 255, 3, '\x91\xfe\xff', 0)
 
535
        self.assertDecode(512, 256, 3, '\xA2\x02\x01', 0)
 
536
        self.assertDecode(258, 257, 5, '\xB3\x02\x01\x01\x01', 0)
 
537
        self.assertDecode(0, 257, 3, '\xB0\x01\x01', 0)
 
538
 
 
539
    def test_decode_not_start(self):
 
540
        self.assertDecode(1, 1, 6, 'abc\x91\x01\x01def', 3)
 
541
        self.assertDecode(9, 10, 5, 'ab\x91\x09\x0ade', 2)
 
542
        self.assertDecode(254, 255, 6, 'not\x91\xfe\xffcopy', 3)
 
543
 
 
544
 
 
545
class TestBase128Int(tests.TestCase):
 
546
 
 
547
    scenarios = module_scenarios()
 
548
 
 
549
    _gc_module = None # Set by load_tests
 
550
 
 
551
    def assertEqualEncode(self, bytes, val):
 
552
        self.assertEqual(bytes, self._gc_module.encode_base128_int(val))
 
553
 
 
554
    def assertEqualDecode(self, val, num_decode, bytes):
 
555
        self.assertEqual((val, num_decode),
 
556
                         self._gc_module.decode_base128_int(bytes))
 
557
 
 
558
    def test_encode(self):
 
559
        self.assertEqualEncode('\x01', 1)
 
560
        self.assertEqualEncode('\x02', 2)
 
561
        self.assertEqualEncode('\x7f', 127)
 
562
        self.assertEqualEncode('\x80\x01', 128)
 
563
        self.assertEqualEncode('\xff\x01', 255)
 
564
        self.assertEqualEncode('\x80\x02', 256)
 
565
        self.assertEqualEncode('\xff\xff\xff\xff\x0f', 0xFFFFFFFF)
 
566
 
 
567
    def test_decode(self):
 
568
        self.assertEqualDecode(1, 1, '\x01')
 
569
        self.assertEqualDecode(2, 1, '\x02')
 
570
        self.assertEqualDecode(127, 1, '\x7f')
 
571
        self.assertEqualDecode(128, 2, '\x80\x01')
 
572
        self.assertEqualDecode(255, 2, '\xff\x01')
 
573
        self.assertEqualDecode(256, 2, '\x80\x02')
 
574
        self.assertEqualDecode(0xFFFFFFFF, 5, '\xff\xff\xff\xff\x0f')
 
575
 
 
576
    def test_decode_with_trailing_bytes(self):
 
577
        self.assertEqualDecode(1, 1, '\x01abcdef')
 
578
        self.assertEqualDecode(127, 1, '\x7f\x01')
 
579
        self.assertEqualDecode(128, 2, '\x80\x01abcdef')
 
580
        self.assertEqualDecode(255, 2, '\xff\x01\xff')
 
581
 
288
582