22
22
from bzrlib import pack, errors, tests
25
class TestContainerSerialiser(tests.TestCase):
26
"""Tests for the ContainerSerialiser class."""
28
def test_construct(self):
29
"""Test constructing a ContainerSerialiser."""
30
pack.ContainerSerialiser()
33
serialiser = pack.ContainerSerialiser()
34
self.assertEqual('Bazaar pack format 1 (introduced in 0.18)\n',
38
serialiser = pack.ContainerSerialiser()
39
self.assertEqual('E', serialiser.end())
41
def test_bytes_record_no_name(self):
42
serialiser = pack.ContainerSerialiser()
43
record = serialiser.bytes_record('bytes', [])
44
self.assertEqual('B5\n\nbytes', record)
46
def test_bytes_record_one_name_with_one_part(self):
47
serialiser = pack.ContainerSerialiser()
48
record = serialiser.bytes_record('bytes', [('name',)])
49
self.assertEqual('B5\nname\n\nbytes', record)
51
def test_bytes_record_one_name_with_two_parts(self):
52
serialiser = pack.ContainerSerialiser()
53
record = serialiser.bytes_record('bytes', [('part1', 'part2')])
54
self.assertEqual('B5\npart1\x00part2\n\nbytes', record)
56
def test_bytes_record_two_names(self):
57
serialiser = pack.ContainerSerialiser()
58
record = serialiser.bytes_record('bytes', [('name1',), ('name2',)])
59
self.assertEqual('B5\nname1\nname2\n\nbytes', record)
61
def test_bytes_record_whitespace_in_name_part(self):
62
serialiser = pack.ContainerSerialiser()
64
errors.InvalidRecordError,
65
serialiser.bytes_record, 'bytes', [('bad name',)])
68
25
class TestContainerWriter(tests.TestCase):
71
tests.TestCase.setUp(self)
72
self.output = StringIO()
73
self.writer = pack.ContainerWriter(self.output.write)
75
def assertOutput(self, expected_output):
76
"""Assert that the output of self.writer ContainerWriter is equal to
79
self.assertEqual(expected_output, self.output.getvalue())
81
27
def test_construct(self):
82
28
"""Test constructing a ContainerWriter.
84
This uses None as the output stream to show that the constructor
85
doesn't try to use the output stream.
30
This uses None as the output stream to show that the constructor doesn't
31
try to use the output stream.
87
33
writer = pack.ContainerWriter(None)
89
35
def test_begin(self):
90
36
"""The begin() method writes the container format marker line."""
92
self.assertOutput('Bazaar pack format 1 (introduced in 0.18)\n')
94
def test_zero_records_written_after_begin(self):
95
"""After begin is written, 0 records have been written."""
97
self.assertEqual(0, self.writer.records_written)
38
writer = pack.ContainerWriter(output.write)
40
self.assertEqual('Bazaar pack format 1 (introduced in 0.18)\n',
99
43
def test_end(self):
100
44
"""The end() method writes an End Marker record."""
103
self.assertOutput('Bazaar pack format 1 (introduced in 0.18)\nE')
105
def test_empty_end_does_not_add_a_record_to_records_written(self):
106
"""The end() method does not count towards the records written."""
109
self.assertEqual(0, self.writer.records_written)
111
def test_non_empty_end_does_not_add_a_record_to_records_written(self):
112
"""The end() method does not count towards the records written."""
114
self.writer.add_bytes_record('foo', names=[])
116
self.assertEqual(1, self.writer.records_written)
46
writer = pack.ContainerWriter(output.write)
49
self.assertEqual('Bazaar pack format 1 (introduced in 0.18)\nE',
118
52
def test_add_bytes_record_no_name(self):
119
53
"""Add a bytes record with no name."""
121
offset, length = self.writer.add_bytes_record('abc', names=[])
55
writer = pack.ContainerWriter(output.write)
57
offset, length = writer.add_bytes_record('abc', names=[])
122
58
self.assertEqual((42, 7), (offset, length))
124
'Bazaar pack format 1 (introduced in 0.18)\nB3\n\nabc')
59
self.assertEqual('Bazaar pack format 1 (introduced in 0.18)\nB3\n\nabc',
126
62
def test_add_bytes_record_one_name(self):
127
63
"""Add a bytes record with one name."""
129
offset, length = self.writer.add_bytes_record(
130
'abc', names=[('name1', )])
65
writer = pack.ContainerWriter(output.write)
67
offset, length = writer.add_bytes_record('abc', names=['name1'])
131
68
self.assertEqual((42, 13), (offset, length))
133
'Bazaar pack format 1 (introduced in 0.18)\n'
136
def test_add_bytes_record_two_names(self):
137
"""Add a bytes record with two names."""
139
offset, length = self.writer.add_bytes_record(
140
'abc', names=[('name1', ), ('name2', )])
141
self.assertEqual((42, 19), (offset, length))
143
'Bazaar pack format 1 (introduced in 0.18)\n'
144
'B3\nname1\nname2\n\nabc')
146
def test_add_bytes_record_two_names(self):
147
"""Add a bytes record with two names."""
149
offset, length = self.writer.add_bytes_record(
150
'abc', names=[('name1', ), ('name2', )])
151
self.assertEqual((42, 19), (offset, length))
153
'Bazaar pack format 1 (introduced in 0.18)\n'
154
'B3\nname1\nname2\n\nabc')
156
def test_add_bytes_record_two_element_name(self):
157
"""Add a bytes record with a two-element name."""
159
offset, length = self.writer.add_bytes_record(
160
'abc', names=[('name1', 'name2')])
161
self.assertEqual((42, 19), (offset, length))
163
'Bazaar pack format 1 (introduced in 0.18)\n'
164
'B3\nname1\x00name2\n\nabc')
70
'Bazaar pack format 1 (introduced in 0.18)\n'
74
def test_add_bytes_record_two_names(self):
75
"""Add a bytes record with two names."""
77
writer = pack.ContainerWriter(output.write)
79
offset, length = writer.add_bytes_record('abc', names=['name1', 'name2'])
80
self.assertEqual((42, 19), (offset, length))
82
'Bazaar pack format 1 (introduced in 0.18)\n'
83
'B3\nname1\nname2\n\nabc',
166
86
def test_add_second_bytes_record_gets_higher_offset(self):
168
self.writer.add_bytes_record('abc', names=[])
169
offset, length = self.writer.add_bytes_record('abc', names=[])
88
writer = pack.ContainerWriter(output.write)
90
writer.add_bytes_record('abc', names=[])
91
offset, length = writer.add_bytes_record('abc', names=[])
170
92
self.assertEqual((49, 7), (offset, length))
172
94
'Bazaar pack format 1 (introduced in 0.18)\n'
176
99
def test_add_bytes_record_invalid_name(self):
177
100
"""Adding a Bytes record with a name with whitespace in it raises
178
101
InvalidRecordError.
104
writer = pack.ContainerWriter(output.write)
181
106
self.assertRaises(
182
107
errors.InvalidRecordError,
183
self.writer.add_bytes_record, 'abc', names=[('bad name', )])
185
def test_add_bytes_records_add_to_records_written(self):
186
"""Adding a Bytes record increments the records_written counter."""
188
self.writer.add_bytes_record('foo', names=[])
189
self.assertEqual(1, self.writer.records_written)
190
self.writer.add_bytes_record('foo', names=[])
191
self.assertEqual(2, self.writer.records_written)
108
writer.add_bytes_record, 'abc', names=['bad name'])
194
111
class TestContainerReader(tests.TestCase):
195
"""Tests for the ContainerReader.
197
The ContainerReader reads format 1 containers, so these tests explicitly
198
test how it reacts to format 1 data. If a new version of the format is
199
added, then separate tests for that format should be added.
202
113
def get_reader_for(self, bytes):
203
114
stream = StringIO(bytes)
560
458
results.append(f.readline())
561
459
results.append(f.read(4))
562
460
self.assertEqual(['0', '\n', '2\n4\n'], results)
565
class PushParserTestCase(tests.TestCase):
566
"""Base class for TestCases involving ContainerPushParser."""
568
def make_parser_expecting_record_type(self):
569
parser = pack.ContainerPushParser()
570
parser.accept_bytes("Bazaar pack format 1 (introduced in 0.18)\n")
573
def make_parser_expecting_bytes_record(self):
574
parser = pack.ContainerPushParser()
575
parser.accept_bytes("Bazaar pack format 1 (introduced in 0.18)\nB")
578
def assertRecordParsing(self, expected_record, bytes):
579
"""Assert that 'bytes' is parsed as a given bytes record.
581
:param expected_record: A tuple of (names, bytes).
583
parser = self.make_parser_expecting_bytes_record()
584
parser.accept_bytes(bytes)
585
parsed_records = parser.read_pending_records()
586
self.assertEqual([expected_record], parsed_records)
589
class TestContainerPushParser(PushParserTestCase):
590
"""Tests for ContainerPushParser.
592
The ContainerPushParser reads format 1 containers, so these tests
593
explicitly test how it reacts to format 1 data. If a new version of the
594
format is added, then separate tests for that format should be added.
597
def test_construct(self):
598
"""ContainerPushParser can be constructed."""
599
pack.ContainerPushParser()
601
def test_multiple_records_at_once(self):
602
"""If multiple records worth of data are fed to the parser in one
603
string, the parser will correctly parse all the records.
605
(A naive implementation might stop after parsing the first record.)
607
parser = self.make_parser_expecting_record_type()
608
parser.accept_bytes("B5\nname1\n\nbody1B5\nname2\n\nbody2")
610
[([('name1',)], 'body1'), ([('name2',)], 'body2')],
611
parser.read_pending_records())
613
def test_multiple_empty_records_at_once(self):
614
"""If multiple empty records worth of data are fed to the parser in one
615
string, the parser will correctly parse all the records.
617
(A naive implementation might stop after parsing the first empty
618
record, because the buffer size had not changed.)
620
parser = self.make_parser_expecting_record_type()
621
parser.accept_bytes("B0\nname1\n\nB0\nname2\n\n")
623
[([('name1',)], ''), ([('name2',)], '')],
624
parser.read_pending_records())
627
class TestContainerPushParserBytesParsing(PushParserTestCase):
628
"""Tests for reading Bytes records with ContainerPushParser.
630
The ContainerPushParser reads format 1 containers, so these tests
631
explicitly test how it reacts to format 1 data. If a new version of the
632
format is added, then separate tests for that format should be added.
635
def test_record_with_no_name(self):
636
"""Reading a Bytes record with no name returns an empty list of
639
self.assertRecordParsing(([], 'aaaaa'), "5\n\naaaaa")
641
def test_record_with_one_name(self):
642
"""Reading a Bytes record with one name returns a list of just that
645
self.assertRecordParsing(
646
([('name1', )], 'aaaaa'),
649
def test_record_with_two_names(self):
650
"""Reading a Bytes record with two names returns a list of both names.
652
self.assertRecordParsing(
653
([('name1', ), ('name2', )], 'aaaaa'),
654
"5\nname1\nname2\n\naaaaa")
656
def test_record_with_two_part_names(self):
657
"""Reading a Bytes record with a two_part name reads both."""
658
self.assertRecordParsing(
659
([('name1', 'name2')], 'aaaaa'),
660
"5\nname1\x00name2\n\naaaaa")
662
def test_invalid_length(self):
663
"""If the length-prefix is not a number, parsing raises
666
parser = self.make_parser_expecting_bytes_record()
668
errors.InvalidRecordError, parser.accept_bytes, "not a number\n")
670
def test_incomplete_record(self):
671
"""If the bytes seen so far don't form a complete record, then there
672
will be nothing returned by read_pending_records.
674
parser = self.make_parser_expecting_bytes_record()
675
parser.accept_bytes("5\n\nabcd")
676
self.assertEqual([], parser.read_pending_records())
678
def test_accept_nothing(self):
679
"""The edge case of parsing an empty string causes no error."""
680
parser = self.make_parser_expecting_bytes_record()
681
parser.accept_bytes("")
683
def assertInvalidRecord(self, bytes):
684
"""Assert that parsing the given bytes will raise an
687
parser = self.make_parser_expecting_bytes_record()
689
errors.InvalidRecordError, parser.accept_bytes, bytes)
691
def test_read_invalid_name_whitespace(self):
692
"""Names must have no whitespace."""
693
# A name with a space.
694
self.assertInvalidRecord("0\nbad name\n\n")
697
self.assertInvalidRecord("0\nbad\tname\n\n")
699
# A name with a vertical tab.
700
self.assertInvalidRecord("0\nbad\vname\n\n")
702
def test_repeated_read_pending_records(self):
703
"""read_pending_records will not return the same record twice."""
704
parser = self.make_parser_expecting_bytes_record()
705
parser.accept_bytes("6\n\nabcdef")
706
self.assertEqual([([], 'abcdef')], parser.read_pending_records())
707
self.assertEqual([], parser.read_pending_records())