~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_groupcompress.py

merge bzr.dev@4126 into brisbane-core

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2008, 2009 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for group compression."""
 
18
 
 
19
import zlib
 
20
 
 
21
from bzrlib import (
 
22
    groupcompress,
 
23
    errors,
 
24
    osutils,
 
25
    tests,
 
26
    versionedfile,
 
27
    )
 
28
from bzrlib.osutils import sha_string
 
29
from bzrlib.tests import (
 
30
    TestCaseWithTransport,
 
31
    multiply_tests,
 
32
    )
 
33
 
 
34
 
 
35
class TestGroupCompressor(tests.TestCase):
 
36
    """Tests for GroupCompressor"""
 
37
 
 
38
    def test_empty_delta(self):
 
39
        compressor = groupcompress.GroupCompressor()
 
40
        self.assertEqual([], compressor.lines)
 
41
 
 
42
    def test_one_nosha_delta(self):
 
43
        # diff against NUKK
 
44
        compressor = groupcompress.GroupCompressor()
 
45
        sha1, start_point, end_point, _, _ = compressor.compress(('label',),
 
46
            'strange\ncommon\n', None)
 
47
        self.assertEqual(sha_string('strange\ncommon\n'), sha1)
 
48
        expected_lines = [
 
49
            'f', '\x0f', 'strange\ncommon\n',
 
50
            ]
 
51
        self.assertEqual(expected_lines, compressor.lines)
 
52
        self.assertEqual(0, start_point)
 
53
        self.assertEqual(sum(map(len, expected_lines)), end_point)
 
54
 
 
55
    def test_empty_content(self):
 
56
        compressor = groupcompress.GroupCompressor()
 
57
        # Adding empty bytes should return the 'null' record
 
58
        sha1, start_point, end_point, kind, _ = compressor.compress(('empty',),
 
59
            '', None)
 
60
        self.assertEqual(0, start_point)
 
61
        self.assertEqual(0, end_point)
 
62
        self.assertEqual('fulltext', kind)
 
63
        self.assertEqual(groupcompress._null_sha1, sha1)
 
64
        self.assertEqual(0, compressor.endpoint)
 
65
        self.assertEqual([], compressor.lines)
 
66
        # Even after adding some content
 
67
        compressor.compress(('content',), 'some\nbytes\n', None)
 
68
        self.assertTrue(compressor.endpoint > 0)
 
69
        sha1, start_point, end_point, kind, _ = compressor.compress(('empty2',),
 
70
            '', None)
 
71
        self.assertEqual(0, start_point)
 
72
        self.assertEqual(0, end_point)
 
73
        self.assertEqual('fulltext', kind)
 
74
        self.assertEqual(groupcompress._null_sha1, sha1)
 
75
 
 
76
    def _chunks_to_repr_lines(self, chunks):
 
77
        return '\n'.join(map(repr, ''.join(chunks).split('\n')))
 
78
 
 
79
    def assertEqualDiffEncoded(self, expected, actual):
 
80
        """Compare the actual content to the expected content.
 
81
 
 
82
        :param expected: A group of chunks that we expect to see
 
83
        :param actual: The measured 'chunks'
 
84
 
 
85
        We will transform the chunks back into lines, and then run 'repr()'
 
86
        over them to handle non-ascii characters.
 
87
        """
 
88
        self.assertEqualDiff(self._chunks_to_repr_lines(expected),
 
89
                             self._chunks_to_repr_lines(actual))
 
90
 
 
91
    def test_two_nosha_delta(self):
 
92
        compressor = groupcompress.GroupCompressor()
 
93
        sha1_1, _, _, _, _ = compressor.compress(('label',),
 
94
            'strange\ncommon long line\nthat needs a 16 byte match\n', None)
 
95
        expected_lines = list(compressor.lines)
 
96
        sha1_2, start_point, end_point, _, _ = compressor.compress(('newlabel',),
 
97
            'common long line\nthat needs a 16 byte match\ndifferent\n', None)
 
98
        self.assertEqual(sha_string('common long line\n'
 
99
                                    'that needs a 16 byte match\n'
 
100
                                    'different\n'), sha1_2)
 
101
        expected_lines.extend([
 
102
            # 'delta', delta length
 
103
            'd\x10',
 
104
            # source and target length
 
105
            '\x36\x36',
 
106
            # copy the line common
 
107
            '\x91\x0a\x2c', #copy, offset 0x0a, len 0x2c
 
108
            # add the line different, and the trailing newline
 
109
            '\x0adifferent\n', # insert 10 bytes
 
110
            ])
 
111
        self.assertEqualDiffEncoded(expected_lines, compressor.lines)
 
112
        self.assertEqual(sum(map(len, expected_lines)), end_point)
 
113
 
 
114
    def test_three_nosha_delta(self):
 
115
        # The first interesting test: make a change that should use lines from
 
116
        # both parents.
 
117
        compressor = groupcompress.GroupCompressor()
 
118
        sha1_1, _, _, _, _ = compressor.compress(('label',),
 
119
            'strange\ncommon very very long line\nwith some extra text\n', None)
 
120
        sha1_2, _, _, _, _ = compressor.compress(('newlabel',),
 
121
            'different\nmoredifferent\nand then some more\n', None)
 
122
        expected_lines = list(compressor.lines)
 
123
        sha1_3, start_point, end_point, _, _ = compressor.compress(('label3',),
 
124
            'new\ncommon very very long line\nwith some extra text\n'
 
125
            'different\nmoredifferent\nand then some more\n',
 
126
            None)
 
127
        self.assertEqual(
 
128
            sha_string('new\ncommon very very long line\nwith some extra text\n'
 
129
                       'different\nmoredifferent\nand then some more\n'),
 
130
            sha1_3)
 
131
        expected_lines.extend([
 
132
            # 'delta', delta length
 
133
            'd\x0c',
 
134
            # source and target length
 
135
            '\x67\x5f'
 
136
            # insert new
 
137
            '\x03new',
 
138
            # Copy of first parent 'common' range
 
139
            '\x91\x09\x31' # copy, offset 0x09, 0x31 bytes
 
140
            # Copy of second parent 'different' range
 
141
            '\x91\x3c\x2b' # copy, offset 0x3c, 0x2b bytes
 
142
            ])
 
143
        self.assertEqualDiffEncoded(expected_lines, compressor.lines)
 
144
        self.assertEqual(sum(map(len, expected_lines)), end_point)
 
145
 
 
146
    def test_stats(self):
 
147
        compressor = groupcompress.GroupCompressor()
 
148
        compressor.compress(('label',), 'strange\ncommon long line\n'
 
149
                                        'plus more text\n', None)
 
150
        compressor.compress(('newlabel',),
 
151
                            'common long line\nplus more text\n'
 
152
                            'different\nmoredifferent\n', None)
 
153
        compressor.compress(('label3',),
 
154
                            'new\ncommon long line\nplus more text\n'
 
155
                            '\ndifferent\nmoredifferent\n', None)
 
156
        self.assertAlmostEqual(1.4, compressor.ratio(), 1)
 
157
 
 
158
    def test_extract_from_compressor(self):
 
159
        # Knit fetching will try to reconstruct texts locally which results in
 
160
        # reading something that is in the compressor stream already.
 
161
        compressor = groupcompress.GroupCompressor()
 
162
        sha1_1, _, _, _, _ = compressor.compress(('label',),
 
163
            'strange\ncommon long line\nthat needs a 16 byte match\n', None)
 
164
        expected_lines = list(compressor.lines)
 
165
        sha1_2, _, end_point, _, _ = compressor.compress(('newlabel',),
 
166
            'common long line\nthat needs a 16 byte match\ndifferent\n', None)
 
167
        # get the first out
 
168
        self.assertEqual(('strange\ncommon long line\n'
 
169
                          'that needs a 16 byte match\n', sha1_1),
 
170
            compressor.extract(('label',)))
 
171
        # and the second
 
172
        self.assertEqual(('common long line\nthat needs a 16 byte match\n'
 
173
                          'different\n', sha1_2),
 
174
                         compressor.extract(('newlabel',)))
 
175
 
 
176
 
 
177
class TestBase128Int(tests.TestCase):
 
178
 
 
179
    def assertEqualEncode(self, bytes, val):
 
180
        self.assertEqual(bytes, groupcompress.encode_base128_int(val))
 
181
 
 
182
    def assertEqualDecode(self, val, num_decode, bytes):
 
183
        self.assertEqual((val, num_decode),
 
184
                         groupcompress.decode_base128_int(bytes))
 
185
 
 
186
    def test_encode(self):
 
187
        self.assertEqualEncode('\x01', 1)
 
188
        self.assertEqualEncode('\x02', 2)
 
189
        self.assertEqualEncode('\x7f', 127)
 
190
        self.assertEqualEncode('\x80\x01', 128)
 
191
        self.assertEqualEncode('\xff\x01', 255)
 
192
        self.assertEqualEncode('\x80\x02', 256)
 
193
        self.assertEqualEncode('\xff\xff\xff\xff\x0f', 0xFFFFFFFF)
 
194
 
 
195
    def test_decode(self):
 
196
        self.assertEqualDecode(1, 1, '\x01')
 
197
        self.assertEqualDecode(2, 1, '\x02')
 
198
        self.assertEqualDecode(127, 1, '\x7f')
 
199
        self.assertEqualDecode(128, 2, '\x80\x01')
 
200
        self.assertEqualDecode(255, 2, '\xff\x01')
 
201
        self.assertEqualDecode(256, 2, '\x80\x02')
 
202
        self.assertEqualDecode(0xFFFFFFFF, 5, '\xff\xff\xff\xff\x0f')
 
203
 
 
204
    def test_decode_with_trailing_bytes(self):
 
205
        self.assertEqualDecode(1, 1, '\x01abcdef')
 
206
        self.assertEqualDecode(127, 1, '\x7f\x01')
 
207
        self.assertEqualDecode(128, 2, '\x80\x01abcdef')
 
208
        self.assertEqualDecode(255, 2, '\xff\x01\xff')
 
209
 
 
210
 
 
211
class TestGroupCompressBlock(tests.TestCase):
 
212
 
 
213
    def make_block(self, key_to_text):
 
214
        """Create a GroupCompressBlock, filling it with the given texts."""
 
215
        compressor = groupcompress.GroupCompressor()
 
216
        start = 0
 
217
        for key in sorted(key_to_text):
 
218
            compressor.compress(key, key_to_text[key], None)
 
219
        block = compressor.flush()
 
220
        entries = block._entries
 
221
        # Go through from_bytes(to_bytes()) so that we start with a compressed
 
222
        # content object
 
223
        return entries, groupcompress.GroupCompressBlock.from_bytes(
 
224
            block.to_bytes())
 
225
 
 
226
    def test_from_empty_bytes(self):
 
227
        self.assertRaises(ValueError,
 
228
                          groupcompress.GroupCompressBlock.from_bytes, '')
 
229
 
 
230
    def test_from_minimal_bytes(self):
 
231
        block = groupcompress.GroupCompressBlock.from_bytes(
 
232
            'gcb1z\n0\n0\n0\n0\n')
 
233
        self.assertIsInstance(block, groupcompress.GroupCompressBlock)
 
234
        self.assertEqual({}, block._entries)
 
235
        self.assertIs(None, block._content)
 
236
        self.assertEqual('', block._z_content)
 
237
        block._ensure_content()
 
238
        self.assertEqual('', block._content)
 
239
        self.assertEqual('', block._z_content)
 
240
        block._ensure_content() # Ensure content is safe to call 2x
 
241
 
 
242
    def test_from_bytes_with_labels(self):
 
243
        header = ('key:bing\n'
 
244
            'sha1:abcdabcdabcdabcdabcdabcdabcdabcdabcdabcd\n'
 
245
            'type:fulltext\n'
 
246
            'start:100\n'
 
247
            'length:100\n'
 
248
            '\n'
 
249
            'key:foo\x00bar\n'
 
250
            'sha1:abcdabcdabcdabcdabcdabcdabcdabcdabcdabcd\n'
 
251
            'type:fulltext\n'
 
252
            'start:0\n'
 
253
            'length:100\n'
 
254
            '\n')
 
255
        z_header = zlib.compress(header)
 
256
        content = ('a tiny bit of content\n')
 
257
        z_content = zlib.compress(content)
 
258
        z_bytes = (
 
259
            'gcb1z\n' # group compress block v1 plain
 
260
            '%d\n' # Length of zlib bytes
 
261
            '%d\n' # Length of all meta-info
 
262
            '%d\n' # Length of compressed content
 
263
            '%d\n' # Length of uncompressed content
 
264
            '%s'   # Compressed header
 
265
            '%s'   # Compressed content
 
266
            ) % (len(z_header), len(header),
 
267
                 len(z_content), len(content),
 
268
                 z_header, z_content)
 
269
        block = groupcompress.GroupCompressBlock.from_bytes(
 
270
            z_bytes)
 
271
        block._parse_header()
 
272
        self.assertIsInstance(block, groupcompress.GroupCompressBlock)
 
273
        self.assertEqual([('bing',), ('foo', 'bar')], sorted(block._entries))
 
274
        bing = block._entries[('bing',)]
 
275
        self.assertEqual(('bing',), bing.key)
 
276
        self.assertEqual('fulltext', bing.type)
 
277
        self.assertEqual('abcd'*10, bing.sha1)
 
278
        self.assertEqual(100, bing.start)
 
279
        self.assertEqual(100, bing.length)
 
280
        foobar = block._entries[('foo', 'bar')]
 
281
        self.assertEqual(('foo', 'bar'), foobar.key)
 
282
        self.assertEqual('fulltext', foobar.type)
 
283
        self.assertEqual('abcd'*10, foobar.sha1)
 
284
        self.assertEqual(0, foobar.start)
 
285
        self.assertEqual(100, foobar.length)
 
286
        self.assertEqual(z_content, block._z_content)
 
287
        self.assertIs(None, block._content)
 
288
        block._ensure_content()
 
289
        self.assertEqual(z_content, block._z_content)
 
290
        self.assertEqual(content, block._content)
 
291
 
 
292
    def test_from_old_bytes(self):
 
293
        # Backwards compatibility, with groups that didn't define content length
 
294
        content = ('a tiny bit of content\n')
 
295
        z_content = zlib.compress(content)
 
296
        z_bytes = (
 
297
            'gcb1z\n' # group compress block v1 plain
 
298
            '0\n' # Length of zlib bytes
 
299
            '0\n' # Length of all meta-info
 
300
            ''    # Compressed header
 
301
            '%s'   # Compressed content
 
302
            ) % (z_content)
 
303
        block = groupcompress.GroupCompressBlock.from_bytes(
 
304
            z_bytes)
 
305
        self.assertIsInstance(block, groupcompress.GroupCompressBlock)
 
306
        block._ensure_content()
 
307
        self.assertEqual(z_content, block._z_content)
 
308
        self.assertEqual(content, block._content)
 
309
 
 
310
    def test_add_entry(self):
 
311
        gcb = groupcompress.GroupCompressBlock()
 
312
        e = gcb.add_entry(('foo', 'bar'), 'fulltext', 'abcd'*10, 0, 100)
 
313
        self.assertIsInstance(e, groupcompress.GroupCompressBlockEntry)
 
314
        self.assertEqual(('foo', 'bar'), e.key)
 
315
        self.assertEqual('fulltext', e.type)
 
316
        self.assertEqual('abcd'*10, e.sha1)
 
317
        self.assertEqual(0, e.start)
 
318
        self.assertEqual(100, e.length)
 
319
 
 
320
    def test_to_bytes(self):
 
321
        no_labels = groupcompress._NO_LABELS
 
322
        def reset():
 
323
            groupcompress._NO_LABELS = no_labels
 
324
        self.addCleanup(reset)
 
325
        groupcompress._NO_LABELS = False
 
326
        gcb = groupcompress.GroupCompressBlock()
 
327
        gcb.add_entry(('foo', 'bar'), 'fulltext', 'abcd'*10, 0, 100)
 
328
        gcb.add_entry(('bing',), 'fulltext', 'abcd'*10, 100, 100)
 
329
        gcb.set_content('this is some content\n'
 
330
                        'this content will be compressed\n')
 
331
        bytes = gcb.to_bytes()
 
332
        expected_header =('gcb1z\n' # group compress block v1 zlib
 
333
                          '76\n' # Length of compressed bytes
 
334
                          '183\n' # Length of uncompressed meta-info
 
335
                          '50\n' # Length of compressed content
 
336
                          '53\n' # Length of uncompressed content
 
337
                         )
 
338
        self.assertStartsWith(bytes, expected_header)
 
339
        remaining_bytes = bytes[len(expected_header):]
 
340
        raw_bytes = zlib.decompress(remaining_bytes)
 
341
        self.assertEqualDiff('key:bing\n'
 
342
                             'sha1:abcdabcdabcdabcdabcdabcdabcdabcdabcdabcd\n'
 
343
                             'type:fulltext\n'
 
344
                             'start:100\n'
 
345
                             'length:100\n'
 
346
                             '\n'
 
347
                             'key:foo\x00bar\n'
 
348
                             'sha1:abcdabcdabcdabcdabcdabcdabcdabcdabcdabcd\n'
 
349
                             'type:fulltext\n'
 
350
                             'start:0\n'
 
351
                             'length:100\n'
 
352
                             '\n', raw_bytes)
 
353
 
 
354
    def test_partial_decomp(self):
 
355
        content_chunks = []
 
356
        # We need a sufficient amount of data so that zlib.decompress has
 
357
        # partial decompression to work with. Most auto-generated data
 
358
        # compresses a bit too well, we want a combination, so we combine a sha
 
359
        # hash with compressible data.
 
360
        for i in xrange(2048):
 
361
            next_content = '%d\nThis is a bit of duplicate text\n' % (i,)
 
362
            content_chunks.append(next_content)
 
363
            next_sha1 = osutils.sha_string(next_content)
 
364
            content_chunks.append(next_sha1 + '\n')
 
365
        content = ''.join(content_chunks)
 
366
        self.assertEqual(158634, len(content))
 
367
        z_content = zlib.compress(content)
 
368
        self.assertEqual(57182, len(z_content))
 
369
        block = groupcompress.GroupCompressBlock()
 
370
        block._z_content = z_content
 
371
        block._z_content_length = len(z_content)
 
372
        block._compressor_name = 'zlib'
 
373
        block._content_length = 158634
 
374
        self.assertIs(None, block._content)
 
375
        block._ensure_content(100)
 
376
        self.assertIsNot(None, block._content)
 
377
        # We have decompressed at least 100 bytes
 
378
        self.assertTrue(len(block._content) >= 100)
 
379
        # We have not decompressed the whole content
 
380
        self.assertTrue(len(block._content) < 158634)
 
381
        self.assertEqualDiff(content[:len(block._content)], block._content)
 
382
        # ensuring content that we already have shouldn't cause any more data
 
383
        # to be extracted
 
384
        cur_len = len(block._content)
 
385
        block._ensure_content(cur_len - 10)
 
386
        self.assertEqual(cur_len, len(block._content))
 
387
        # Now we want a bit more content
 
388
        cur_len += 10
 
389
        block._ensure_content(cur_len)
 
390
        self.assertTrue(len(block._content) >= cur_len)
 
391
        self.assertTrue(len(block._content) < 158634)
 
392
        self.assertEqualDiff(content[:len(block._content)], block._content)
 
393
        # And now lets finish
 
394
        block._ensure_content(158634)
 
395
        self.assertEqualDiff(content, block._content)
 
396
        # And the decompressor is finalized
 
397
        self.assertIs(None, block._z_content_decompressor)
 
398
 
 
399
    def test_partial_decomp_no_known_length(self):
 
400
        content_chunks = []
 
401
        for i in xrange(2048):
 
402
            next_content = '%d\nThis is a bit of duplicate text\n' % (i,)
 
403
            content_chunks.append(next_content)
 
404
            next_sha1 = osutils.sha_string(next_content)
 
405
            content_chunks.append(next_sha1 + '\n')
 
406
        content = ''.join(content_chunks)
 
407
        self.assertEqual(158634, len(content))
 
408
        z_content = zlib.compress(content)
 
409
        self.assertEqual(57182, len(z_content))
 
410
        block = groupcompress.GroupCompressBlock()
 
411
        block._z_content = z_content
 
412
        block._z_content_length = len(z_content)
 
413
        block._compressor_name = 'zlib'
 
414
        block._content_length = None # Don't tell the decompressed length
 
415
        self.assertIs(None, block._content)
 
416
        block._ensure_content(100)
 
417
        self.assertIsNot(None, block._content)
 
418
        # We have decompressed at least 100 bytes
 
419
        self.assertTrue(len(block._content) >= 100)
 
420
        # We have not decompressed the whole content
 
421
        self.assertTrue(len(block._content) < 158634)
 
422
        self.assertEqualDiff(content[:len(block._content)], block._content)
 
423
        # ensuring content that we already have shouldn't cause any more data
 
424
        # to be extracted
 
425
        cur_len = len(block._content)
 
426
        block._ensure_content(cur_len - 10)
 
427
        self.assertEqual(cur_len, len(block._content))
 
428
        # Now we want a bit more content
 
429
        cur_len += 10
 
430
        block._ensure_content(cur_len)
 
431
        self.assertTrue(len(block._content) >= cur_len)
 
432
        self.assertTrue(len(block._content) < 158634)
 
433
        self.assertEqualDiff(content[:len(block._content)], block._content)
 
434
        # And now lets finish
 
435
        block._ensure_content()
 
436
        self.assertEqualDiff(content, block._content)
 
437
        # And the decompressor is finalized
 
438
        self.assertIs(None, block._z_content_decompressor)
 
439
 
 
440
 
 
441
class TestCaseWithGroupCompressVersionedFiles(tests.TestCaseWithTransport):
 
442
 
 
443
    def make_test_vf(self, create_graph, keylength=1, do_cleanup=True,
 
444
                     dir='.'):
 
445
        t = self.get_transport(dir)
 
446
        t.ensure_base()
 
447
        vf = groupcompress.make_pack_factory(graph=create_graph,
 
448
            delta=False, keylength=keylength)(t)
 
449
        if do_cleanup:
 
450
            self.addCleanup(groupcompress.cleanup_pack_group, vf)
 
451
        return vf
 
452
 
 
453
 
 
454
class TestGroupCompressVersionedFiles(TestCaseWithGroupCompressVersionedFiles):
 
455
 
 
456
    def test_get_record_stream_as_requested(self):
 
457
        # Consider promoting 'as-requested' to general availability, and
 
458
        # make this a VF interface test
 
459
        vf = self.make_test_vf(False, dir='source')
 
460
        vf.add_lines(('a',), (), ['lines\n'])
 
461
        vf.add_lines(('b',), (), ['lines\n'])
 
462
        vf.add_lines(('c',), (), ['lines\n'])
 
463
        vf.add_lines(('d',), (), ['lines\n'])
 
464
        vf.writer.end()
 
465
        keys = [record.key for record in vf.get_record_stream(
 
466
                    [('a',), ('b',), ('c',), ('d',)],
 
467
                    'as-requested', False)]
 
468
        self.assertEqual([('a',), ('b',), ('c',), ('d',)], keys)
 
469
        keys = [record.key for record in vf.get_record_stream(
 
470
                    [('b',), ('a',), ('d',), ('c',)],
 
471
                    'as-requested', False)]
 
472
        self.assertEqual([('b',), ('a',), ('d',), ('c',)], keys)
 
473
 
 
474
        # It should work even after being repacked into another VF
 
475
        vf2 = self.make_test_vf(False, dir='target')
 
476
        vf2.insert_record_stream(vf.get_record_stream(
 
477
                    [('b',), ('a',), ('d',), ('c',)], 'as-requested', False))
 
478
        vf2.writer.end()
 
479
 
 
480
        keys = [record.key for record in vf2.get_record_stream(
 
481
                    [('a',), ('b',), ('c',), ('d',)],
 
482
                    'as-requested', False)]
 
483
        self.assertEqual([('a',), ('b',), ('c',), ('d',)], keys)
 
484
        keys = [record.key for record in vf2.get_record_stream(
 
485
                    [('b',), ('a',), ('d',), ('c',)],
 
486
                    'as-requested', False)]
 
487
        self.assertEqual([('b',), ('a',), ('d',), ('c',)], keys)
 
488
 
 
489
    def test_insert_record_stream_re_uses_blocks(self):
 
490
        vf = self.make_test_vf(True, dir='source')
 
491
        def grouped_stream(revision_ids, first_parents=()):
 
492
            parents = first_parents
 
493
            for revision_id in revision_ids:
 
494
                key = (revision_id,)
 
495
                record = versionedfile.FulltextContentFactory(
 
496
                    key, parents, None,
 
497
                    'some content that is\n'
 
498
                    'identical except for\n'
 
499
                    'revision_id:%s\n' % (revision_id,))
 
500
                yield record
 
501
                parents = (key,)
 
502
        # One group, a-d
 
503
        vf.insert_record_stream(grouped_stream(['a', 'b', 'c', 'd']))
 
504
        # Second group, e-h
 
505
        vf.insert_record_stream(grouped_stream(['e', 'f', 'g', 'h'],
 
506
                                               first_parents=(('d',),)))
 
507
        block_bytes = {}
 
508
        stream = vf.get_record_stream([(r,) for r in 'abcdefgh'],
 
509
                                      'unordered', False)
 
510
        num_records = 0
 
511
        for record in stream:
 
512
            if record.key in [('a',), ('e',)]:
 
513
                self.assertEqual('groupcompress-block', record.storage_kind)
 
514
            else:
 
515
                self.assertEqual('groupcompress-block-ref',
 
516
                                 record.storage_kind)
 
517
            block_bytes[record.key] = record._manager._block._z_content
 
518
            num_records += 1
 
519
        self.assertEqual(8, num_records)
 
520
        for r in 'abcd':
 
521
            key = (r,)
 
522
            self.assertIs(block_bytes[key], block_bytes[('a',)])
 
523
            self.assertNotEqual(block_bytes[key], block_bytes[('e',)])
 
524
        for r in 'efgh':
 
525
            key = (r,)
 
526
            self.assertIs(block_bytes[key], block_bytes[('e',)])
 
527
            self.assertNotEqual(block_bytes[key], block_bytes[('a',)])
 
528
        # Now copy the blocks into another vf, and ensure that the blocks are
 
529
        # preserved without creating new entries
 
530
        vf2 = self.make_test_vf(True, dir='target')
 
531
        # ordering in 'groupcompress' order, should actually swap the groups in
 
532
        # the target vf, but the groups themselves should not be disturbed.
 
533
        vf2.insert_record_stream(vf.get_record_stream(
 
534
            [(r,) for r in 'abcdefgh'], 'groupcompress', False))
 
535
        stream = vf2.get_record_stream([(r,) for r in 'abcdefgh'],
 
536
                                       'groupcompress', False)
 
537
        vf2.writer.end()
 
538
        num_records = 0
 
539
        for record in stream:
 
540
            num_records += 1
 
541
            self.assertEqual(block_bytes[record.key],
 
542
                             record._manager._block._z_content)
 
543
        self.assertEqual(8, num_records)
 
544
 
 
545
    def test__insert_record_stream_no_reuse_block(self):
 
546
        vf = self.make_test_vf(True, dir='source')
 
547
        def grouped_stream(revision_ids, first_parents=()):
 
548
            parents = first_parents
 
549
            for revision_id in revision_ids:
 
550
                key = (revision_id,)
 
551
                record = versionedfile.FulltextContentFactory(
 
552
                    key, parents, None,
 
553
                    'some content that is\n'
 
554
                    'identical except for\n'
 
555
                    'revision_id:%s\n' % (revision_id,))
 
556
                yield record
 
557
                parents = (key,)
 
558
        # One group, a-d
 
559
        vf.insert_record_stream(grouped_stream(['a', 'b', 'c', 'd']))
 
560
        # Second group, e-h
 
561
        vf.insert_record_stream(grouped_stream(['e', 'f', 'g', 'h'],
 
562
                                               first_parents=(('d',),)))
 
563
        vf.writer.end()
 
564
        self.assertEqual(8, len(list(vf.get_record_stream(
 
565
                                        [(r,) for r in 'abcdefgh'],
 
566
                                        'unordered', False))))
 
567
        # Now copy the blocks into another vf, and ensure that the blocks are
 
568
        # preserved without creating new entries
 
569
        vf2 = self.make_test_vf(True, dir='target')
 
570
        # ordering in 'groupcompress' order, should actually swap the groups in
 
571
        # the target vf, but the groups themselves should not be disturbed.
 
572
        list(vf2._insert_record_stream(vf.get_record_stream(
 
573
            [(r,) for r in 'abcdefgh'], 'groupcompress', False),
 
574
            reuse_blocks=False))
 
575
        vf2.writer.end()
 
576
        # After inserting with reuse_blocks=False, we should have everything in
 
577
        # a single new block.
 
578
        stream = vf2.get_record_stream([(r,) for r in 'abcdefgh'],
 
579
                                       'groupcompress', False)
 
580
        block = None
 
581
        for record in stream:
 
582
            if block is None:
 
583
                block = record._manager._block
 
584
            else:
 
585
                self.assertIs(block, record._manager._block)
 
586
 
 
587
 
 
588
class TestLazyGroupCompress(tests.TestCaseWithTransport):
 
589
 
 
590
    _texts = {
 
591
        ('key1',): "this is a text\n"
 
592
                   "with a reasonable amount of compressible bytes\n",
 
593
        ('key2',): "another text\n"
 
594
                   "with a reasonable amount of compressible bytes\n",
 
595
        ('key3',): "yet another text which won't be extracted\n"
 
596
                   "with a reasonable amount of compressible bytes\n",
 
597
        ('key4',): "this will be extracted\n"
 
598
                   "but references bytes from\n"
 
599
                   "yet another text which won't be extracted\n"
 
600
                   "with a reasonable amount of compressible bytes\n",
 
601
    }
 
602
    def make_block(self, key_to_text):
 
603
        """Create a GroupCompressBlock, filling it with the given texts."""
 
604
        compressor = groupcompress.GroupCompressor()
 
605
        start = 0
 
606
        for key in sorted(key_to_text):
 
607
            compressor.compress(key, key_to_text[key], None)
 
608
        block = compressor.flush()
 
609
        entries = block._entries
 
610
        raw_bytes = block.to_bytes()
 
611
        return entries, groupcompress.GroupCompressBlock.from_bytes(raw_bytes)
 
612
 
 
613
    def add_key_to_manager(self, key, entries, block, manager):
 
614
        entry = entries[key]
 
615
        manager.add_factory(entry.key, (), entry.start, entry.end)
 
616
 
 
617
    def test_get_fulltexts(self):
 
618
        entries, block = self.make_block(self._texts)
 
619
        manager = groupcompress._LazyGroupContentManager(block)
 
620
        self.add_key_to_manager(('key1',), entries, block, manager)
 
621
        self.add_key_to_manager(('key2',), entries, block, manager)
 
622
        result_order = []
 
623
        for record in manager.get_record_stream():
 
624
            result_order.append(record.key)
 
625
            text = self._texts[record.key]
 
626
            self.assertEqual(text, record.get_bytes_as('fulltext'))
 
627
        self.assertEqual([('key1',), ('key2',)], result_order)
 
628
 
 
629
        # If we build the manager in the opposite order, we should get them
 
630
        # back in the opposite order
 
631
        manager = groupcompress._LazyGroupContentManager(block)
 
632
        self.add_key_to_manager(('key2',), entries, block, manager)
 
633
        self.add_key_to_manager(('key1',), entries, block, manager)
 
634
        result_order = []
 
635
        for record in manager.get_record_stream():
 
636
            result_order.append(record.key)
 
637
            text = self._texts[record.key]
 
638
            self.assertEqual(text, record.get_bytes_as('fulltext'))
 
639
        self.assertEqual([('key2',), ('key1',)], result_order)
 
640
 
 
641
    def test__wire_bytes_no_keys(self):
 
642
        entries, block = self.make_block(self._texts)
 
643
        manager = groupcompress._LazyGroupContentManager(block)
 
644
        wire_bytes = manager._wire_bytes()
 
645
        block_length = len(block.to_bytes())
 
646
        # We should have triggered a strip, since we aren't using any content
 
647
        stripped_block = manager._block.to_bytes()
 
648
        self.assertTrue(block_length > len(stripped_block))
 
649
        empty_z_header = zlib.compress('')
 
650
        self.assertEqual('groupcompress-block\n'
 
651
                         '8\n' # len(compress(''))
 
652
                         '0\n' # len('')
 
653
                         '%d\n'# compressed block len
 
654
                         '%s'  # zheader
 
655
                         '%s'  # block
 
656
                         % (len(stripped_block), empty_z_header,
 
657
                            stripped_block),
 
658
                         wire_bytes)
 
659
 
 
660
    def test__wire_bytes(self):
 
661
        entries, block = self.make_block(self._texts)
 
662
        manager = groupcompress._LazyGroupContentManager(block)
 
663
        self.add_key_to_manager(('key1',), entries, block, manager)
 
664
        self.add_key_to_manager(('key4',), entries, block, manager)
 
665
        block_bytes = block.to_bytes()
 
666
        wire_bytes = manager._wire_bytes()
 
667
        (storage_kind, z_header_len, header_len,
 
668
         block_len, rest) = wire_bytes.split('\n', 4)
 
669
        z_header_len = int(z_header_len)
 
670
        header_len = int(header_len)
 
671
        block_len = int(block_len)
 
672
        self.assertEqual('groupcompress-block', storage_kind)
 
673
        self.assertEqual(33, z_header_len)
 
674
        self.assertEqual(25, header_len)
 
675
        self.assertEqual(len(block_bytes), block_len)
 
676
        z_header = rest[:z_header_len]
 
677
        header = zlib.decompress(z_header)
 
678
        self.assertEqual(header_len, len(header))
 
679
        entry1 = entries[('key1',)]
 
680
        entry4 = entries[('key4',)]
 
681
        self.assertEqualDiff('key1\n'
 
682
                             '\n'  # no parents
 
683
                             '%d\n' # start offset
 
684
                             '%d\n' # end byte
 
685
                             'key4\n'
 
686
                             '\n'
 
687
                             '%d\n'
 
688
                             '%d\n'
 
689
                             % (entry1.start, entry1.end,
 
690
                                entry4.start, entry4.end),
 
691
                            header)
 
692
        z_block = rest[z_header_len:]
 
693
        self.assertEqual(block_bytes, z_block)
 
694
 
 
695
    def test_from_bytes(self):
 
696
        entries, block = self.make_block(self._texts)
 
697
        manager = groupcompress._LazyGroupContentManager(block)
 
698
        self.add_key_to_manager(('key1',), entries, block, manager)
 
699
        self.add_key_to_manager(('key4',), entries, block, manager)
 
700
        wire_bytes = manager._wire_bytes()
 
701
        self.assertStartsWith(wire_bytes, 'groupcompress-block\n')
 
702
        manager = groupcompress._LazyGroupContentManager.from_bytes(wire_bytes)
 
703
        self.assertIsInstance(manager, groupcompress._LazyGroupContentManager)
 
704
        self.assertEqual(2, len(manager._factories))
 
705
        self.assertEqual(block._z_content, manager._block._z_content)
 
706
        result_order = []
 
707
        for record in manager.get_record_stream():
 
708
            result_order.append(record.key)
 
709
            text = self._texts[record.key]
 
710
            self.assertEqual(text, record.get_bytes_as('fulltext'))
 
711
        self.assertEqual([('key1',), ('key4',)], result_order)
 
712
 
 
713
    def test__check_rebuild_no_changes(self):
 
714
        entries, block = self.make_block(self._texts)
 
715
        manager = groupcompress._LazyGroupContentManager(block)
 
716
        # Request all the keys, which ensures that we won't rebuild
 
717
        self.add_key_to_manager(('key1',), entries, block, manager)
 
718
        self.add_key_to_manager(('key2',), entries, block, manager)
 
719
        self.add_key_to_manager(('key3',), entries, block, manager)
 
720
        self.add_key_to_manager(('key4',), entries, block, manager)
 
721
        manager._check_rebuild_block()
 
722
        self.assertIs(block, manager._block)
 
723
 
 
724
    def test__check_rebuild_only_one(self):
 
725
        entries, block = self.make_block(self._texts)
 
726
        manager = groupcompress._LazyGroupContentManager(block)
 
727
        # Request just the first key, which should trigger a 'strip' action
 
728
        self.add_key_to_manager(('key1',), entries, block, manager)
 
729
        manager._check_rebuild_block()
 
730
        self.assertIsNot(block, manager._block)
 
731
        self.assertTrue(block._content_length > manager._block._content_length)
 
732
        # We should be able to still get the content out of this block, though
 
733
        # it should only have 1 entry
 
734
        for record in manager.get_record_stream():
 
735
            self.assertEqual(('key1',), record.key)
 
736
            self.assertEqual(self._texts[record.key],
 
737
                             record.get_bytes_as('fulltext'))
 
738
 
 
739
    def test__check_rebuild_middle(self):
 
740
        entries, block = self.make_block(self._texts)
 
741
        manager = groupcompress._LazyGroupContentManager(block)
 
742
        # Request a small key in the middle should trigger a 'rebuild'
 
743
        self.add_key_to_manager(('key4',), entries, block, manager)
 
744
        manager._check_rebuild_block()
 
745
        self.assertIsNot(block, manager._block)
 
746
        self.assertTrue(block._content_length > manager._block._content_length)
 
747
        for record in manager.get_record_stream():
 
748
            self.assertEqual(('key4',), record.key)
 
749
            self.assertEqual(self._texts[record.key],
 
750
                             record.get_bytes_as('fulltext'))