~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_chunk_writer.py

  • Committer: Jelmer Vernooij
  • Date: 2012-07-06 11:27:16 UTC
  • mfrom: (6534 +trunk)
  • mto: This revision was merged to the branch mainline in revision 6535.
  • Revision ID: jelmer@samba.org-20120706112716-jcun7vvbpgmcplgz
Merge bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2008 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
#
 
17
 
 
18
"""Tests for writing fixed size chunks with compression."""
 
19
 
 
20
import zlib
 
21
 
 
22
from bzrlib import chunk_writer
 
23
from bzrlib.tests import TestCaseWithTransport
 
24
 
 
25
 
 
26
class TestWriter(TestCaseWithTransport):
 
27
 
 
28
    def check_chunk(self, bytes_list, size):
 
29
        bytes = ''.join(bytes_list)
 
30
        self.assertEqual(size, len(bytes))
 
31
        return zlib.decompress(bytes)
 
32
 
 
33
    def test_chunk_writer_empty(self):
 
34
        writer = chunk_writer.ChunkWriter(4096)
 
35
        bytes_list, unused, padding = writer.finish()
 
36
        node_bytes = self.check_chunk(bytes_list, 4096)
 
37
        self.assertEqual("", node_bytes)
 
38
        self.assertEqual(None, unused)
 
39
        # Only a zlib header.
 
40
        self.assertEqual(4088, padding)
 
41
 
 
42
    def test_optimize_for_speed(self):
 
43
        writer = chunk_writer.ChunkWriter(4096)
 
44
        writer.set_optimize(for_size=False)
 
45
        self.assertEqual(chunk_writer.ChunkWriter._repack_opts_for_speed,
 
46
                         (writer._max_repack, writer._max_zsync))
 
47
        writer = chunk_writer.ChunkWriter(4096, optimize_for_size=False)
 
48
        self.assertEqual(chunk_writer.ChunkWriter._repack_opts_for_speed,
 
49
                         (writer._max_repack, writer._max_zsync))
 
50
 
 
51
    def test_optimize_for_size(self):
 
52
        writer = chunk_writer.ChunkWriter(4096)
 
53
        writer.set_optimize(for_size=True)
 
54
        self.assertEqual(chunk_writer.ChunkWriter._repack_opts_for_size,
 
55
                         (writer._max_repack, writer._max_zsync))
 
56
        writer = chunk_writer.ChunkWriter(4096, optimize_for_size=True)
 
57
        self.assertEqual(chunk_writer.ChunkWriter._repack_opts_for_size,
 
58
                         (writer._max_repack, writer._max_zsync))
 
59
 
 
60
    def test_some_data(self):
 
61
        writer = chunk_writer.ChunkWriter(4096)
 
62
        writer.write("foo bar baz quux\n")
 
63
        bytes_list, unused, padding = writer.finish()
 
64
        node_bytes = self.check_chunk(bytes_list, 4096)
 
65
        self.assertEqual("foo bar baz quux\n", node_bytes)
 
66
        self.assertEqual(None, unused)
 
67
        # More than just the header..
 
68
        self.assertEqual(4073, padding)
 
69
 
 
70
    def test_too_much_data_does_not_exceed_size(self):
 
71
        # Generate enough data to exceed 4K
 
72
        lines = []
 
73
        for group in range(48):
 
74
            offset = group * 50
 
75
            numbers = range(offset, offset + 50)
 
76
            # Create a line with this group
 
77
            lines.append(''.join(map(str, numbers)) + '\n')
 
78
        writer = chunk_writer.ChunkWriter(4096)
 
79
        for idx, line in enumerate(lines):
 
80
            if writer.write(line):
 
81
                self.assertEqual(46, idx)
 
82
                break
 
83
        bytes_list, unused, _ = writer.finish()
 
84
        node_bytes = self.check_chunk(bytes_list, 4096)
 
85
        # the first 46 lines should have been added
 
86
        expected_bytes = ''.join(lines[:46])
 
87
        self.assertEqualDiff(expected_bytes, node_bytes)
 
88
        # And the line that failed should have been saved for us
 
89
        self.assertEqual(lines[46], unused)
 
90
 
 
91
    def test_too_much_data_preserves_reserve_space(self):
 
92
        # Generate enough data to exceed 4K
 
93
        lines = []
 
94
        for group in range(48):
 
95
            offset = group * 50
 
96
            numbers = range(offset, offset + 50)
 
97
            # Create a line with this group
 
98
            lines.append(''.join(map(str, numbers)) + '\n')
 
99
        writer = chunk_writer.ChunkWriter(4096, 256)
 
100
        for idx, line in enumerate(lines):
 
101
            if writer.write(line):
 
102
                self.assertEqual(44, idx)
 
103
                break
 
104
        else:
 
105
            self.fail('We were able to write all lines')
 
106
        self.assertFalse(writer.write("A"*256, reserved=True))
 
107
        bytes_list, unused, _ = writer.finish()
 
108
        node_bytes = self.check_chunk(bytes_list, 4096)
 
109
        # the first 44 lines should have been added
 
110
        expected_bytes = ''.join(lines[:44]) + "A"*256
 
111
        self.assertEqualDiff(expected_bytes, node_bytes)
 
112
        # And the line that failed should have been saved for us
 
113
        self.assertEqual(lines[44], unused)