1
# groupcompress, a bzr plugin providing new compression logic.
2
# Copyright (C) 2008 Canonical Limited.
4
# This program is free software; you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License version 2 as published
6
# by the Free Software Foundation.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
"""Tests for group compression."""
26
from bzrlib.osutils import sha_string
27
from bzrlib.tests import (
28
TestCaseWithTransport,
35
class TestGroupCompressor(tests.TestCase):
36
"""Tests for GroupCompressor"""
38
def test_empty_delta(self):
39
compressor = groupcompress.GroupCompressor(True)
40
self.assertEqual([], compressor.lines)
42
def test_one_nosha_delta(self):
44
compressor = groupcompress.GroupCompressor(True)
45
sha1, end_point, _, _ = compressor.compress(('label',),
46
'strange\ncommon\n', None)
47
self.assertEqual(sha_string('strange\ncommon\n'), sha1)
49
'f', '\x0f', 'strange\ncommon\n',
51
self.assertEqual(expected_lines, compressor.lines)
52
self.assertEqual(sum(map(len, expected_lines)), end_point)
54
def _chunks_to_repr_lines(self, chunks):
55
return '\n'.join(map(repr, ''.join(chunks).split('\n')))
57
def assertEqualDiffEncoded(self, expected, actual):
58
"""Compare the actual content to the expected content.
60
:param expected: A group of chunks that we expect to see
61
:param actual: The measured 'chunks'
63
We will transform the chunks back into lines, and then run 'repr()'
64
over them to handle non-ascii characters.
66
self.assertEqualDiff(self._chunks_to_repr_lines(expected),
67
self._chunks_to_repr_lines(actual))
69
def test_two_nosha_delta(self):
70
compressor = groupcompress.GroupCompressor(True)
71
sha1_1, _, _, _ = compressor.compress(('label',),
72
'strange\ncommon long line\nthat needs a 16 byte match\n', None)
73
expected_lines = list(compressor.lines)
74
sha1_2, end_point, _, _ = compressor.compress(('newlabel',),
75
'common long line\nthat needs a 16 byte match\ndifferent\n', None)
76
self.assertEqual(sha_string('common long line\n'
77
'that needs a 16 byte match\n'
78
'different\n'), sha1_2)
79
expected_lines.extend([
80
# 'delta', delta length
82
# source and target length
84
# copy the line common
85
'\x91\x0a\x2c', #copy, offset 0x0a, len 0x2c
86
# add the line different, and the trailing newline
87
'\x0adifferent\n', # insert 10 bytes
89
self.assertEqualDiffEncoded(expected_lines, compressor.lines)
90
self.assertEqual(sum(map(len, expected_lines)), end_point)
92
def test_three_nosha_delta(self):
93
# The first interesting test: make a change that should use lines from
95
compressor = groupcompress.GroupCompressor(True)
96
sha1_1, end_point, _, _ = compressor.compress(('label',),
97
'strange\ncommon very very long line\nwith some extra text\n', None)
98
sha1_2, _, _, _ = compressor.compress(('newlabel',),
99
'different\nmoredifferent\nand then some more\n', None)
100
expected_lines = list(compressor.lines)
101
sha1_3, end_point, _, _ = compressor.compress(('label3',),
102
'new\ncommon very very long line\nwith some extra text\n'
103
'different\nmoredifferent\nand then some more\n',
106
sha_string('new\ncommon very very long line\nwith some extra text\n'
107
'different\nmoredifferent\nand then some more\n'),
109
expected_lines.extend([
110
# 'delta', delta length
112
# source and target length
116
# Copy of first parent 'common' range
117
'\x91\x09\x31' # copy, offset 0x09, 0x31 bytes
118
# Copy of second parent 'different' range
119
'\x91\x3c\x2b' # copy, offset 0x3c, 0x2b bytes
121
self.assertEqualDiffEncoded(expected_lines, compressor.lines)
122
self.assertEqual(sum(map(len, expected_lines)), end_point)
124
def test_stats(self):
125
compressor = groupcompress.GroupCompressor(True)
126
compressor.compress(('label',), 'strange\ncommon long line\n'
127
'plus more text\n', None)
128
compressor.compress(('newlabel',),
129
'common long line\nplus more text\n'
130
'different\nmoredifferent\n', None)
131
compressor.compress(('label3',),
132
'new\ncommon long line\nplus more text\n'
133
'\ndifferent\nmoredifferent\n', None)
134
self.assertAlmostEqual(1.4, compressor.ratio(), 1)
136
def test_extract_from_compressor(self):
137
# Knit fetching will try to reconstruct texts locally which results in
138
# reading something that is in the compressor stream already.
139
compressor = groupcompress.GroupCompressor(True)
140
sha1_1, _, _, _ = compressor.compress(('label',),
141
'strange\ncommon long line\nthat needs a 16 byte match\n', None)
142
expected_lines = list(compressor.lines)
143
sha1_2, end_point, _, _ = compressor.compress(('newlabel',),
144
'common long line\nthat needs a 16 byte match\ndifferent\n', None)
146
self.assertEqual(('strange\ncommon long line\n'
147
'that needs a 16 byte match\n', sha1_1),
148
compressor.extract(('label',)))
150
self.assertEqual(('common long line\nthat needs a 16 byte match\n'
151
'different\n', sha1_2),
152
compressor.extract(('newlabel',)))
155
class TestBase128Int(tests.TestCase):
157
def assertEqualEncode(self, bytes, val):
158
self.assertEqual(bytes, groupcompress.encode_base128_int(val))
160
def assertEqualDecode(self, val, num_decode, bytes):
161
self.assertEqual((val, num_decode),
162
groupcompress.decode_base128_int(bytes))
164
def test_encode(self):
165
self.assertEqualEncode('\x01', 1)
166
self.assertEqualEncode('\x02', 2)
167
self.assertEqualEncode('\x7f', 127)
168
self.assertEqualEncode('\x80\x01', 128)
169
self.assertEqualEncode('\xff\x01', 255)
170
self.assertEqualEncode('\x80\x02', 256)
171
self.assertEqualEncode('\xff\xff\xff\xff\x0f', 0xFFFFFFFF)
173
def test_decode(self):
174
self.assertEqualDecode(1, 1, '\x01')
175
self.assertEqualDecode(2, 1, '\x02')
176
self.assertEqualDecode(127, 1, '\x7f')
177
self.assertEqualDecode(128, 2, '\x80\x01')
178
self.assertEqualDecode(255, 2, '\xff\x01')
179
self.assertEqualDecode(256, 2, '\x80\x02')
180
self.assertEqualDecode(0xFFFFFFFF, 5, '\xff\xff\xff\xff\x0f')
182
def test_decode_with_trailing_bytes(self):
183
self.assertEqualDecode(1, 1, '\x01abcdef')
184
self.assertEqualDecode(127, 1, '\x7f\x01')
185
self.assertEqualDecode(128, 2, '\x80\x01abcdef')
186
self.assertEqualDecode(255, 2, '\xff\x01\xff')
189
class TestGroupCompressBlock(tests.TestCase):
191
def test_from_empty_bytes(self):
192
self.assertRaises(ValueError,
193
groupcompress.GroupCompressBlock.from_bytes, '')
195
def test_from_minimal_bytes(self):
196
block = groupcompress.GroupCompressBlock.from_bytes('gcb1z\n0\n0\n')
197
self.assertIsInstance(block, groupcompress.GroupCompressBlock)
198
self.assertEqual({}, block._entries)
200
def test_from_bytes(self):
202
'gcb1z\n' # group compress block v1 plain
203
'76\n' # Length of zlib bytes
204
'183\n' # Length of all meta-info
207
'sha1:abcdabcdabcdabcdabcdabcdabcdabcdabcdabcd\n'
213
'sha1:abcdabcdabcdabcdabcdabcdabcdabcdabcdabcd\n'
218
block = groupcompress.GroupCompressBlock.from_bytes(
220
self.assertIs(None, block._content)
221
self.assertIsInstance(block, groupcompress.GroupCompressBlock)
222
self.assertEqual([('bing',), ('foo', 'bar')], sorted(block._entries))
223
bing = block._entries[('bing',)]
224
self.assertEqual(('bing',), bing.key)
225
self.assertEqual('fulltext', bing.type)
226
self.assertEqual('abcd'*10, bing.sha1)
227
self.assertEqual(100, bing.start)
228
self.assertEqual(100, bing.length)
229
foobar = block._entries[('foo', 'bar')]
230
self.assertEqual(('foo', 'bar'), foobar.key)
231
self.assertEqual('fulltext', foobar.type)
232
self.assertEqual('abcd'*10, foobar.sha1)
233
self.assertEqual(0, foobar.start)
234
self.assertEqual(100, foobar.length)
236
def test_add_entry(self):
237
gcb = groupcompress.GroupCompressBlock()
238
e = gcb.add_entry(('foo', 'bar'), 'fulltext', 'abcd'*10, 0, 100)
239
self.assertIsInstance(e, groupcompress.GroupCompressBlockEntry)
240
self.assertEqual(('foo', 'bar'), e.key)
241
self.assertEqual('fulltext', e.type)
242
self.assertEqual('abcd'*10, e.sha1)
243
self.assertEqual(0, e.start)
244
self.assertEqual(100, e.length)
246
def test_to_bytes(self):
247
gcb = groupcompress.GroupCompressBlock()
248
gcb.add_entry(('foo', 'bar'), 'fulltext', 'abcd'*10, 0, 100)
249
gcb.add_entry(('bing',), 'fulltext', 'abcd'*10, 100, 100)
250
bytes = gcb.to_bytes()
251
self.assertStartsWith(bytes,
252
'gcb1z\n' # group compress block v1 zlib
253
'76\n' # Length of compressed bytes
254
'183\n' # Length of all meta-info
256
remaining_bytes = bytes[13:]
257
raw_bytes = zlib.decompress(remaining_bytes)
258
self.assertEqualDiff('key:bing\n'
259
'sha1:abcdabcdabcdabcdabcdabcdabcdabcdabcdabcd\n'
265
'sha1:abcdabcdabcdabcdabcdabcdabcdabcdabcdabcd\n'