~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/groupcompress.py

  • Committer: John Arbash Meinel
  • Date: 2009-03-17 19:27:05 UTC
  • mto: (3735.2.156 brisbane-core)
  • mto: This revision was merged to the branch mainline in revision 4280.
  • Revision ID: john@arbash-meinel.com-20090317192705-8r4ny7purwsx3m0l
Add a _LazyGroupContentManager._check_rebuild_block
This can be called to ensure the content we are generated has been appropriately
shrunk before we put it on-the-wire, etc.

Show diffs side-by-side

added added

removed removed

Lines of Context:
19
19
from itertools import izip
20
20
from cStringIO import StringIO
21
21
import struct
 
22
import time
22
23
import zlib
23
24
try:
24
25
    import pylzma
34
35
    osutils,
35
36
    pack,
36
37
    patiencediff,
 
38
    trace,
37
39
    )
38
40
from bzrlib.graph import Graph
39
41
from bzrlib.knit import _DirectPackAccess
463
465
        return ''.join(chunks)
464
466
 
465
467
 
466
 
class LazyGroupCompressFactory(object):
 
468
class _LazyGroupCompressFactory(object):
467
469
    """Yield content from a GroupCompressBlock on demand."""
468
470
 
469
471
    def __init__(self, key, parents, manager, start, end, first):
470
 
        """Create a LazyGroupCompressFactory
 
472
        """Create a _LazyGroupCompressFactory
471
473
 
472
474
        :param key: The key of just this record
473
475
        :param parents: The parents of this key (possibly None)
517
519
 
518
520
 
519
521
class _LazyGroupContentManager(object):
520
 
    """This manages a group of LazyGroupCompressFactory objects."""
 
522
    """This manages a group of _LazyGroupCompressFactory objects."""
521
523
 
522
524
    def __init__(self, block):
523
525
        self._block = block
530
532
        else:
531
533
            first = False
532
534
        # Note that this creates a reference cycle....
533
 
        factory = LazyGroupCompressFactory(key, parents, self,
 
535
        factory = _LazyGroupCompressFactory(key, parents, self,
534
536
            start, end, first=first)
535
537
        self._factories.append(factory)
536
538
 
541
543
        # TODO: Consider setting self._factories = None after the above loop,
542
544
        #       as it will break the reference cycle
543
545
 
 
546
    def _trim_block(self, last_byte):
 
547
        """Create a new GroupCompressBlock, with just some of the content."""
 
548
        # None of the factories need to be adjusted, because the content is
 
549
        # located in an identical place. Just that some of the unreferenced
 
550
        # trailing bytes are stripped
 
551
        trace.mutter('stripping trailing bytes from groupcompress block'
 
552
                     ' %d => %d', self._block._content_length, last_byte)
 
553
        new_block = GroupCompressBlock()
 
554
        self._block._ensure_content(last_byte)
 
555
        new_block.set_content(self._block._content[:last_byte])
 
556
        self._block = new_block
 
557
 
 
558
    def _rebuild_block(self):
 
559
        """Create a new GroupCompressBlock with only the referenced texts."""
 
560
        compressor = GroupCompressor()
 
561
        tstart = time.time()
 
562
        old_length = self._block._content_length
 
563
        cur_endpoint = 0
 
564
        for factory in self._factories:
 
565
            bytes = factory.get_bytes_as('fulltext')
 
566
            (found_sha1, end_point, type,
 
567
             length) = compressor.compress(factory.key, bytes, factory.sha1)
 
568
            # Now update this factory with the new offsets, etc
 
569
            factory.sha1 = found_sha1
 
570
            factory._start = cur_endpoint
 
571
            factory._end = end_point
 
572
            cur_endpoint = end_point
 
573
        new_block = compressor.flush()
 
574
        # TODO: Should we check that new_block really *is* smaller than the old
 
575
        #       block? It seems hard to come up with a method that it would
 
576
        #       expand, since we do full compression again. Perhaps based on a
 
577
        #       request that ends up poorly ordered?
 
578
        delta = time.time() - tstart
 
579
        self._block = new_block
 
580
        trace.mutter('creating new compressed block on-the-fly in %.3fs'
 
581
                     ' %d bytes => %d bytes', delta, old_length,
 
582
                     self._block._content_length)
 
583
 
 
584
    def _check_rebuild_block(self):
 
585
        """Check to see if our block should be repacked."""
 
586
        total_bytes_used = 0
 
587
        last_byte_used = 0
 
588
        for factory in self._factories:
 
589
            total_bytes_used += factory._end - factory._start
 
590
            last_byte_used = max(last_byte_used, factory._end)
 
591
        # If we are using most of the bytes from the block, we have nothing
 
592
        # else to check (currently more that 1/2)
 
593
        if total_bytes_used * 2 >= self._block._content_length:
 
594
            return
 
595
        # Can we just strip off the trailing bytes? If we are going to be
 
596
        # transmitting more than 50% of the front of the content, go ahead
 
597
        if total_bytes_used * 2 > last_byte_used:
 
598
            self._trim_block(last_byte_used)
 
599
            return
 
600
 
 
601
        # We are using a small amount of the data, and it isn't just packed
 
602
        # nicely at the front, so rebuild the content.
 
603
        # Note: This would be *nicer* as a strip-data-from-group, rather than
 
604
        #       building it up again from scratch
 
605
        #       It might be reasonable to consider the fulltext sizes for
 
606
        #       different bits when deciding this, too. As you may have a small
 
607
        #       fulltext, and a trivial delta, and you are just trading around
 
608
        #       for another fulltext. If we do a simple 'prune' you may end up
 
609
        #       expanding many deltas into fulltexts, as well.
 
610
        #       If we build a cheap enough 'strip', then we could try a strip,
 
611
        #       if that expands the content, we then rebuild.
 
612
        self._rebuild_block()
 
613
 
544
614
    def _wire_bytes(self):
545
615
        """Return a byte stream suitable for transmitting over the wire."""
546
616
        # TODO: this might be a really good time to determine that we want to
811
881
        content = ''.join(self.lines)
812
882
        self.lines = None
813
883
        self._block.set_content(content)
814
 
        return self._block.to_bytes()
 
884
        return self._block
815
885
 
816
886
    def output_chunks(self, new_chunks):
817
887
        """Output some chunks.
1328
1398
        keys_to_add = []
1329
1399
        basis_end = 0
1330
1400
        def flush():
1331
 
            bytes = self._compressor.flush()
 
1401
            bytes = self._compressor.flush().to_bytes()
1332
1402
            index, start, length = self._access.add_raw_records(
1333
1403
                [(None, len(bytes))], bytes)[0]
1334
1404
            nodes = []