1
# Copyright (C) 2008, 2009 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for group compression."""
25
from bzrlib.osutils import sha_string
26
from bzrlib.tests import (
27
TestCaseWithTransport,
34
class TestGroupCompressor(tests.TestCase):
35
"""Tests for GroupCompressor"""
37
def test_empty_delta(self):
38
compressor = groupcompress.GroupCompressor(True)
39
self.assertEqual([], compressor.lines)
41
def test_one_nosha_delta(self):
43
compressor = groupcompress.GroupCompressor(True)
44
sha1, end_point, _, _ = compressor.compress(('label',),
45
'strange\ncommon\n', None)
46
self.assertEqual(sha_string('strange\ncommon\n'), sha1)
48
'f', '\x0f', 'strange\ncommon\n',
50
self.assertEqual(expected_lines, compressor.lines)
51
self.assertEqual(sum(map(len, expected_lines)), end_point)
53
def _chunks_to_repr_lines(self, chunks):
54
return '\n'.join(map(repr, ''.join(chunks).split('\n')))
56
def assertEqualDiffEncoded(self, expected, actual):
57
"""Compare the actual content to the expected content.
59
:param expected: A group of chunks that we expect to see
60
:param actual: The measured 'chunks'
62
We will transform the chunks back into lines, and then run 'repr()'
63
over them to handle non-ascii characters.
65
self.assertEqualDiff(self._chunks_to_repr_lines(expected),
66
self._chunks_to_repr_lines(actual))
68
def test_two_nosha_delta(self):
69
compressor = groupcompress.GroupCompressor(True)
70
sha1_1, _, _, _ = compressor.compress(('label',),
71
'strange\ncommon long line\nthat needs a 16 byte match\n', None)
72
expected_lines = list(compressor.lines)
73
sha1_2, end_point, _, _ = compressor.compress(('newlabel',),
74
'common long line\nthat needs a 16 byte match\ndifferent\n', None)
75
self.assertEqual(sha_string('common long line\n'
76
'that needs a 16 byte match\n'
77
'different\n'), sha1_2)
78
expected_lines.extend([
79
# 'delta', delta length
81
# source and target length
83
# copy the line common
84
'\x91\x0a\x2c', #copy, offset 0x0a, len 0x2c
85
# add the line different, and the trailing newline
86
'\x0adifferent\n', # insert 10 bytes
88
self.assertEqualDiffEncoded(expected_lines, compressor.lines)
89
self.assertEqual(sum(map(len, expected_lines)), end_point)
91
def test_three_nosha_delta(self):
92
# The first interesting test: make a change that should use lines from
94
compressor = groupcompress.GroupCompressor(True)
95
sha1_1, end_point, _, _ = compressor.compress(('label',),
96
'strange\ncommon very very long line\nwith some extra text\n', None)
97
sha1_2, _, _, _ = compressor.compress(('newlabel',),
98
'different\nmoredifferent\nand then some more\n', None)
99
expected_lines = list(compressor.lines)
100
sha1_3, end_point, _, _ = compressor.compress(('label3',),
101
'new\ncommon very very long line\nwith some extra text\n'
102
'different\nmoredifferent\nand then some more\n',
105
sha_string('new\ncommon very very long line\nwith some extra text\n'
106
'different\nmoredifferent\nand then some more\n'),
108
expected_lines.extend([
109
# 'delta', delta length
111
# source and target length
115
# Copy of first parent 'common' range
116
'\x91\x09\x31' # copy, offset 0x09, 0x31 bytes
117
# Copy of second parent 'different' range
118
'\x91\x3c\x2b' # copy, offset 0x3c, 0x2b bytes
120
self.assertEqualDiffEncoded(expected_lines, compressor.lines)
121
self.assertEqual(sum(map(len, expected_lines)), end_point)
123
def test_stats(self):
124
compressor = groupcompress.GroupCompressor(True)
125
compressor.compress(('label',), 'strange\ncommon long line\n'
126
'plus more text\n', None)
127
compressor.compress(('newlabel',),
128
'common long line\nplus more text\n'
129
'different\nmoredifferent\n', None)
130
compressor.compress(('label3',),
131
'new\ncommon long line\nplus more text\n'
132
'\ndifferent\nmoredifferent\n', None)
133
self.assertAlmostEqual(1.4, compressor.ratio(), 1)
135
def test_extract_from_compressor(self):
136
# Knit fetching will try to reconstruct texts locally which results in
137
# reading something that is in the compressor stream already.
138
compressor = groupcompress.GroupCompressor(True)
139
sha1_1, _, _, _ = compressor.compress(('label',),
140
'strange\ncommon long line\nthat needs a 16 byte match\n', None)
141
expected_lines = list(compressor.lines)
142
sha1_2, end_point, _, _ = compressor.compress(('newlabel',),
143
'common long line\nthat needs a 16 byte match\ndifferent\n', None)
145
self.assertEqual(('strange\ncommon long line\n'
146
'that needs a 16 byte match\n', sha1_1),
147
compressor.extract(('label',)))
149
self.assertEqual(('common long line\nthat needs a 16 byte match\n'
150
'different\n', sha1_2),
151
compressor.extract(('newlabel',)))
154
class TestBase128Int(tests.TestCase):
156
def assertEqualEncode(self, bytes, val):
157
self.assertEqual(bytes, groupcompress.encode_base128_int(val))
159
def assertEqualDecode(self, val, num_decode, bytes):
160
self.assertEqual((val, num_decode),
161
groupcompress.decode_base128_int(bytes))
163
def test_encode(self):
164
self.assertEqualEncode('\x01', 1)
165
self.assertEqualEncode('\x02', 2)
166
self.assertEqualEncode('\x7f', 127)
167
self.assertEqualEncode('\x80\x01', 128)
168
self.assertEqualEncode('\xff\x01', 255)
169
self.assertEqualEncode('\x80\x02', 256)
170
self.assertEqualEncode('\xff\xff\xff\xff\x0f', 0xFFFFFFFF)
172
def test_decode(self):
173
self.assertEqualDecode(1, 1, '\x01')
174
self.assertEqualDecode(2, 1, '\x02')
175
self.assertEqualDecode(127, 1, '\x7f')
176
self.assertEqualDecode(128, 2, '\x80\x01')
177
self.assertEqualDecode(255, 2, '\xff\x01')
178
self.assertEqualDecode(256, 2, '\x80\x02')
179
self.assertEqualDecode(0xFFFFFFFF, 5, '\xff\xff\xff\xff\x0f')
181
def test_decode_with_trailing_bytes(self):
182
self.assertEqualDecode(1, 1, '\x01abcdef')
183
self.assertEqualDecode(127, 1, '\x7f\x01')
184
self.assertEqualDecode(128, 2, '\x80\x01abcdef')
185
self.assertEqualDecode(255, 2, '\xff\x01\xff')
188
class TestGroupCompressBlock(tests.TestCase):
190
def test_from_empty_bytes(self):
191
self.assertRaises(ValueError,
192
groupcompress.GroupCompressBlock.from_bytes, '')
194
def test_from_minimal_bytes(self):
195
block = groupcompress.GroupCompressBlock.from_bytes('gcb1z\n0\n0\n')
196
self.assertIsInstance(block, groupcompress.GroupCompressBlock)
197
self.assertEqual({}, block._entries)
199
def test_from_bytes(self):
201
'gcb1z\n' # group compress block v1 plain
202
'76\n' # Length of zlib bytes
203
'183\n' # Length of all meta-info
206
'sha1:abcdabcdabcdabcdabcdabcdabcdabcdabcdabcd\n'
212
'sha1:abcdabcdabcdabcdabcdabcdabcdabcdabcdabcd\n'
217
block = groupcompress.GroupCompressBlock.from_bytes(
219
self.assertIs(None, block._content)
220
self.assertIsInstance(block, groupcompress.GroupCompressBlock)
221
self.assertEqual([('bing',), ('foo', 'bar')], sorted(block._entries))
222
bing = block._entries[('bing',)]
223
self.assertEqual(('bing',), bing.key)
224
self.assertEqual('fulltext', bing.type)
225
self.assertEqual('abcd'*10, bing.sha1)
226
self.assertEqual(100, bing.start)
227
self.assertEqual(100, bing.length)
228
foobar = block._entries[('foo', 'bar')]
229
self.assertEqual(('foo', 'bar'), foobar.key)
230
self.assertEqual('fulltext', foobar.type)
231
self.assertEqual('abcd'*10, foobar.sha1)
232
self.assertEqual(0, foobar.start)
233
self.assertEqual(100, foobar.length)
235
def test_add_entry(self):
236
gcb = groupcompress.GroupCompressBlock()
237
e = gcb.add_entry(('foo', 'bar'), 'fulltext', 'abcd'*10, 0, 100)
238
self.assertIsInstance(e, groupcompress.GroupCompressBlockEntry)
239
self.assertEqual(('foo', 'bar'), e.key)
240
self.assertEqual('fulltext', e.type)
241
self.assertEqual('abcd'*10, e.sha1)
242
self.assertEqual(0, e.start)
243
self.assertEqual(100, e.length)
245
def test_to_bytes(self):
246
gcb = groupcompress.GroupCompressBlock()
247
gcb.add_entry(('foo', 'bar'), 'fulltext', 'abcd'*10, 0, 100)
248
gcb.add_entry(('bing',), 'fulltext', 'abcd'*10, 100, 100)
249
bytes = gcb.to_bytes()
250
self.assertStartsWith(bytes,
251
'gcb1z\n' # group compress block v1 zlib
252
'76\n' # Length of compressed bytes
253
'183\n' # Length of all meta-info
255
remaining_bytes = bytes[13:]
256
raw_bytes = zlib.decompress(remaining_bytes)
257
self.assertEqualDiff('key:bing\n'
258
'sha1:abcdabcdabcdabcdabcdabcdabcdabcdabcdabcd\n'
264
'sha1:abcdabcdabcdabcdabcdabcdabcdabcdabcdabcd\n'