1
# Copyright (C) 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for bzrlib.pack."""
20
from cStringIO import StringIO
22
from bzrlib import pack, errors, tests
25
class TestContainerSerialiser(tests.TestCase):
26
"""Tests for the ContainerSerialiser class."""
28
def test_construct(self):
29
"""Test constructing a ContainerSerialiser."""
30
pack.ContainerSerialiser()
33
serialiser = pack.ContainerSerialiser()
34
self.assertEqual('Bazaar pack format 1 (introduced in 0.18)\n',
38
serialiser = pack.ContainerSerialiser()
39
self.assertEqual('E', serialiser.end())
41
def test_bytes_record_no_name(self):
42
serialiser = pack.ContainerSerialiser()
43
record = serialiser.bytes_record('bytes', [])
44
self.assertEqual('B5\n\nbytes', record)
46
def test_bytes_record_one_name_with_one_part(self):
47
serialiser = pack.ContainerSerialiser()
48
record = serialiser.bytes_record('bytes', [('name',)])
49
self.assertEqual('B5\nname\n\nbytes', record)
51
def test_bytes_record_one_name_with_two_parts(self):
52
serialiser = pack.ContainerSerialiser()
53
record = serialiser.bytes_record('bytes', [('part1', 'part2')])
54
self.assertEqual('B5\npart1\x00part2\n\nbytes', record)
56
def test_bytes_record_two_names(self):
57
serialiser = pack.ContainerSerialiser()
58
record = serialiser.bytes_record('bytes', [('name1',), ('name2',)])
59
self.assertEqual('B5\nname1\nname2\n\nbytes', record)
61
def test_bytes_record_whitespace_in_name_part(self):
62
serialiser = pack.ContainerSerialiser()
64
errors.InvalidRecordError,
65
serialiser.bytes_record, 'bytes', [('bad name',)])
68
class TestContainerWriter(tests.TestCase):
71
self.output = StringIO()
72
self.writer = pack.ContainerWriter(self.output.write)
74
def assertOutput(self, expected_output):
75
"""Assert that the output of self.writer ContainerWriter is equal to
78
self.assertEqual(expected_output, self.output.getvalue())
80
def test_construct(self):
81
"""Test constructing a ContainerWriter.
83
This uses None as the output stream to show that the constructor
84
doesn't try to use the output stream.
86
writer = pack.ContainerWriter(None)
89
"""The begin() method writes the container format marker line."""
91
self.assertOutput('Bazaar pack format 1 (introduced in 0.18)\n')
93
def test_zero_records_written_after_begin(self):
94
"""After begin is written, 0 records have been written."""
96
self.assertEqual(0, self.writer.records_written)
99
"""The end() method writes an End Marker record."""
102
self.assertOutput('Bazaar pack format 1 (introduced in 0.18)\nE')
104
def test_empty_end_does_not_add_a_record_to_records_written(self):
105
"""The end() method does not count towards the records written."""
108
self.assertEqual(0, self.writer.records_written)
110
def test_non_empty_end_does_not_add_a_record_to_records_written(self):
111
"""The end() method does not count towards the records written."""
113
self.writer.add_bytes_record('foo', names=[])
115
self.assertEqual(1, self.writer.records_written)
117
def test_add_bytes_record_no_name(self):
118
"""Add a bytes record with no name."""
120
offset, length = self.writer.add_bytes_record('abc', names=[])
121
self.assertEqual((42, 7), (offset, length))
123
'Bazaar pack format 1 (introduced in 0.18)\nB3\n\nabc')
125
def test_add_bytes_record_one_name(self):
126
"""Add a bytes record with one name."""
128
offset, length = self.writer.add_bytes_record(
129
'abc', names=[('name1', )])
130
self.assertEqual((42, 13), (offset, length))
132
'Bazaar pack format 1 (introduced in 0.18)\n'
135
def test_add_bytes_record_two_names(self):
136
"""Add a bytes record with two names."""
138
offset, length = self.writer.add_bytes_record(
139
'abc', names=[('name1', ), ('name2', )])
140
self.assertEqual((42, 19), (offset, length))
142
'Bazaar pack format 1 (introduced in 0.18)\n'
143
'B3\nname1\nname2\n\nabc')
145
def test_add_bytes_record_two_names(self):
146
"""Add a bytes record with two names."""
148
offset, length = self.writer.add_bytes_record(
149
'abc', names=[('name1', ), ('name2', )])
150
self.assertEqual((42, 19), (offset, length))
152
'Bazaar pack format 1 (introduced in 0.18)\n'
153
'B3\nname1\nname2\n\nabc')
155
def test_add_bytes_record_two_element_name(self):
156
"""Add a bytes record with a two-element name."""
158
offset, length = self.writer.add_bytes_record(
159
'abc', names=[('name1', 'name2')])
160
self.assertEqual((42, 19), (offset, length))
162
'Bazaar pack format 1 (introduced in 0.18)\n'
163
'B3\nname1\x00name2\n\nabc')
165
def test_add_second_bytes_record_gets_higher_offset(self):
167
self.writer.add_bytes_record('abc', names=[])
168
offset, length = self.writer.add_bytes_record('abc', names=[])
169
self.assertEqual((49, 7), (offset, length))
171
'Bazaar pack format 1 (introduced in 0.18)\n'
175
def test_add_bytes_record_invalid_name(self):
176
"""Adding a Bytes record with a name with whitespace in it raises
181
errors.InvalidRecordError,
182
self.writer.add_bytes_record, 'abc', names=[('bad name', )])
184
def test_add_bytes_records_add_to_records_written(self):
185
"""Adding a Bytes record increments the records_written counter."""
187
self.writer.add_bytes_record('foo', names=[])
188
self.assertEqual(1, self.writer.records_written)
189
self.writer.add_bytes_record('foo', names=[])
190
self.assertEqual(2, self.writer.records_written)
193
class TestContainerReader(tests.TestCase):
194
"""Tests for the ContainerReader.
196
The ContainerReader reads format 1 containers, so these tests explicitly
197
test how it reacts to format 1 data. If a new version of the format is
198
added, then separate tests for that format should be added.
201
def get_reader_for(self, bytes):
202
stream = StringIO(bytes)
203
reader = pack.ContainerReader(stream)
206
def test_construct(self):
207
"""Test constructing a ContainerReader.
209
This uses None as the output stream to show that the constructor doesn't
210
try to use the input stream.
212
reader = pack.ContainerReader(None)
214
def test_empty_container(self):
215
"""Read an empty container."""
216
reader = self.get_reader_for(
217
"Bazaar pack format 1 (introduced in 0.18)\nE")
218
self.assertEqual([], list(reader.iter_records()))
220
def test_unknown_format(self):
221
"""Unrecognised container formats raise UnknownContainerFormatError."""
222
reader = self.get_reader_for("unknown format\n")
224
errors.UnknownContainerFormatError, reader.iter_records)
226
def test_unexpected_end_of_container(self):
227
"""Containers that don't end with an End Marker record should cause
228
UnexpectedEndOfContainerError to be raised.
230
reader = self.get_reader_for(
231
"Bazaar pack format 1 (introduced in 0.18)\n")
232
iterator = reader.iter_records()
234
errors.UnexpectedEndOfContainerError, iterator.next)
236
def test_unknown_record_type(self):
237
"""Unknown record types cause UnknownRecordTypeError to be raised."""
238
reader = self.get_reader_for(
239
"Bazaar pack format 1 (introduced in 0.18)\nX")
240
iterator = reader.iter_records()
242
errors.UnknownRecordTypeError, iterator.next)
244
def test_container_with_one_unnamed_record(self):
245
"""Read a container with one Bytes record.
247
Parsing Bytes records is more thoroughly exercised by
248
TestBytesRecordReader. This test is here to ensure that
249
ContainerReader's integration with BytesRecordReader is working.
251
reader = self.get_reader_for(
252
"Bazaar pack format 1 (introduced in 0.18)\nB5\n\naaaaaE")
253
expected_records = [([], 'aaaaa')]
256
[(names, read_bytes(None))
257
for (names, read_bytes) in reader.iter_records()])
259
def test_validate_empty_container(self):
260
"""validate does not raise an error for a container with no records."""
261
reader = self.get_reader_for("Bazaar pack format 1 (introduced in 0.18)\nE")
262
# No exception raised
265
def test_validate_non_empty_valid_container(self):
266
"""validate does not raise an error for a container with a valid record.
268
reader = self.get_reader_for(
269
"Bazaar pack format 1 (introduced in 0.18)\nB3\nname\n\nabcE")
270
# No exception raised
273
def test_validate_bad_format(self):
274
"""validate raises an error for unrecognised format strings.
276
It may raise either UnexpectedEndOfContainerError or
277
UnknownContainerFormatError, depending on exactly what the string is.
279
inputs = ["", "x", "Bazaar pack format 1 (introduced in 0.18)", "bad\n"]
281
reader = self.get_reader_for(input)
283
(errors.UnexpectedEndOfContainerError,
284
errors.UnknownContainerFormatError),
287
def test_validate_bad_record_marker(self):
288
"""validate raises UnknownRecordTypeError for unrecognised record
291
reader = self.get_reader_for(
292
"Bazaar pack format 1 (introduced in 0.18)\nX")
293
self.assertRaises(errors.UnknownRecordTypeError, reader.validate)
295
def test_validate_data_after_end_marker(self):
296
"""validate raises ContainerHasExcessDataError if there are any bytes
297
after the end of the container.
299
reader = self.get_reader_for(
300
"Bazaar pack format 1 (introduced in 0.18)\nEcrud")
302
errors.ContainerHasExcessDataError, reader.validate)
304
def test_validate_no_end_marker(self):
305
"""validate raises UnexpectedEndOfContainerError if there's no end of
306
container marker, even if the container up to this point has been valid.
308
reader = self.get_reader_for(
309
"Bazaar pack format 1 (introduced in 0.18)\n")
311
errors.UnexpectedEndOfContainerError, reader.validate)
313
def test_validate_duplicate_name(self):
314
"""validate raises DuplicateRecordNameError if the same name occurs
315
multiple times in the container.
317
reader = self.get_reader_for(
318
"Bazaar pack format 1 (introduced in 0.18)\n"
322
self.assertRaises(errors.DuplicateRecordNameError, reader.validate)
324
def test_validate_undecodeable_name(self):
325
"""Names that aren't valid UTF-8 cause validate to fail."""
326
reader = self.get_reader_for(
327
"Bazaar pack format 1 (introduced in 0.18)\nB0\n\xcc\n\nE")
328
self.assertRaises(errors.InvalidRecordError, reader.validate)
331
class TestBytesRecordReader(tests.TestCase):
332
"""Tests for reading and validating Bytes records with
335
Like TestContainerReader, this explicitly tests the reading of format 1
336
data. If a new version of the format is added, then a separate set of
337
tests for reading that format should be added.
340
def get_reader_for(self, bytes):
341
stream = StringIO(bytes)
342
reader = pack.BytesRecordReader(stream)
345
def test_record_with_no_name(self):
346
"""Reading a Bytes record with no name returns an empty list of
349
reader = self.get_reader_for("5\n\naaaaa")
350
names, get_bytes = reader.read()
351
self.assertEqual([], names)
352
self.assertEqual('aaaaa', get_bytes(None))
354
def test_record_with_one_name(self):
355
"""Reading a Bytes record with one name returns a list of just that
358
reader = self.get_reader_for("5\nname1\n\naaaaa")
359
names, get_bytes = reader.read()
360
self.assertEqual([('name1', )], names)
361
self.assertEqual('aaaaa', get_bytes(None))
363
def test_record_with_two_names(self):
364
"""Reading a Bytes record with two names returns a list of both names.
366
reader = self.get_reader_for("5\nname1\nname2\n\naaaaa")
367
names, get_bytes = reader.read()
368
self.assertEqual([('name1', ), ('name2', )], names)
369
self.assertEqual('aaaaa', get_bytes(None))
371
def test_record_with_two_part_names(self):
372
"""Reading a Bytes record with a two_part name reads both."""
373
reader = self.get_reader_for("5\nname1\x00name2\n\naaaaa")
374
names, get_bytes = reader.read()
375
self.assertEqual([('name1', 'name2', )], names)
376
self.assertEqual('aaaaa', get_bytes(None))
378
def test_invalid_length(self):
379
"""If the length-prefix is not a number, parsing raises
382
reader = self.get_reader_for("not a number\n")
383
self.assertRaises(errors.InvalidRecordError, reader.read)
385
def test_early_eof(self):
386
"""Tests for premature EOF occuring during parsing Bytes records with
389
A incomplete container might be interrupted at any point. The
390
BytesRecordReader needs to cope with the input stream running out no
391
matter where it is in the parsing process.
393
In all cases, UnexpectedEndOfContainerError should be raised.
395
complete_record = "6\nname\n\nabcdef"
396
for count in range(0, len(complete_record)):
397
incomplete_record = complete_record[:count]
398
reader = self.get_reader_for(incomplete_record)
399
# We don't use assertRaises to make diagnosing failures easier
400
# (assertRaises doesn't allow a custom failure message).
402
names, read_bytes = reader.read()
404
except errors.UnexpectedEndOfContainerError:
408
"UnexpectedEndOfContainerError not raised when parsing %r"
409
% (incomplete_record,))
411
def test_initial_eof(self):
412
"""EOF before any bytes read at all."""
413
reader = self.get_reader_for("")
414
self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
416
def test_eof_after_length(self):
417
"""EOF after reading the length and before reading name(s)."""
418
reader = self.get_reader_for("123\n")
419
self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
421
def test_eof_during_name(self):
422
"""EOF during reading a name."""
423
reader = self.get_reader_for("123\nname")
424
self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
426
def test_read_invalid_name_whitespace(self):
427
"""Names must have no whitespace."""
428
# A name with a space.
429
reader = self.get_reader_for("0\nbad name\n\n")
430
self.assertRaises(errors.InvalidRecordError, reader.read)
433
reader = self.get_reader_for("0\nbad\tname\n\n")
434
self.assertRaises(errors.InvalidRecordError, reader.read)
436
# A name with a vertical tab.
437
reader = self.get_reader_for("0\nbad\vname\n\n")
438
self.assertRaises(errors.InvalidRecordError, reader.read)
440
def test_validate_whitespace_in_name(self):
441
"""Names must have no whitespace."""
442
reader = self.get_reader_for("0\nbad name\n\n")
443
self.assertRaises(errors.InvalidRecordError, reader.validate)
445
def test_validate_interrupted_prelude(self):
446
"""EOF during reading a record's prelude causes validate to fail."""
447
reader = self.get_reader_for("")
449
errors.UnexpectedEndOfContainerError, reader.validate)
451
def test_validate_interrupted_body(self):
452
"""EOF during reading a record's body causes validate to fail."""
453
reader = self.get_reader_for("1\n\n")
455
errors.UnexpectedEndOfContainerError, reader.validate)
457
def test_validate_unparseable_length(self):
458
"""An unparseable record length causes validate to fail."""
459
reader = self.get_reader_for("\n\n")
461
errors.InvalidRecordError, reader.validate)
463
def test_validate_undecodeable_name(self):
464
"""Names that aren't valid UTF-8 cause validate to fail."""
465
reader = self.get_reader_for("0\n\xcc\n\n")
466
self.assertRaises(errors.InvalidRecordError, reader.validate)
468
def test_read_max_length(self):
469
"""If the max_length passed to the callable returned by read is not
470
None, then no more than that many bytes will be read.
472
reader = self.get_reader_for("6\n\nabcdef")
473
names, get_bytes = reader.read()
474
self.assertEqual('abc', get_bytes(3))
476
def test_read_no_max_length(self):
477
"""If the max_length passed to the callable returned by read is None,
478
then all the bytes in the record will be read.
480
reader = self.get_reader_for("6\n\nabcdef")
481
names, get_bytes = reader.read()
482
self.assertEqual('abcdef', get_bytes(None))
484
def test_repeated_read_calls(self):
485
"""Repeated calls to the callable returned from BytesRecordReader.read
486
will not read beyond the end of the record.
488
reader = self.get_reader_for("6\n\nabcdefB3\nnext-record\nXXX")
489
names, get_bytes = reader.read()
490
self.assertEqual('abcdef', get_bytes(None))
491
self.assertEqual('', get_bytes(None))
492
self.assertEqual('', get_bytes(99))
495
class TestMakeReadvReader(tests.TestCaseWithTransport):
497
def test_read_skipping_records(self):
498
pack_data = StringIO()
499
writer = pack.ContainerWriter(pack_data.write)
502
memos.append(writer.add_bytes_record('abc', names=[]))
503
memos.append(writer.add_bytes_record('def', names=[('name1', )]))
504
memos.append(writer.add_bytes_record('ghi', names=[('name2', )]))
505
memos.append(writer.add_bytes_record('jkl', names=[]))
507
transport = self.get_transport()
508
transport.put_bytes('mypack', pack_data.getvalue())
509
requested_records = [memos[0], memos[2]]
510
reader = pack.make_readv_reader(transport, 'mypack', requested_records)
512
for names, reader_func in reader.iter_records():
513
result.append((names, reader_func(None)))
514
self.assertEqual([([], 'abc'), ([('name2', )], 'ghi')], result)
517
class TestReadvFile(tests.TestCaseWithTransport):
518
"""Tests of the ReadVFile class.
520
Error cases are deliberately undefined: this code adapts the underlying
521
transport interface to a single 'streaming read' interface as
522
ContainerReader needs.
525
def test_read_bytes(self):
526
"""Test reading of both single bytes and all bytes in a hunk."""
527
transport = self.get_transport()
528
transport.put_bytes('sample', '0123456789')
529
f = pack.ReadVFile(transport.readv('sample', [(0,1), (1,2), (4,1), (6,2)]))
531
results.append(f.read(1))
532
results.append(f.read(2))
533
results.append(f.read(1))
534
results.append(f.read(1))
535
results.append(f.read(1))
536
self.assertEqual(['0', '12', '4', '6', '7'], results)
538
def test_readline(self):
539
"""Test using readline() as ContainerReader does.
541
This is always within a readv hunk, never across it.
543
transport = self.get_transport()
544
transport.put_bytes('sample', '0\n2\n4\n')
545
f = pack.ReadVFile(transport.readv('sample', [(0,2), (2,4)]))
547
results.append(f.readline())
548
results.append(f.readline())
549
results.append(f.readline())
550
self.assertEqual(['0\n', '2\n', '4\n'], results)
552
def test_readline_and_read(self):
553
"""Test exercising one byte reads, readline, and then read again."""
554
transport = self.get_transport()
555
transport.put_bytes('sample', '0\n2\n4\n')
556
f = pack.ReadVFile(transport.readv('sample', [(0,6)]))
558
results.append(f.read(1))
559
results.append(f.readline())
560
results.append(f.read(4))
561
self.assertEqual(['0', '\n', '2\n4\n'], results)
564
class PushParserTestCase(tests.TestCase):
565
"""Base class for TestCases involving ContainerPushParser."""
567
def make_parser_expecting_record_type(self):
568
parser = pack.ContainerPushParser()
569
parser.accept_bytes("Bazaar pack format 1 (introduced in 0.18)\n")
572
def make_parser_expecting_bytes_record(self):
573
parser = pack.ContainerPushParser()
574
parser.accept_bytes("Bazaar pack format 1 (introduced in 0.18)\nB")
577
def assertRecordParsing(self, expected_record, bytes):
578
"""Assert that 'bytes' is parsed as a given bytes record.
580
:param expected_record: A tuple of (names, bytes).
582
parser = self.make_parser_expecting_bytes_record()
583
parser.accept_bytes(bytes)
584
parsed_records = parser.read_pending_records()
585
self.assertEqual([expected_record], parsed_records)
588
class TestContainerPushParser(PushParserTestCase):
589
"""Tests for ContainerPushParser.
591
The ContainerPushParser reads format 1 containers, so these tests
592
explicitly test how it reacts to format 1 data. If a new version of the
593
format is added, then separate tests for that format should be added.
596
def test_construct(self):
597
"""ContainerPushParser can be constructed."""
598
pack.ContainerPushParser()
600
def test_multiple_records_at_once(self):
601
"""If multiple records worth of data are fed to the parser in one
602
string, the parser will correctly parse all the records.
604
(A naive implementation might stop after parsing the first record.)
606
parser = self.make_parser_expecting_record_type()
607
parser.accept_bytes("B5\nname1\n\nbody1B5\nname2\n\nbody2")
609
[([('name1',)], 'body1'), ([('name2',)], 'body2')],
610
parser.read_pending_records())
613
class TestContainerPushParserBytesParsing(PushParserTestCase):
614
"""Tests for reading Bytes records with ContainerPushParser.
616
The ContainerPushParser reads format 1 containers, so these tests
617
explicitly test how it reacts to format 1 data. If a new version of the
618
format is added, then separate tests for that format should be added.
621
def test_record_with_no_name(self):
622
"""Reading a Bytes record with no name returns an empty list of
625
self.assertRecordParsing(([], 'aaaaa'), "5\n\naaaaa")
627
def test_record_with_one_name(self):
628
"""Reading a Bytes record with one name returns a list of just that
631
self.assertRecordParsing(
632
([('name1', )], 'aaaaa'),
635
def test_record_with_two_names(self):
636
"""Reading a Bytes record with two names returns a list of both names.
638
self.assertRecordParsing(
639
([('name1', ), ('name2', )], 'aaaaa'),
640
"5\nname1\nname2\n\naaaaa")
642
def test_record_with_two_part_names(self):
643
"""Reading a Bytes record with a two_part name reads both."""
644
self.assertRecordParsing(
645
([('name1', 'name2')], 'aaaaa'),
646
"5\nname1\x00name2\n\naaaaa")
648
def test_invalid_length(self):
649
"""If the length-prefix is not a number, parsing raises
652
parser = self.make_parser_expecting_bytes_record()
654
errors.InvalidRecordError, parser.accept_bytes, "not a number\n")
656
def test_incomplete_record(self):
657
"""If the bytes seen so far don't form a complete record, then there
658
will be nothing returned by read_pending_records.
660
parser = self.make_parser_expecting_bytes_record()
661
parser.accept_bytes("5\n\nabcd")
662
self.assertEqual([], parser.read_pending_records())
664
def test_accept_nothing(self):
665
"""The edge case of parsing an empty string causes no error."""
666
parser = self.make_parser_expecting_bytes_record()
667
parser.accept_bytes("")
669
def assertInvalidRecord(self, bytes):
670
"""Assert that parsing the given bytes will raise an
673
parser = self.make_parser_expecting_bytes_record()
675
errors.InvalidRecordError, parser.accept_bytes, bytes)
677
def test_read_invalid_name_whitespace(self):
678
"""Names must have no whitespace."""
679
# A name with a space.
680
self.assertInvalidRecord("0\nbad name\n\n")
683
self.assertInvalidRecord("0\nbad\tname\n\n")
685
# A name with a vertical tab.
686
self.assertInvalidRecord("0\nbad\vname\n\n")
688
def test_repeated_read_pending_records(self):
689
"""read_pending_records will not return the same record twice."""
690
parser = self.make_parser_expecting_bytes_record()
691
parser.accept_bytes("6\n\nabcdef")
692
self.assertEqual([([], 'abcdef')], parser.read_pending_records())
693
self.assertEqual([], parser.read_pending_records())