~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/groupcompress.py

Merge the _LazyGroupContentManager, et al.

This allows us to stream GroupCompressBlocks in their compressed form, and unpack them
during insert, rather than during get().

Show diffs side-by-side

added added

removed removed

Lines of Context:
19
19
from itertools import izip
20
20
from cStringIO import StringIO
21
21
import struct
 
22
import time
22
23
import zlib
23
24
try:
24
25
    import pylzma
34
35
    osutils,
35
36
    pack,
36
37
    patiencediff,
 
38
    trace,
37
39
    )
38
40
from bzrlib.graph import Graph
39
41
from bzrlib.knit import _DirectPackAccess
54
56
    )
55
57
 
56
58
_USE_LZMA = False and (pylzma is not None)
57
 
_NO_LABELS = False
 
59
_NO_LABELS = True
58
60
_FAST = False
59
61
 
60
62
def encode_base128_int(val):
131
133
            self.key, self.type, self.sha1, self.start, self.length
132
134
            )
133
135
 
 
136
    @property
 
137
    def end(self):
 
138
        return self.start + self.length
 
139
 
 
140
# The max zlib window size is 32kB, so if we set 'max_size' output of the
 
141
# decompressor to the requested bytes + 32kB, then we should guarantee
 
142
# num_bytes coming out.
 
143
_ZLIB_DECOMP_WINDOW = 32*1024
134
144
 
135
145
class GroupCompressBlock(object):
136
146
    """An object which maintains the internal structure of the compressed data.
145
155
    def __init__(self):
146
156
        # map by key? or just order in file?
147
157
        self._entries = {}
 
158
        self._compressor_name = None
 
159
        self._z_header_length = None
 
160
        self._header_length = None
 
161
        self._z_header = None
 
162
        self._z_content = None
 
163
        self._z_content_decompressor = None
 
164
        self._z_content_length = None
 
165
        self._content_length = None
148
166
        self._content = None
149
 
        self._size = 0
 
167
 
 
168
    def __len__(self):
 
169
        return self._content_length + self._header_length
150
170
 
151
171
    def _parse_header(self):
152
 
        """Parse the meta-info from the stream."""
153
 
 
154
 
    def __len__(self):
155
 
        return self._size
156
 
 
157
 
    def _parse_header_bytes(self, header_bytes):
158
172
        """Parse the header part of the block."""
159
 
        if _NO_LABELS:
160
 
            # Don't parse the label structure if we aren't going to use it
 
173
        assert self._z_header is not None
 
174
        if self._z_header == '':
 
175
            # Nothing to process
 
176
            self._z_header = None
161
177
            return
162
 
        lines = header_bytes.split('\n')
 
178
        if self._compressor_name == 'lzma':
 
179
            header = pylzma.decompress(self._z_header)
 
180
        else:
 
181
            assert self._compressor_name == 'zlib'
 
182
            header = zlib.decompress(self._z_header)
 
183
        self._z_header = None # We have consumed the header
 
184
        lines = header.split('\n')
 
185
        del header
163
186
        info_dict = {}
164
187
        for line in lines:
165
188
            if not line: #End of record
177
200
                value = intern(value)
178
201
            info_dict[key] = value
179
202
 
 
203
    def _ensure_content(self, num_bytes=None):
 
204
        """Make sure that content has been expanded enough.
 
205
 
 
206
        :param num_bytes: Ensure that we have extracted at least num_bytes of
 
207
            content. If None, consume everything
 
208
        """
 
209
        # TODO: If we re-use the same content block at different times during
 
210
        #       get_record_stream(), it is possible that the first pass will
 
211
        #       get inserted, triggering an extract/_ensure_content() which
 
212
        #       will get rid of _z_content. And then the next use of the block
 
213
        #       will try to access _z_content (to send it over the wire), and
 
214
        #       fail because it is already extracted. Consider never releasing
 
215
        #       _z_content because of this.
 
216
        if num_bytes is None:
 
217
            num_bytes = self._content_length
 
218
        if self._content_length is not None:
 
219
            assert num_bytes <= self._content_length
 
220
        if self._content is None:
 
221
            assert self._z_content is not None
 
222
            if self._z_content == '':
 
223
                self._content = ''
 
224
            elif self._compressor_name == 'lzma':
 
225
                # We don't do partial lzma decomp yet
 
226
                self._content = pylma.decompress(self._z_content)
 
227
            else:
 
228
                # Start a zlib decompressor
 
229
                assert self._compressor_name == 'zlib'
 
230
                if num_bytes is None:
 
231
                    self._content = zlib.decompress(self._z_content)
 
232
                else:
 
233
                    self._z_content_decompressor = zlib.decompressobj()
 
234
                    # Seed the decompressor with the uncompressed bytes, so
 
235
                    # that the rest of the code is simplified
 
236
                    self._content = self._z_content_decompressor.decompress(
 
237
                        self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
 
238
                # Any bytes remaining to be decompressed will be in the
 
239
                # decompressors 'unconsumed_tail'
 
240
        # Do we have enough bytes already?
 
241
        if num_bytes is not None and len(self._content) >= num_bytes:
 
242
            return
 
243
        if num_bytes is None and self._z_content_decompressor is None:
 
244
            # We must have already decompressed everything
 
245
            return
 
246
        # If we got this far, and don't have a decompressor, something is wrong
 
247
        assert self._z_content_decompressor is not None
 
248
        remaining_decomp = self._z_content_decompressor.unconsumed_tail
 
249
        if num_bytes is None:
 
250
            if remaining_decomp:
 
251
                # We don't know how much is left, but we'll decompress it all
 
252
                self._content += self._z_content_decompressor.decompress(
 
253
                    remaining_decomp)
 
254
                # Note: There what I consider a bug in zlib.decompressobj
 
255
                #       If you pass back in the entire unconsumed_tail, only
 
256
                #       this time you don't pass a max-size, it doesn't
 
257
                #       change the unconsumed_tail back to None/''.
 
258
                #       However, we know we are done with the whole stream
 
259
                self._z_content_decompressor = None
 
260
            self._content_length = len(self._content)
 
261
        else:
 
262
            # If we have nothing left to decomp, we ran out of decomp bytes
 
263
            assert remaining_decomp
 
264
            needed_bytes = num_bytes - len(self._content)
 
265
            # We always set max_size to 32kB over the minimum needed, so that
 
266
            # zlib will give us as much as we really want.
 
267
            # TODO: If this isn't good enough, we could make a loop here,
 
268
            #       that keeps expanding the request until we get enough
 
269
            self._content += self._z_content_decompressor.decompress(
 
270
                remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
 
271
            assert len(self._content) >= num_bytes
 
272
            if not self._z_content_decompressor.unconsumed_tail:
 
273
                # The stream is finished
 
274
                self._z_content_decompressor = None
 
275
 
 
276
    def _parse_bytes(self, bytes):
 
277
        """Read the various lengths from the header.
 
278
 
 
279
        This also populates the various 'compressed' buffers.
 
280
 
 
281
        :return: The position in bytes just after the last newline
 
282
        """
 
283
        # At present, there are 4 lengths to be read, we have 2 integers for
 
284
        # the length of the compressed and uncompressed header, and 2 integers
 
285
        # for the compressed and uncompressed content
 
286
        # 14 bytes can represent > 1TB, so to avoid checking too far, cap the
 
287
        # search to 14 bytes.
 
288
        pos = bytes.index('\n', 6, 20)
 
289
        self._z_header_length = int(bytes[6:pos])
 
290
        pos += 1
 
291
        pos2 = bytes.index('\n', pos, pos + 14)
 
292
        self._header_length = int(bytes[pos:pos2])
 
293
        end_of_z_lengths = pos2
 
294
        pos2 += 1
 
295
        # Older versions don't have the content lengths, if we want to preserve
 
296
        # backwards compatibility, we could try/except over these, and allow
 
297
        # them to be skipped
 
298
        try:
 
299
            pos = bytes.index('\n', pos2, pos2 + 14)
 
300
            self._z_content_length = int(bytes[pos2:pos])
 
301
            pos += 1
 
302
            pos2 = bytes.index('\n', pos, pos + 14)
 
303
            self._content_length = int(bytes[pos:pos2])
 
304
            pos = pos2 + 1
 
305
            assert len(bytes) == (pos + self._z_header_length +
 
306
                                  self._z_content_length)
 
307
            pos2 = pos + self._z_header_length
 
308
            self._z_header = bytes[pos:pos2]
 
309
            self._z_content = bytes[pos2:]
 
310
            assert len(self._z_content) == self._z_content_length
 
311
        except ValueError:
 
312
            # This is the older form, which did not encode its content length
 
313
            pos = end_of_z_lengths + 1
 
314
            pos2 = pos + self._z_header_length
 
315
            self._z_header = bytes[pos:pos2]
 
316
            self._z_content = bytes[pos2:]
 
317
            self._z_content_length = len(self._z_content)
 
318
 
180
319
    @classmethod
181
320
    def from_bytes(cls, bytes):
182
321
        out = cls()
183
322
        if bytes[:6] not in (cls.GCB_HEADER, cls.GCB_LZ_HEADER):
184
323
            raise ValueError('bytes did not start with %r' % (cls.GCB_HEADER,))
185
324
        if bytes[4] == 'z':
186
 
            decomp = zlib.decompress
 
325
            out._compressor_name = 'zlib'
187
326
        elif bytes[4] == 'l':
188
 
            decomp = pylzma.decompress
 
327
            out._compressor_name = 'lzma'
189
328
        else:
190
329
            raise ValueError('unknown compressor: %r' % (bytes,))
191
 
        pos = bytes.index('\n', 6)
192
 
        z_header_length = int(bytes[6:pos])
193
 
        pos += 1
194
 
        pos2 = bytes.index('\n', pos)
195
 
        header_length = int(bytes[pos:pos2])
196
 
        if z_header_length == 0:
197
 
            if header_length != 0:
198
 
                raise ValueError('z_header_length 0, but header length != 0')
199
 
            zcontent = bytes[pos2+1:]
200
 
            if zcontent:
201
 
                out._content = decomp(zcontent)
202
 
                out._size = len(out._content)
203
 
            return out
204
 
        pos = pos2 + 1
205
 
        pos2 = pos + z_header_length
206
 
        z_header_bytes = bytes[pos:pos2]
207
 
        if len(z_header_bytes) != z_header_length:
208
 
            raise ValueError('Wrong length of compressed header. %s != %s'
209
 
                             % (len(z_header_bytes), z_header_length))
210
 
        header_bytes = decomp(z_header_bytes)
211
 
        if len(header_bytes) != header_length:
212
 
            raise ValueError('Wrong length of header. %s != %s'
213
 
                             % (len(header_bytes), header_length))
214
 
        del z_header_bytes
215
 
        out._parse_header_bytes(header_bytes)
216
 
        del header_bytes
217
 
        zcontent = bytes[pos2:]
218
 
        if zcontent:
219
 
            out._content = decomp(zcontent)
220
 
            out._size = header_length + len(out._content)
 
330
        out._parse_bytes(bytes)
 
331
        if not _NO_LABELS:
 
332
            out._parse_header()
221
333
        return out
222
334
 
223
 
    def extract(self, key, index_memo, sha1=None):
 
335
    def extract(self, key, start, end, sha1=None):
224
336
        """Extract the text for a specific key.
225
337
 
226
338
        :param key: The label used for this content
227
339
        :param sha1: TODO (should we validate only when sha1 is supplied?)
228
340
        :return: The bytes for the content
229
341
        """
230
 
        if _NO_LABELS or not self._entries:
231
 
            start, end = index_memo[3:5]
232
 
            # The bytes are 'f' or 'd' for the type, then a variable-length
233
 
            # base128 integer for the content size, then the actual content
234
 
            # We know that the variable-length integer won't be longer than 10
235
 
            # bytes (it only takes 5 bytes to encode 2^32)
236
 
            c = self._content[start]
237
 
            if c == 'f':
238
 
                type = 'fulltext'
239
 
            else:
240
 
                if c != 'd':
241
 
                    raise ValueError('Unknown content control code: %s'
242
 
                                     % (c,))
243
 
                type = 'delta'
244
 
            entry = GroupCompressBlockEntry(key, type, sha1=None,
245
 
                                            start=start, length=end-start)
246
 
        else:
247
 
            entry = self._entries[key]
248
 
            c = self._content[entry.start]
249
 
            if entry.type == 'fulltext':
250
 
                if c != 'f':
251
 
                    raise ValueError('Label claimed fulltext, byte claims: %s'
252
 
                                     % (c,))
253
 
            elif entry.type == 'delta':
254
 
                if c != 'd':
255
 
                    raise ValueError('Label claimed delta, byte claims: %s'
256
 
                                     % (c,))
257
 
            start = entry.start
 
342
        # Make sure we have enough bytes for this record
 
343
        # TODO: if we didn't want to track the end of this entry, we could
 
344
        #       _ensure_content(start+enough_bytes_for_type_and_length), and
 
345
        #       then decode the entry length, and
 
346
        #       _ensure_content(start+1+length)
 
347
        #       It is 2 calls to _ensure_content(), but we always buffer a bit
 
348
        #       extra anyway, and it means 1 less offset stored in the index,
 
349
        #       and transmitted over the wire
 
350
        if end is None:
 
351
            # it takes 5 bytes to encode 2^32, so we need 1 byte to hold the
 
352
            # 'f' or 'd' declaration, and then 5 more for the record length.
 
353
            self._ensure_content(start + 6)
 
354
        else:
 
355
            self._ensure_content(end)
 
356
        # The bytes are 'f' or 'd' for the type, then a variable-length
 
357
        # base128 integer for the content size, then the actual content
 
358
        # We know that the variable-length integer won't be longer than 5
 
359
        # bytes (it takes 5 bytes to encode 2^32)
 
360
        c = self._content[start]
 
361
        if c == 'f':
 
362
            type = 'fulltext'
 
363
        else:
 
364
            if c != 'd':
 
365
                raise ValueError('Unknown content control code: %s'
 
366
                                 % (c,))
 
367
            type = 'delta'
258
368
        content_len, len_len = decode_base128_int(
259
 
                            self._content[entry.start + 1:entry.start + 11])
260
 
        content_start = entry.start + 1 + len_len
261
 
        end = entry.start + entry.length
 
369
                            self._content[start + 1:start + 6])
 
370
        content_start = start + 1 + len_len
 
371
        if end is None:
 
372
            end = content_start + content_len
 
373
            self._ensure_content(end)
 
374
        else:
 
375
            if end != content_start + content_len:
 
376
                raise ValueError('end != len according to field header'
 
377
                    ' %s != %s' % (end, content_start + content_len))
 
378
        entry = GroupCompressBlockEntry(key, type, sha1=None,
 
379
                                        start=start, length=end-start)
262
380
        content = self._content[content_start:end]
263
381
        if c == 'f':
264
382
            bytes = content
284
402
        self._entries[key] = entry
285
403
        return entry
286
404
 
287
 
    def to_bytes(self, content=''):
 
405
    def set_content(self, content):
 
406
        """Set the content of this block."""
 
407
        self._content_length = len(content)
 
408
        self._content = content
 
409
        self._z_content = None
 
410
        self._z_header_length = None
 
411
 
 
412
    def to_bytes(self):
288
413
        """Encode the information into a byte stream."""
289
414
        compress = zlib.compress
290
415
        if _USE_LZMA:
307
432
            chunks.append(chunk)
308
433
        bytes = ''.join(chunks)
309
434
        info_len = len(bytes)
310
 
        z_bytes = []
311
 
        z_bytes.append(compress(bytes))
312
 
        del bytes
 
435
        z_header_bytes = compress(bytes)
 
436
        del bytes, chunks
 
437
        z_header_len = len(z_header_bytes)
313
438
        # TODO: we may want to have the header compressed in the same chain
314
439
        #       as the data, or we may not, evaulate it
315
440
        #       having them compressed together is probably a win for
317
442
        #       label in the header is duplicated in the text.
318
443
        #       For chk pages and real bytes, I would guess this is not
319
444
        #       true.
320
 
        z_len = sum(map(len, z_bytes))
321
 
        c_len = len(content)
322
445
        if _NO_LABELS:
323
 
            z_bytes = []
324
 
            z_len = 0
 
446
            z_header_bytes = ''
 
447
            z_header_len = 0
325
448
            info_len = 0
326
 
        z_bytes.append(compress(content))
 
449
        if self._z_content is not None:
 
450
            content_len = self._content_length
 
451
            z_content_len = self._z_content_length
 
452
            z_content_bytes = self._z_content
 
453
        else:
 
454
            assert self._content is not None
 
455
            content_len = self._content_length
 
456
            z_content_bytes = compress(self._content)
 
457
            self._z_content = z_content_bytes
 
458
            z_content_len = len(z_content_bytes)
 
459
            self._z_content_length = z_content_len
327
460
        if _USE_LZMA:
328
461
            header = self.GCB_LZ_HEADER
329
462
        else:
330
463
            header = self.GCB_HEADER
331
464
        chunks = [header,
332
 
                  '%d\n' % (z_len,),
333
 
                  '%d\n' % (info_len,),
334
 
                  #'%d\n' % (c_len,),
 
465
                  '%d\n%d\n%d\n%d\n' % (z_header_len, info_len,
 
466
                                        z_content_len, content_len)
335
467
                 ]
336
 
        chunks.extend(z_bytes)
 
468
        chunks.append(z_header_bytes)
 
469
        chunks.append(z_content_bytes)
337
470
        return ''.join(chunks)
338
471
 
339
472
 
 
473
class _LazyGroupCompressFactory(object):
 
474
    """Yield content from a GroupCompressBlock on demand."""
 
475
 
 
476
    def __init__(self, key, parents, manager, start, end, first):
 
477
        """Create a _LazyGroupCompressFactory
 
478
 
 
479
        :param key: The key of just this record
 
480
        :param parents: The parents of this key (possibly None)
 
481
        :param gc_block: A GroupCompressBlock object
 
482
        :param start: Offset of the first byte for this record in the
 
483
            uncompressd content
 
484
        :param end: Offset of the byte just after the end of this record
 
485
            (ie, bytes = content[start:end])
 
486
        :param first: Is this the first Factory for the given block?
 
487
        """
 
488
        self.key = key
 
489
        self.parents = parents
 
490
        self.sha1 = None
 
491
        # Note: This attribute coupled with Manager._factories creates a
 
492
        #       reference cycle. Perhaps we would rather use a weakref(), or
 
493
        #       find an appropriate time to release the ref. After the first
 
494
        #       get_bytes_as call? After Manager.get_record_stream() returns
 
495
        #       the object?
 
496
        self._manager = manager
 
497
        self.storage_kind = 'groupcompress-block'
 
498
        if not first:
 
499
            self.storage_kind = 'groupcompress-block-ref'
 
500
        self._first = first
 
501
        self._start = start
 
502
        self._end = end
 
503
 
 
504
    def __repr__(self):
 
505
        return '%s(%s, first=%s)' % (self.__class__.__name__,
 
506
            self.key, self._first)
 
507
 
 
508
    def get_bytes_as(self, storage_kind):
 
509
        if storage_kind == self.storage_kind:
 
510
            if self._first:
 
511
                # wire bytes, something...
 
512
                return self._manager._wire_bytes()
 
513
            else:
 
514
                return ''
 
515
        if storage_kind in ('fulltext', 'chunked'):
 
516
            self._manager._prepare_for_extract()
 
517
            block = self._manager._block
 
518
            _, bytes = block.extract(self.key, self._start, self._end)
 
519
            if storage_kind == 'fulltext':
 
520
                return bytes
 
521
            else:
 
522
                return [bytes]
 
523
        raise errors.UnavailableRepresentation(self.key, storage_kind,
 
524
            self.storage_kind)
 
525
 
 
526
 
 
527
class _LazyGroupContentManager(object):
 
528
    """This manages a group of _LazyGroupCompressFactory objects."""
 
529
 
 
530
    def __init__(self, block):
 
531
        self._block = block
 
532
        # We need to preserve the ordering
 
533
        self._factories = []
 
534
        self._last_byte = 0
 
535
 
 
536
    def add_factory(self, key, parents, start, end):
 
537
        if not self._factories:
 
538
            first = True
 
539
        else:
 
540
            first = False
 
541
        # Note that this creates a reference cycle....
 
542
        factory = _LazyGroupCompressFactory(key, parents, self,
 
543
            start, end, first=first)
 
544
        self._last_byte = max(end, self._last_byte)
 
545
        self._factories.append(factory)
 
546
 
 
547
    def get_record_stream(self):
 
548
        """Get a record for all keys added so far."""
 
549
        for factory in self._factories:
 
550
            yield factory
 
551
        # TODO: Consider setting self._factories = None after the above loop,
 
552
        #       as it will break the reference cycle
 
553
 
 
554
    def _trim_block(self, last_byte):
 
555
        """Create a new GroupCompressBlock, with just some of the content."""
 
556
        # None of the factories need to be adjusted, because the content is
 
557
        # located in an identical place. Just that some of the unreferenced
 
558
        # trailing bytes are stripped
 
559
        trace.mutter('stripping trailing bytes from groupcompress block'
 
560
                     ' %d => %d', self._block._content_length, last_byte)
 
561
        new_block = GroupCompressBlock()
 
562
        self._block._ensure_content(last_byte)
 
563
        new_block.set_content(self._block._content[:last_byte])
 
564
        self._block = new_block
 
565
 
 
566
    def _rebuild_block(self):
 
567
        """Create a new GroupCompressBlock with only the referenced texts."""
 
568
        compressor = GroupCompressor()
 
569
        tstart = time.time()
 
570
        old_length = self._block._content_length
 
571
        cur_endpoint = 0
 
572
        for factory in self._factories:
 
573
            bytes = factory.get_bytes_as('fulltext')
 
574
            (found_sha1, end_point, type,
 
575
             length) = compressor.compress(factory.key, bytes, factory.sha1)
 
576
            # Now update this factory with the new offsets, etc
 
577
            factory.sha1 = found_sha1
 
578
            factory._start = cur_endpoint
 
579
            factory._end = end_point
 
580
            cur_endpoint = end_point
 
581
        self._last_byte = cur_endpoint
 
582
        new_block = compressor.flush()
 
583
        # TODO: Should we check that new_block really *is* smaller than the old
 
584
        #       block? It seems hard to come up with a method that it would
 
585
        #       expand, since we do full compression again. Perhaps based on a
 
586
        #       request that ends up poorly ordered?
 
587
        delta = time.time() - tstart
 
588
        self._block = new_block
 
589
        trace.mutter('creating new compressed block on-the-fly in %.3fs'
 
590
                     ' %d bytes => %d bytes', delta, old_length,
 
591
                     self._block._content_length)
 
592
 
 
593
    def _prepare_for_extract(self):
 
594
        """A _LazyGroupCompressFactory is about to extract to fulltext."""
 
595
        # We expect that if one child is going to fulltext, all will be. This
 
596
        # helps prevent all of them from extracting a small amount at a time.
 
597
        # Which in itself isn't terribly expensive, but resizing 2MB 32kB at a
 
598
        # time (self._block._content) is a little expensive.
 
599
        self._block._ensure_content(self._last_byte)
 
600
 
 
601
    def _check_rebuild_block(self):
 
602
        """Check to see if our block should be repacked."""
 
603
        total_bytes_used = 0
 
604
        last_byte_used = 0
 
605
        for factory in self._factories:
 
606
            total_bytes_used += factory._end - factory._start
 
607
            last_byte_used = max(last_byte_used, factory._end)
 
608
        # If we are using most of the bytes from the block, we have nothing
 
609
        # else to check (currently more that 1/2)
 
610
        if total_bytes_used * 2 >= self._block._content_length:
 
611
            return
 
612
        # Can we just strip off the trailing bytes? If we are going to be
 
613
        # transmitting more than 50% of the front of the content, go ahead
 
614
        if total_bytes_used * 2 > last_byte_used:
 
615
            self._trim_block(last_byte_used)
 
616
            return
 
617
 
 
618
        # We are using a small amount of the data, and it isn't just packed
 
619
        # nicely at the front, so rebuild the content.
 
620
        # Note: This would be *nicer* as a strip-data-from-group, rather than
 
621
        #       building it up again from scratch
 
622
        #       It might be reasonable to consider the fulltext sizes for
 
623
        #       different bits when deciding this, too. As you may have a small
 
624
        #       fulltext, and a trivial delta, and you are just trading around
 
625
        #       for another fulltext. If we do a simple 'prune' you may end up
 
626
        #       expanding many deltas into fulltexts, as well.
 
627
        #       If we build a cheap enough 'strip', then we could try a strip,
 
628
        #       if that expands the content, we then rebuild.
 
629
        self._rebuild_block()
 
630
 
 
631
    def _wire_bytes(self):
 
632
        """Return a byte stream suitable for transmitting over the wire."""
 
633
        self._check_rebuild_block()
 
634
        # The outer block starts with:
 
635
        #   'groupcompress-block\n'
 
636
        #   <length of compressed key info>\n
 
637
        #   <length of uncompressed info>\n
 
638
        #   <length of gc block>\n
 
639
        #   <header bytes>
 
640
        #   <gc-block>
 
641
        lines = ['groupcompress-block\n']
 
642
        # The minimal info we need is the key, the start offset, and the
 
643
        # parents. The length and type are encoded in the record itself.
 
644
        # However, passing in the other bits makes it easier.  The list of
 
645
        # keys, and the start offset, the length
 
646
        # 1 line key
 
647
        # 1 line with parents, '' for ()
 
648
        # 1 line for start offset
 
649
        # 1 line for end byte
 
650
        header_lines = []
 
651
        for factory in self._factories:
 
652
            key_bytes = '\x00'.join(factory.key)
 
653
            parents = factory.parents
 
654
            if parents is None:
 
655
                parent_bytes = 'None:'
 
656
            else:
 
657
                parent_bytes = '\t'.join('\x00'.join(key) for key in parents)
 
658
            record_header = '%s\n%s\n%d\n%d\n' % (
 
659
                key_bytes, parent_bytes, factory._start, factory._end)
 
660
            header_lines.append(record_header)
 
661
        header_bytes = ''.join(header_lines)
 
662
        del header_lines
 
663
        header_bytes_len = len(header_bytes)
 
664
        z_header_bytes = zlib.compress(header_bytes)
 
665
        del header_bytes
 
666
        z_header_bytes_len = len(z_header_bytes)
 
667
        block_bytes = self._block.to_bytes()
 
668
        lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
 
669
                                       len(block_bytes)))
 
670
        lines.append(z_header_bytes)
 
671
        lines.append(block_bytes)
 
672
        del z_header_bytes, block_bytes
 
673
        return ''.join(lines)
 
674
 
 
675
    @classmethod
 
676
    def from_bytes(cls, bytes):
 
677
        # TODO: This does extra string copying, probably better to do it a
 
678
        #       different way
 
679
        (storage_kind, z_header_len, header_len,
 
680
         block_len, rest) = bytes.split('\n', 4)
 
681
        del bytes
 
682
        if storage_kind != 'groupcompress-block':
 
683
            raise ValueError('Unknown storage kind: %s' % (storage_kind,))
 
684
        z_header_len = int(z_header_len)
 
685
        if len(rest) < z_header_len:
 
686
            raise ValueError('Compressed header len shorter than all bytes')
 
687
        z_header = rest[:z_header_len]
 
688
        header_len = int(header_len)
 
689
        header = zlib.decompress(z_header)
 
690
        if len(header) != header_len:
 
691
            raise ValueError('invalid length for decompressed bytes')
 
692
        del z_header
 
693
        block_len = int(block_len)
 
694
        if len(rest) != z_header_len + block_len:
 
695
            raise ValueError('Invalid length for block')
 
696
        block_bytes = rest[z_header_len:]
 
697
        del rest
 
698
        # So now we have a valid GCB, we just need to parse the factories that
 
699
        # were sent to us
 
700
        header_lines = header.split('\n')
 
701
        del header
 
702
        last = header_lines.pop()
 
703
        if last != '':
 
704
            raise ValueError('header lines did not end with a trailing'
 
705
                             ' newline')
 
706
        if len(header_lines) % 4 != 0:
 
707
            raise ValueError('The header was not an even multiple of 4 lines')
 
708
        block = GroupCompressBlock.from_bytes(block_bytes)
 
709
        del block_bytes
 
710
        result = cls(block)
 
711
        for start in xrange(0, len(header_lines), 4):
 
712
            # intern()?
 
713
            key = tuple(header_lines[start].split('\x00'))
 
714
            parents_line = header_lines[start+1]
 
715
            if parents_line == 'None:':
 
716
                parents = None
 
717
            else:
 
718
                parents = tuple([tuple(segment.split('\x00'))
 
719
                                 for segment in parents_line.split('\t')
 
720
                                  if segment])
 
721
            start_offset = int(header_lines[start+2])
 
722
            end_offset = int(header_lines[start+3])
 
723
            result.add_factory(key, parents, start_offset, end_offset)
 
724
        return result
 
725
 
 
726
 
 
727
def network_block_to_records(storage_kind, bytes, line_end):
 
728
    if storage_kind != 'groupcompress-block':
 
729
        raise ValueError('Unknown storage kind: %s' % (storage_kind,))
 
730
    manager = _LazyGroupContentManager.from_bytes(bytes)
 
731
    return manager.get_record_stream()
 
732
 
 
733
 
340
734
class GroupCompressor(object):
341
735
    """Produce a serialised group of compressed texts.
342
736
 
353
747
       left side.
354
748
    """
355
749
 
356
 
    def __init__(self, delta=True):
357
 
        """Create a GroupCompressor.
358
 
 
359
 
        :param delta: If False, do not compress records.
360
 
        """
 
750
    def __init__(self):
 
751
        """Create a GroupCompressor."""
361
752
        # Consider seeding the lines with some sort of GC Start flag, or
362
753
        # putting it as part of the output stream, rather than in the
363
754
        # compressed bytes.
488
879
                             % (entry.sha1, bytes_sha1))
489
880
        return bytes, entry.sha1
490
881
 
 
882
    def flush(self):
 
883
        """Finish this group, creating a formatted stream."""
 
884
        content = ''.join(self.lines)
 
885
        self.lines = None
 
886
        self._block.set_content(content)
 
887
        return self._block
 
888
 
491
889
    def output_chunks(self, new_chunks):
492
890
        """Output some chunks.
493
891
 
899
1297
                unadded_keys, source_result)
900
1298
        for key in missing:
901
1299
            yield AbsentContentFactory(key)
 
1300
        manager = None
 
1301
        # TODO: This works fairly well at batching up existing groups into a
 
1302
        #       streamable format, and possibly allowing for taking one big
 
1303
        #       group and splitting it when it isn't fully utilized.
 
1304
        #       However, it doesn't allow us to find under-utilized groups and
 
1305
        #       combine them into a bigger group on the fly.
 
1306
        #       (Consider the issue with how chk_map inserts texts
 
1307
        #       one-at-a-time.) This could be done at insert_record_stream()
 
1308
        #       time, but it probably would decrease the number of
 
1309
        #       bytes-on-the-wire for fetch.
902
1310
        for source, keys in source_keys:
903
1311
            if source is self:
904
1312
                for key in keys:
905
1313
                    if key in self._unadded_refs:
 
1314
                        if manager is not None:
 
1315
                            # Yield everything buffered so far
 
1316
                            for factory in manager.get_record_stream():
 
1317
                                yield factory
 
1318
                            manager = None
906
1319
                        bytes, sha1 = self._compressor.extract(key)
907
1320
                        parents = self._unadded_refs[key]
 
1321
                        yield FulltextContentFactory(key, parents, sha1, bytes)
908
1322
                    else:
909
1323
                        index_memo, _, parents, (method, _) = locations[key]
910
1324
                        block = self._get_block(index_memo)
911
 
                        entry, bytes = block.extract(key, index_memo)
912
 
                        sha1 = entry.sha1
913
 
                        # TODO: If we don't have labels, then the sha1 here is computed
914
 
                        #       from the data, so we don't want to re-sha the string.
915
 
                        if not _FAST and sha_string(bytes) != sha1:
916
 
                            raise AssertionError('sha1 sum did not match')
917
 
                    yield FulltextContentFactory(key, parents, sha1, bytes)
 
1325
                        start, end = index_memo[3:5]
 
1326
                        if manager is None:
 
1327
                            manager = _LazyGroupContentManager(block)
 
1328
                        elif manager._block is not block:
 
1329
                            # Flush and create a new manager
 
1330
                            for factory in manager.get_record_stream():
 
1331
                                yield factory
 
1332
                            manager = _LazyGroupContentManager(block)
 
1333
                        manager.add_factory(key, parents, start, end)
918
1334
            else:
 
1335
                if manager is not None:
 
1336
                    # Yield everything buffered so far
 
1337
                    for factory in manager.get_record_stream():
 
1338
                        yield factory
 
1339
                    manager = None
919
1340
                for record in source.get_record_stream(keys, ordering,
920
1341
                                                       include_delta_closure):
921
1342
                    yield record
 
1343
        if manager is not None:
 
1344
            # Yield everything buffered so far
 
1345
            for factory in manager.get_record_stream():
 
1346
                yield factory
 
1347
            manager = None
922
1348
 
923
1349
    def get_sha1s(self, keys):
924
1350
        """See VersionedFiles.get_sha1s()."""
928
1354
                result[record.key] = record.sha1
929
1355
            else:
930
1356
                if record.storage_kind != 'absent':
931
 
                    result[record.key] == sha_string(record.get_bytes_as(
 
1357
                    result[record.key] = sha_string(record.get_bytes_as(
932
1358
                        'fulltext'))
933
1359
        return result
934
1360
 
942
1368
        for _ in self._insert_record_stream(stream):
943
1369
            pass
944
1370
 
945
 
    def _insert_record_stream(self, stream, random_id=False, nostore_sha=None):
 
1371
    def _insert_record_stream(self, stream, random_id=False, nostore_sha=None,
 
1372
                              reuse_blocks=True):
946
1373
        """Internal core to insert a record stream into this container.
947
1374
 
948
1375
        This helper function has a different interface than insert_record_stream
951
1378
        :param stream: A stream of records to insert.
952
1379
        :param nostore_sha: If the sha1 of a given text matches nostore_sha,
953
1380
            raise ExistingContent, rather than committing the new text.
 
1381
        :param reuse_blocks: If the source is streaming from
 
1382
            groupcompress-blocks, just insert the blocks as-is, rather than
 
1383
            expanding the texts and inserting again.
954
1384
        :return: An iterator over the sha1 of the inserted records.
955
1385
        :seealso insert_record_stream:
956
1386
        :seealso add_lines:
966
1396
                return adapter
967
1397
        # This will go up to fulltexts for gc to gc fetching, which isn't
968
1398
        # ideal.
969
 
        self._compressor = GroupCompressor(self._delta)
 
1399
        self._compressor = GroupCompressor()
970
1400
        self._unadded_refs = {}
971
1401
        keys_to_add = []
972
1402
        basis_end = 0
973
1403
        def flush():
974
 
            bytes = self._compressor._block.to_bytes(
975
 
                ''.join(self._compressor.lines))
 
1404
            bytes = self._compressor.flush().to_bytes()
976
1405
            index, start, length = self._access.add_raw_records(
977
1406
                [(None, len(bytes))], bytes)[0]
978
1407
            nodes = []
981
1410
            self._index.add_records(nodes, random_id=random_id)
982
1411
            self._unadded_refs = {}
983
1412
            del keys_to_add[:]
984
 
            self._compressor = GroupCompressor(self._delta)
 
1413
            self._compressor = GroupCompressor()
985
1414
 
986
1415
        last_prefix = None
987
1416
        last_fulltext_len = None
988
1417
        max_fulltext_len = 0
989
1418
        max_fulltext_prefix = None
 
1419
        insert_manager = None
 
1420
        block_start = None
 
1421
        block_length = None
990
1422
        for record in stream:
991
1423
            # Raise an error when a record is missing.
992
1424
            if record.storage_kind == 'absent':
993
1425
                raise errors.RevisionNotPresent(record.key, self)
 
1426
            if reuse_blocks:
 
1427
                # If the reuse_blocks flag is set, check to see if we can just
 
1428
                # copy a groupcompress block as-is.
 
1429
                if record.storage_kind == 'groupcompress-block':
 
1430
                    # Insert the raw block into the target repo
 
1431
                    insert_manager = record._manager
 
1432
                    record._manager._check_rebuild_block()
 
1433
                    bytes = record._manager._block.to_bytes()
 
1434
                    _, start, length = self._access.add_raw_records(
 
1435
                        [(None, len(bytes))], bytes)[0]
 
1436
                    del bytes
 
1437
                    block_start = start
 
1438
                    block_length = length
 
1439
                if record.storage_kind in ('groupcompress-block',
 
1440
                                           'groupcompress-block-ref'):
 
1441
                    assert insert_manager is not None
 
1442
                    assert record._manager is insert_manager
 
1443
                    value = "%d %d %d %d" % (block_start, block_length,
 
1444
                                             record._start, record._end)
 
1445
                    nodes = [(record.key, value, (record.parents,))]
 
1446
                    self._index.add_records(nodes, random_id=random_id)
 
1447
                    continue
994
1448
            try:
995
1449
                bytes = record.get_bytes_as('fulltext')
996
1450
            except errors.UnavailableRepresentation: