~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_chunk_writer.py

Merge bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2008 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
#
 
17
 
 
18
"""Tests for writing fixed size chunks with compression."""
 
19
 
 
20
import zlib
 
21
 
 
22
from bzrlib import chunk_writer
 
23
from bzrlib.tests import TestCaseWithTransport
 
24
 
 
25
 
 
26
class TestWriter(TestCaseWithTransport):
 
27
 
 
28
    def check_chunk(self, bytes_list, size):
 
29
        bytes = ''.join(bytes_list)
 
30
        self.assertEqual(size, len(bytes))
 
31
        return zlib.decompress(bytes)
 
32
 
 
33
    def test_chunk_writer_empty(self):
 
34
        writer = chunk_writer.ChunkWriter(4096)
 
35
        bytes_list, unused, padding = writer.finish()
 
36
        node_bytes = self.check_chunk(bytes_list, 4096)
 
37
        self.assertEqual("", node_bytes)
 
38
        self.assertEqual(None, unused)
 
39
        # Only a zlib header.
 
40
        self.assertEqual(4088, padding)
 
41
 
 
42
    def test_some_data(self):
 
43
        writer = chunk_writer.ChunkWriter(4096)
 
44
        writer.write("foo bar baz quux\n")
 
45
        bytes_list, unused, padding = writer.finish()
 
46
        node_bytes = self.check_chunk(bytes_list, 4096)
 
47
        self.assertEqual("foo bar baz quux\n", node_bytes)
 
48
        self.assertEqual(None, unused)
 
49
        # More than just the header..
 
50
        self.assertEqual(4073, padding)
 
51
 
 
52
    def test_too_much_data_does_not_exceed_size(self):
 
53
        # Generate enough data to exceed 4K
 
54
        lines = []
 
55
        for group in range(48):
 
56
            offset = group * 50
 
57
            numbers = range(offset, offset + 50)
 
58
            # Create a line with this group
 
59
            lines.append(''.join(map(str, numbers)) + '\n')
 
60
        writer = chunk_writer.ChunkWriter(4096)
 
61
        for idx, line in enumerate(lines):
 
62
            if writer.write(line):
 
63
                self.assertEqual(46, idx)
 
64
                break
 
65
        bytes_list, unused, _ = writer.finish()
 
66
        node_bytes = self.check_chunk(bytes_list, 4096)
 
67
        # the first 46 lines should have been added
 
68
        expected_bytes = ''.join(lines[:46])
 
69
        self.assertEqualDiff(expected_bytes, node_bytes)
 
70
        # And the line that failed should have been saved for us
 
71
        self.assertEqual(lines[46], unused)
 
72
 
 
73
    def test_too_much_data_preserves_reserve_space(self):
 
74
        # Generate enough data to exceed 4K
 
75
        lines = []
 
76
        for group in range(48):
 
77
            offset = group * 50
 
78
            numbers = range(offset, offset + 50)
 
79
            # Create a line with this group
 
80
            lines.append(''.join(map(str, numbers)) + '\n')
 
81
        writer = chunk_writer.ChunkWriter(4096, 256)
 
82
        for idx, line in enumerate(lines):
 
83
            if writer.write(line):
 
84
                self.assertEqual(44, idx)
 
85
                break
 
86
        else:
 
87
            self.fail('We were able to write all lines')
 
88
        self.assertFalse(writer.write("A"*256, reserved=True))
 
89
        bytes_list, unused, _ = writer.finish()
 
90
        node_bytes = self.check_chunk(bytes_list, 4096)
 
91
        # the first 44 lines should have been added
 
92
        expected_bytes = ''.join(lines[:44]) + "A"*256
 
93
        self.assertEqualDiff(expected_bytes, node_bytes)
 
94
        # And the line that failed should have been saved for us
 
95
        self.assertEqual(lines[44], unused)