22
22
from bzrlib import pack, errors, tests
25
class TestContainerSerialiser(tests.TestCase):
26
"""Tests for the ContainerSerialiser class."""
28
def test_construct(self):
29
"""Test constructing a ContainerSerialiser."""
30
pack.ContainerSerialiser()
33
serialiser = pack.ContainerSerialiser()
34
self.assertEqual('Bazaar pack format 1 (introduced in 0.18)\n',
38
serialiser = pack.ContainerSerialiser()
39
self.assertEqual('E', serialiser.end())
41
def test_bytes_record_no_name(self):
42
serialiser = pack.ContainerSerialiser()
43
record = serialiser.bytes_record('bytes', [])
44
self.assertEqual('B5\n\nbytes', record)
46
def test_bytes_record_one_name_with_one_part(self):
47
serialiser = pack.ContainerSerialiser()
48
record = serialiser.bytes_record('bytes', [('name',)])
49
self.assertEqual('B5\nname\n\nbytes', record)
51
def test_bytes_record_one_name_with_two_parts(self):
52
serialiser = pack.ContainerSerialiser()
53
record = serialiser.bytes_record('bytes', [('part1', 'part2')])
54
self.assertEqual('B5\npart1\x00part2\n\nbytes', record)
56
def test_bytes_record_two_names(self):
57
serialiser = pack.ContainerSerialiser()
58
record = serialiser.bytes_record('bytes', [('name1',), ('name2',)])
59
self.assertEqual('B5\nname1\nname2\n\nbytes', record)
61
def test_bytes_record_whitespace_in_name_part(self):
62
serialiser = pack.ContainerSerialiser()
64
errors.InvalidRecordError,
65
serialiser.bytes_record, 'bytes', [('bad name',)])
68
25
class TestContainerWriter(tests.TestCase):
71
tests.TestCase.setUp(self)
72
self.output = StringIO()
73
self.writer = pack.ContainerWriter(self.output.write)
75
def assertOutput(self, expected_output):
76
"""Assert that the output of self.writer ContainerWriter is equal to
79
self.assertEqual(expected_output, self.output.getvalue())
81
27
def test_construct(self):
82
28
"""Test constructing a ContainerWriter.
84
This uses None as the output stream to show that the constructor
85
doesn't try to use the output stream.
30
This uses None as the output stream to show that the constructor doesn't
31
try to use the output stream.
87
33
writer = pack.ContainerWriter(None)
89
35
def test_begin(self):
90
36
"""The begin() method writes the container format marker line."""
92
self.assertOutput('Bazaar pack format 1 (introduced in 0.18)\n')
94
def test_zero_records_written_after_begin(self):
95
"""After begin is written, 0 records have been written."""
97
self.assertEqual(0, self.writer.records_written)
38
writer = pack.ContainerWriter(output.write)
40
self.assertEqual('Bazaar pack format 1 (introduced in 0.18)\n',
99
43
def test_end(self):
100
44
"""The end() method writes an End Marker record."""
103
self.assertOutput('Bazaar pack format 1 (introduced in 0.18)\nE')
105
def test_empty_end_does_not_add_a_record_to_records_written(self):
106
"""The end() method does not count towards the records written."""
109
self.assertEqual(0, self.writer.records_written)
111
def test_non_empty_end_does_not_add_a_record_to_records_written(self):
112
"""The end() method does not count towards the records written."""
114
self.writer.add_bytes_record('foo', names=[])
116
self.assertEqual(1, self.writer.records_written)
46
writer = pack.ContainerWriter(output.write)
49
self.assertEqual('Bazaar pack format 1 (introduced in 0.18)\nE',
118
52
def test_add_bytes_record_no_name(self):
119
53
"""Add a bytes record with no name."""
121
offset, length = self.writer.add_bytes_record('abc', names=[])
55
writer = pack.ContainerWriter(output.write)
57
offset, length = writer.add_bytes_record('abc', names=[])
122
58
self.assertEqual((42, 7), (offset, length))
124
'Bazaar pack format 1 (introduced in 0.18)\nB3\n\nabc')
59
self.assertEqual('Bazaar pack format 1 (introduced in 0.18)\nB3\n\nabc',
126
62
def test_add_bytes_record_one_name(self):
127
63
"""Add a bytes record with one name."""
129
offset, length = self.writer.add_bytes_record(
130
'abc', names=[('name1', )])
65
writer = pack.ContainerWriter(output.write)
67
offset, length = writer.add_bytes_record('abc', names=[('name1', )])
131
68
self.assertEqual((42, 13), (offset, length))
133
'Bazaar pack format 1 (introduced in 0.18)\n'
136
def test_add_bytes_record_two_names(self):
137
"""Add a bytes record with two names."""
139
offset, length = self.writer.add_bytes_record(
140
'abc', names=[('name1', ), ('name2', )])
141
self.assertEqual((42, 19), (offset, length))
143
'Bazaar pack format 1 (introduced in 0.18)\n'
144
'B3\nname1\nname2\n\nabc')
146
def test_add_bytes_record_two_names(self):
147
"""Add a bytes record with two names."""
149
offset, length = self.writer.add_bytes_record(
150
'abc', names=[('name1', ), ('name2', )])
151
self.assertEqual((42, 19), (offset, length))
153
'Bazaar pack format 1 (introduced in 0.18)\n'
154
'B3\nname1\nname2\n\nabc')
70
'Bazaar pack format 1 (introduced in 0.18)\n'
74
def test_add_bytes_record_two_names(self):
75
"""Add a bytes record with two names."""
77
writer = pack.ContainerWriter(output.write)
79
offset, length = writer.add_bytes_record('abc', names=[('name1', ), ('name2', )])
80
self.assertEqual((42, 19), (offset, length))
82
'Bazaar pack format 1 (introduced in 0.18)\n'
83
'B3\nname1\nname2\n\nabc',
86
def test_add_bytes_record_two_names(self):
87
"""Add a bytes record with two names."""
89
writer = pack.ContainerWriter(output.write)
91
offset, length = writer.add_bytes_record('abc', names=[('name1', ), ('name2', )])
92
self.assertEqual((42, 19), (offset, length))
94
'Bazaar pack format 1 (introduced in 0.18)\n'
95
'B3\nname1\nname2\n\nabc',
156
98
def test_add_bytes_record_two_element_name(self):
157
99
"""Add a bytes record with a two-element name."""
159
offset, length = self.writer.add_bytes_record(
160
'abc', names=[('name1', 'name2')])
101
writer = pack.ContainerWriter(output.write)
103
offset, length = writer.add_bytes_record('abc', names=[('name1', 'name2')])
161
104
self.assertEqual((42, 19), (offset, length))
163
106
'Bazaar pack format 1 (introduced in 0.18)\n'
164
'B3\nname1\x00name2\n\nabc')
107
'B3\nname1\x00name2\n\nabc',
166
110
def test_add_second_bytes_record_gets_higher_offset(self):
168
self.writer.add_bytes_record('abc', names=[])
169
offset, length = self.writer.add_bytes_record('abc', names=[])
112
writer = pack.ContainerWriter(output.write)
114
writer.add_bytes_record('abc', names=[])
115
offset, length = writer.add_bytes_record('abc', names=[])
170
116
self.assertEqual((49, 7), (offset, length))
172
118
'Bazaar pack format 1 (introduced in 0.18)\n'
176
123
def test_add_bytes_record_invalid_name(self):
177
124
"""Adding a Bytes record with a name with whitespace in it raises
178
125
InvalidRecordError.
128
writer = pack.ContainerWriter(output.write)
181
130
self.assertRaises(
182
131
errors.InvalidRecordError,
183
self.writer.add_bytes_record, 'abc', names=[('bad name', )])
185
def test_add_bytes_records_add_to_records_written(self):
186
"""Adding a Bytes record increments the records_written counter."""
188
self.writer.add_bytes_record('foo', names=[])
189
self.assertEqual(1, self.writer.records_written)
190
self.writer.add_bytes_record('foo', names=[])
191
self.assertEqual(2, self.writer.records_written)
132
writer.add_bytes_record, 'abc', names=[('bad name', )])
194
135
class TestContainerReader(tests.TestCase):
195
"""Tests for the ContainerReader.
197
The ContainerReader reads format 1 containers, so these tests explicitly
198
test how it reacts to format 1 data. If a new version of the format is
199
added, then separate tests for that format should be added.
202
137
def get_reader_for(self, bytes):
203
138
stream = StringIO(bytes)
560
489
results.append(f.readline())
561
490
results.append(f.read(4))
562
491
self.assertEqual(['0', '\n', '2\n4\n'], results)
565
class PushParserTestCase(tests.TestCase):
566
"""Base class for TestCases involving ContainerPushParser."""
568
def make_parser_expecting_record_type(self):
569
parser = pack.ContainerPushParser()
570
parser.accept_bytes("Bazaar pack format 1 (introduced in 0.18)\n")
573
def make_parser_expecting_bytes_record(self):
574
parser = pack.ContainerPushParser()
575
parser.accept_bytes("Bazaar pack format 1 (introduced in 0.18)\nB")
578
def assertRecordParsing(self, expected_record, bytes):
579
"""Assert that 'bytes' is parsed as a given bytes record.
581
:param expected_record: A tuple of (names, bytes).
583
parser = self.make_parser_expecting_bytes_record()
584
parser.accept_bytes(bytes)
585
parsed_records = parser.read_pending_records()
586
self.assertEqual([expected_record], parsed_records)
589
class TestContainerPushParser(PushParserTestCase):
590
"""Tests for ContainerPushParser.
592
The ContainerPushParser reads format 1 containers, so these tests
593
explicitly test how it reacts to format 1 data. If a new version of the
594
format is added, then separate tests for that format should be added.
597
def test_construct(self):
598
"""ContainerPushParser can be constructed."""
599
pack.ContainerPushParser()
601
def test_multiple_records_at_once(self):
602
"""If multiple records worth of data are fed to the parser in one
603
string, the parser will correctly parse all the records.
605
(A naive implementation might stop after parsing the first record.)
607
parser = self.make_parser_expecting_record_type()
608
parser.accept_bytes("B5\nname1\n\nbody1B5\nname2\n\nbody2")
610
[([('name1',)], 'body1'), ([('name2',)], 'body2')],
611
parser.read_pending_records())
614
class TestContainerPushParserBytesParsing(PushParserTestCase):
615
"""Tests for reading Bytes records with ContainerPushParser.
617
The ContainerPushParser reads format 1 containers, so these tests
618
explicitly test how it reacts to format 1 data. If a new version of the
619
format is added, then separate tests for that format should be added.
622
def test_record_with_no_name(self):
623
"""Reading a Bytes record with no name returns an empty list of
626
self.assertRecordParsing(([], 'aaaaa'), "5\n\naaaaa")
628
def test_record_with_one_name(self):
629
"""Reading a Bytes record with one name returns a list of just that
632
self.assertRecordParsing(
633
([('name1', )], 'aaaaa'),
636
def test_record_with_two_names(self):
637
"""Reading a Bytes record with two names returns a list of both names.
639
self.assertRecordParsing(
640
([('name1', ), ('name2', )], 'aaaaa'),
641
"5\nname1\nname2\n\naaaaa")
643
def test_record_with_two_part_names(self):
644
"""Reading a Bytes record with a two_part name reads both."""
645
self.assertRecordParsing(
646
([('name1', 'name2')], 'aaaaa'),
647
"5\nname1\x00name2\n\naaaaa")
649
def test_invalid_length(self):
650
"""If the length-prefix is not a number, parsing raises
653
parser = self.make_parser_expecting_bytes_record()
655
errors.InvalidRecordError, parser.accept_bytes, "not a number\n")
657
def test_incomplete_record(self):
658
"""If the bytes seen so far don't form a complete record, then there
659
will be nothing returned by read_pending_records.
661
parser = self.make_parser_expecting_bytes_record()
662
parser.accept_bytes("5\n\nabcd")
663
self.assertEqual([], parser.read_pending_records())
665
def test_accept_nothing(self):
666
"""The edge case of parsing an empty string causes no error."""
667
parser = self.make_parser_expecting_bytes_record()
668
parser.accept_bytes("")
670
def assertInvalidRecord(self, bytes):
671
"""Assert that parsing the given bytes will raise an
674
parser = self.make_parser_expecting_bytes_record()
676
errors.InvalidRecordError, parser.accept_bytes, bytes)
678
def test_read_invalid_name_whitespace(self):
679
"""Names must have no whitespace."""
680
# A name with a space.
681
self.assertInvalidRecord("0\nbad name\n\n")
684
self.assertInvalidRecord("0\nbad\tname\n\n")
686
# A name with a vertical tab.
687
self.assertInvalidRecord("0\nbad\vname\n\n")
689
def test_repeated_read_pending_records(self):
690
"""read_pending_records will not return the same record twice."""
691
parser = self.make_parser_expecting_bytes_record()
692
parser.accept_bytes("6\n\nabcdef")
693
self.assertEqual([([], 'abcdef')], parser.read_pending_records())
694
self.assertEqual([], parser.read_pending_records())