~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_rio.py

Add a test environment for InterRepository objects, and remove the fetch corner case tests from test_repository.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007, 2009, 2010 Canonical Ltd
 
1
# Copyright (C) 2005 by Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
"""Tests for rio serialization
18
18
 
22
22
but this depends on the transport.
23
23
"""
24
24
 
25
 
import cStringIO
26
25
import os
27
 
import re
28
26
import sys
29
27
from tempfile import TemporaryFile
30
28
 
31
 
from bzrlib import (
32
 
    rio,
33
 
    )
34
29
from bzrlib.tests import TestCaseInTempDir, TestCase
35
 
from bzrlib.rio import (RioWriter, Stanza, read_stanza, read_stanzas, rio_file,
36
 
                        RioReader)
 
30
from bzrlib.rio import RioWriter, Stanza, read_stanza, read_stanzas
37
31
 
38
32
 
39
33
class TestRio(TestCase):
54
48
        # these aren't enforced at construction time
55
49
        ## self.assertRaises(ValueError,
56
50
        ##        Stanza, complex=42 + 3j)
57
 
        ## self.assertRaises(ValueError,
 
51
        ## self.assertRaises(ValueError, 
58
52
        ##        Stanza, several=range(10))
59
53
 
60
54
    def test_empty_value(self):
70
64
                ['name: fred\n',
71
65
                 'number: 42\n'])
72
66
 
73
 
    def test_as_dict(self):
74
 
        """Convert rio Stanza to dictionary"""
75
 
        s = Stanza(number='42', name='fred')
76
 
        sd = s.as_dict()
77
 
        self.assertEquals(sd, dict(number='42', name='fred'))
78
 
 
79
67
    def test_to_file(self):
80
68
        """Write rio to file"""
81
69
        tmpf = TemporaryFile()
123
111
    def test_repeated_field(self):
124
112
        """Repeated field in rio"""
125
113
        s = Stanza()
126
 
        for k, v in [('a', '10'), ('b', '20'), ('a', '100'), ('b', '200'),
 
114
        for k, v in [('a', '10'), ('b', '20'), ('a', '100'), ('b', '200'), 
127
115
                     ('a', '1000'), ('b', '2000')]:
128
116
            s.add(k, v)
129
117
        s2 = read_stanza(s.to_lines())
141
129
    def test_blank_line(self):
142
130
        s = Stanza(none='', one='\n', two='\n\n')
143
131
        self.assertEqualDiff(s.to_string(), """\
144
 
none:\x20
145
 
one:\x20
 
132
none: 
 
133
one: 
146
134
\t
147
 
two:\x20
 
135
two: 
148
136
\t
149
137
\t
150
138
""")
154
142
    def test_whitespace_value(self):
155
143
        s = Stanza(space=' ', tabs='\t\t\t', combo='\n\t\t\n')
156
144
        self.assertEqualDiff(s.to_string(), """\
157
 
combo:\x20
 
145
combo: 
158
146
\t\t\t
159
147
\t
160
 
space:\x20\x20
 
148
space:  
161
149
tabs: \t\t\t
162
150
""")
163
151
        s2 = read_stanza(s.to_lines())
164
152
        self.assertEquals(s, s2)
165
 
        self.rio_file_stanzas([s])
166
153
 
167
154
    def test_quoted(self):
168
155
        """rio quoted string cases"""
169
 
        s = Stanza(q1='"hello"',
170
 
                   q2=' "for',
 
156
        s = Stanza(q1='"hello"', 
 
157
                   q2=' "for', 
171
158
                   q3='\n\n"for"\n',
172
159
                   q4='for\n"\nfor',
173
160
                   q5='\n',
174
 
                   q6='"',
 
161
                   q6='"', 
175
162
                   q7='""',
176
163
                   q8='\\',
177
164
                   q9='\\"\\"',
178
165
                   )
179
166
        s2 = read_stanza(s.to_lines())
180
167
        self.assertEquals(s, s2)
181
 
        # apparent bug in read_stanza
182
 
        # s3 = read_stanza(self.stanzas_to_str([s]))
183
 
        # self.assertEquals(s, s3)
184
168
 
185
169
    def test_read_empty(self):
186
170
        """Detect end of rio file"""
187
171
        s = read_stanza([])
188
172
        self.assertEqual(s, None)
189
173
        self.assertTrue(s is None)
190
 
 
191
 
    def test_read_nul_byte(self):
192
 
        """File consisting of a nul byte causes an error."""
193
 
        self.assertRaises(ValueError, read_stanza, ['\0'])
194
 
 
195
 
    def test_read_nul_bytes(self):
196
 
        """File consisting of many nul bytes causes an error."""
197
 
        self.assertRaises(ValueError, read_stanza, ['\0' * 100])
198
 
 
 
174
        
199
175
    def test_read_iter(self):
200
176
        """Read several stanzas from file"""
201
177
        tmpf = TemporaryFile()
212
188
        reader = read_stanzas(tmpf)
213
189
        read_iter = iter(reader)
214
190
        stuff = list(reader)
215
 
        self.assertEqual(stuff,
 
191
        self.assertEqual(stuff, 
216
192
                [ Stanza(version_header='1'),
217
193
                  Stanza(name="foo", val='123'),
218
194
                  Stanza(name="bar", val='129319'), ])
246
222
        self.assertEquals(s, Stanza(name="bar", val='129319'))
247
223
        s = read_stanza(tmpf)
248
224
        self.assertEquals(s, None)
249
 
        self.check_rio_file(tmpf)
250
 
 
251
 
    def check_rio_file(self, real_file):
252
 
        real_file.seek(0)
253
 
        read_write = rio_file(RioReader(real_file)).read()
254
 
        real_file.seek(0)
255
 
        self.assertEquals(read_write, real_file.read())
256
 
 
257
 
    @staticmethod
258
 
    def stanzas_to_str(stanzas):
259
 
        return rio_file(stanzas).read()
260
 
 
261
 
    def rio_file_stanzas(self, stanzas):
262
 
        new_stanzas = list(RioReader(rio_file(stanzas)))
263
 
        self.assertEqual(new_stanzas, stanzas)
264
225
 
265
226
    def test_tricky_quoted(self):
266
227
        tmpf = TemporaryFile()
267
228
        tmpf.write('''\
268
229
s: "one"
269
230
 
270
 
s:\x20
 
231
s: 
271
232
\t"one"
272
233
\t
273
234
 
277
238
 
278
239
s: """
279
240
 
280
 
s:\x20
 
241
s: 
281
242
\t
282
243
 
283
244
s: \\
284
245
 
285
 
s:\x20
 
246
s: 
286
247
\t\\
287
248
\t\\\\
288
249
\t
312
273
            ]
313
274
        for expected in expected_vals:
314
275
            stanza = read_stanza(tmpf)
315
 
            self.rio_file_stanzas([stanza])
316
276
            self.assertEquals(len(stanza), 1)
317
277
            self.assertEqualDiff(stanza.get('s'), expected)
318
278
 
320
280
        """Write empty stanza"""
321
281
        l = list(Stanza().to_lines())
322
282
        self.assertEquals(l, [])
323
 
 
324
 
    def test_rio_raises_type_error(self):
325
 
        """TypeError on adding invalid type to Stanza"""
326
 
        s = Stanza()
327
 
        self.assertRaises(TypeError, s.add, 'foo', {})
328
 
 
329
 
    def test_rio_raises_type_error_key(self):
330
 
        """TypeError on adding invalid type to Stanza"""
331
 
        s = Stanza()
332
 
        self.assertRaises(TypeError, s.add, 10, {})
333
 
 
334
 
    def test_rio_unicode(self):
335
 
        uni_data = u'\N{KATAKANA LETTER O}'
336
 
        s = Stanza(foo=uni_data)
337
 
        self.assertEquals(s.get('foo'), uni_data)
338
 
        raw_lines = s.to_lines()
339
 
        self.assertEquals(raw_lines,
340
 
                ['foo: ' + uni_data.encode('utf-8') + '\n'])
341
 
        new_s = read_stanza(raw_lines)
342
 
        self.assertEquals(new_s.get('foo'), uni_data)
343
 
 
344
 
    def test_rio_to_unicode(self):
345
 
        uni_data = u'\N{KATAKANA LETTER O}'
346
 
        s = Stanza(foo=uni_data)
347
 
        unicode_str = s.to_unicode()
348
 
        self.assertEqual(u'foo: %s\n' % (uni_data,), unicode_str)
349
 
        new_s = rio.read_stanza_unicode(unicode_str.splitlines(True))
350
 
        self.assertEqual(uni_data, new_s.get('foo'))
351
 
 
352
 
    def test_nested_rio_unicode(self):
353
 
        uni_data = u'\N{KATAKANA LETTER O}'
354
 
        s = Stanza(foo=uni_data)
355
 
        parent_stanza = Stanza(child=s.to_unicode())
356
 
        raw_lines = parent_stanza.to_lines()
357
 
        self.assertEqual(['child: foo: ' + uni_data.encode('utf-8') + '\n',
358
 
                          '\t\n',
359
 
                         ], raw_lines)
360
 
        new_parent = read_stanza(raw_lines)
361
 
        child_text = new_parent.get('child')
362
 
        self.assertEqual(u'foo: %s\n' % uni_data, child_text)
363
 
        new_child = rio.read_stanza_unicode(child_text.splitlines(True))
364
 
        self.assertEqual(uni_data, new_child.get('foo'))
365
 
 
366
 
    def mail_munge(self, lines, dos_nl=True):
367
 
        new_lines = []
368
 
        for line in lines:
369
 
            line = re.sub(' *\n', '\n', line)
370
 
            if dos_nl:
371
 
                line = re.sub('([^\r])\n', '\\1\r\n', line)
372
 
            new_lines.append(line)
373
 
        return new_lines
374
 
 
375
 
    def test_patch_rio(self):
376
 
        stanza = Stanza(data='#\n\r\\r ', space=' ' * 255, hash='#' * 255)
377
 
        lines = rio.to_patch_lines(stanza)
378
 
        for line in lines:
379
 
            self.assertContainsRe(line, '^# ')
380
 
            self.assertTrue(72 >= len(line))
381
 
        for line in rio.to_patch_lines(stanza, max_width=12):
382
 
            self.assertTrue(12 >= len(line))
383
 
        new_stanza = rio.read_patch_stanza(self.mail_munge(lines,
384
 
                                                           dos_nl=False))
385
 
        lines = self.mail_munge(lines)
386
 
        new_stanza = rio.read_patch_stanza(lines)
387
 
        self.assertEqual('#\n\r\\r ', new_stanza.get('data'))
388
 
        self.assertEqual(' '* 255, new_stanza.get('space'))
389
 
        self.assertEqual('#'* 255, new_stanza.get('hash'))
390
 
 
391
 
    def test_patch_rio_linebreaks(self):
392
 
        stanza = Stanza(breaktest='linebreak -/'*30)
393
 
        self.assertContainsRe(rio.to_patch_lines(stanza, 71)[0],
394
 
                              'linebreak\\\\\n')
395
 
        stanza = Stanza(breaktest='linebreak-/'*30)
396
 
        self.assertContainsRe(rio.to_patch_lines(stanza, 70)[0],
397
 
                              'linebreak-\\\\\n')
398
 
        stanza = Stanza(breaktest='linebreak/'*30)
399
 
        self.assertContainsRe(rio.to_patch_lines(stanza, 70)[0],
400
 
                              'linebreak\\\\\n')