22
22
from bzrlib import pack, errors, tests
25
class TestContainerSerialiser(tests.TestCase):
26
"""Tests for the ContainerSerialiser class."""
28
def test_construct(self):
29
"""Test constructing a ContainerSerialiser."""
30
pack.ContainerSerialiser()
33
serialiser = pack.ContainerSerialiser()
34
self.assertEqual('Bazaar pack format 1 (introduced in 0.18)\n',
38
serialiser = pack.ContainerSerialiser()
39
self.assertEqual('E', serialiser.end())
41
def test_bytes_record_no_name(self):
42
serialiser = pack.ContainerSerialiser()
43
record = serialiser.bytes_record('bytes', [])
44
self.assertEqual('B5\n\nbytes', record)
46
def test_bytes_record_one_name_with_one_part(self):
47
serialiser = pack.ContainerSerialiser()
48
record = serialiser.bytes_record('bytes', [('name',)])
49
self.assertEqual('B5\nname\n\nbytes', record)
51
def test_bytes_record_one_name_with_two_parts(self):
52
serialiser = pack.ContainerSerialiser()
53
record = serialiser.bytes_record('bytes', [('part1', 'part2')])
54
self.assertEqual('B5\npart1\x00part2\n\nbytes', record)
56
def test_bytes_record_two_names(self):
57
serialiser = pack.ContainerSerialiser()
58
record = serialiser.bytes_record('bytes', [('name1',), ('name2',)])
59
self.assertEqual('B5\nname1\nname2\n\nbytes', record)
61
def test_bytes_record_whitespace_in_name_part(self):
62
serialiser = pack.ContainerSerialiser()
64
errors.InvalidRecordError,
65
serialiser.bytes_record, 'bytes', [('bad name',)])
68
25
class TestContainerWriter(tests.TestCase):
71
tests.TestCase.setUp(self)
72
self.output = StringIO()
73
self.writer = pack.ContainerWriter(self.output.write)
75
def assertOutput(self, expected_output):
76
"""Assert that the output of self.writer ContainerWriter is equal to
79
self.assertEqual(expected_output, self.output.getvalue())
81
27
def test_construct(self):
82
28
"""Test constructing a ContainerWriter.
84
This uses None as the output stream to show that the constructor
85
doesn't try to use the output stream.
30
This uses None as the output stream to show that the constructor doesn't
31
try to use the output stream.
87
33
writer = pack.ContainerWriter(None)
89
35
def test_begin(self):
90
36
"""The begin() method writes the container format marker line."""
92
self.assertOutput('Bazaar pack format 1 (introduced in 0.18)\n')
38
writer = pack.ContainerWriter(output.write)
40
self.assertEqual('Bazaar pack format 1 (introduced in 0.18)\n',
94
43
def test_zero_records_written_after_begin(self):
95
44
"""After begin is written, 0 records have been written."""
97
self.assertEqual(0, self.writer.records_written)
46
writer = pack.ContainerWriter(output.write)
48
self.assertEqual(0, writer.records_written)
99
50
def test_end(self):
100
51
"""The end() method writes an End Marker record."""
103
self.assertOutput('Bazaar pack format 1 (introduced in 0.18)\nE')
53
writer = pack.ContainerWriter(output.write)
56
self.assertEqual('Bazaar pack format 1 (introduced in 0.18)\nE',
105
59
def test_empty_end_does_not_add_a_record_to_records_written(self):
106
60
"""The end() method does not count towards the records written."""
109
self.assertEqual(0, self.writer.records_written)
62
writer = pack.ContainerWriter(output.write)
65
self.assertEqual(0, writer.records_written)
111
67
def test_non_empty_end_does_not_add_a_record_to_records_written(self):
112
68
"""The end() method does not count towards the records written."""
114
self.writer.add_bytes_record('foo', names=[])
116
self.assertEqual(1, self.writer.records_written)
70
writer = pack.ContainerWriter(output.write)
72
writer.add_bytes_record('foo', names=[])
74
self.assertEqual(1, writer.records_written)
118
76
def test_add_bytes_record_no_name(self):
119
77
"""Add a bytes record with no name."""
121
offset, length = self.writer.add_bytes_record('abc', names=[])
79
writer = pack.ContainerWriter(output.write)
81
offset, length = writer.add_bytes_record('abc', names=[])
122
82
self.assertEqual((42, 7), (offset, length))
124
'Bazaar pack format 1 (introduced in 0.18)\nB3\n\nabc')
83
self.assertEqual('Bazaar pack format 1 (introduced in 0.18)\nB3\n\nabc',
126
86
def test_add_bytes_record_one_name(self):
127
87
"""Add a bytes record with one name."""
129
offset, length = self.writer.add_bytes_record(
130
'abc', names=[('name1', )])
89
writer = pack.ContainerWriter(output.write)
91
offset, length = writer.add_bytes_record('abc', names=[('name1', )])
131
92
self.assertEqual((42, 13), (offset, length))
133
'Bazaar pack format 1 (introduced in 0.18)\n'
136
def test_add_bytes_record_two_names(self):
137
"""Add a bytes record with two names."""
139
offset, length = self.writer.add_bytes_record(
140
'abc', names=[('name1', ), ('name2', )])
141
self.assertEqual((42, 19), (offset, length))
143
'Bazaar pack format 1 (introduced in 0.18)\n'
144
'B3\nname1\nname2\n\nabc')
146
def test_add_bytes_record_two_names(self):
147
"""Add a bytes record with two names."""
149
offset, length = self.writer.add_bytes_record(
150
'abc', names=[('name1', ), ('name2', )])
151
self.assertEqual((42, 19), (offset, length))
153
'Bazaar pack format 1 (introduced in 0.18)\n'
154
'B3\nname1\nname2\n\nabc')
94
'Bazaar pack format 1 (introduced in 0.18)\n'
98
def test_add_bytes_record_two_names(self):
99
"""Add a bytes record with two names."""
101
writer = pack.ContainerWriter(output.write)
103
offset, length = writer.add_bytes_record('abc', names=[('name1', ), ('name2', )])
104
self.assertEqual((42, 19), (offset, length))
106
'Bazaar pack format 1 (introduced in 0.18)\n'
107
'B3\nname1\nname2\n\nabc',
110
def test_add_bytes_record_two_names(self):
111
"""Add a bytes record with two names."""
113
writer = pack.ContainerWriter(output.write)
115
offset, length = writer.add_bytes_record('abc', names=[('name1', ), ('name2', )])
116
self.assertEqual((42, 19), (offset, length))
118
'Bazaar pack format 1 (introduced in 0.18)\n'
119
'B3\nname1\nname2\n\nabc',
156
122
def test_add_bytes_record_two_element_name(self):
157
123
"""Add a bytes record with a two-element name."""
159
offset, length = self.writer.add_bytes_record(
160
'abc', names=[('name1', 'name2')])
125
writer = pack.ContainerWriter(output.write)
127
offset, length = writer.add_bytes_record('abc', names=[('name1', 'name2')])
161
128
self.assertEqual((42, 19), (offset, length))
163
130
'Bazaar pack format 1 (introduced in 0.18)\n'
164
'B3\nname1\x00name2\n\nabc')
131
'B3\nname1\x00name2\n\nabc',
166
134
def test_add_second_bytes_record_gets_higher_offset(self):
168
self.writer.add_bytes_record('abc', names=[])
169
offset, length = self.writer.add_bytes_record('abc', names=[])
136
writer = pack.ContainerWriter(output.write)
138
writer.add_bytes_record('abc', names=[])
139
offset, length = writer.add_bytes_record('abc', names=[])
170
140
self.assertEqual((49, 7), (offset, length))
172
142
'Bazaar pack format 1 (introduced in 0.18)\n'
176
147
def test_add_bytes_record_invalid_name(self):
177
148
"""Adding a Bytes record with a name with whitespace in it raises
178
149
InvalidRecordError.
152
writer = pack.ContainerWriter(output.write)
181
154
self.assertRaises(
182
155
errors.InvalidRecordError,
183
self.writer.add_bytes_record, 'abc', names=[('bad name', )])
156
writer.add_bytes_record, 'abc', names=[('bad name', )])
185
158
def test_add_bytes_records_add_to_records_written(self):
186
159
"""Adding a Bytes record increments the records_written counter."""
188
self.writer.add_bytes_record('foo', names=[])
189
self.assertEqual(1, self.writer.records_written)
190
self.writer.add_bytes_record('foo', names=[])
191
self.assertEqual(2, self.writer.records_written)
161
writer = pack.ContainerWriter(output.write)
163
writer.add_bytes_record('foo', names=[])
164
self.assertEqual(1, writer.records_written)
165
writer.add_bytes_record('foo', names=[])
166
self.assertEqual(2, writer.records_written)
194
169
class TestContainerReader(tests.TestCase):
195
"""Tests for the ContainerReader.
197
The ContainerReader reads format 1 containers, so these tests explicitly
198
test how it reacts to format 1 data. If a new version of the format is
199
added, then separate tests for that format should be added.
202
171
def get_reader_for(self, bytes):
203
172
stream = StringIO(bytes)
560
523
results.append(f.readline())
561
524
results.append(f.read(4))
562
525
self.assertEqual(['0', '\n', '2\n4\n'], results)
565
class PushParserTestCase(tests.TestCase):
566
"""Base class for TestCases involving ContainerPushParser."""
568
def make_parser_expecting_record_type(self):
569
parser = pack.ContainerPushParser()
570
parser.accept_bytes("Bazaar pack format 1 (introduced in 0.18)\n")
573
def make_parser_expecting_bytes_record(self):
574
parser = pack.ContainerPushParser()
575
parser.accept_bytes("Bazaar pack format 1 (introduced in 0.18)\nB")
578
def assertRecordParsing(self, expected_record, bytes):
579
"""Assert that 'bytes' is parsed as a given bytes record.
581
:param expected_record: A tuple of (names, bytes).
583
parser = self.make_parser_expecting_bytes_record()
584
parser.accept_bytes(bytes)
585
parsed_records = parser.read_pending_records()
586
self.assertEqual([expected_record], parsed_records)
589
class TestContainerPushParser(PushParserTestCase):
590
"""Tests for ContainerPushParser.
592
The ContainerPushParser reads format 1 containers, so these tests
593
explicitly test how it reacts to format 1 data. If a new version of the
594
format is added, then separate tests for that format should be added.
597
def test_construct(self):
598
"""ContainerPushParser can be constructed."""
599
pack.ContainerPushParser()
601
def test_multiple_records_at_once(self):
602
"""If multiple records worth of data are fed to the parser in one
603
string, the parser will correctly parse all the records.
605
(A naive implementation might stop after parsing the first record.)
607
parser = self.make_parser_expecting_record_type()
608
parser.accept_bytes("B5\nname1\n\nbody1B5\nname2\n\nbody2")
610
[([('name1',)], 'body1'), ([('name2',)], 'body2')],
611
parser.read_pending_records())
613
def test_multiple_empty_records_at_once(self):
614
"""If multiple empty records worth of data are fed to the parser in one
615
string, the parser will correctly parse all the records.
617
(A naive implementation might stop after parsing the first empty
618
record, because the buffer size had not changed.)
620
parser = self.make_parser_expecting_record_type()
621
parser.accept_bytes("B0\nname1\n\nB0\nname2\n\n")
623
[([('name1',)], ''), ([('name2',)], '')],
624
parser.read_pending_records())
627
class TestContainerPushParserBytesParsing(PushParserTestCase):
628
"""Tests for reading Bytes records with ContainerPushParser.
630
The ContainerPushParser reads format 1 containers, so these tests
631
explicitly test how it reacts to format 1 data. If a new version of the
632
format is added, then separate tests for that format should be added.
635
def test_record_with_no_name(self):
636
"""Reading a Bytes record with no name returns an empty list of
639
self.assertRecordParsing(([], 'aaaaa'), "5\n\naaaaa")
641
def test_record_with_one_name(self):
642
"""Reading a Bytes record with one name returns a list of just that
645
self.assertRecordParsing(
646
([('name1', )], 'aaaaa'),
649
def test_record_with_two_names(self):
650
"""Reading a Bytes record with two names returns a list of both names.
652
self.assertRecordParsing(
653
([('name1', ), ('name2', )], 'aaaaa'),
654
"5\nname1\nname2\n\naaaaa")
656
def test_record_with_two_part_names(self):
657
"""Reading a Bytes record with a two_part name reads both."""
658
self.assertRecordParsing(
659
([('name1', 'name2')], 'aaaaa'),
660
"5\nname1\x00name2\n\naaaaa")
662
def test_invalid_length(self):
663
"""If the length-prefix is not a number, parsing raises
666
parser = self.make_parser_expecting_bytes_record()
668
errors.InvalidRecordError, parser.accept_bytes, "not a number\n")
670
def test_incomplete_record(self):
671
"""If the bytes seen so far don't form a complete record, then there
672
will be nothing returned by read_pending_records.
674
parser = self.make_parser_expecting_bytes_record()
675
parser.accept_bytes("5\n\nabcd")
676
self.assertEqual([], parser.read_pending_records())
678
def test_accept_nothing(self):
679
"""The edge case of parsing an empty string causes no error."""
680
parser = self.make_parser_expecting_bytes_record()
681
parser.accept_bytes("")
683
def assertInvalidRecord(self, bytes):
684
"""Assert that parsing the given bytes will raise an
687
parser = self.make_parser_expecting_bytes_record()
689
errors.InvalidRecordError, parser.accept_bytes, bytes)
691
def test_read_invalid_name_whitespace(self):
692
"""Names must have no whitespace."""
693
# A name with a space.
694
self.assertInvalidRecord("0\nbad name\n\n")
697
self.assertInvalidRecord("0\nbad\tname\n\n")
699
# A name with a vertical tab.
700
self.assertInvalidRecord("0\nbad\vname\n\n")
702
def test_repeated_read_pending_records(self):
703
"""read_pending_records will not return the same record twice."""
704
parser = self.make_parser_expecting_bytes_record()
705
parser.accept_bytes("6\n\nabcdef")
706
self.assertEqual([([], 'abcdef')], parser.read_pending_records())
707
self.assertEqual([], parser.read_pending_records())