22
22
from bzrlib import pack, errors, tests
25
class TestContainerSerialiser(tests.TestCase):
26
"""Tests for the ContainerSerialiser class."""
28
def test_construct(self):
29
"""Test constructing a ContainerSerialiser."""
30
pack.ContainerSerialiser()
33
serialiser = pack.ContainerSerialiser()
34
self.assertEqual('Bazaar pack format 1 (introduced in 0.18)\n',
38
serialiser = pack.ContainerSerialiser()
39
self.assertEqual('E', serialiser.end())
41
def test_bytes_record_no_name(self):
42
serialiser = pack.ContainerSerialiser()
43
record = serialiser.bytes_record('bytes', [])
44
self.assertEqual('B5\n\nbytes', record)
46
def test_bytes_record_one_name_with_one_part(self):
47
serialiser = pack.ContainerSerialiser()
48
record = serialiser.bytes_record('bytes', [('name',)])
49
self.assertEqual('B5\nname\n\nbytes', record)
51
def test_bytes_record_one_name_with_two_parts(self):
52
serialiser = pack.ContainerSerialiser()
53
record = serialiser.bytes_record('bytes', [('part1', 'part2')])
54
self.assertEqual('B5\npart1\x00part2\n\nbytes', record)
56
def test_bytes_record_two_names(self):
57
serialiser = pack.ContainerSerialiser()
58
record = serialiser.bytes_record('bytes', [('name1',), ('name2',)])
59
self.assertEqual('B5\nname1\nname2\n\nbytes', record)
61
def test_bytes_record_whitespace_in_name_part(self):
62
serialiser = pack.ContainerSerialiser()
64
errors.InvalidRecordError,
65
serialiser.bytes_record, 'bytes', [('bad name',)])
25
68
class TestContainerWriter(tests.TestCase):
71
self.output = StringIO()
72
self.writer = pack.ContainerWriter(self.output.write)
74
def assertOutput(self, expected_output):
75
"""Assert that the output of self.writer ContainerWriter is equal to
78
self.assertEqual(expected_output, self.output.getvalue())
27
80
def test_construct(self):
28
81
"""Test constructing a ContainerWriter.
30
This uses None as the output stream to show that the constructor doesn't
31
try to use the output stream.
83
This uses None as the output stream to show that the constructor
84
doesn't try to use the output stream.
33
86
writer = pack.ContainerWriter(None)
35
88
def test_begin(self):
36
89
"""The begin() method writes the container format marker line."""
38
writer = pack.ContainerWriter(output.write)
40
self.assertEqual('Bazaar pack format 1 (introduced in 0.18)\n',
91
self.assertOutput('Bazaar pack format 1 (introduced in 0.18)\n')
43
93
def test_zero_records_written_after_begin(self):
44
94
"""After begin is written, 0 records have been written."""
46
writer = pack.ContainerWriter(output.write)
48
self.assertEqual(0, writer.records_written)
96
self.assertEqual(0, self.writer.records_written)
50
98
def test_end(self):
51
99
"""The end() method writes an End Marker record."""
53
writer = pack.ContainerWriter(output.write)
56
self.assertEqual('Bazaar pack format 1 (introduced in 0.18)\nE',
102
self.assertOutput('Bazaar pack format 1 (introduced in 0.18)\nE')
59
104
def test_empty_end_does_not_add_a_record_to_records_written(self):
60
105
"""The end() method does not count towards the records written."""
62
writer = pack.ContainerWriter(output.write)
65
self.assertEqual(0, writer.records_written)
108
self.assertEqual(0, self.writer.records_written)
67
110
def test_non_empty_end_does_not_add_a_record_to_records_written(self):
68
111
"""The end() method does not count towards the records written."""
70
writer = pack.ContainerWriter(output.write)
72
writer.add_bytes_record('foo', names=[])
74
self.assertEqual(1, writer.records_written)
113
self.writer.add_bytes_record('foo', names=[])
115
self.assertEqual(1, self.writer.records_written)
76
117
def test_add_bytes_record_no_name(self):
77
118
"""Add a bytes record with no name."""
79
writer = pack.ContainerWriter(output.write)
81
offset, length = writer.add_bytes_record('abc', names=[])
120
offset, length = self.writer.add_bytes_record('abc', names=[])
82
121
self.assertEqual((42, 7), (offset, length))
83
self.assertEqual('Bazaar pack format 1 (introduced in 0.18)\nB3\n\nabc',
123
'Bazaar pack format 1 (introduced in 0.18)\nB3\n\nabc')
86
125
def test_add_bytes_record_one_name(self):
87
126
"""Add a bytes record with one name."""
89
writer = pack.ContainerWriter(output.write)
91
offset, length = writer.add_bytes_record('abc', names=[('name1', )])
128
offset, length = self.writer.add_bytes_record(
129
'abc', names=[('name1', )])
92
130
self.assertEqual((42, 13), (offset, length))
94
'Bazaar pack format 1 (introduced in 0.18)\n'
98
def test_add_bytes_record_two_names(self):
99
"""Add a bytes record with two names."""
101
writer = pack.ContainerWriter(output.write)
103
offset, length = writer.add_bytes_record('abc', names=[('name1', ), ('name2', )])
104
self.assertEqual((42, 19), (offset, length))
106
'Bazaar pack format 1 (introduced in 0.18)\n'
107
'B3\nname1\nname2\n\nabc',
110
def test_add_bytes_record_two_names(self):
111
"""Add a bytes record with two names."""
113
writer = pack.ContainerWriter(output.write)
115
offset, length = writer.add_bytes_record('abc', names=[('name1', ), ('name2', )])
116
self.assertEqual((42, 19), (offset, length))
118
'Bazaar pack format 1 (introduced in 0.18)\n'
119
'B3\nname1\nname2\n\nabc',
132
'Bazaar pack format 1 (introduced in 0.18)\n'
135
def test_add_bytes_record_two_names(self):
136
"""Add a bytes record with two names."""
138
offset, length = self.writer.add_bytes_record(
139
'abc', names=[('name1', ), ('name2', )])
140
self.assertEqual((42, 19), (offset, length))
142
'Bazaar pack format 1 (introduced in 0.18)\n'
143
'B3\nname1\nname2\n\nabc')
145
def test_add_bytes_record_two_names(self):
146
"""Add a bytes record with two names."""
148
offset, length = self.writer.add_bytes_record(
149
'abc', names=[('name1', ), ('name2', )])
150
self.assertEqual((42, 19), (offset, length))
152
'Bazaar pack format 1 (introduced in 0.18)\n'
153
'B3\nname1\nname2\n\nabc')
122
155
def test_add_bytes_record_two_element_name(self):
123
156
"""Add a bytes record with a two-element name."""
125
writer = pack.ContainerWriter(output.write)
127
offset, length = writer.add_bytes_record('abc', names=[('name1', 'name2')])
158
offset, length = self.writer.add_bytes_record(
159
'abc', names=[('name1', 'name2')])
128
160
self.assertEqual((42, 19), (offset, length))
130
162
'Bazaar pack format 1 (introduced in 0.18)\n'
131
'B3\nname1\x00name2\n\nabc',
163
'B3\nname1\x00name2\n\nabc')
134
165
def test_add_second_bytes_record_gets_higher_offset(self):
136
writer = pack.ContainerWriter(output.write)
138
writer.add_bytes_record('abc', names=[])
139
offset, length = writer.add_bytes_record('abc', names=[])
167
self.writer.add_bytes_record('abc', names=[])
168
offset, length = self.writer.add_bytes_record('abc', names=[])
140
169
self.assertEqual((49, 7), (offset, length))
142
171
'Bazaar pack format 1 (introduced in 0.18)\n'
147
175
def test_add_bytes_record_invalid_name(self):
148
176
"""Adding a Bytes record with a name with whitespace in it raises
149
177
InvalidRecordError.
152
writer = pack.ContainerWriter(output.write)
154
180
self.assertRaises(
155
181
errors.InvalidRecordError,
156
writer.add_bytes_record, 'abc', names=[('bad name', )])
182
self.writer.add_bytes_record, 'abc', names=[('bad name', )])
158
184
def test_add_bytes_records_add_to_records_written(self):
159
185
"""Adding a Bytes record increments the records_written counter."""
161
writer = pack.ContainerWriter(output.write)
163
writer.add_bytes_record('foo', names=[])
164
self.assertEqual(1, writer.records_written)
165
writer.add_bytes_record('foo', names=[])
166
self.assertEqual(2, writer.records_written)
187
self.writer.add_bytes_record('foo', names=[])
188
self.assertEqual(1, self.writer.records_written)
189
self.writer.add_bytes_record('foo', names=[])
190
self.assertEqual(2, self.writer.records_written)
169
193
class TestContainerReader(tests.TestCase):
194
"""Tests for the ContainerReader.
196
The ContainerReader reads format 1 containers, so these tests explicitly
197
test how it reacts to format 1 data. If a new version of the format is
198
added, then separate tests for that format should be added.
171
201
def get_reader_for(self, bytes):
172
202
stream = StringIO(bytes)
523
559
results.append(f.readline())
524
560
results.append(f.read(4))
525
561
self.assertEqual(['0', '\n', '2\n4\n'], results)
564
class PushParserTestCase(tests.TestCase):
565
"""Base class for TestCases involving ContainerPushParser."""
567
def make_parser_expecting_record_type(self):
568
parser = pack.ContainerPushParser()
569
parser.accept_bytes("Bazaar pack format 1 (introduced in 0.18)\n")
572
def make_parser_expecting_bytes_record(self):
573
parser = pack.ContainerPushParser()
574
parser.accept_bytes("Bazaar pack format 1 (introduced in 0.18)\nB")
577
def assertRecordParsing(self, expected_record, bytes):
578
"""Assert that 'bytes' is parsed as a given bytes record.
580
:param expected_record: A tuple of (names, bytes).
582
parser = self.make_parser_expecting_bytes_record()
583
parser.accept_bytes(bytes)
584
parsed_records = parser.read_pending_records()
585
self.assertEqual([expected_record], parsed_records)
588
class TestContainerPushParser(PushParserTestCase):
589
"""Tests for ContainerPushParser.
591
The ContainerPushParser reads format 1 containers, so these tests
592
explicitly test how it reacts to format 1 data. If a new version of the
593
format is added, then separate tests for that format should be added.
596
def test_construct(self):
597
"""ContainerPushParser can be constructed."""
598
pack.ContainerPushParser()
600
def test_multiple_records_at_once(self):
601
"""If multiple records worth of data are fed to the parser in one
602
string, the parser will correctly parse all the records.
604
(A naive implementation might stop after parsing the first record.)
606
parser = self.make_parser_expecting_record_type()
607
parser.accept_bytes("B5\nname1\n\nbody1B5\nname2\n\nbody2")
609
[([('name1',)], 'body1'), ([('name2',)], 'body2')],
610
parser.read_pending_records())
613
class TestContainerPushParserBytesParsing(PushParserTestCase):
614
"""Tests for reading Bytes records with ContainerPushParser.
616
The ContainerPushParser reads format 1 containers, so these tests
617
explicitly test how it reacts to format 1 data. If a new version of the
618
format is added, then separate tests for that format should be added.
621
def test_record_with_no_name(self):
622
"""Reading a Bytes record with no name returns an empty list of
625
self.assertRecordParsing(([], 'aaaaa'), "5\n\naaaaa")
627
def test_record_with_one_name(self):
628
"""Reading a Bytes record with one name returns a list of just that
631
self.assertRecordParsing(
632
([('name1', )], 'aaaaa'),
635
def test_record_with_two_names(self):
636
"""Reading a Bytes record with two names returns a list of both names.
638
self.assertRecordParsing(
639
([('name1', ), ('name2', )], 'aaaaa'),
640
"5\nname1\nname2\n\naaaaa")
642
def test_record_with_two_part_names(self):
643
"""Reading a Bytes record with a two_part name reads both."""
644
self.assertRecordParsing(
645
([('name1', 'name2')], 'aaaaa'),
646
"5\nname1\x00name2\n\naaaaa")
648
def test_invalid_length(self):
649
"""If the length-prefix is not a number, parsing raises
652
parser = self.make_parser_expecting_bytes_record()
654
errors.InvalidRecordError, parser.accept_bytes, "not a number\n")
656
def test_incomplete_record(self):
657
"""If the bytes seen so far don't form a complete record, then there
658
will be nothing returned by read_pending_records.
660
parser = self.make_parser_expecting_bytes_record()
661
parser.accept_bytes("5\n\nabcd")
662
self.assertEqual([], parser.read_pending_records())
664
def test_accept_nothing(self):
665
"""The edge case of parsing an empty string causes no error."""
666
parser = self.make_parser_expecting_bytes_record()
667
parser.accept_bytes("")
669
def assertInvalidRecord(self, bytes):
670
"""Assert that parsing the given bytes will raise an
673
parser = self.make_parser_expecting_bytes_record()
675
errors.InvalidRecordError, parser.accept_bytes, bytes)
677
def test_read_invalid_name_whitespace(self):
678
"""Names must have no whitespace."""
679
# A name with a space.
680
self.assertInvalidRecord("0\nbad name\n\n")
683
self.assertInvalidRecord("0\nbad\tname\n\n")
685
# A name with a vertical tab.
686
self.assertInvalidRecord("0\nbad\vname\n\n")
688
def test_repeated_read_pending_records(self):
689
"""read_pending_records will not return the same record twice."""
690
parser = self.make_parser_expecting_bytes_record()
691
parser.accept_bytes("6\n\nabcdef")
692
self.assertEqual([([], 'abcdef')], parser.read_pending_records())
693
self.assertEqual([], parser.read_pending_records())