~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_pack.py

Add validate method to ContainerReader and BytesRecordReader.

Show diffs side-by-side

added added

removed removed

Lines of Context:
87
87
 
88
88
class TestContainerReader(tests.TestCase):
89
89
 
 
90
    def get_reader_for(self, bytes):
 
91
        stream = StringIO(bytes)
 
92
        reader = pack.ContainerReader(stream.read)
 
93
        return reader
 
94
 
90
95
    def test_construct(self):
91
96
        """Test constructing a ContainerReader.
92
97
        
97
102
 
98
103
    def test_empty_container(self):
99
104
        """Read an empty container."""
100
 
        input = StringIO("Bazaar pack format 1\nE")
101
 
        reader = pack.ContainerReader(input.read)
 
105
        reader = self.get_reader_for("Bazaar pack format 1\nE")
102
106
        self.assertEqual([], list(reader.iter_records()))
103
107
 
104
108
    def test_unknown_format(self):
105
109
        """Unrecognised container formats raise UnknownContainerFormatError."""
106
 
        input = StringIO("unknown format\n")
107
 
        reader = pack.ContainerReader(input.read)
 
110
        reader = self.get_reader_for("unknown format\n")
108
111
        self.assertRaises(
109
112
            errors.UnknownContainerFormatError, reader.iter_records)
110
113
 
112
115
        """Containers that don't end with an End Marker record should cause
113
116
        UnexpectedEndOfContainerError to be raised.
114
117
        """
115
 
        input = StringIO("Bazaar pack format 1\n")
116
 
        reader = pack.ContainerReader(input.read)
 
118
        reader = self.get_reader_for("Bazaar pack format 1\n")
117
119
        iterator = reader.iter_records()
118
120
        self.assertRaises(
119
121
            errors.UnexpectedEndOfContainerError, iterator.next)
120
122
 
121
123
    def test_unknown_record_type(self):
122
124
        """Unknown record types cause UnknownRecordTypeError to be raised."""
123
 
        input = StringIO("Bazaar pack format 1\nX")
124
 
        reader = pack.ContainerReader(input.read)
 
125
        reader = self.get_reader_for("Bazaar pack format 1\nX")
125
126
        iterator = reader.iter_records()
126
127
        self.assertRaises(
127
128
            errors.UnknownRecordTypeError, iterator.next)
133
134
        TestBytesRecordReader.  This test is here to ensure that
134
135
        ContainerReader's integration with BytesRecordReader is working.
135
136
        """
136
 
        input = StringIO("Bazaar pack format 1\nB5\n\naaaaaE")
137
 
        reader = pack.ContainerReader(input.read)
 
137
        reader = self.get_reader_for("Bazaar pack format 1\nB5\n\naaaaaE")
138
138
        expected_records = [([], 'aaaaa')]
139
139
        self.assertEqual(expected_records, list(reader.iter_records()))
140
140
 
 
141
    def test_validate_empty_container(self):
 
142
        reader = self.get_reader_for("Bazaar pack format 1\nE")
 
143
        # No exception raised
 
144
        reader.validate()
 
145
 
 
146
    def test_validate_non_empty_valid_container(self):
 
147
        reader = self.get_reader_for("Bazaar pack format 1\nB3\nname\n\nabcE")
 
148
        # No exception raised
 
149
        reader.validate()
 
150
 
 
151
    def test_validate_bad_format(self):
 
152
        inputs = ["", "x", "Bazaar pack format 1", "bad\n"]
 
153
        for input in inputs:
 
154
            reader = self.get_reader_for(input)
 
155
            self.assertRaises(
 
156
                (errors.UnexpectedEndOfContainerError,
 
157
                 errors.UnknownContainerFormatError),
 
158
                reader.validate)
 
159
 
 
160
    def test_validate_bad_record_marker(self):
 
161
        reader = self.get_reader_for("Bazaar pack format 1\nX")
 
162
        self.assertRaises(errors.UnknownRecordTypeError, reader.validate)
 
163
 
 
164
    def test_validate_data_after_end_marker(self):
 
165
        reader = self.get_reader_for("Bazaar pack format 1\nEcrud")
 
166
        self.assertRaises(
 
167
            errors.ContainerHasExcessDataError, reader.validate)
 
168
 
 
169
    def test_validate_no_end_marker(self):
 
170
        reader = self.get_reader_for("Bazaar pack format 1\n")
 
171
        self.assertRaises(
 
172
            errors.UnexpectedEndOfContainerError, reader.validate)
 
173
 
141
174
 
142
175
class TestBytesRecordReader(tests.TestCase):
143
176
    """Tests for parsing Bytes records with BytesRecordReader."""
144
177
 
 
178
    def get_reader_for(self, bytes):
 
179
        stream = StringIO(bytes)
 
180
        reader = pack.BytesRecordReader(stream.read)
 
181
        return reader
 
182
 
145
183
    def test_record_with_no_name(self):
146
184
        """Reading a Bytes record with no name returns an empty list of
147
185
        names.
148
186
        """
149
 
        input = StringIO("5\n\naaaaa")
150
 
        reader = pack.BytesRecordReader(input.read)
 
187
        reader = self.get_reader_for("5\n\naaaaa")
151
188
        names, bytes = reader.read()
152
189
        self.assertEqual([], names)
153
190
        self.assertEqual('aaaaa', bytes)
156
193
        """Reading a Bytes record with one name returns a list of just that
157
194
        name.
158
195
        """
159
 
        input = StringIO("5\nname1\n\naaaaa")
160
 
        reader = pack.BytesRecordReader(input.read)
 
196
        reader = self.get_reader_for("5\nname1\n\naaaaa")
161
197
        names, bytes = reader.read()
162
198
        self.assertEqual(['name1'], names)
163
199
        self.assertEqual('aaaaa', bytes)
165
201
    def test_record_with_two_names(self):
166
202
        """Reading a Bytes record with two names returns a list of both names.
167
203
        """
168
 
        input = StringIO("5\nname1\nname2\n\naaaaa")
169
 
        reader = pack.BytesRecordReader(input.read)
 
204
        reader = self.get_reader_for("5\nname1\nname2\n\naaaaa")
170
205
        names, bytes = reader.read()
171
206
        self.assertEqual(['name1', 'name2'], names)
172
207
        self.assertEqual('aaaaa', bytes)
175
210
        """If the length-prefix is not a number, parsing raises
176
211
        InvalidRecordError.
177
212
        """
178
 
        input = StringIO("not a number\n")
179
 
        reader = pack.BytesRecordReader(input.read)
 
213
        reader = self.get_reader_for("not a number\n")
180
214
        self.assertRaises(errors.InvalidRecordError, reader.read)
181
215
 
182
216
    def test_early_eof(self):
191
225
        """
192
226
        complete_record = "6\nname\n\nabcdef"
193
227
        for count in range(0, len(complete_record)):
194
 
            input = StringIO(complete_record[:count])
195
 
            reader = pack.BytesRecordReader(input.read)
 
228
            reader = self.get_reader_for(complete_record[:count])
196
229
            # We don't use assertRaises to make diagnosing failures easier.
197
230
            try:
198
231
                reader.read()
205
238
 
206
239
    def test_initial_eof(self):
207
240
        """EOF before any bytes read at all."""
208
 
        input = StringIO("")
209
 
        reader = pack.BytesRecordReader(input.read)
 
241
        reader = self.get_reader_for("")
210
242
        self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
211
243
 
212
244
    def test_eof_after_length(self):
213
245
        """EOF after reading the length and before reading name(s)."""
214
 
        input = StringIO("123\n")
215
 
        reader = pack.BytesRecordReader(input.read)
 
246
        reader = self.get_reader_for("123\n")
216
247
        self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
217
248
 
218
249
    def test_eof_during_name(self):
219
250
        """EOF during reading a name."""
220
 
        input = StringIO("123\nname")
221
 
        reader = pack.BytesRecordReader(input.read)
 
251
        reader = self.get_reader_for("123\nname")
222
252
        self.assertRaises(errors.UnexpectedEndOfContainerError, reader.read)
223
253
 
224
254
    def test_invalid_name_whitespace(self):
225
255
        """Names must have no whitespace."""
226
256
        # A name with a space.
227
 
        input = StringIO("0\nbad name\n\n")
228
 
        reader = pack.BytesRecordReader(input.read)
 
257
        reader = self.get_reader_for("0\nbad name\n\n")
229
258
        self.assertRaises(errors.InvalidRecordError, reader.read)
230
259
 
231
260
        # A name with a tab.
232
 
        input = StringIO("0\nbad\tname\n\n")
233
 
        reader = pack.BytesRecordReader(input.read)
 
261
        reader = self.get_reader_for("0\nbad\tname\n\n")
234
262
        self.assertRaises(errors.InvalidRecordError, reader.read)
235
263
 
236
264
        # A name with a vertical tab.
237
 
        input = StringIO("0\nbad\vname\n\n")
238
 
        reader = pack.BytesRecordReader(input.read)
 
265
        reader = self.get_reader_for("0\nbad\vname\n\n")
239
266
        self.assertRaises(errors.InvalidRecordError, reader.read)
240
267
 
241
 
                
 
268
    def test_validate_whitespace_in_name(self):
 
269
        reader = self.get_reader_for("0\nbad name\n\nE")
 
270
        self.assertRaises(errors.InvalidRecordError, reader.validate)
 
271
 
 
272
    def test_validate_interrupted_prelude(self):
 
273
        reader = self.get_reader_for("")
 
274
        self.assertRaises(
 
275
            errors.UnexpectedEndOfContainerError, reader.validate)
 
276
 
 
277
    def test_validate_interrupted_body(self):
 
278
        reader = self.get_reader_for("1\n\n")
 
279
        self.assertRaises(
 
280
            errors.UnexpectedEndOfContainerError, reader.validate)
 
281
 
 
282
    def test_validate_unparseable_length(self):
 
283
        reader = self.get_reader_for("\n\n")
 
284
        self.assertRaises(
 
285
            errors.InvalidRecordError, reader.validate)
 
286