1
# Copyright (C) 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for bzrlib.pack."""
20
from cStringIO import StringIO
22
from bzrlib import pack, errors, tests
25
class TestContainerSerialiser(tests.TestCase):
26
"""Tests for the ContainerSerialiser class."""
28
def test_construct(self):
29
"""Test constructing a ContainerSerialiser."""
30
pack.ContainerSerialiser()
33
serialiser = pack.ContainerSerialiser()
34
self.assertEqual('Bazaar pack format 1 (introduced in 0.18)\n',
38
serialiser = pack.ContainerSerialiser()
39
self.assertEqual('E', serialiser.end())
41
def test_bytes_record_no_name(self):
42
serialiser = pack.ContainerSerialiser()
43
record = serialiser.bytes_record('bytes', [])
44
self.assertEqual('B5\n\nbytes', record)
46
def test_bytes_record_one_name_with_one_part(self):
47
serialiser = pack.ContainerSerialiser()
48
record = serialiser.bytes_record('bytes', [('name',)])
49
self.assertEqual('B5\nname\n\nbytes', record)
51
def test_bytes_record_one_name_with_two_parts(self):
52
serialiser = pack.ContainerSerialiser()
53
record = serialiser.bytes_record('bytes', [('part1', 'part2')])
54
self.assertEqual('B5\npart1\x00part2\n\nbytes', record)
56
def test_bytes_record_two_names(self):
57
serialiser = pack.ContainerSerialiser()
58
record = serialiser.bytes_record('bytes', [('name1',), ('name2',)])
59
self.assertEqual('B5\nname1\nname2\n\nbytes', record)
61
def test_bytes_record_whitespace_in_name_part(self):
62
serialiser = pack.ContainerSerialiser()
64
errors.InvalidRecordError,
65
serialiser.bytes_record, 'bytes', [('bad name',)])
68
class TestContainerWriter(tests.TestCase):
71
tests.TestCase.setUp(self)
72
self.output = StringIO()
73
self.writer = pack.ContainerWriter(self.output.write)
75
def assertOutput(self, expected_output):
76
"""Assert that the output of self.writer ContainerWriter is equal to
79
self.assertEqual(expected_output, self.output.getvalue())
81
def test_construct(self):
82
"""Test constructing a ContainerWriter.
84
This uses None as the output stream to show that the constructor
85
doesn't try to use the output stream.
87
writer = pack.ContainerWriter(None)
90
"""The begin() method writes the container format marker line."""
92
self.assertOutput('Bazaar pack format 1 (introduced in 0.18)\n')
94
def test_zero_records_written_after_begin(self):
95
"""After begin is written, 0 records have been written."""
97
self.assertEqual(0, self.writer.records_written)
100
"""The end() method writes an End Marker record."""
103
self.assertOutput('Bazaar pack format 1 (introduced in 0.18)\nE')
105
def test_empty_end_does_not_add_a_record_to_records_written(self):
106
"""The end() method does not count towards the records written."""
109
self.assertEqual(0, self.writer.records_written)
111
def test_non_empty_end_does_not_add_a_record_to_records_written(self):
112
"""The end() method does not count towards the records written."""
114
self.writer.add_bytes_record('foo', names=[])
116
self.assertEqual(1, self.writer.records_written)
118
def test_add_bytes_record_no_name(self):
119
"""Add a bytes record with no name."""
121
offset, length = self.writer.add_bytes_record('abc', names=[])
122
self.assertEqual((42, 7), (offset, length))
124
'Bazaar pack format 1 (introduced in 0.18)\nB3\n\nabc')
126
def test_add_bytes_record_one_name(self):
127
"""Add a bytes record with one name."""
129
offset, length = self.writer.add_bytes_record(
130
'abc', names=[('name1', )])
131
self.assertEqual((42, 13), (offset, length))
133
'Bazaar pack format 1 (introduced in 0.18)\n'
136
def test_add_bytes_record_two_names(self):
137
"""Add a bytes record with two names."""
139
offset, length = self.writer.add_bytes_record(
140
'abc', names=[('name1', ), ('name2', )])
141
self.assertEqual((42, 19), (offset, length))
143
'Bazaar pack format 1 (introduced in 0.18)\n'
144
'B3\nname1\nname2\n\nabc')
146
def test_add_bytes_record_two_names(self):
147
"""Add a bytes record with two names."""
149
offset, length = self.writer.add_bytes_record(
150
'abc', names=[('name1', ), ('name2', )])
151
self.assertEqual((42, 19), (offset, length))
153
'Bazaar pack format 1 (introduced in 0.18)\n'
154
'B3\nname1\nname2\n\nabc')
156
def test_add_bytes_record_two_element_name(self):
157
"""Add a bytes record with a two-element name."""
159
offset, length = self.writer.add_bytes_record(
160
'abc', names=[('name1', 'name2')])
161
self.assertEqual((42, 19), (offset, length))
163
'Bazaar pack format 1 (introduced in 0.18)\n'
164
'B3\nname1\x00name2\n\nabc')
166
def test_add_second_bytes_record_gets_higher_offset(self):
168
self.writer.add_bytes_record('abc', names=[])
169
offset, length = self.writer.add_bytes_record('abc', names=[])
170
self.assertEqual((49, 7), (offset, length))
172
'Bazaar pack format 1 (introduced in 0.18)\n'
176
def test_add_bytes_record_invalid_name(self):
177
"""Adding a Bytes record with a name with whitespace in it raises
182
errors.InvalidRecordError,
183
self.writer.add_bytes_record, 'abc', names=[('bad name', )])
185
def test_add_bytes_records_add_to_records_written(self):
186
"""Adding a Bytes record increments the records_written counter."""
188
self.writer.add_bytes_record('foo', names=[])
189
self.assertEqual(1, self.writer.records_written)
190
self.writer.add_bytes_record('foo', names=[])
191
self.assertEqual(2, self.writer.records_written)
194
class TestContainerReader(tests.TestCase):
195
"""Tests for the ContainerReader.
197
The ContainerReader reads format 1 containers, so these tests explicitly
198
test how it reacts to format 1 data. If a new version of the format is
199
added, then separate tests for that format should be added.
202
def get_reader_for(self, bytes):
203
stream = StringIO(bytes)
204
reader = pack.ContainerReader(stream)
207
def test_construct(self):
208
"""Test constructing a ContainerReader.
210
This uses None as the output stream to show that the constructor doesn't
211
try to use the input stream.
213
reader = pack.ContainerReader(None)
215
def test_empty_container(self):
216
"""Read an empty container."""
217
reader = self.get_reader_for(
218
"Bazaar pack format 1 (introduced in 0.18)\nE")
219
self.assertEqual([], list(reader.iter_records()))
221
def test_unknown_format(self):
222
"""Unrecognised container formats raise UnknownContainerFormatError."""
223
reader = self.get_reader_for("unknown format\n")
225
errors.UnknownContainerFormatError, reader.iter_records)
227
def test_unexpected_end_of_container(self):
228
"""Containers that don't end with an End Marker record should cause
229
UnexpectedEndOfContainerError to be raised.
231
reader = self.get_reader_for(
232
"Bazaar pack format 1 (introduced in 0.18)\n")
233
iterator = reader.iter_records()
235
errors.UnexpectedEndOfContainerError, iterator.next)
237
def test_unknown_record_type(self):
238
"""Unknown record types cause UnknownRecordTypeError to be raised."""
239
reader = self.get_reader_for(
240
"Bazaar pack format 1 (introduced in 0.18)\nX")
241
iterator = reader.iter_records()
243
errors.UnknownRecordTypeError, iterator.next)
245
def test_container_with_one_unnamed_record(self):
246
"""Read a container with one Bytes record.
248
Parsing Bytes records is more thoroughly exercised by
249
TestBytesRecordReader. This test is here to ensure that
250
ContainerReader's integration with BytesRecordReader is working.
252
reader = self.get_reader_for(
253
"Bazaar pack format 1 (introduced in 0.18)\nB5\n\naaaaaE")
254
expected_records = [([], 'aaaaa')]
257
[(names, read_bytes(None))
258
for (names, read_bytes) in reader.iter_records()])
260
def test_validate_empty_container(self):
261
"""validate does not raise an error for a container with no records."""
262
reader = self.get_reader_for("Bazaar pack format 1 (introduced in 0.18)\nE")
263
# No exception raised
266
def test_validate_non_empty_valid_container(self):
267
"""validate does not raise an error for a container with a valid record.
269
reader = self.get_reader_for(
270
"Bazaar pack format 1 (introduced in 0.18)\nB3\nname\n\nabcE")
271
# No exception raised
274
def test_validate_bad_format(self):
275
"""validate raises an error for unrecognised format strings.
277
It may raise either UnexpectedEndOfContainerError or
278
UnknownContainerFormatError, depending on exactly what the string is.
280
inputs = ["", "x", "Bazaar pack format 1 (introduced in 0.18)", "bad\n"]
282
reader = self.get_reader_for(input)
284
(errors.UnexpectedEndOfContainerError,
285
errors.UnknownContainerFormatError),
288
def test_validate_bad_record_marker(self):
289
"""validate raises UnknownRecordTypeError for unrecognised record
292
reader = self.get_reader_for(
293
"Bazaar pack format 1 (introduced in 0.18)\nX")
294
self.assertRaises(errors.UnknownRecordTypeError, reader.validate)
296
def test_validate_data_after_end_marker(self):
297
"""validate raises ContainerHasExcessDataError if there are any bytes
298
after the end of the container.
300
reader = self.get_reader_for(
301
"Bazaar pack format 1 (introduced in 0.18)\nEcrud")
303
errors.ContainerHasExcessDataError, reader.validate)
305
def test_validate_no_end_marker(self):
306
"""validate raises UnexpectedEndOfContainerError if there's no end of
307
container marker, even if the container up to this point has been valid.
309
reader = self.get_reader_for(
310
"Bazaar pack format 1 (introduced in 0.18)\n")
312
errors.UnexpectedEndOfContainerError, reader.validate)
314
def test_validate_duplicate_name(self):
315
"""validate raises DuplicateRecordNameError if the same name occurs
316
multiple times in the container.
318
reader = self.get_reader_for(
319
"Bazaar pack format 1 (introduced in 0.18)\n"
323
self.assertRaises(errors.DuplicateRecordNameError, reader.validate)
325
def test_validate_undecodeable_name(self):
326
"""Names that aren't valid UTF-8 cause validate to fail."""
327
reader = self.get_reader_for(
328
"Bazaar pack format 1 (introduced in 0.18)\nB0\n\xcc\n\nE")
329
self.assertRaises(errors.InvalidRecordError, reader.validate)
332
class TestBytesRecordReader(tests.TestCase):
333
"""Tests for reading and validating Bytes records with
336
Like TestContainerReader, this explicitly tests the reading of format 1
337
data. If a new version of the format is added, then a separate set of
338
tests for reading that format should be added.
341
def get_reader_for(self, bytes):
342
stream = StringIO(bytes)
343
reader = pack.BytesRecordReader(stream)
346
def test_record_with_no_name(self):
347
"""Reading a Bytes record with no name returns an empty list of
350
reader = self.get_reader_for("5\n\naaaaa")
351
names, get_bytes = reader.read()
352
self.assertEqual([], names)
353
self.assertEqual('aaaaa', get_bytes(None))
355
def test_record_with_one_name(self):
356
"""Reading a Bytes record with one name returns a list of just that
359
reader = self.get_reader_for("5\nname1\n\naaaaa")
360
names, get_bytes = reader.read()
361
self.assertEqual([('name1', )], names)
362
self.assertEqual('aaaaa', get_bytes(None))
364
def test_record_with_two_names(self):
365
"""Reading a Bytes record with two names returns a list of both names.
367
reader = self.get_reader_for("5\nname1\nname2\n\naaaaa")
368
names, get_bytes = reader.read()
369
self.assertEqual([('name1', ), ('name2', )], names)
370
self.assertEqual('aaaaa', get_bytes(None))
372
def test_record_with_two_part_names(self):
373
"""Reading a Bytes record with a two_part name reads both."""
374
reader = self.get_reader_for("5\nname1\x00name2\n\naaaaa")
375
names, get_bytes = reader.read()
376
self.assertEqual([('name1', 'name2', )], names)
377
self.assertEqual('aaaaa', get_bytes(None))
379
def test_invalid_length(self):
380
"""If the length-prefix is not a number, parsing raises
383
reader = self.get_reader_for("not a number\n")
384
self.assertRaises(errors.InvalidRecordError, reader.read)
386
def test_early_eof(self):
387
"""Tests for premature EOF occuring during parsing Bytes records with
390
A incomplete container might be interrupted at any point. The
391
BytesRecordReader needs to cope with the input stream running out no
392
matter where it is in the parsing process.
394
In all cases, UnexpectedEndOfContainerError should be raised.
396
complete_record = "6\nname\n\nabcdef"
397
for count in range(0, len(complete_record)):
398
incomplete_record = complete_record[:count]
399
reader = self.get_reader_for(incomplete_record)
400
# We don't use assertRaises to make diagnosing failures easier
401
# (assertRaises doesn't allow a custom failure message).
403
names, read_bytes = reader.read()
405
except errors.UnexpectedEndOfContainerError:
409
"UnexpectedEndOfContainerError not raised when parsing %r"
410
% (incomplete_record,))
412
def test_initial_eof(self):
413
"""EOF before any bytes read at all."""
414
reader = self.get_reader_for("")
415
self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
417
def test_eof_after_length(self):
418
"""EOF after reading the length and before reading name(s)."""
419
reader = self.get_reader_for("123\n")
420
self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
422
def test_eof_during_name(self):
423
"""EOF during reading a name."""
424
reader = self.get_reader_for("123\nname")
425
self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
427
def test_read_invalid_name_whitespace(self):
428
"""Names must have no whitespace."""
429
# A name with a space.
430
reader = self.get_reader_for("0\nbad name\n\n")
431
self.assertRaises(errors.InvalidRecordError, reader.read)
434
reader = self.get_reader_for("0\nbad\tname\n\n")
435
self.assertRaises(errors.InvalidRecordError, reader.read)
437
# A name with a vertical tab.
438
reader = self.get_reader_for("0\nbad\vname\n\n")
439
self.assertRaises(errors.InvalidRecordError, reader.read)
441
def test_validate_whitespace_in_name(self):
442
"""Names must have no whitespace."""
443
reader = self.get_reader_for("0\nbad name\n\n")
444
self.assertRaises(errors.InvalidRecordError, reader.validate)
446
def test_validate_interrupted_prelude(self):
447
"""EOF during reading a record's prelude causes validate to fail."""
448
reader = self.get_reader_for("")
450
errors.UnexpectedEndOfContainerError, reader.validate)
452
def test_validate_interrupted_body(self):
453
"""EOF during reading a record's body causes validate to fail."""
454
reader = self.get_reader_for("1\n\n")
456
errors.UnexpectedEndOfContainerError, reader.validate)
458
def test_validate_unparseable_length(self):
459
"""An unparseable record length causes validate to fail."""
460
reader = self.get_reader_for("\n\n")
462
errors.InvalidRecordError, reader.validate)
464
def test_validate_undecodeable_name(self):
465
"""Names that aren't valid UTF-8 cause validate to fail."""
466
reader = self.get_reader_for("0\n\xcc\n\n")
467
self.assertRaises(errors.InvalidRecordError, reader.validate)
469
def test_read_max_length(self):
470
"""If the max_length passed to the callable returned by read is not
471
None, then no more than that many bytes will be read.
473
reader = self.get_reader_for("6\n\nabcdef")
474
names, get_bytes = reader.read()
475
self.assertEqual('abc', get_bytes(3))
477
def test_read_no_max_length(self):
478
"""If the max_length passed to the callable returned by read is None,
479
then all the bytes in the record will be read.
481
reader = self.get_reader_for("6\n\nabcdef")
482
names, get_bytes = reader.read()
483
self.assertEqual('abcdef', get_bytes(None))
485
def test_repeated_read_calls(self):
486
"""Repeated calls to the callable returned from BytesRecordReader.read
487
will not read beyond the end of the record.
489
reader = self.get_reader_for("6\n\nabcdefB3\nnext-record\nXXX")
490
names, get_bytes = reader.read()
491
self.assertEqual('abcdef', get_bytes(None))
492
self.assertEqual('', get_bytes(None))
493
self.assertEqual('', get_bytes(99))
496
class TestMakeReadvReader(tests.TestCaseWithTransport):
498
def test_read_skipping_records(self):
499
pack_data = StringIO()
500
writer = pack.ContainerWriter(pack_data.write)
503
memos.append(writer.add_bytes_record('abc', names=[]))
504
memos.append(writer.add_bytes_record('def', names=[('name1', )]))
505
memos.append(writer.add_bytes_record('ghi', names=[('name2', )]))
506
memos.append(writer.add_bytes_record('jkl', names=[]))
508
transport = self.get_transport()
509
transport.put_bytes('mypack', pack_data.getvalue())
510
requested_records = [memos[0], memos[2]]
511
reader = pack.make_readv_reader(transport, 'mypack', requested_records)
513
for names, reader_func in reader.iter_records():
514
result.append((names, reader_func(None)))
515
self.assertEqual([([], 'abc'), ([('name2', )], 'ghi')], result)
518
class TestReadvFile(tests.TestCaseWithTransport):
519
"""Tests of the ReadVFile class.
521
Error cases are deliberately undefined: this code adapts the underlying
522
transport interface to a single 'streaming read' interface as
523
ContainerReader needs.
526
def test_read_bytes(self):
527
"""Test reading of both single bytes and all bytes in a hunk."""
528
transport = self.get_transport()
529
transport.put_bytes('sample', '0123456789')
530
f = pack.ReadVFile(transport.readv('sample', [(0,1), (1,2), (4,1), (6,2)]))
532
results.append(f.read(1))
533
results.append(f.read(2))
534
results.append(f.read(1))
535
results.append(f.read(1))
536
results.append(f.read(1))
537
self.assertEqual(['0', '12', '4', '6', '7'], results)
539
def test_readline(self):
540
"""Test using readline() as ContainerReader does.
542
This is always within a readv hunk, never across it.
544
transport = self.get_transport()
545
transport.put_bytes('sample', '0\n2\n4\n')
546
f = pack.ReadVFile(transport.readv('sample', [(0,2), (2,4)]))
548
results.append(f.readline())
549
results.append(f.readline())
550
results.append(f.readline())
551
self.assertEqual(['0\n', '2\n', '4\n'], results)
553
def test_readline_and_read(self):
554
"""Test exercising one byte reads, readline, and then read again."""
555
transport = self.get_transport()
556
transport.put_bytes('sample', '0\n2\n4\n')
557
f = pack.ReadVFile(transport.readv('sample', [(0,6)]))
559
results.append(f.read(1))
560
results.append(f.readline())
561
results.append(f.read(4))
562
self.assertEqual(['0', '\n', '2\n4\n'], results)
565
class PushParserTestCase(tests.TestCase):
566
"""Base class for TestCases involving ContainerPushParser."""
568
def make_parser_expecting_record_type(self):
569
parser = pack.ContainerPushParser()
570
parser.accept_bytes("Bazaar pack format 1 (introduced in 0.18)\n")
573
def make_parser_expecting_bytes_record(self):
574
parser = pack.ContainerPushParser()
575
parser.accept_bytes("Bazaar pack format 1 (introduced in 0.18)\nB")
578
def assertRecordParsing(self, expected_record, bytes):
579
"""Assert that 'bytes' is parsed as a given bytes record.
581
:param expected_record: A tuple of (names, bytes).
583
parser = self.make_parser_expecting_bytes_record()
584
parser.accept_bytes(bytes)
585
parsed_records = parser.read_pending_records()
586
self.assertEqual([expected_record], parsed_records)
589
class TestContainerPushParser(PushParserTestCase):
590
"""Tests for ContainerPushParser.
592
The ContainerPushParser reads format 1 containers, so these tests
593
explicitly test how it reacts to format 1 data. If a new version of the
594
format is added, then separate tests for that format should be added.
597
def test_construct(self):
598
"""ContainerPushParser can be constructed."""
599
pack.ContainerPushParser()
601
def test_multiple_records_at_once(self):
602
"""If multiple records worth of data are fed to the parser in one
603
string, the parser will correctly parse all the records.
605
(A naive implementation might stop after parsing the first record.)
607
parser = self.make_parser_expecting_record_type()
608
parser.accept_bytes("B5\nname1\n\nbody1B5\nname2\n\nbody2")
610
[([('name1',)], 'body1'), ([('name2',)], 'body2')],
611
parser.read_pending_records())
613
def test_multiple_empty_records_at_once(self):
614
"""If multiple empty records worth of data are fed to the parser in one
615
string, the parser will correctly parse all the records.
617
(A naive implementation might stop after parsing the first empty
618
record, because the buffer size had not changed.)
620
parser = self.make_parser_expecting_record_type()
621
parser.accept_bytes("B0\nname1\n\nB0\nname2\n\n")
623
[([('name1',)], ''), ([('name2',)], '')],
624
parser.read_pending_records())
627
class TestContainerPushParserBytesParsing(PushParserTestCase):
628
"""Tests for reading Bytes records with ContainerPushParser.
630
The ContainerPushParser reads format 1 containers, so these tests
631
explicitly test how it reacts to format 1 data. If a new version of the
632
format is added, then separate tests for that format should be added.
635
def test_record_with_no_name(self):
636
"""Reading a Bytes record with no name returns an empty list of
639
self.assertRecordParsing(([], 'aaaaa'), "5\n\naaaaa")
641
def test_record_with_one_name(self):
642
"""Reading a Bytes record with one name returns a list of just that
645
self.assertRecordParsing(
646
([('name1', )], 'aaaaa'),
649
def test_record_with_two_names(self):
650
"""Reading a Bytes record with two names returns a list of both names.
652
self.assertRecordParsing(
653
([('name1', ), ('name2', )], 'aaaaa'),
654
"5\nname1\nname2\n\naaaaa")
656
def test_record_with_two_part_names(self):
657
"""Reading a Bytes record with a two_part name reads both."""
658
self.assertRecordParsing(
659
([('name1', 'name2')], 'aaaaa'),
660
"5\nname1\x00name2\n\naaaaa")
662
def test_invalid_length(self):
663
"""If the length-prefix is not a number, parsing raises
666
parser = self.make_parser_expecting_bytes_record()
668
errors.InvalidRecordError, parser.accept_bytes, "not a number\n")
670
def test_incomplete_record(self):
671
"""If the bytes seen so far don't form a complete record, then there
672
will be nothing returned by read_pending_records.
674
parser = self.make_parser_expecting_bytes_record()
675
parser.accept_bytes("5\n\nabcd")
676
self.assertEqual([], parser.read_pending_records())
678
def test_accept_nothing(self):
679
"""The edge case of parsing an empty string causes no error."""
680
parser = self.make_parser_expecting_bytes_record()
681
parser.accept_bytes("")
683
def assertInvalidRecord(self, bytes):
684
"""Assert that parsing the given bytes will raise an
687
parser = self.make_parser_expecting_bytes_record()
689
errors.InvalidRecordError, parser.accept_bytes, bytes)
691
def test_read_invalid_name_whitespace(self):
692
"""Names must have no whitespace."""
693
# A name with a space.
694
self.assertInvalidRecord("0\nbad name\n\n")
697
self.assertInvalidRecord("0\nbad\tname\n\n")
699
# A name with a vertical tab.
700
self.assertInvalidRecord("0\nbad\vname\n\n")
702
def test_repeated_read_pending_records(self):
703
"""read_pending_records will not return the same record twice."""
704
parser = self.make_parser_expecting_bytes_record()
705
parser.accept_bytes("6\n\nabcdef")
706
self.assertEqual([([], 'abcdef')], parser.read_pending_records())
707
self.assertEqual([], parser.read_pending_records())