~bzr-pqm/bzr/bzr.dev

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# Copyright (C) 2008 Canonical Ltd
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
#

"""Tests for writing fixed size chunks with compression."""

import zlib

from bzrlib import chunk_writer
from bzrlib.tests import TestCaseWithTransport


class TestWriter(TestCaseWithTransport):

    def check_chunk(self, bytes_list, size):
        bytes = ''.join(bytes_list)
        self.assertEqual(size, len(bytes))
        return zlib.decompress(bytes)

    def test_chunk_writer_empty(self):
        writer = chunk_writer.ChunkWriter(4096)
        bytes_list, unused, padding = writer.finish()
        node_bytes = self.check_chunk(bytes_list, 4096)
        self.assertEqual("", node_bytes)
        self.assertEqual(None, unused)
        # Only a zlib header.
        self.assertEqual(4088, padding)

    def test_optimize_for_speed(self):
        writer = chunk_writer.ChunkWriter(4096)
        writer.set_optimize(for_size=False)
        self.assertEqual(chunk_writer.ChunkWriter._repack_opts_for_speed,
                         (writer._max_repack, writer._max_zsync))
        writer = chunk_writer.ChunkWriter(4096, optimize_for_size=False)
        self.assertEqual(chunk_writer.ChunkWriter._repack_opts_for_speed,
                         (writer._max_repack, writer._max_zsync))

    def test_optimize_for_size(self):
        writer = chunk_writer.ChunkWriter(4096)
        writer.set_optimize(for_size=True)
        self.assertEqual(chunk_writer.ChunkWriter._repack_opts_for_size,
                         (writer._max_repack, writer._max_zsync))
        writer = chunk_writer.ChunkWriter(4096, optimize_for_size=True)
        self.assertEqual(chunk_writer.ChunkWriter._repack_opts_for_size,
                         (writer._max_repack, writer._max_zsync))

    def test_some_data(self):
        writer = chunk_writer.ChunkWriter(4096)
        writer.write("foo bar baz quux\n")
        bytes_list, unused, padding = writer.finish()
        node_bytes = self.check_chunk(bytes_list, 4096)
        self.assertEqual("foo bar baz quux\n", node_bytes)
        self.assertEqual(None, unused)
        # More than just the header..
        self.assertEqual(4073, padding)

    def test_too_much_data_does_not_exceed_size(self):
        # Generate enough data to exceed 4K
        lines = []
        for group in range(48):
            offset = group * 50
            numbers = range(offset, offset + 50)
            # Create a line with this group
            lines.append(''.join(map(str, numbers)) + '\n')
        writer = chunk_writer.ChunkWriter(4096)
        for idx, line in enumerate(lines):
            if writer.write(line):
                self.assertEqual(46, idx)
                break
        bytes_list, unused, _ = writer.finish()
        node_bytes = self.check_chunk(bytes_list, 4096)
        # the first 46 lines should have been added
        expected_bytes = ''.join(lines[:46])
        self.assertEqualDiff(expected_bytes, node_bytes)
        # And the line that failed should have been saved for us
        self.assertEqual(lines[46], unused)

    def test_too_much_data_preserves_reserve_space(self):
        # Generate enough data to exceed 4K
        lines = []
        for group in range(48):
            offset = group * 50
            numbers = range(offset, offset + 50)
            # Create a line with this group
            lines.append(''.join(map(str, numbers)) + '\n')
        writer = chunk_writer.ChunkWriter(4096, 256)
        for idx, line in enumerate(lines):
            if writer.write(line):
                self.assertEqual(44, idx)
                break
        else:
            self.fail('We were able to write all lines')
        self.assertFalse(writer.write("A"*256, reserved=True))
        bytes_list, unused, _ = writer.finish()
        node_bytes = self.check_chunk(bytes_list, 4096)
        # the first 44 lines should have been added
        expected_bytes = ''.join(lines[:44]) + "A"*256
        self.assertEqualDiff(expected_bytes, node_bytes)
        # And the line that failed should have been saved for us
        self.assertEqual(lines[44], unused)