~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_groupcompress.py

Major code cleanup.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2008-2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for group compression."""
 
18
 
 
19
import zlib
 
20
 
 
21
from bzrlib import (
 
22
    btree_index,
 
23
    config,
 
24
    groupcompress,
 
25
    errors,
 
26
    index as _mod_index,
 
27
    osutils,
 
28
    tests,
 
29
    trace,
 
30
    versionedfile,
 
31
    )
 
32
from bzrlib.osutils import sha_string
 
33
from bzrlib.tests.test__groupcompress import compiled_groupcompress_feature
 
34
from bzrlib.tests.scenarios import load_tests_apply_scenarios
 
35
 
 
36
 
 
37
def group_compress_implementation_scenarios():
 
38
    scenarios = [
 
39
        ('python', {'compressor': groupcompress.PythonGroupCompressor}),
 
40
        ]
 
41
    if compiled_groupcompress_feature.available():
 
42
        scenarios.append(('C',
 
43
            {'compressor': groupcompress.PyrexGroupCompressor}))
 
44
    return scenarios
 
45
 
 
46
 
 
47
load_tests = load_tests_apply_scenarios
 
48
 
 
49
 
 
50
class TestGroupCompressor(tests.TestCase):
 
51
 
 
52
    def _chunks_to_repr_lines(self, chunks):
 
53
        return '\n'.join(map(repr, ''.join(chunks).split('\n')))
 
54
 
 
55
    def assertEqualDiffEncoded(self, expected, actual):
 
56
        """Compare the actual content to the expected content.
 
57
 
 
58
        :param expected: A group of chunks that we expect to see
 
59
        :param actual: The measured 'chunks'
 
60
 
 
61
        We will transform the chunks back into lines, and then run 'repr()'
 
62
        over them to handle non-ascii characters.
 
63
        """
 
64
        self.assertEqualDiff(self._chunks_to_repr_lines(expected),
 
65
                             self._chunks_to_repr_lines(actual))
 
66
 
 
67
 
 
68
class TestAllGroupCompressors(TestGroupCompressor):
 
69
    """Tests for GroupCompressor"""
 
70
 
 
71
    scenarios = group_compress_implementation_scenarios()
 
72
    compressor = None # Set by scenario
 
73
 
 
74
    def test_empty_delta(self):
 
75
        compressor = self.compressor()
 
76
        self.assertEqual([], compressor.chunks)
 
77
 
 
78
    def test_one_nosha_delta(self):
 
79
        # diff against NUKK
 
80
        compressor = self.compressor()
 
81
        sha1, start_point, end_point, _ = compressor.compress(('label',),
 
82
            'strange\ncommon\n', None)
 
83
        self.assertEqual(sha_string('strange\ncommon\n'), sha1)
 
84
        expected_lines = 'f' '\x0f' 'strange\ncommon\n'
 
85
        self.assertEqual(expected_lines, ''.join(compressor.chunks))
 
86
        self.assertEqual(0, start_point)
 
87
        self.assertEqual(sum(map(len, expected_lines)), end_point)
 
88
 
 
89
    def test_empty_content(self):
 
90
        compressor = self.compressor()
 
91
        # Adding empty bytes should return the 'null' record
 
92
        sha1, start_point, end_point, kind = compressor.compress(('empty',),
 
93
                                                                 '', None)
 
94
        self.assertEqual(0, start_point)
 
95
        self.assertEqual(0, end_point)
 
96
        self.assertEqual('fulltext', kind)
 
97
        self.assertEqual(groupcompress._null_sha1, sha1)
 
98
        self.assertEqual(0, compressor.endpoint)
 
99
        self.assertEqual([], compressor.chunks)
 
100
        # Even after adding some content
 
101
        compressor.compress(('content',), 'some\nbytes\n', None)
 
102
        self.assertTrue(compressor.endpoint > 0)
 
103
        sha1, start_point, end_point, kind = compressor.compress(('empty2',),
 
104
                                                                 '', None)
 
105
        self.assertEqual(0, start_point)
 
106
        self.assertEqual(0, end_point)
 
107
        self.assertEqual('fulltext', kind)
 
108
        self.assertEqual(groupcompress._null_sha1, sha1)
 
109
 
 
110
    def test_extract_from_compressor(self):
 
111
        # Knit fetching will try to reconstruct texts locally which results in
 
112
        # reading something that is in the compressor stream already.
 
113
        compressor = self.compressor()
 
114
        sha1_1, _, _, _ = compressor.compress(('label',),
 
115
            'strange\ncommon long line\nthat needs a 16 byte match\n', None)
 
116
        expected_lines = list(compressor.chunks)
 
117
        sha1_2, _, end_point, _ = compressor.compress(('newlabel',),
 
118
            'common long line\nthat needs a 16 byte match\ndifferent\n', None)
 
119
        # get the first out
 
120
        self.assertEqual(('strange\ncommon long line\n'
 
121
                          'that needs a 16 byte match\n', sha1_1),
 
122
                         compressor.extract(('label',)))
 
123
        # and the second
 
124
        self.assertEqual(('common long line\nthat needs a 16 byte match\n'
 
125
                          'different\n', sha1_2),
 
126
                         compressor.extract(('newlabel',)))
 
127
 
 
128
    def test_pop_last(self):
 
129
        compressor = self.compressor()
 
130
        _, _, _, _ = compressor.compress(('key1',),
 
131
            'some text\nfor the first entry\n', None)
 
132
        expected_lines = list(compressor.chunks)
 
133
        _, _, _, _ = compressor.compress(('key2',),
 
134
            'some text\nfor the second entry\n', None)
 
135
        compressor.pop_last()
 
136
        self.assertEqual(expected_lines, compressor.chunks)
 
137
 
 
138
 
 
139
class TestPyrexGroupCompressor(TestGroupCompressor):
 
140
 
 
141
    _test_needs_features = [compiled_groupcompress_feature]
 
142
    compressor = groupcompress.PyrexGroupCompressor
 
143
 
 
144
    def test_stats(self):
 
145
        compressor = self.compressor()
 
146
        compressor.compress(('label',),
 
147
                            'strange\n'
 
148
                            'common very very long line\n'
 
149
                            'plus more text\n', None)
 
150
        compressor.compress(('newlabel',),
 
151
                            'common very very long line\n'
 
152
                            'plus more text\n'
 
153
                            'different\n'
 
154
                            'moredifferent\n', None)
 
155
        compressor.compress(('label3',),
 
156
                            'new\n'
 
157
                            'common very very long line\n'
 
158
                            'plus more text\n'
 
159
                            'different\n'
 
160
                            'moredifferent\n', None)
 
161
        self.assertAlmostEqual(1.9, compressor.ratio(), 1)
 
162
 
 
163
    def test_two_nosha_delta(self):
 
164
        compressor = self.compressor()
 
165
        sha1_1, _, _, _ = compressor.compress(('label',),
 
166
            'strange\ncommon long line\nthat needs a 16 byte match\n', None)
 
167
        expected_lines = list(compressor.chunks)
 
168
        sha1_2, start_point, end_point, _ = compressor.compress(('newlabel',),
 
169
            'common long line\nthat needs a 16 byte match\ndifferent\n', None)
 
170
        self.assertEqual(sha_string('common long line\n'
 
171
                                    'that needs a 16 byte match\n'
 
172
                                    'different\n'), sha1_2)
 
173
        expected_lines.extend([
 
174
            # 'delta', delta length
 
175
            'd\x0f',
 
176
            # source and target length
 
177
            '\x36',
 
178
            # copy the line common
 
179
            '\x91\x0a\x2c', #copy, offset 0x0a, len 0x2c
 
180
            # add the line different, and the trailing newline
 
181
            '\x0adifferent\n', # insert 10 bytes
 
182
            ])
 
183
        self.assertEqualDiffEncoded(expected_lines, compressor.chunks)
 
184
        self.assertEqual(sum(map(len, expected_lines)), end_point)
 
185
 
 
186
    def test_three_nosha_delta(self):
 
187
        # The first interesting test: make a change that should use lines from
 
188
        # both parents.
 
189
        compressor = self.compressor()
 
190
        sha1_1, _, _, _ = compressor.compress(('label',),
 
191
            'strange\ncommon very very long line\nwith some extra text\n', None)
 
192
        sha1_2, _, _, _ = compressor.compress(('newlabel',),
 
193
            'different\nmoredifferent\nand then some more\n', None)
 
194
        expected_lines = list(compressor.chunks)
 
195
        sha1_3, start_point, end_point, _ = compressor.compress(('label3',),
 
196
            'new\ncommon very very long line\nwith some extra text\n'
 
197
            'different\nmoredifferent\nand then some more\n',
 
198
            None)
 
199
        self.assertEqual(
 
200
            sha_string('new\ncommon very very long line\nwith some extra text\n'
 
201
                       'different\nmoredifferent\nand then some more\n'),
 
202
            sha1_3)
 
203
        expected_lines.extend([
 
204
            # 'delta', delta length
 
205
            'd\x0b',
 
206
            # source and target length
 
207
            '\x5f'
 
208
            # insert new
 
209
            '\x03new',
 
210
            # Copy of first parent 'common' range
 
211
            '\x91\x09\x31' # copy, offset 0x09, 0x31 bytes
 
212
            # Copy of second parent 'different' range
 
213
            '\x91\x3c\x2b' # copy, offset 0x3c, 0x2b bytes
 
214
            ])
 
215
        self.assertEqualDiffEncoded(expected_lines, compressor.chunks)
 
216
        self.assertEqual(sum(map(len, expected_lines)), end_point)
 
217
 
 
218
 
 
219
class TestPythonGroupCompressor(TestGroupCompressor):
 
220
 
 
221
    compressor = groupcompress.PythonGroupCompressor
 
222
 
 
223
    def test_stats(self):
 
224
        compressor = self.compressor()
 
225
        compressor.compress(('label',),
 
226
                            'strange\n'
 
227
                            'common very very long line\n'
 
228
                            'plus more text\n', None)
 
229
        compressor.compress(('newlabel',),
 
230
                            'common very very long line\n'
 
231
                            'plus more text\n'
 
232
                            'different\n'
 
233
                            'moredifferent\n', None)
 
234
        compressor.compress(('label3',),
 
235
                            'new\n'
 
236
                            'common very very long line\n'
 
237
                            'plus more text\n'
 
238
                            'different\n'
 
239
                            'moredifferent\n', None)
 
240
        self.assertAlmostEqual(1.9, compressor.ratio(), 1)
 
241
 
 
242
    def test_two_nosha_delta(self):
 
243
        compressor = self.compressor()
 
244
        sha1_1, _, _, _ = compressor.compress(('label',),
 
245
            'strange\ncommon long line\nthat needs a 16 byte match\n', None)
 
246
        expected_lines = list(compressor.chunks)
 
247
        sha1_2, start_point, end_point, _ = compressor.compress(('newlabel',),
 
248
            'common long line\nthat needs a 16 byte match\ndifferent\n', None)
 
249
        self.assertEqual(sha_string('common long line\n'
 
250
                                    'that needs a 16 byte match\n'
 
251
                                    'different\n'), sha1_2)
 
252
        expected_lines.extend([
 
253
            # 'delta', delta length
 
254
            'd\x0f',
 
255
            # target length
 
256
            '\x36',
 
257
            # copy the line common
 
258
            '\x91\x0a\x2c', #copy, offset 0x0a, len 0x2c
 
259
            # add the line different, and the trailing newline
 
260
            '\x0adifferent\n', # insert 10 bytes
 
261
            ])
 
262
        self.assertEqualDiffEncoded(expected_lines, compressor.chunks)
 
263
        self.assertEqual(sum(map(len, expected_lines)), end_point)
 
264
 
 
265
    def test_three_nosha_delta(self):
 
266
        # The first interesting test: make a change that should use lines from
 
267
        # both parents.
 
268
        compressor = self.compressor()
 
269
        sha1_1, _, _, _ = compressor.compress(('label',),
 
270
            'strange\ncommon very very long line\nwith some extra text\n', None)
 
271
        sha1_2, _, _, _ = compressor.compress(('newlabel',),
 
272
            'different\nmoredifferent\nand then some more\n', None)
 
273
        expected_lines = list(compressor.chunks)
 
274
        sha1_3, start_point, end_point, _ = compressor.compress(('label3',),
 
275
            'new\ncommon very very long line\nwith some extra text\n'
 
276
            'different\nmoredifferent\nand then some more\n',
 
277
            None)
 
278
        self.assertEqual(
 
279
            sha_string('new\ncommon very very long line\nwith some extra text\n'
 
280
                       'different\nmoredifferent\nand then some more\n'),
 
281
            sha1_3)
 
282
        expected_lines.extend([
 
283
            # 'delta', delta length
 
284
            'd\x0c',
 
285
            # target length
 
286
            '\x5f'
 
287
            # insert new
 
288
            '\x04new\n',
 
289
            # Copy of first parent 'common' range
 
290
            '\x91\x0a\x30' # copy, offset 0x0a, 0x30 bytes
 
291
            # Copy of second parent 'different' range
 
292
            '\x91\x3c\x2b' # copy, offset 0x3c, 0x2b bytes
 
293
            ])
 
294
        self.assertEqualDiffEncoded(expected_lines, compressor.chunks)
 
295
        self.assertEqual(sum(map(len, expected_lines)), end_point)
 
296
 
 
297
 
 
298
class TestGroupCompressBlock(tests.TestCase):
 
299
 
 
300
    def make_block(self, key_to_text):
 
301
        """Create a GroupCompressBlock, filling it with the given texts."""
 
302
        compressor = groupcompress.GroupCompressor()
 
303
        start = 0
 
304
        for key in sorted(key_to_text):
 
305
            compressor.compress(key, key_to_text[key], None)
 
306
        locs = dict((key, (start, end)) for key, (start, _, end, _)
 
307
                    in compressor.labels_deltas.iteritems())
 
308
        block = compressor.flush()
 
309
        raw_bytes = block.to_bytes()
 
310
        # Go through from_bytes(to_bytes()) so that we start with a compressed
 
311
        # content object
 
312
        return locs, groupcompress.GroupCompressBlock.from_bytes(raw_bytes)
 
313
 
 
314
    def test_from_empty_bytes(self):
 
315
        self.assertRaises(ValueError,
 
316
                          groupcompress.GroupCompressBlock.from_bytes, '')
 
317
 
 
318
    def test_from_minimal_bytes(self):
 
319
        block = groupcompress.GroupCompressBlock.from_bytes(
 
320
            'gcb1z\n0\n0\n')
 
321
        self.assertIsInstance(block, groupcompress.GroupCompressBlock)
 
322
        self.assertIs(None, block._content)
 
323
        self.assertEqual('', block._z_content)
 
324
        block._ensure_content()
 
325
        self.assertEqual('', block._content)
 
326
        self.assertEqual('', block._z_content)
 
327
        block._ensure_content() # Ensure content is safe to call 2x
 
328
 
 
329
    def test_from_invalid(self):
 
330
        self.assertRaises(ValueError,
 
331
                          groupcompress.GroupCompressBlock.from_bytes,
 
332
                          'this is not a valid header')
 
333
 
 
334
    def test_from_bytes(self):
 
335
        content = ('a tiny bit of content\n')
 
336
        z_content = zlib.compress(content)
 
337
        z_bytes = (
 
338
            'gcb1z\n' # group compress block v1 plain
 
339
            '%d\n' # Length of compressed content
 
340
            '%d\n' # Length of uncompressed content
 
341
            '%s'   # Compressed content
 
342
            ) % (len(z_content), len(content), z_content)
 
343
        block = groupcompress.GroupCompressBlock.from_bytes(
 
344
            z_bytes)
 
345
        self.assertEqual(z_content, block._z_content)
 
346
        self.assertIs(None, block._content)
 
347
        self.assertEqual(len(z_content), block._z_content_length)
 
348
        self.assertEqual(len(content), block._content_length)
 
349
        block._ensure_content()
 
350
        self.assertEqual(z_content, block._z_content)
 
351
        self.assertEqual(content, block._content)
 
352
 
 
353
    def test_to_chunks(self):
 
354
        content_chunks = ['this is some content\n',
 
355
                          'this content will be compressed\n']
 
356
        content_len = sum(map(len, content_chunks))
 
357
        content = ''.join(content_chunks)
 
358
        gcb = groupcompress.GroupCompressBlock()
 
359
        gcb.set_chunked_content(content_chunks, content_len)
 
360
        total_len, block_chunks = gcb.to_chunks()
 
361
        block_bytes = ''.join(block_chunks)
 
362
        self.assertEqual(gcb._z_content_length, len(gcb._z_content))
 
363
        self.assertEqual(total_len, len(block_bytes))
 
364
        self.assertEqual(gcb._content_length, content_len)
 
365
        expected_header =('gcb1z\n' # group compress block v1 zlib
 
366
                          '%d\n' # Length of compressed content
 
367
                          '%d\n' # Length of uncompressed content
 
368
                         ) % (gcb._z_content_length, gcb._content_length)
 
369
        # The first chunk should be the header chunk. It is small, fixed size,
 
370
        # and there is no compelling reason to split it up
 
371
        self.assertEqual(expected_header, block_chunks[0])
 
372
        self.assertStartsWith(block_bytes, expected_header)
 
373
        remaining_bytes = block_bytes[len(expected_header):]
 
374
        raw_bytes = zlib.decompress(remaining_bytes)
 
375
        self.assertEqual(content, raw_bytes)
 
376
 
 
377
    def test_to_bytes(self):
 
378
        content = ('this is some content\n'
 
379
                   'this content will be compressed\n')
 
380
        gcb = groupcompress.GroupCompressBlock()
 
381
        gcb.set_content(content)
 
382
        bytes = gcb.to_bytes()
 
383
        self.assertEqual(gcb._z_content_length, len(gcb._z_content))
 
384
        self.assertEqual(gcb._content_length, len(content))
 
385
        expected_header =('gcb1z\n' # group compress block v1 zlib
 
386
                          '%d\n' # Length of compressed content
 
387
                          '%d\n' # Length of uncompressed content
 
388
                         ) % (gcb._z_content_length, gcb._content_length)
 
389
        self.assertStartsWith(bytes, expected_header)
 
390
        remaining_bytes = bytes[len(expected_header):]
 
391
        raw_bytes = zlib.decompress(remaining_bytes)
 
392
        self.assertEqual(content, raw_bytes)
 
393
 
 
394
        # we should get the same results if using the chunked version
 
395
        gcb = groupcompress.GroupCompressBlock()
 
396
        gcb.set_chunked_content(['this is some content\n'
 
397
                                 'this content will be compressed\n'],
 
398
                                 len(content))
 
399
        old_bytes = bytes
 
400
        bytes = gcb.to_bytes()
 
401
        self.assertEqual(old_bytes, bytes)
 
402
 
 
403
    def test_partial_decomp(self):
 
404
        content_chunks = []
 
405
        # We need a sufficient amount of data so that zlib.decompress has
 
406
        # partial decompression to work with. Most auto-generated data
 
407
        # compresses a bit too well, we want a combination, so we combine a sha
 
408
        # hash with compressible data.
 
409
        for i in xrange(2048):
 
410
            next_content = '%d\nThis is a bit of duplicate text\n' % (i,)
 
411
            content_chunks.append(next_content)
 
412
            next_sha1 = osutils.sha_string(next_content)
 
413
            content_chunks.append(next_sha1 + '\n')
 
414
        content = ''.join(content_chunks)
 
415
        self.assertEqual(158634, len(content))
 
416
        z_content = zlib.compress(content)
 
417
        self.assertEqual(57182, len(z_content))
 
418
        block = groupcompress.GroupCompressBlock()
 
419
        block._z_content_chunks = (z_content,)
 
420
        block._z_content_length = len(z_content)
 
421
        block._compressor_name = 'zlib'
 
422
        block._content_length = 158634
 
423
        self.assertIs(None, block._content)
 
424
        block._ensure_content(100)
 
425
        self.assertIsNot(None, block._content)
 
426
        # We have decompressed at least 100 bytes
 
427
        self.assertTrue(len(block._content) >= 100)
 
428
        # We have not decompressed the whole content
 
429
        self.assertTrue(len(block._content) < 158634)
 
430
        self.assertEqualDiff(content[:len(block._content)], block._content)
 
431
        # ensuring content that we already have shouldn't cause any more data
 
432
        # to be extracted
 
433
        cur_len = len(block._content)
 
434
        block._ensure_content(cur_len - 10)
 
435
        self.assertEqual(cur_len, len(block._content))
 
436
        # Now we want a bit more content
 
437
        cur_len += 10
 
438
        block._ensure_content(cur_len)
 
439
        self.assertTrue(len(block._content) >= cur_len)
 
440
        self.assertTrue(len(block._content) < 158634)
 
441
        self.assertEqualDiff(content[:len(block._content)], block._content)
 
442
        # And now lets finish
 
443
        block._ensure_content(158634)
 
444
        self.assertEqualDiff(content, block._content)
 
445
        # And the decompressor is finalized
 
446
        self.assertIs(None, block._z_content_decompressor)
 
447
 
 
448
    def test__ensure_all_content(self):
 
449
        content_chunks = []
 
450
        # We need a sufficient amount of data so that zlib.decompress has
 
451
        # partial decompression to work with. Most auto-generated data
 
452
        # compresses a bit too well, we want a combination, so we combine a sha
 
453
        # hash with compressible data.
 
454
        for i in xrange(2048):
 
455
            next_content = '%d\nThis is a bit of duplicate text\n' % (i,)
 
456
            content_chunks.append(next_content)
 
457
            next_sha1 = osutils.sha_string(next_content)
 
458
            content_chunks.append(next_sha1 + '\n')
 
459
        content = ''.join(content_chunks)
 
460
        self.assertEqual(158634, len(content))
 
461
        z_content = zlib.compress(content)
 
462
        self.assertEqual(57182, len(z_content))
 
463
        block = groupcompress.GroupCompressBlock()
 
464
        block._z_content_chunks = (z_content,)
 
465
        block._z_content_length = len(z_content)
 
466
        block._compressor_name = 'zlib'
 
467
        block._content_length = 158634
 
468
        self.assertIs(None, block._content)
 
469
        # The first _ensure_content got all of the required data
 
470
        block._ensure_content(158634)
 
471
        self.assertEqualDiff(content, block._content)
 
472
        # And we should have released the _z_content_decompressor since it was
 
473
        # fully consumed
 
474
        self.assertIs(None, block._z_content_decompressor)
 
475
 
 
476
    def test__dump(self):
 
477
        dup_content = 'some duplicate content\nwhich is sufficiently long\n'
 
478
        key_to_text = {('1',): dup_content + '1 unique\n',
 
479
                       ('2',): dup_content + '2 extra special\n'}
 
480
        locs, block = self.make_block(key_to_text)
 
481
        self.assertEqual([('f', len(key_to_text[('1',)])),
 
482
                          ('d', 21, len(key_to_text[('2',)]),
 
483
                           [('c', 2, len(dup_content)),
 
484
                            ('i', len('2 extra special\n'), '')
 
485
                           ]),
 
486
                         ], block._dump())
 
487
 
 
488
 
 
489
class TestCaseWithGroupCompressVersionedFiles(
 
490
        tests.TestCaseWithMemoryTransport):
 
491
 
 
492
    def make_test_vf(self, create_graph, keylength=1, do_cleanup=True,
 
493
                     dir='.', inconsistency_fatal=True):
 
494
        t = self.get_transport(dir)
 
495
        t.ensure_base()
 
496
        vf = groupcompress.make_pack_factory(graph=create_graph,
 
497
            delta=False, keylength=keylength,
 
498
            inconsistency_fatal=inconsistency_fatal)(t)
 
499
        if do_cleanup:
 
500
            self.addCleanup(groupcompress.cleanup_pack_group, vf)
 
501
        return vf
 
502
 
 
503
 
 
504
class TestGroupCompressVersionedFiles(TestCaseWithGroupCompressVersionedFiles):
 
505
 
 
506
    def make_g_index(self, name, ref_lists=0, nodes=[]):
 
507
        builder = btree_index.BTreeBuilder(ref_lists)
 
508
        for node, references, value in nodes:
 
509
            builder.add_node(node, references, value)
 
510
        stream = builder.finish()
 
511
        trans = self.get_transport()
 
512
        size = trans.put_file(name, stream)
 
513
        return btree_index.BTreeGraphIndex(trans, name, size)
 
514
 
 
515
    def make_g_index_missing_parent(self):
 
516
        graph_index = self.make_g_index('missing_parent', 1,
 
517
            [(('parent', ), '2 78 2 10', ([],)),
 
518
             (('tip', ), '2 78 2 10',
 
519
              ([('parent', ), ('missing-parent', )],)),
 
520
              ])
 
521
        return graph_index
 
522
 
 
523
    def test_get_record_stream_as_requested(self):
 
524
        # Consider promoting 'as-requested' to general availability, and
 
525
        # make this a VF interface test
 
526
        vf = self.make_test_vf(False, dir='source')
 
527
        vf.add_lines(('a',), (), ['lines\n'])
 
528
        vf.add_lines(('b',), (), ['lines\n'])
 
529
        vf.add_lines(('c',), (), ['lines\n'])
 
530
        vf.add_lines(('d',), (), ['lines\n'])
 
531
        vf.writer.end()
 
532
        keys = [record.key for record in vf.get_record_stream(
 
533
                    [('a',), ('b',), ('c',), ('d',)],
 
534
                    'as-requested', False)]
 
535
        self.assertEqual([('a',), ('b',), ('c',), ('d',)], keys)
 
536
        keys = [record.key for record in vf.get_record_stream(
 
537
                    [('b',), ('a',), ('d',), ('c',)],
 
538
                    'as-requested', False)]
 
539
        self.assertEqual([('b',), ('a',), ('d',), ('c',)], keys)
 
540
 
 
541
        # It should work even after being repacked into another VF
 
542
        vf2 = self.make_test_vf(False, dir='target')
 
543
        vf2.insert_record_stream(vf.get_record_stream(
 
544
                    [('b',), ('a',), ('d',), ('c',)], 'as-requested', False))
 
545
        vf2.writer.end()
 
546
 
 
547
        keys = [record.key for record in vf2.get_record_stream(
 
548
                    [('a',), ('b',), ('c',), ('d',)],
 
549
                    'as-requested', False)]
 
550
        self.assertEqual([('a',), ('b',), ('c',), ('d',)], keys)
 
551
        keys = [record.key for record in vf2.get_record_stream(
 
552
                    [('b',), ('a',), ('d',), ('c',)],
 
553
                    'as-requested', False)]
 
554
        self.assertEqual([('b',), ('a',), ('d',), ('c',)], keys)
 
555
 
 
556
    def test_get_record_stream_max_bytes_to_index_default(self):
 
557
        vf = self.make_test_vf(True, dir='source')
 
558
        vf.add_lines(('a',), (), ['lines\n'])
 
559
        vf.writer.end()
 
560
        record = vf.get_record_stream([('a',)], 'unordered', True).next()
 
561
        self.assertEqual(vf._DEFAULT_COMPRESSOR_SETTINGS,
 
562
                         record._manager._get_compressor_settings())
 
563
 
 
564
    def test_get_record_stream_accesses_compressor_settings(self):
 
565
        vf = self.make_test_vf(True, dir='source')
 
566
        vf.add_lines(('a',), (), ['lines\n'])
 
567
        vf.writer.end()
 
568
        vf._max_bytes_to_index = 1234
 
569
        record = vf.get_record_stream([('a',)], 'unordered', True).next()
 
570
        self.assertEqual(dict(max_bytes_to_index=1234),
 
571
                         record._manager._get_compressor_settings())
 
572
 
 
573
    def test_insert_record_stream_reuses_blocks(self):
 
574
        vf = self.make_test_vf(True, dir='source')
 
575
        def grouped_stream(revision_ids, first_parents=()):
 
576
            parents = first_parents
 
577
            for revision_id in revision_ids:
 
578
                key = (revision_id,)
 
579
                record = versionedfile.FulltextContentFactory(
 
580
                    key, parents, None,
 
581
                    'some content that is\n'
 
582
                    'identical except for\n'
 
583
                    'revision_id:%s\n' % (revision_id,))
 
584
                yield record
 
585
                parents = (key,)
 
586
        # One group, a-d
 
587
        vf.insert_record_stream(grouped_stream(['a', 'b', 'c', 'd']))
 
588
        # Second group, e-h
 
589
        vf.insert_record_stream(grouped_stream(['e', 'f', 'g', 'h'],
 
590
                                               first_parents=(('d',),)))
 
591
        block_bytes = {}
 
592
        stream = vf.get_record_stream([(r,) for r in 'abcdefgh'],
 
593
                                      'unordered', False)
 
594
        num_records = 0
 
595
        for record in stream:
 
596
            if record.key in [('a',), ('e',)]:
 
597
                self.assertEqual('groupcompress-block', record.storage_kind)
 
598
            else:
 
599
                self.assertEqual('groupcompress-block-ref',
 
600
                                 record.storage_kind)
 
601
            block_bytes[record.key] = record._manager._block._z_content
 
602
            num_records += 1
 
603
        self.assertEqual(8, num_records)
 
604
        for r in 'abcd':
 
605
            key = (r,)
 
606
            self.assertIs(block_bytes[key], block_bytes[('a',)])
 
607
            self.assertNotEqual(block_bytes[key], block_bytes[('e',)])
 
608
        for r in 'efgh':
 
609
            key = (r,)
 
610
            self.assertIs(block_bytes[key], block_bytes[('e',)])
 
611
            self.assertNotEqual(block_bytes[key], block_bytes[('a',)])
 
612
        # Now copy the blocks into another vf, and ensure that the blocks are
 
613
        # preserved without creating new entries
 
614
        vf2 = self.make_test_vf(True, dir='target')
 
615
        # ordering in 'groupcompress' order, should actually swap the groups in
 
616
        # the target vf, but the groups themselves should not be disturbed.
 
617
        def small_size_stream():
 
618
            for record in vf.get_record_stream([(r,) for r in 'abcdefgh'],
 
619
                                               'groupcompress', False):
 
620
                record._manager._full_enough_block_size = \
 
621
                    record._manager._block._content_length
 
622
                yield record
 
623
                        
 
624
        vf2.insert_record_stream(small_size_stream())
 
625
        stream = vf2.get_record_stream([(r,) for r in 'abcdefgh'],
 
626
                                       'groupcompress', False)
 
627
        vf2.writer.end()
 
628
        num_records = 0
 
629
        for record in stream:
 
630
            num_records += 1
 
631
            self.assertEqual(block_bytes[record.key],
 
632
                             record._manager._block._z_content)
 
633
        self.assertEqual(8, num_records)
 
634
 
 
635
    def test_insert_record_stream_packs_on_the_fly(self):
 
636
        vf = self.make_test_vf(True, dir='source')
 
637
        def grouped_stream(revision_ids, first_parents=()):
 
638
            parents = first_parents
 
639
            for revision_id in revision_ids:
 
640
                key = (revision_id,)
 
641
                record = versionedfile.FulltextContentFactory(
 
642
                    key, parents, None,
 
643
                    'some content that is\n'
 
644
                    'identical except for\n'
 
645
                    'revision_id:%s\n' % (revision_id,))
 
646
                yield record
 
647
                parents = (key,)
 
648
        # One group, a-d
 
649
        vf.insert_record_stream(grouped_stream(['a', 'b', 'c', 'd']))
 
650
        # Second group, e-h
 
651
        vf.insert_record_stream(grouped_stream(['e', 'f', 'g', 'h'],
 
652
                                               first_parents=(('d',),)))
 
653
        # Now copy the blocks into another vf, and see that the
 
654
        # insert_record_stream rebuilt a new block on-the-fly because of
 
655
        # under-utilization
 
656
        vf2 = self.make_test_vf(True, dir='target')
 
657
        vf2.insert_record_stream(vf.get_record_stream(
 
658
            [(r,) for r in 'abcdefgh'], 'groupcompress', False))
 
659
        stream = vf2.get_record_stream([(r,) for r in 'abcdefgh'],
 
660
                                       'groupcompress', False)
 
661
        vf2.writer.end()
 
662
        num_records = 0
 
663
        # All of the records should be recombined into a single block
 
664
        block = None
 
665
        for record in stream:
 
666
            num_records += 1
 
667
            if block is None:
 
668
                block = record._manager._block
 
669
            else:
 
670
                self.assertIs(block, record._manager._block)
 
671
        self.assertEqual(8, num_records)
 
672
 
 
673
    def test__insert_record_stream_no_reuse_block(self):
 
674
        vf = self.make_test_vf(True, dir='source')
 
675
        def grouped_stream(revision_ids, first_parents=()):
 
676
            parents = first_parents
 
677
            for revision_id in revision_ids:
 
678
                key = (revision_id,)
 
679
                record = versionedfile.FulltextContentFactory(
 
680
                    key, parents, None,
 
681
                    'some content that is\n'
 
682
                    'identical except for\n'
 
683
                    'revision_id:%s\n' % (revision_id,))
 
684
                yield record
 
685
                parents = (key,)
 
686
        # One group, a-d
 
687
        vf.insert_record_stream(grouped_stream(['a', 'b', 'c', 'd']))
 
688
        # Second group, e-h
 
689
        vf.insert_record_stream(grouped_stream(['e', 'f', 'g', 'h'],
 
690
                                               first_parents=(('d',),)))
 
691
        vf.writer.end()
 
692
        self.assertEqual(8, len(list(vf.get_record_stream(
 
693
                                        [(r,) for r in 'abcdefgh'],
 
694
                                        'unordered', False))))
 
695
        # Now copy the blocks into another vf, and ensure that the blocks are
 
696
        # preserved without creating new entries
 
697
        vf2 = self.make_test_vf(True, dir='target')
 
698
        # ordering in 'groupcompress' order, should actually swap the groups in
 
699
        # the target vf, but the groups themselves should not be disturbed.
 
700
        list(vf2._insert_record_stream(vf.get_record_stream(
 
701
            [(r,) for r in 'abcdefgh'], 'groupcompress', False),
 
702
            reuse_blocks=False))
 
703
        vf2.writer.end()
 
704
        # After inserting with reuse_blocks=False, we should have everything in
 
705
        # a single new block.
 
706
        stream = vf2.get_record_stream([(r,) for r in 'abcdefgh'],
 
707
                                       'groupcompress', False)
 
708
        block = None
 
709
        for record in stream:
 
710
            if block is None:
 
711
                block = record._manager._block
 
712
            else:
 
713
                self.assertIs(block, record._manager._block)
 
714
 
 
715
    def test_add_missing_noncompression_parent_unvalidated_index(self):
 
716
        unvalidated = self.make_g_index_missing_parent()
 
717
        combined = _mod_index.CombinedGraphIndex([unvalidated])
 
718
        index = groupcompress._GCGraphIndex(combined,
 
719
            is_locked=lambda: True, parents=True,
 
720
            track_external_parent_refs=True)
 
721
        index.scan_unvalidated_index(unvalidated)
 
722
        self.assertEqual(
 
723
            frozenset([('missing-parent',)]), index.get_missing_parents())
 
724
 
 
725
    def test_track_external_parent_refs(self):
 
726
        g_index = self.make_g_index('empty', 1, [])
 
727
        mod_index = btree_index.BTreeBuilder(1, 1)
 
728
        combined = _mod_index.CombinedGraphIndex([g_index, mod_index])
 
729
        index = groupcompress._GCGraphIndex(combined,
 
730
            is_locked=lambda: True, parents=True,
 
731
            add_callback=mod_index.add_nodes,
 
732
            track_external_parent_refs=True)
 
733
        index.add_records([
 
734
            (('new-key',), '2 10 2 10', [(('parent-1',), ('parent-2',))])])
 
735
        self.assertEqual(
 
736
            frozenset([('parent-1',), ('parent-2',)]),
 
737
            index.get_missing_parents())
 
738
 
 
739
    def make_source_with_b(self, a_parent, path):
 
740
        source = self.make_test_vf(True, dir=path)
 
741
        source.add_lines(('a',), (), ['lines\n'])
 
742
        if a_parent:
 
743
            b_parents = (('a',),)
 
744
        else:
 
745
            b_parents = ()
 
746
        source.add_lines(('b',), b_parents, ['lines\n'])
 
747
        return source
 
748
 
 
749
    def do_inconsistent_inserts(self, inconsistency_fatal):
 
750
        target = self.make_test_vf(True, dir='target',
 
751
                                   inconsistency_fatal=inconsistency_fatal)
 
752
        for x in range(2):
 
753
            source = self.make_source_with_b(x==1, 'source%s' % x)
 
754
            target.insert_record_stream(source.get_record_stream(
 
755
                [('b',)], 'unordered', False))
 
756
 
 
757
    def test_inconsistent_redundant_inserts_warn(self):
 
758
        """Should not insert a record that is already present."""
 
759
        warnings = []
 
760
        def warning(template, args):
 
761
            warnings.append(template % args)
 
762
        _trace_warning = trace.warning
 
763
        trace.warning = warning
 
764
        try:
 
765
            self.do_inconsistent_inserts(inconsistency_fatal=False)
 
766
        finally:
 
767
            trace.warning = _trace_warning
 
768
        self.assertEqual(["inconsistent details in skipped record: ('b',)"
 
769
                          " ('42 32 0 8', ((),)) ('74 32 0 8', ((('a',),),))"],
 
770
                         warnings)
 
771
 
 
772
    def test_inconsistent_redundant_inserts_raises(self):
 
773
        e = self.assertRaises(errors.KnitCorrupt, self.do_inconsistent_inserts,
 
774
                              inconsistency_fatal=True)
 
775
        self.assertContainsRe(str(e), "Knit.* corrupt: inconsistent details"
 
776
                              " in add_records:"
 
777
                              " \('b',\) \('42 32 0 8', \(\(\),\)\) \('74 32"
 
778
                              " 0 8', \(\(\('a',\),\),\)\)")
 
779
 
 
780
    def test_clear_cache(self):
 
781
        vf = self.make_source_with_b(True, 'source')
 
782
        vf.writer.end()
 
783
        for record in vf.get_record_stream([('a',), ('b',)], 'unordered',
 
784
                                           True):
 
785
            pass
 
786
        self.assertTrue(len(vf._group_cache) > 0)
 
787
        vf.clear_cache()
 
788
        self.assertEqual(0, len(vf._group_cache))
 
789
 
 
790
 
 
791
class TestGroupCompressConfig(tests.TestCaseWithTransport):
 
792
 
 
793
    def make_test_vf(self):
 
794
        t = self.get_transport('.')
 
795
        t.ensure_base()
 
796
        factory = groupcompress.make_pack_factory(graph=True,
 
797
            delta=False, keylength=1, inconsistency_fatal=True)
 
798
        vf = factory(t)
 
799
        self.addCleanup(groupcompress.cleanup_pack_group, vf)
 
800
        return vf
 
801
 
 
802
    def test_max_bytes_to_index_default(self):
 
803
        vf = self.make_test_vf()
 
804
        gc = vf._make_group_compressor()
 
805
        self.assertEqual(vf._DEFAULT_MAX_BYTES_TO_INDEX,
 
806
                         vf._max_bytes_to_index)
 
807
        if isinstance(gc, groupcompress.PyrexGroupCompressor):
 
808
            self.assertEqual(vf._DEFAULT_MAX_BYTES_TO_INDEX,
 
809
                             gc._delta_index._max_bytes_to_index)
 
810
 
 
811
    def test_max_bytes_to_index_in_config(self):
 
812
        c = config.GlobalConfig()
 
813
        c.set_user_option('bzr.groupcompress.max_bytes_to_index', '10000')
 
814
        vf = self.make_test_vf()
 
815
        gc = vf._make_group_compressor()
 
816
        self.assertEqual(10000, vf._max_bytes_to_index)
 
817
        if isinstance(gc, groupcompress.PyrexGroupCompressor):
 
818
            self.assertEqual(10000, gc._delta_index._max_bytes_to_index)
 
819
 
 
820
    def test_max_bytes_to_index_bad_config(self):
 
821
        c = config.GlobalConfig()
 
822
        c.set_user_option('bzr.groupcompress.max_bytes_to_index', 'boogah')
 
823
        vf = self.make_test_vf()
 
824
        # TODO: This is triggering a warning, we might want to trap and make
 
825
        #       sure it is readable.
 
826
        gc = vf._make_group_compressor()
 
827
        self.assertEqual(vf._DEFAULT_MAX_BYTES_TO_INDEX,
 
828
                         vf._max_bytes_to_index)
 
829
        if isinstance(gc, groupcompress.PyrexGroupCompressor):
 
830
            self.assertEqual(vf._DEFAULT_MAX_BYTES_TO_INDEX,
 
831
                             gc._delta_index._max_bytes_to_index)
 
832
 
 
833
 
 
834
class StubGCVF(object):
 
835
    def __init__(self, canned_get_blocks=None):
 
836
        self._group_cache = {}
 
837
        self._canned_get_blocks = canned_get_blocks or []
 
838
    def _get_blocks(self, read_memos):
 
839
        return iter(self._canned_get_blocks)
 
840
    
 
841
 
 
842
class Test_BatchingBlockFetcher(TestCaseWithGroupCompressVersionedFiles):
 
843
    """Simple whitebox unit tests for _BatchingBlockFetcher."""
 
844
    
 
845
    def test_add_key_new_read_memo(self):
 
846
        """Adding a key with an uncached read_memo new to this batch adds that
 
847
        read_memo to the list of memos to fetch.
 
848
        """
 
849
        # locations are: index_memo, ignored, parents, ignored
 
850
        # where index_memo is: (idx, offset, len, factory_start, factory_end)
 
851
        # and (idx, offset, size) is known as the 'read_memo', identifying the
 
852
        # raw bytes needed.
 
853
        read_memo = ('fake index', 100, 50)
 
854
        locations = {
 
855
            ('key',): (read_memo + (None, None), None, None, None)}
 
856
        batcher = groupcompress._BatchingBlockFetcher(StubGCVF(), locations)
 
857
        total_size = batcher.add_key(('key',))
 
858
        self.assertEqual(50, total_size)
 
859
        self.assertEqual([('key',)], batcher.keys)
 
860
        self.assertEqual([read_memo], batcher.memos_to_get)
 
861
 
 
862
    def test_add_key_duplicate_read_memo(self):
 
863
        """read_memos that occur multiple times in a batch will only be fetched
 
864
        once.
 
865
        """
 
866
        read_memo = ('fake index', 100, 50)
 
867
        # Two keys, both sharing the same read memo (but different overall
 
868
        # index_memos).
 
869
        locations = {
 
870
            ('key1',): (read_memo + (0, 1), None, None, None),
 
871
            ('key2',): (read_memo + (1, 2), None, None, None)}
 
872
        batcher = groupcompress._BatchingBlockFetcher(StubGCVF(), locations)
 
873
        total_size = batcher.add_key(('key1',))
 
874
        total_size = batcher.add_key(('key2',))
 
875
        self.assertEqual(50, total_size)
 
876
        self.assertEqual([('key1',), ('key2',)], batcher.keys)
 
877
        self.assertEqual([read_memo], batcher.memos_to_get)
 
878
 
 
879
    def test_add_key_cached_read_memo(self):
 
880
        """Adding a key with a cached read_memo will not cause that read_memo
 
881
        to be added to the list to fetch.
 
882
        """
 
883
        read_memo = ('fake index', 100, 50)
 
884
        gcvf = StubGCVF()
 
885
        gcvf._group_cache[read_memo] = 'fake block'
 
886
        locations = {
 
887
            ('key',): (read_memo + (None, None), None, None, None)}
 
888
        batcher = groupcompress._BatchingBlockFetcher(gcvf, locations)
 
889
        total_size = batcher.add_key(('key',))
 
890
        self.assertEqual(0, total_size)
 
891
        self.assertEqual([('key',)], batcher.keys)
 
892
        self.assertEqual([], batcher.memos_to_get)
 
893
 
 
894
    def test_yield_factories_empty(self):
 
895
        """An empty batch yields no factories."""
 
896
        batcher = groupcompress._BatchingBlockFetcher(StubGCVF(), {})
 
897
        self.assertEqual([], list(batcher.yield_factories()))
 
898
 
 
899
    def test_yield_factories_calls_get_blocks(self):
 
900
        """Uncached memos are retrieved via get_blocks."""
 
901
        read_memo1 = ('fake index', 100, 50)
 
902
        read_memo2 = ('fake index', 150, 40)
 
903
        gcvf = StubGCVF(
 
904
            canned_get_blocks=[
 
905
                (read_memo1, groupcompress.GroupCompressBlock()),
 
906
                (read_memo2, groupcompress.GroupCompressBlock())])
 
907
        locations = {
 
908
            ('key1',): (read_memo1 + (None, None), None, None, None),
 
909
            ('key2',): (read_memo2 + (None, None), None, None, None)}
 
910
        batcher = groupcompress._BatchingBlockFetcher(gcvf, locations)
 
911
        batcher.add_key(('key1',))
 
912
        batcher.add_key(('key2',))
 
913
        factories = list(batcher.yield_factories(full_flush=True))
 
914
        self.assertLength(2, factories)
 
915
        keys = [f.key for f in factories]
 
916
        kinds = [f.storage_kind for f in factories]
 
917
        self.assertEqual([('key1',), ('key2',)], keys)
 
918
        self.assertEqual(['groupcompress-block', 'groupcompress-block'], kinds)
 
919
 
 
920
    def test_yield_factories_flushing(self):
 
921
        """yield_factories holds back on yielding results from the final block
 
922
        unless passed full_flush=True.
 
923
        """
 
924
        fake_block = groupcompress.GroupCompressBlock()
 
925
        read_memo = ('fake index', 100, 50)
 
926
        gcvf = StubGCVF()
 
927
        gcvf._group_cache[read_memo] = fake_block
 
928
        locations = {
 
929
            ('key',): (read_memo + (None, None), None, None, None)}
 
930
        batcher = groupcompress._BatchingBlockFetcher(gcvf, locations)
 
931
        batcher.add_key(('key',))
 
932
        self.assertEqual([], list(batcher.yield_factories()))
 
933
        factories = list(batcher.yield_factories(full_flush=True))
 
934
        self.assertLength(1, factories)
 
935
        self.assertEqual(('key',), factories[0].key)
 
936
        self.assertEqual('groupcompress-block', factories[0].storage_kind)
 
937
 
 
938
 
 
939
class TestLazyGroupCompress(tests.TestCaseWithTransport):
 
940
 
 
941
    _texts = {
 
942
        ('key1',): "this is a text\n"
 
943
                   "with a reasonable amount of compressible bytes\n"
 
944
                   "which can be shared between various other texts\n",
 
945
        ('key2',): "another text\n"
 
946
                   "with a reasonable amount of compressible bytes\n"
 
947
                   "which can be shared between various other texts\n",
 
948
        ('key3',): "yet another text which won't be extracted\n"
 
949
                   "with a reasonable amount of compressible bytes\n"
 
950
                   "which can be shared between various other texts\n",
 
951
        ('key4',): "this will be extracted\n"
 
952
                   "but references most of its bytes from\n"
 
953
                   "yet another text which won't be extracted\n"
 
954
                   "with a reasonable amount of compressible bytes\n"
 
955
                   "which can be shared between various other texts\n",
 
956
    }
 
957
    def make_block(self, key_to_text):
 
958
        """Create a GroupCompressBlock, filling it with the given texts."""
 
959
        compressor = groupcompress.GroupCompressor()
 
960
        start = 0
 
961
        for key in sorted(key_to_text):
 
962
            compressor.compress(key, key_to_text[key], None)
 
963
        locs = dict((key, (start, end)) for key, (start, _, end, _)
 
964
                    in compressor.labels_deltas.iteritems())
 
965
        block = compressor.flush()
 
966
        raw_bytes = block.to_bytes()
 
967
        return locs, groupcompress.GroupCompressBlock.from_bytes(raw_bytes)
 
968
 
 
969
    def add_key_to_manager(self, key, locations, block, manager):
 
970
        start, end = locations[key]
 
971
        manager.add_factory(key, (), start, end)
 
972
 
 
973
    def make_block_and_full_manager(self, texts):
 
974
        locations, block = self.make_block(texts)
 
975
        manager = groupcompress._LazyGroupContentManager(block)
 
976
        for key in sorted(texts):
 
977
            self.add_key_to_manager(key, locations, block, manager)
 
978
        return block, manager
 
979
 
 
980
    def test_get_fulltexts(self):
 
981
        locations, block = self.make_block(self._texts)
 
982
        manager = groupcompress._LazyGroupContentManager(block)
 
983
        self.add_key_to_manager(('key1',), locations, block, manager)
 
984
        self.add_key_to_manager(('key2',), locations, block, manager)
 
985
        result_order = []
 
986
        for record in manager.get_record_stream():
 
987
            result_order.append(record.key)
 
988
            text = self._texts[record.key]
 
989
            self.assertEqual(text, record.get_bytes_as('fulltext'))
 
990
        self.assertEqual([('key1',), ('key2',)], result_order)
 
991
 
 
992
        # If we build the manager in the opposite order, we should get them
 
993
        # back in the opposite order
 
994
        manager = groupcompress._LazyGroupContentManager(block)
 
995
        self.add_key_to_manager(('key2',), locations, block, manager)
 
996
        self.add_key_to_manager(('key1',), locations, block, manager)
 
997
        result_order = []
 
998
        for record in manager.get_record_stream():
 
999
            result_order.append(record.key)
 
1000
            text = self._texts[record.key]
 
1001
            self.assertEqual(text, record.get_bytes_as('fulltext'))
 
1002
        self.assertEqual([('key2',), ('key1',)], result_order)
 
1003
 
 
1004
    def test__wire_bytes_no_keys(self):
 
1005
        locations, block = self.make_block(self._texts)
 
1006
        manager = groupcompress._LazyGroupContentManager(block)
 
1007
        wire_bytes = manager._wire_bytes()
 
1008
        block_length = len(block.to_bytes())
 
1009
        # We should have triggered a strip, since we aren't using any content
 
1010
        stripped_block = manager._block.to_bytes()
 
1011
        self.assertTrue(block_length > len(stripped_block))
 
1012
        empty_z_header = zlib.compress('')
 
1013
        self.assertEqual('groupcompress-block\n'
 
1014
                         '8\n' # len(compress(''))
 
1015
                         '0\n' # len('')
 
1016
                         '%d\n'# compressed block len
 
1017
                         '%s'  # zheader
 
1018
                         '%s'  # block
 
1019
                         % (len(stripped_block), empty_z_header,
 
1020
                            stripped_block),
 
1021
                         wire_bytes)
 
1022
 
 
1023
    def test__wire_bytes(self):
 
1024
        locations, block = self.make_block(self._texts)
 
1025
        manager = groupcompress._LazyGroupContentManager(block)
 
1026
        self.add_key_to_manager(('key1',), locations, block, manager)
 
1027
        self.add_key_to_manager(('key4',), locations, block, manager)
 
1028
        block_bytes = block.to_bytes()
 
1029
        wire_bytes = manager._wire_bytes()
 
1030
        (storage_kind, z_header_len, header_len,
 
1031
         block_len, rest) = wire_bytes.split('\n', 4)
 
1032
        z_header_len = int(z_header_len)
 
1033
        header_len = int(header_len)
 
1034
        block_len = int(block_len)
 
1035
        self.assertEqual('groupcompress-block', storage_kind)
 
1036
        self.assertEqual(34, z_header_len)
 
1037
        self.assertEqual(26, header_len)
 
1038
        self.assertEqual(len(block_bytes), block_len)
 
1039
        z_header = rest[:z_header_len]
 
1040
        header = zlib.decompress(z_header)
 
1041
        self.assertEqual(header_len, len(header))
 
1042
        entry1 = locations[('key1',)]
 
1043
        entry4 = locations[('key4',)]
 
1044
        self.assertEqualDiff('key1\n'
 
1045
                             '\n'  # no parents
 
1046
                             '%d\n' # start offset
 
1047
                             '%d\n' # end offset
 
1048
                             'key4\n'
 
1049
                             '\n'
 
1050
                             '%d\n'
 
1051
                             '%d\n'
 
1052
                             % (entry1[0], entry1[1],
 
1053
                                entry4[0], entry4[1]),
 
1054
                            header)
 
1055
        z_block = rest[z_header_len:]
 
1056
        self.assertEqual(block_bytes, z_block)
 
1057
 
 
1058
    def test_from_bytes(self):
 
1059
        locations, block = self.make_block(self._texts)
 
1060
        manager = groupcompress._LazyGroupContentManager(block)
 
1061
        self.add_key_to_manager(('key1',), locations, block, manager)
 
1062
        self.add_key_to_manager(('key4',), locations, block, manager)
 
1063
        wire_bytes = manager._wire_bytes()
 
1064
        self.assertStartsWith(wire_bytes, 'groupcompress-block\n')
 
1065
        manager = groupcompress._LazyGroupContentManager.from_bytes(wire_bytes)
 
1066
        self.assertIsInstance(manager, groupcompress._LazyGroupContentManager)
 
1067
        self.assertEqual(2, len(manager._factories))
 
1068
        self.assertEqual(block._z_content, manager._block._z_content)
 
1069
        result_order = []
 
1070
        for record in manager.get_record_stream():
 
1071
            result_order.append(record.key)
 
1072
            text = self._texts[record.key]
 
1073
            self.assertEqual(text, record.get_bytes_as('fulltext'))
 
1074
        self.assertEqual([('key1',), ('key4',)], result_order)
 
1075
 
 
1076
    def test__check_rebuild_no_changes(self):
 
1077
        block, manager = self.make_block_and_full_manager(self._texts)
 
1078
        manager._check_rebuild_block()
 
1079
        self.assertIs(block, manager._block)
 
1080
 
 
1081
    def test__check_rebuild_only_one(self):
 
1082
        locations, block = self.make_block(self._texts)
 
1083
        manager = groupcompress._LazyGroupContentManager(block)
 
1084
        # Request just the first key, which should trigger a 'strip' action
 
1085
        self.add_key_to_manager(('key1',), locations, block, manager)
 
1086
        manager._check_rebuild_block()
 
1087
        self.assertIsNot(block, manager._block)
 
1088
        self.assertTrue(block._content_length > manager._block._content_length)
 
1089
        # We should be able to still get the content out of this block, though
 
1090
        # it should only have 1 entry
 
1091
        for record in manager.get_record_stream():
 
1092
            self.assertEqual(('key1',), record.key)
 
1093
            self.assertEqual(self._texts[record.key],
 
1094
                             record.get_bytes_as('fulltext'))
 
1095
 
 
1096
    def test__check_rebuild_middle(self):
 
1097
        locations, block = self.make_block(self._texts)
 
1098
        manager = groupcompress._LazyGroupContentManager(block)
 
1099
        # Request a small key in the middle should trigger a 'rebuild'
 
1100
        self.add_key_to_manager(('key4',), locations, block, manager)
 
1101
        manager._check_rebuild_block()
 
1102
        self.assertIsNot(block, manager._block)
 
1103
        self.assertTrue(block._content_length > manager._block._content_length)
 
1104
        for record in manager.get_record_stream():
 
1105
            self.assertEqual(('key4',), record.key)
 
1106
            self.assertEqual(self._texts[record.key],
 
1107
                             record.get_bytes_as('fulltext'))
 
1108
 
 
1109
    def test_manager_default_compressor_settings(self):
 
1110
        locations, old_block = self.make_block(self._texts)
 
1111
        manager = groupcompress._LazyGroupContentManager(old_block)
 
1112
        gcvf = groupcompress.GroupCompressVersionedFiles
 
1113
        # It doesn't greedily evaluate _max_bytes_to_index
 
1114
        self.assertIs(None, manager._compressor_settings)
 
1115
        self.assertEqual(gcvf._DEFAULT_COMPRESSOR_SETTINGS,
 
1116
                         manager._get_compressor_settings())
 
1117
 
 
1118
    def test_manager_custom_compressor_settings(self):
 
1119
        locations, old_block = self.make_block(self._texts)
 
1120
        called = []
 
1121
        def compressor_settings():
 
1122
            called.append('called')
 
1123
            return (10,)
 
1124
        manager = groupcompress._LazyGroupContentManager(old_block,
 
1125
            get_compressor_settings=compressor_settings)
 
1126
        gcvf = groupcompress.GroupCompressVersionedFiles
 
1127
        # It doesn't greedily evaluate compressor_settings
 
1128
        self.assertIs(None, manager._compressor_settings)
 
1129
        self.assertEqual((10,), manager._get_compressor_settings())
 
1130
        self.assertEqual((10,), manager._get_compressor_settings())
 
1131
        self.assertEqual((10,), manager._compressor_settings)
 
1132
        # Only called 1 time
 
1133
        self.assertEqual(['called'], called)
 
1134
 
 
1135
    def test__rebuild_handles_compressor_settings(self):
 
1136
        if not isinstance(groupcompress.GroupCompressor,
 
1137
                          groupcompress.PyrexGroupCompressor):
 
1138
            raise tests.TestNotApplicable('pure-python compressor'
 
1139
                ' does not handle compressor_settings')
 
1140
        locations, old_block = self.make_block(self._texts)
 
1141
        manager = groupcompress._LazyGroupContentManager(old_block,
 
1142
            get_compressor_settings=lambda: dict(max_bytes_to_index=32))
 
1143
        gc = manager._make_group_compressor()
 
1144
        self.assertEqual(32, gc._delta_index._max_bytes_to_index)
 
1145
        self.add_key_to_manager(('key3',), locations, old_block, manager)
 
1146
        self.add_key_to_manager(('key4',), locations, old_block, manager)
 
1147
        action, last_byte, total_bytes = manager._check_rebuild_action()
 
1148
        self.assertEqual('rebuild', action)
 
1149
        manager._rebuild_block()
 
1150
        new_block = manager._block
 
1151
        self.assertIsNot(old_block, new_block)
 
1152
        # Because of the new max_bytes_to_index, we do a poor job of
 
1153
        # rebuilding. This is a side-effect of the change, but at least it does
 
1154
        # show the setting had an effect.
 
1155
        self.assertTrue(old_block._content_length < new_block._content_length)
 
1156
 
 
1157
    def test_check_is_well_utilized_all_keys(self):
 
1158
        block, manager = self.make_block_and_full_manager(self._texts)
 
1159
        self.assertFalse(manager.check_is_well_utilized())
 
1160
        # Though we can fake it by changing the recommended minimum size
 
1161
        manager._full_enough_block_size = block._content_length
 
1162
        self.assertTrue(manager.check_is_well_utilized())
 
1163
        # Setting it just above causes it to fail
 
1164
        manager._full_enough_block_size = block._content_length + 1
 
1165
        self.assertFalse(manager.check_is_well_utilized())
 
1166
        # Setting the mixed-block size doesn't do anything, because the content
 
1167
        # is considered to not be 'mixed'
 
1168
        manager._full_enough_mixed_block_size = block._content_length
 
1169
        self.assertFalse(manager.check_is_well_utilized())
 
1170
 
 
1171
    def test_check_is_well_utilized_mixed_keys(self):
 
1172
        texts = {}
 
1173
        f1k1 = ('f1', 'k1')
 
1174
        f1k2 = ('f1', 'k2')
 
1175
        f2k1 = ('f2', 'k1')
 
1176
        f2k2 = ('f2', 'k2')
 
1177
        texts[f1k1] = self._texts[('key1',)]
 
1178
        texts[f1k2] = self._texts[('key2',)]
 
1179
        texts[f2k1] = self._texts[('key3',)]
 
1180
        texts[f2k2] = self._texts[('key4',)]
 
1181
        block, manager = self.make_block_and_full_manager(texts)
 
1182
        self.assertFalse(manager.check_is_well_utilized())
 
1183
        manager._full_enough_block_size = block._content_length
 
1184
        self.assertTrue(manager.check_is_well_utilized())
 
1185
        manager._full_enough_block_size = block._content_length + 1
 
1186
        self.assertFalse(manager.check_is_well_utilized())
 
1187
        manager._full_enough_mixed_block_size = block._content_length
 
1188
        self.assertTrue(manager.check_is_well_utilized())
 
1189
 
 
1190
    def test_check_is_well_utilized_partial_use(self):
 
1191
        locations, block = self.make_block(self._texts)
 
1192
        manager = groupcompress._LazyGroupContentManager(block)
 
1193
        manager._full_enough_block_size = block._content_length
 
1194
        self.add_key_to_manager(('key1',), locations, block, manager)
 
1195
        self.add_key_to_manager(('key2',), locations, block, manager)
 
1196
        # Just using the content from key1 and 2 is not enough to be considered
 
1197
        # 'complete'
 
1198
        self.assertFalse(manager.check_is_well_utilized())
 
1199
        # However if we add key3, then we have enough, as we only require 75%
 
1200
        # consumption
 
1201
        self.add_key_to_manager(('key4',), locations, block, manager)
 
1202
        self.assertTrue(manager.check_is_well_utilized())
 
1203
 
 
1204
 
 
1205
class Test_GCBuildDetails(tests.TestCase):
 
1206
 
 
1207
    def test_acts_like_tuple(self):
 
1208
        # _GCBuildDetails inlines some of the data that used to be spread out
 
1209
        # across a bunch of tuples
 
1210
        bd = groupcompress._GCBuildDetails((('parent1',), ('parent2',)),
 
1211
            ('INDEX', 10, 20, 0, 5))
 
1212
        self.assertEqual(4, len(bd))
 
1213
        self.assertEqual(('INDEX', 10, 20, 0, 5), bd[0])
 
1214
        self.assertEqual(None, bd[1]) # Compression Parent is always None
 
1215
        self.assertEqual((('parent1',), ('parent2',)), bd[2])
 
1216
        self.assertEqual(('group', None), bd[3]) # Record details
 
1217
 
 
1218
    def test__repr__(self):
 
1219
        bd = groupcompress._GCBuildDetails((('parent1',), ('parent2',)),
 
1220
            ('INDEX', 10, 20, 0, 5))
 
1221
        self.assertEqual("_GCBuildDetails(('INDEX', 10, 20, 0, 5),"
 
1222
                         " (('parent1',), ('parent2',)))",
 
1223
                         repr(bd))
 
1224