1
# Copyright (C) 2008 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
"""Tests for writing fixed size chunks with compression."""
22
from bzrlib import chunk_writer
23
from bzrlib.tests import TestCaseWithTransport
26
class TestWriter(TestCaseWithTransport):
28
def check_chunk(self, bytes_list, size):
29
bytes = ''.join(bytes_list)
30
self.assertEqual(size, len(bytes))
31
return zlib.decompress(bytes)
33
def test_chunk_writer_empty(self):
34
writer = chunk_writer.ChunkWriter(4096)
35
bytes_list, unused, padding = writer.finish()
36
node_bytes = self.check_chunk(bytes_list, 4096)
37
self.assertEqual("", node_bytes)
38
self.assertEqual(None, unused)
40
self.assertEqual(4088, padding)
42
def test_some_data(self):
43
writer = chunk_writer.ChunkWriter(4096)
44
writer.write("foo bar baz quux\n")
45
bytes_list, unused, padding = writer.finish()
46
node_bytes = self.check_chunk(bytes_list, 4096)
47
self.assertEqual("foo bar baz quux\n", node_bytes)
48
self.assertEqual(None, unused)
49
# More than just the header..
50
self.assertEqual(4073, padding)
52
def test_too_much_data_does_not_exceed_size(self):
53
# Generate enough data to exceed 4K
55
for group in range(48):
57
numbers = range(offset, offset + 50)
58
# Create a line with this group
59
lines.append(''.join(map(str, numbers)) + '\n')
60
writer = chunk_writer.ChunkWriter(4096)
61
for idx, line in enumerate(lines):
62
if writer.write(line):
63
self.assertEqual(46, idx)
65
bytes_list, unused, _ = writer.finish()
66
node_bytes = self.check_chunk(bytes_list, 4096)
67
# the first 46 lines should have been added
68
expected_bytes = ''.join(lines[:46])
69
self.assertEqualDiff(expected_bytes, node_bytes)
70
# And the line that failed should have been saved for us
71
self.assertEqual(lines[46], unused)
73
def test_too_much_data_preserves_reserve_space(self):
74
# Generate enough data to exceed 4K
76
for group in range(48):
78
numbers = range(offset, offset + 50)
79
# Create a line with this group
80
lines.append(''.join(map(str, numbers)) + '\n')
81
writer = chunk_writer.ChunkWriter(4096, 256)
82
for idx, line in enumerate(lines):
83
if writer.write(line):
84
self.assertEqual(44, idx)
87
self.fail('We were able to write all lines')
88
self.assertFalse(writer.write("A"*256, reserved=True))
89
bytes_list, unused, _ = writer.finish()
90
node_bytes = self.check_chunk(bytes_list, 4096)
91
# the first 44 lines should have been added
92
expected_bytes = ''.join(lines[:44]) + "A"*256
93
self.assertEqualDiff(expected_bytes, node_bytes)
94
# And the line that failed should have been saved for us
95
self.assertEqual(lines[44], unused)