~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_pack.py

  • Committer: Aaron Bentley
  • Date: 2007-06-20 22:06:22 UTC
  • mto: (2520.5.2 bzr.mpbundle)
  • mto: This revision was merged to the branch mainline in revision 2631.
  • Revision ID: abentley@panoramicfeedback.com-20070620220622-9lasxr96rr0e0xvn
Use a fresh versionedfile each time

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for bzrlib.pack."""
 
18
 
 
19
 
 
20
from cStringIO import StringIO
 
21
 
 
22
from bzrlib import pack, errors, tests
 
23
 
 
24
 
 
25
class TestContainerWriter(tests.TestCase):
 
26
 
 
27
    def test_construct(self):
 
28
        """Test constructing a ContainerWriter.
 
29
        
 
30
        This uses None as the output stream to show that the constructor doesn't
 
31
        try to use the output stream.
 
32
        """
 
33
        writer = pack.ContainerWriter(None)
 
34
 
 
35
    def test_begin(self):
 
36
        """The begin() method writes the container format marker line."""
 
37
        output = StringIO()
 
38
        writer = pack.ContainerWriter(output.write)
 
39
        writer.begin()
 
40
        self.assertEqual('Bazaar pack format 1\n', output.getvalue())
 
41
 
 
42
    def test_end(self):
 
43
        """The end() method writes an End Marker record."""
 
44
        output = StringIO()
 
45
        writer = pack.ContainerWriter(output.write)
 
46
        writer.begin()
 
47
        writer.end()
 
48
        self.assertEqual('Bazaar pack format 1\nE', output.getvalue())
 
49
 
 
50
    def test_add_bytes_record_no_name(self):
 
51
        """Add a bytes record with no name."""
 
52
        output = StringIO()
 
53
        writer = pack.ContainerWriter(output.write)
 
54
        writer.begin()
 
55
        writer.add_bytes_record('abc', names=[])
 
56
        self.assertEqual('Bazaar pack format 1\nB3\n\nabc', output.getvalue())
 
57
 
 
58
    def test_add_bytes_record_one_name(self):
 
59
        """Add a bytes record with one name."""
 
60
        output = StringIO()
 
61
        writer = pack.ContainerWriter(output.write)
 
62
        writer.begin()
 
63
        writer.add_bytes_record('abc', names=['name1'])
 
64
        self.assertEqual('Bazaar pack format 1\nB3\nname1\n\nabc',
 
65
                         output.getvalue())
 
66
 
 
67
    def test_add_bytes_record_two_names(self):
 
68
        """Add a bytes record with two names."""
 
69
        output = StringIO()
 
70
        writer = pack.ContainerWriter(output.write)
 
71
        writer.begin()
 
72
        writer.add_bytes_record('abc', names=['name1', 'name2'])
 
73
        self.assertEqual('Bazaar pack format 1\nB3\nname1\nname2\n\nabc',
 
74
                         output.getvalue())
 
75
 
 
76
    def test_add_bytes_record_invalid_name(self):
 
77
        """Adding a Bytes record with a name with whitespace in it raises
 
78
        InvalidRecordError.
 
79
        """
 
80
        output = StringIO()
 
81
        writer = pack.ContainerWriter(output.write)
 
82
        writer.begin()
 
83
        self.assertRaises(
 
84
            errors.InvalidRecordError,
 
85
            writer.add_bytes_record, 'abc', names=['bad name'])
 
86
 
 
87
 
 
88
class TestContainerReader(tests.TestCase):
 
89
 
 
90
    def get_reader_for(self, bytes):
 
91
        stream = StringIO(bytes)
 
92
        reader = pack.ContainerReader(stream.read)
 
93
        return reader
 
94
 
 
95
    def test_construct(self):
 
96
        """Test constructing a ContainerReader.
 
97
        
 
98
        This uses None as the output stream to show that the constructor doesn't
 
99
        try to use the input stream.
 
100
        """
 
101
        reader = pack.ContainerReader(None)
 
102
 
 
103
    def test_empty_container(self):
 
104
        """Read an empty container."""
 
105
        reader = self.get_reader_for("Bazaar pack format 1\nE")
 
106
        self.assertEqual([], list(reader.iter_records()))
 
107
 
 
108
    def test_unknown_format(self):
 
109
        """Unrecognised container formats raise UnknownContainerFormatError."""
 
110
        reader = self.get_reader_for("unknown format\n")
 
111
        self.assertRaises(
 
112
            errors.UnknownContainerFormatError, reader.iter_records)
 
113
 
 
114
    def test_unexpected_end_of_container(self):
 
115
        """Containers that don't end with an End Marker record should cause
 
116
        UnexpectedEndOfContainerError to be raised.
 
117
        """
 
118
        reader = self.get_reader_for("Bazaar pack format 1\n")
 
119
        iterator = reader.iter_records()
 
120
        self.assertRaises(
 
121
            errors.UnexpectedEndOfContainerError, iterator.next)
 
122
 
 
123
    def test_unknown_record_type(self):
 
124
        """Unknown record types cause UnknownRecordTypeError to be raised."""
 
125
        reader = self.get_reader_for("Bazaar pack format 1\nX")
 
126
        iterator = reader.iter_records()
 
127
        self.assertRaises(
 
128
            errors.UnknownRecordTypeError, iterator.next)
 
129
 
 
130
    def test_container_with_one_unnamed_record(self):
 
131
        """Read a container with one Bytes record.
 
132
        
 
133
        Parsing Bytes records is more thoroughly exercised by
 
134
        TestBytesRecordReader.  This test is here to ensure that
 
135
        ContainerReader's integration with BytesRecordReader is working.
 
136
        """
 
137
        reader = self.get_reader_for("Bazaar pack format 1\nB5\n\naaaaaE")
 
138
        expected_records = [([], 'aaaaa')]
 
139
        self.assertEqual(
 
140
            expected_records,
 
141
            [(names, read_bytes(None))
 
142
             for (names, read_bytes) in reader.iter_records()])
 
143
 
 
144
    def test_validate_empty_container(self):
 
145
        """validate does not raise an error for a container with no records."""
 
146
        reader = self.get_reader_for("Bazaar pack format 1\nE")
 
147
        # No exception raised
 
148
        reader.validate()
 
149
 
 
150
    def test_validate_non_empty_valid_container(self):
 
151
        """validate does not raise an error for a container with a valid record.
 
152
        """
 
153
        reader = self.get_reader_for("Bazaar pack format 1\nB3\nname\n\nabcE")
 
154
        # No exception raised
 
155
        reader.validate()
 
156
 
 
157
    def test_validate_bad_format(self):
 
158
        """validate raises an error for unrecognised format strings.
 
159
 
 
160
        It may raise either UnexpectedEndOfContainerError or
 
161
        UnknownContainerFormatError, depending on exactly what the string is.
 
162
        """
 
163
        inputs = ["", "x", "Bazaar pack format 1", "bad\n"]
 
164
        for input in inputs:
 
165
            reader = self.get_reader_for(input)
 
166
            self.assertRaises(
 
167
                (errors.UnexpectedEndOfContainerError,
 
168
                 errors.UnknownContainerFormatError),
 
169
                reader.validate)
 
170
 
 
171
    def test_validate_bad_record_marker(self):
 
172
        """validate raises UnknownRecordTypeError for unrecognised record
 
173
        types.
 
174
        """
 
175
        reader = self.get_reader_for("Bazaar pack format 1\nX")
 
176
        self.assertRaises(errors.UnknownRecordTypeError, reader.validate)
 
177
 
 
178
    def test_validate_data_after_end_marker(self):
 
179
        """validate raises ContainerHasExcessDataError if there are any bytes
 
180
        after the end of the container.
 
181
        """
 
182
        reader = self.get_reader_for("Bazaar pack format 1\nEcrud")
 
183
        self.assertRaises(
 
184
            errors.ContainerHasExcessDataError, reader.validate)
 
185
 
 
186
    def test_validate_no_end_marker(self):
 
187
        """validate raises UnexpectedEndOfContainerError if there's no end of
 
188
        container marker, even if the container up to this point has been valid.
 
189
        """
 
190
        reader = self.get_reader_for("Bazaar pack format 1\n")
 
191
        self.assertRaises(
 
192
            errors.UnexpectedEndOfContainerError, reader.validate)
 
193
 
 
194
    def test_validate_duplicate_name(self):
 
195
        """validate raises DuplicateRecordNameError if the same name occurs
 
196
        multiple times in the container.
 
197
        """
 
198
        reader = self.get_reader_for(
 
199
            "Bazaar pack format 1\n"
 
200
            "B0\nname\n\n"
 
201
            "B0\nname\n\n"
 
202
            "E")
 
203
        self.assertRaises(errors.DuplicateRecordNameError, reader.validate)
 
204
 
 
205
    def test_validate_undecodeable_name(self):
 
206
        """Names that aren't valid UTF-8 cause validate to fail."""
 
207
        reader = self.get_reader_for("Bazaar pack format 1\nB0\n\xcc\n\nE")
 
208
        self.assertRaises(errors.InvalidRecordError, reader.validate)
 
209
        
 
210
 
 
211
class TestBytesRecordReader(tests.TestCase):
 
212
    """Tests for reading and validating Bytes records with BytesRecordReader."""
 
213
 
 
214
    def get_reader_for(self, bytes):
 
215
        stream = StringIO(bytes)
 
216
        reader = pack.BytesRecordReader(stream.read)
 
217
        return reader
 
218
 
 
219
    def test_record_with_no_name(self):
 
220
        """Reading a Bytes record with no name returns an empty list of
 
221
        names.
 
222
        """
 
223
        reader = self.get_reader_for("5\n\naaaaa")
 
224
        names, get_bytes = reader.read()
 
225
        self.assertEqual([], names)
 
226
        self.assertEqual('aaaaa', get_bytes(None))
 
227
 
 
228
    def test_record_with_one_name(self):
 
229
        """Reading a Bytes record with one name returns a list of just that
 
230
        name.
 
231
        """
 
232
        reader = self.get_reader_for("5\nname1\n\naaaaa")
 
233
        names, get_bytes = reader.read()
 
234
        self.assertEqual(['name1'], names)
 
235
        self.assertEqual('aaaaa', get_bytes(None))
 
236
 
 
237
    def test_record_with_two_names(self):
 
238
        """Reading a Bytes record with two names returns a list of both names.
 
239
        """
 
240
        reader = self.get_reader_for("5\nname1\nname2\n\naaaaa")
 
241
        names, get_bytes = reader.read()
 
242
        self.assertEqual(['name1', 'name2'], names)
 
243
        self.assertEqual('aaaaa', get_bytes(None))
 
244
 
 
245
    def test_invalid_length(self):
 
246
        """If the length-prefix is not a number, parsing raises
 
247
        InvalidRecordError.
 
248
        """
 
249
        reader = self.get_reader_for("not a number\n")
 
250
        self.assertRaises(errors.InvalidRecordError, reader.read)
 
251
 
 
252
    def test_early_eof(self):
 
253
        """Tests for premature EOF occuring during parsing Bytes records with
 
254
        BytesRecordReader.
 
255
        
 
256
        A incomplete container might be interrupted at any point.  The
 
257
        BytesRecordReader needs to cope with the input stream running out no
 
258
        matter where it is in the parsing process.
 
259
 
 
260
        In all cases, UnexpectedEndOfContainerError should be raised.
 
261
        """
 
262
        complete_record = "6\nname\n\nabcdef"
 
263
        for count in range(0, len(complete_record)):
 
264
            incomplete_record = complete_record[:count]
 
265
            reader = self.get_reader_for(incomplete_record)
 
266
            # We don't use assertRaises to make diagnosing failures easier
 
267
            # (assertRaises doesn't allow a custom failure message).
 
268
            try:
 
269
                names, read_bytes = reader.read()
 
270
                read_bytes(None)
 
271
            except errors.UnexpectedEndOfContainerError:
 
272
                pass
 
273
            else:
 
274
                self.fail(
 
275
                    "UnexpectedEndOfContainerError not raised when parsing %r"
 
276
                    % (incomplete_record,))
 
277
 
 
278
    def test_initial_eof(self):
 
279
        """EOF before any bytes read at all."""
 
280
        reader = self.get_reader_for("")
 
281
        self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
 
282
 
 
283
    def test_eof_after_length(self):
 
284
        """EOF after reading the length and before reading name(s)."""
 
285
        reader = self.get_reader_for("123\n")
 
286
        self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
 
287
 
 
288
    def test_eof_during_name(self):
 
289
        """EOF during reading a name."""
 
290
        reader = self.get_reader_for("123\nname")
 
291
        self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
 
292
 
 
293
    def test_read_invalid_name_whitespace(self):
 
294
        """Names must have no whitespace."""
 
295
        # A name with a space.
 
296
        reader = self.get_reader_for("0\nbad name\n\n")
 
297
        self.assertRaises(errors.InvalidRecordError, reader.read)
 
298
 
 
299
        # A name with a tab.
 
300
        reader = self.get_reader_for("0\nbad\tname\n\n")
 
301
        self.assertRaises(errors.InvalidRecordError, reader.read)
 
302
 
 
303
        # A name with a vertical tab.
 
304
        reader = self.get_reader_for("0\nbad\vname\n\n")
 
305
        self.assertRaises(errors.InvalidRecordError, reader.read)
 
306
 
 
307
    def test_validate_whitespace_in_name(self):
 
308
        """Names must have no whitespace."""
 
309
        reader = self.get_reader_for("0\nbad name\n\n")
 
310
        self.assertRaises(errors.InvalidRecordError, reader.validate)
 
311
 
 
312
    def test_validate_interrupted_prelude(self):
 
313
        """EOF during reading a record's prelude causes validate to fail."""
 
314
        reader = self.get_reader_for("")
 
315
        self.assertRaises(
 
316
            errors.UnexpectedEndOfContainerError, reader.validate)
 
317
 
 
318
    def test_validate_interrupted_body(self):
 
319
        """EOF during reading a record's body causes validate to fail."""
 
320
        reader = self.get_reader_for("1\n\n")
 
321
        self.assertRaises(
 
322
            errors.UnexpectedEndOfContainerError, reader.validate)
 
323
 
 
324
    def test_validate_unparseable_length(self):
 
325
        """An unparseable record length causes validate to fail."""
 
326
        reader = self.get_reader_for("\n\n")
 
327
        self.assertRaises(
 
328
            errors.InvalidRecordError, reader.validate)
 
329
 
 
330
    def test_validate_undecodeable_name(self):
 
331
        """Names that aren't valid UTF-8 cause validate to fail."""
 
332
        reader = self.get_reader_for("0\n\xcc\n\n")
 
333
        self.assertRaises(errors.InvalidRecordError, reader.validate)
 
334
 
 
335
    def test_read_max_length(self):
 
336
        """If the max_length passed to the callable returned by read is not
 
337
        None, then no more than that many bytes will be read.
 
338
        """
 
339
        reader = self.get_reader_for("6\n\nabcdef")
 
340
        names, get_bytes = reader.read()
 
341
        self.assertEqual('abc', get_bytes(3))
 
342
 
 
343
    def test_read_no_max_length(self):
 
344
        """If the max_length passed to the callable returned by read is None,
 
345
        then all the bytes in the record will be read.
 
346
        """
 
347
        reader = self.get_reader_for("6\n\nabcdef")
 
348
        names, get_bytes = reader.read()
 
349
        self.assertEqual('abcdef', get_bytes(None))
 
350
 
 
351
    def test_repeated_read_calls(self):
 
352
        """Repeated calls to the callable returned from BytesRecordReader.read
 
353
        will not read beyond the end of the record.
 
354
        """
 
355
        reader = self.get_reader_for("6\n\nabcdefB3\nnext-record\nXXX")
 
356
        names, get_bytes = reader.read()
 
357
        self.assertEqual('abcdef', get_bytes(None))
 
358
        self.assertEqual('', get_bytes(None))
 
359
        self.assertEqual('', get_bytes(99))
 
360
 
 
361