1
# Copyright (C) 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for bzrlib.pack."""
20
from cStringIO import StringIO
22
from bzrlib import pack, errors, tests
25
class TestContainerWriter(tests.TestCase):
27
def test_construct(self):
28
"""Test constructing a ContainerWriter.
30
This uses None as the output stream to show that the constructor doesn't
31
try to use the output stream.
33
writer = pack.ContainerWriter(None)
36
"""The begin() method writes the container format marker line."""
38
writer = pack.ContainerWriter(output.write)
40
self.assertEqual('Bazaar pack format 1 (introduced in 0.18)\n',
43
def test_zero_records_written_after_begin(self):
44
"""After begin is written, 0 records have been written."""
46
writer = pack.ContainerWriter(output.write)
48
self.assertEqual(0, writer.records_written)
51
"""The end() method writes an End Marker record."""
53
writer = pack.ContainerWriter(output.write)
56
self.assertEqual('Bazaar pack format 1 (introduced in 0.18)\nE',
59
def test_empty_end_does_not_add_a_record_to_records_written(self):
60
"""The end() method does not count towards the records written."""
62
writer = pack.ContainerWriter(output.write)
65
self.assertEqual(0, writer.records_written)
67
def test_non_empty_end_does_not_add_a_record_to_records_written(self):
68
"""The end() method does not count towards the records written."""
70
writer = pack.ContainerWriter(output.write)
72
writer.add_bytes_record('foo', names=[])
74
self.assertEqual(1, writer.records_written)
76
def test_add_bytes_record_no_name(self):
77
"""Add a bytes record with no name."""
79
writer = pack.ContainerWriter(output.write)
81
offset, length = writer.add_bytes_record('abc', names=[])
82
self.assertEqual((42, 7), (offset, length))
83
self.assertEqual('Bazaar pack format 1 (introduced in 0.18)\nB3\n\nabc',
86
def test_add_bytes_record_one_name(self):
87
"""Add a bytes record with one name."""
89
writer = pack.ContainerWriter(output.write)
91
offset, length = writer.add_bytes_record('abc', names=[('name1', )])
92
self.assertEqual((42, 13), (offset, length))
94
'Bazaar pack format 1 (introduced in 0.18)\n'
98
def test_add_bytes_record_two_names(self):
99
"""Add a bytes record with two names."""
101
writer = pack.ContainerWriter(output.write)
103
offset, length = writer.add_bytes_record('abc', names=[('name1', ), ('name2', )])
104
self.assertEqual((42, 19), (offset, length))
106
'Bazaar pack format 1 (introduced in 0.18)\n'
107
'B3\nname1\nname2\n\nabc',
110
def test_add_bytes_record_two_names(self):
111
"""Add a bytes record with two names."""
113
writer = pack.ContainerWriter(output.write)
115
offset, length = writer.add_bytes_record('abc', names=[('name1', ), ('name2', )])
116
self.assertEqual((42, 19), (offset, length))
118
'Bazaar pack format 1 (introduced in 0.18)\n'
119
'B3\nname1\nname2\n\nabc',
122
def test_add_bytes_record_two_element_name(self):
123
"""Add a bytes record with a two-element name."""
125
writer = pack.ContainerWriter(output.write)
127
offset, length = writer.add_bytes_record('abc', names=[('name1', 'name2')])
128
self.assertEqual((42, 19), (offset, length))
130
'Bazaar pack format 1 (introduced in 0.18)\n'
131
'B3\nname1\x00name2\n\nabc',
134
def test_add_second_bytes_record_gets_higher_offset(self):
136
writer = pack.ContainerWriter(output.write)
138
writer.add_bytes_record('abc', names=[])
139
offset, length = writer.add_bytes_record('abc', names=[])
140
self.assertEqual((49, 7), (offset, length))
142
'Bazaar pack format 1 (introduced in 0.18)\n'
147
def test_add_bytes_record_invalid_name(self):
148
"""Adding a Bytes record with a name with whitespace in it raises
152
writer = pack.ContainerWriter(output.write)
155
errors.InvalidRecordError,
156
writer.add_bytes_record, 'abc', names=[('bad name', )])
158
def test_add_bytes_records_add_to_records_written(self):
159
"""Adding a Bytes record increments the records_written counter."""
161
writer = pack.ContainerWriter(output.write)
163
writer.add_bytes_record('foo', names=[])
164
self.assertEqual(1, writer.records_written)
165
writer.add_bytes_record('foo', names=[])
166
self.assertEqual(2, writer.records_written)
169
class TestContainerReader(tests.TestCase):
171
def get_reader_for(self, bytes):
172
stream = StringIO(bytes)
173
reader = pack.ContainerReader(stream)
176
def test_construct(self):
177
"""Test constructing a ContainerReader.
179
This uses None as the output stream to show that the constructor doesn't
180
try to use the input stream.
182
reader = pack.ContainerReader(None)
184
def test_empty_container(self):
185
"""Read an empty container."""
186
reader = self.get_reader_for(
187
"Bazaar pack format 1 (introduced in 0.18)\nE")
188
self.assertEqual([], list(reader.iter_records()))
190
def test_unknown_format(self):
191
"""Unrecognised container formats raise UnknownContainerFormatError."""
192
reader = self.get_reader_for("unknown format\n")
194
errors.UnknownContainerFormatError, reader.iter_records)
196
def test_unexpected_end_of_container(self):
197
"""Containers that don't end with an End Marker record should cause
198
UnexpectedEndOfContainerError to be raised.
200
reader = self.get_reader_for(
201
"Bazaar pack format 1 (introduced in 0.18)\n")
202
iterator = reader.iter_records()
204
errors.UnexpectedEndOfContainerError, iterator.next)
206
def test_unknown_record_type(self):
207
"""Unknown record types cause UnknownRecordTypeError to be raised."""
208
reader = self.get_reader_for(
209
"Bazaar pack format 1 (introduced in 0.18)\nX")
210
iterator = reader.iter_records()
212
errors.UnknownRecordTypeError, iterator.next)
214
def test_container_with_one_unnamed_record(self):
215
"""Read a container with one Bytes record.
217
Parsing Bytes records is more thoroughly exercised by
218
TestBytesRecordReader. This test is here to ensure that
219
ContainerReader's integration with BytesRecordReader is working.
221
reader = self.get_reader_for(
222
"Bazaar pack format 1 (introduced in 0.18)\nB5\n\naaaaaE")
223
expected_records = [([], 'aaaaa')]
226
[(names, read_bytes(None))
227
for (names, read_bytes) in reader.iter_records()])
229
def test_validate_empty_container(self):
230
"""validate does not raise an error for a container with no records."""
231
reader = self.get_reader_for("Bazaar pack format 1 (introduced in 0.18)\nE")
232
# No exception raised
235
def test_validate_non_empty_valid_container(self):
236
"""validate does not raise an error for a container with a valid record.
238
reader = self.get_reader_for(
239
"Bazaar pack format 1 (introduced in 0.18)\nB3\nname\n\nabcE")
240
# No exception raised
243
def test_validate_bad_format(self):
244
"""validate raises an error for unrecognised format strings.
246
It may raise either UnexpectedEndOfContainerError or
247
UnknownContainerFormatError, depending on exactly what the string is.
249
inputs = ["", "x", "Bazaar pack format 1 (introduced in 0.18)", "bad\n"]
251
reader = self.get_reader_for(input)
253
(errors.UnexpectedEndOfContainerError,
254
errors.UnknownContainerFormatError),
257
def test_validate_bad_record_marker(self):
258
"""validate raises UnknownRecordTypeError for unrecognised record
261
reader = self.get_reader_for(
262
"Bazaar pack format 1 (introduced in 0.18)\nX")
263
self.assertRaises(errors.UnknownRecordTypeError, reader.validate)
265
def test_validate_data_after_end_marker(self):
266
"""validate raises ContainerHasExcessDataError if there are any bytes
267
after the end of the container.
269
reader = self.get_reader_for(
270
"Bazaar pack format 1 (introduced in 0.18)\nEcrud")
272
errors.ContainerHasExcessDataError, reader.validate)
274
def test_validate_no_end_marker(self):
275
"""validate raises UnexpectedEndOfContainerError if there's no end of
276
container marker, even if the container up to this point has been valid.
278
reader = self.get_reader_for(
279
"Bazaar pack format 1 (introduced in 0.18)\n")
281
errors.UnexpectedEndOfContainerError, reader.validate)
283
def test_validate_duplicate_name(self):
284
"""validate raises DuplicateRecordNameError if the same name occurs
285
multiple times in the container.
287
reader = self.get_reader_for(
288
"Bazaar pack format 1 (introduced in 0.18)\n"
292
self.assertRaises(errors.DuplicateRecordNameError, reader.validate)
294
def test_validate_undecodeable_name(self):
295
"""Names that aren't valid UTF-8 cause validate to fail."""
296
reader = self.get_reader_for(
297
"Bazaar pack format 1 (introduced in 0.18)\nB0\n\xcc\n\nE")
298
self.assertRaises(errors.InvalidRecordError, reader.validate)
301
class TestBytesRecordReader(tests.TestCase):
302
"""Tests for reading and validating Bytes records with BytesRecordReader."""
304
def get_reader_for(self, bytes):
305
stream = StringIO(bytes)
306
reader = pack.BytesRecordReader(stream)
309
def test_record_with_no_name(self):
310
"""Reading a Bytes record with no name returns an empty list of
313
reader = self.get_reader_for("5\n\naaaaa")
314
names, get_bytes = reader.read()
315
self.assertEqual([], names)
316
self.assertEqual('aaaaa', get_bytes(None))
318
def test_record_with_one_name(self):
319
"""Reading a Bytes record with one name returns a list of just that
322
reader = self.get_reader_for("5\nname1\n\naaaaa")
323
names, get_bytes = reader.read()
324
self.assertEqual([('name1', )], names)
325
self.assertEqual('aaaaa', get_bytes(None))
327
def test_record_with_two_names(self):
328
"""Reading a Bytes record with two names returns a list of both names.
330
reader = self.get_reader_for("5\nname1\nname2\n\naaaaa")
331
names, get_bytes = reader.read()
332
self.assertEqual([('name1', ), ('name2', )], names)
333
self.assertEqual('aaaaa', get_bytes(None))
335
def test_record_with_two_part_names(self):
336
"""Reading a Bytes record with a two_part name reads both."""
337
reader = self.get_reader_for("5\nname1\x00name2\n\naaaaa")
338
names, get_bytes = reader.read()
339
self.assertEqual([('name1', 'name2', )], names)
340
self.assertEqual('aaaaa', get_bytes(None))
342
def test_invalid_length(self):
343
"""If the length-prefix is not a number, parsing raises
346
reader = self.get_reader_for("not a number\n")
347
self.assertRaises(errors.InvalidRecordError, reader.read)
349
def test_early_eof(self):
350
"""Tests for premature EOF occuring during parsing Bytes records with
353
A incomplete container might be interrupted at any point. The
354
BytesRecordReader needs to cope with the input stream running out no
355
matter where it is in the parsing process.
357
In all cases, UnexpectedEndOfContainerError should be raised.
359
complete_record = "6\nname\n\nabcdef"
360
for count in range(0, len(complete_record)):
361
incomplete_record = complete_record[:count]
362
reader = self.get_reader_for(incomplete_record)
363
# We don't use assertRaises to make diagnosing failures easier
364
# (assertRaises doesn't allow a custom failure message).
366
names, read_bytes = reader.read()
368
except errors.UnexpectedEndOfContainerError:
372
"UnexpectedEndOfContainerError not raised when parsing %r"
373
% (incomplete_record,))
375
def test_initial_eof(self):
376
"""EOF before any bytes read at all."""
377
reader = self.get_reader_for("")
378
self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
380
def test_eof_after_length(self):
381
"""EOF after reading the length and before reading name(s)."""
382
reader = self.get_reader_for("123\n")
383
self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
385
def test_eof_during_name(self):
386
"""EOF during reading a name."""
387
reader = self.get_reader_for("123\nname")
388
self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
390
def test_read_invalid_name_whitespace(self):
391
"""Names must have no whitespace."""
392
# A name with a space.
393
reader = self.get_reader_for("0\nbad name\n\n")
394
self.assertRaises(errors.InvalidRecordError, reader.read)
397
reader = self.get_reader_for("0\nbad\tname\n\n")
398
self.assertRaises(errors.InvalidRecordError, reader.read)
400
# A name with a vertical tab.
401
reader = self.get_reader_for("0\nbad\vname\n\n")
402
self.assertRaises(errors.InvalidRecordError, reader.read)
404
def test_validate_whitespace_in_name(self):
405
"""Names must have no whitespace."""
406
reader = self.get_reader_for("0\nbad name\n\n")
407
self.assertRaises(errors.InvalidRecordError, reader.validate)
409
def test_validate_interrupted_prelude(self):
410
"""EOF during reading a record's prelude causes validate to fail."""
411
reader = self.get_reader_for("")
413
errors.UnexpectedEndOfContainerError, reader.validate)
415
def test_validate_interrupted_body(self):
416
"""EOF during reading a record's body causes validate to fail."""
417
reader = self.get_reader_for("1\n\n")
419
errors.UnexpectedEndOfContainerError, reader.validate)
421
def test_validate_unparseable_length(self):
422
"""An unparseable record length causes validate to fail."""
423
reader = self.get_reader_for("\n\n")
425
errors.InvalidRecordError, reader.validate)
427
def test_validate_undecodeable_name(self):
428
"""Names that aren't valid UTF-8 cause validate to fail."""
429
reader = self.get_reader_for("0\n\xcc\n\n")
430
self.assertRaises(errors.InvalidRecordError, reader.validate)
432
def test_read_max_length(self):
433
"""If the max_length passed to the callable returned by read is not
434
None, then no more than that many bytes will be read.
436
reader = self.get_reader_for("6\n\nabcdef")
437
names, get_bytes = reader.read()
438
self.assertEqual('abc', get_bytes(3))
440
def test_read_no_max_length(self):
441
"""If the max_length passed to the callable returned by read is None,
442
then all the bytes in the record will be read.
444
reader = self.get_reader_for("6\n\nabcdef")
445
names, get_bytes = reader.read()
446
self.assertEqual('abcdef', get_bytes(None))
448
def test_repeated_read_calls(self):
449
"""Repeated calls to the callable returned from BytesRecordReader.read
450
will not read beyond the end of the record.
452
reader = self.get_reader_for("6\n\nabcdefB3\nnext-record\nXXX")
453
names, get_bytes = reader.read()
454
self.assertEqual('abcdef', get_bytes(None))
455
self.assertEqual('', get_bytes(None))
456
self.assertEqual('', get_bytes(99))
459
class TestMakeReadvReader(tests.TestCaseWithTransport):
461
def test_read_skipping_records(self):
462
pack_data = StringIO()
463
writer = pack.ContainerWriter(pack_data.write)
466
memos.append(writer.add_bytes_record('abc', names=[]))
467
memos.append(writer.add_bytes_record('def', names=[('name1', )]))
468
memos.append(writer.add_bytes_record('ghi', names=[('name2', )]))
469
memos.append(writer.add_bytes_record('jkl', names=[]))
471
transport = self.get_transport()
472
transport.put_bytes('mypack', pack_data.getvalue())
473
requested_records = [memos[0], memos[2]]
474
reader = pack.make_readv_reader(transport, 'mypack', requested_records)
476
for names, reader_func in reader.iter_records():
477
result.append((names, reader_func(None)))
478
self.assertEqual([([], 'abc'), ([('name2', )], 'ghi')], result)
481
class TestReadvFile(tests.TestCaseWithTransport):
482
"""Tests of the ReadVFile class.
484
Error cases are deliberately undefined: this code adapts the underlying
485
transport interface to a single 'streaming read' interface as
486
ContainerReader needs.
489
def test_read_bytes(self):
490
"""Test reading of both single bytes and all bytes in a hunk."""
491
transport = self.get_transport()
492
transport.put_bytes('sample', '0123456789')
493
f = pack.ReadVFile(transport.readv('sample', [(0,1), (1,2), (4,1), (6,2)]))
495
results.append(f.read(1))
496
results.append(f.read(2))
497
results.append(f.read(1))
498
results.append(f.read(1))
499
results.append(f.read(1))
500
self.assertEqual(['0', '12', '4', '6', '7'], results)
502
def test_readline(self):
503
"""Test using readline() as ContainerReader does.
505
This is always within a readv hunk, never across it.
507
transport = self.get_transport()
508
transport.put_bytes('sample', '0\n2\n4\n')
509
f = pack.ReadVFile(transport.readv('sample', [(0,2), (2,4)]))
511
results.append(f.readline())
512
results.append(f.readline())
513
results.append(f.readline())
514
self.assertEqual(['0\n', '2\n', '4\n'], results)
516
def test_readline_and_read(self):
517
"""Test exercising one byte reads, readline, and then read again."""
518
transport = self.get_transport()
519
transport.put_bytes('sample', '0\n2\n4\n')
520
f = pack.ReadVFile(transport.readv('sample', [(0,6)]))
522
results.append(f.read(1))
523
results.append(f.readline())
524
results.append(f.read(4))
525
self.assertEqual(['0', '\n', '2\n4\n'], results)