~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_tuned_gzip.py

  • Committer: John Arbash Meinel
  • Date: 2008-07-09 21:42:24 UTC
  • mto: This revision was merged to the branch mainline in revision 3543.
  • Revision ID: john@arbash-meinel.com-20080709214224-r75k87r6a01pfc3h
Restore a real weave merge to 'bzr merge --weave'.

To do so efficiently, we only add the simple LCAs to the final weave
object, unless we run into complexities with the merge graph.
This gives the same effective result as adding all the texts,
with the advantage of not having to extract all of them.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006, 2009, 2010, 2011 Canonical Ltd
 
1
# Copyright (C) 2006 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
 
18
18
"""Tests for tuned_gzip."""
21
21
# do not use bzrlib test cases here - this should be suitable for sending
22
22
# upstream.
23
23
from cStringIO import StringIO
 
24
from unittest import TestCase
24
25
import zlib
25
26
 
26
27
 
27
 
from bzrlib import (
28
 
    symbol_versioning,
29
 
    tuned_gzip,
30
 
    tests,
31
 
    )
 
28
from bzrlib import tuned_gzip
32
29
 
33
30
 
34
31
class FakeDecompress(object):
39
36
 
40
37
    def decompress(self, buf):
41
38
        """Return an empty string as though we are at eof."""
42
 
        # note that the zlib module *overwrites* unused data
 
39
        # note that the zlib module *overwrites* unused data 
43
40
        # on writes after EOF.
44
41
        self.unused_data = buf
45
42
        return ''
46
43
 
47
44
 
48
 
class TestFakeDecompress(tests.TestCase):
 
45
class TestFakeDecompress(TestCase):
49
46
    """We use a fake decompressor to test GzipFile.
50
47
 
51
48
    This class tests the behaviours we want from it.
70
67
        self.assertEqual('1234567', decompress.unused_data)
71
68
 
72
69
 
73
 
class TestGzip(tests.TestCase):
 
70
class TestGzip(TestCase):
74
71
 
75
72
    def test__read_short_remainder(self):
76
73
        # a _read call at the end of a compressed hunk should
77
 
        # read more bytes if there is less than 8 bytes (the
 
74
        # read more bytes if there is less than 8 bytes (the 
78
75
        # gzip trailer) unread.
79
76
        stream = StringIO('\0\0\0\0\0\0\0\0')
80
 
        myfile = self.applyDeprecated(
81
 
            symbol_versioning.deprecated_in((2, 3, 0)),
82
 
            tuned_gzip.GzipFile, fileobj=stream)
 
77
        myfile = tuned_gzip.GzipFile(fileobj=stream)
83
78
        # disable the _new_member check, we are microtesting.
84
79
        myfile._new_member = False
85
80
        myfile.crc = zlib.crc32('')
89
84
        # all the data should have been read now
90
85
        self.assertEqual('', stream.read())
91
86
        # and it should be new member time in the stream.
92
 
        self.assertTrue(myfile._new_member)
93
 
 
94
 
    def test_negative_crc(self):
95
 
        """Content with a negative crc should not break when written"""
96
 
        sio = StringIO()
97
 
        gfile = self.applyDeprecated(
98
 
            symbol_versioning.deprecated_in((2, 3, 0)),
99
 
            tuned_gzip.GzipFile, mode="w", fileobj=sio)
100
 
        gfile.write("\xFF")
101
 
        gfile.close()
102
 
        self.assertEqual(gfile.crc & 0xFFFFFFFFL, 0xFF000000L)
103
 
        self.assertEqual(sio.getvalue()[-8:-4], "\x00\x00\x00\xFF")
104
 
 
105
 
 
106
 
class TestToGzip(tests.TestCase):
107
 
 
108
 
    def assertToGzip(self, chunks):
109
 
        raw_bytes = ''.join(chunks)
110
 
        gzfromchunks = tuned_gzip.chunks_to_gzip(chunks)
111
 
        gzfrombytes = tuned_gzip.bytes_to_gzip(raw_bytes)
112
 
        self.assertEqual(gzfrombytes, gzfromchunks)
113
 
        decoded = self.applyDeprecated(
114
 
            symbol_versioning.deprecated_in((2, 3, 0)),
115
 
            tuned_gzip.GzipFile, fileobj=StringIO(gzfromchunks)).read()
116
 
        lraw, ldecoded = len(raw_bytes), len(decoded)
117
 
        self.assertEqual(lraw, ldecoded,
118
 
                         'Expecting data length %d, got %d' % (lraw, ldecoded))
119
 
        self.assertEqual(raw_bytes, decoded)
120
 
 
121
 
    def test_single_chunk(self):
122
 
        self.assertToGzip(['a modest chunk\nwith some various\nbits\n'])
123
 
 
124
 
    def test_simple_text(self):
125
 
        self.assertToGzip(['some\n', 'strings\n', 'to\n', 'process\n'])
126
 
 
127
 
    def test_large_chunks(self):
128
 
        self.assertToGzip(['a large string\n'*1024])
129
 
        self.assertToGzip(['a large string\n']*1024)
130
 
 
131
 
    def test_enormous_chunks(self):
132
 
        self.assertToGzip(['a large string\n'*1024*256])
133
 
        self.assertToGzip(['a large string\n']*1024*256)
 
87
        self.failUnless(myfile._new_member)