~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_rio.py

Merge bzr.dev to resolve conflicts

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2007, 2009, 2010, 2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for rio serialization
 
18
 
 
19
A simple, reproducible structured IO format.
 
20
 
 
21
rio itself works in Unicode strings.  It is typically encoded to UTF-8,
 
22
but this depends on the transport.
 
23
"""
 
24
 
 
25
import re
 
26
from tempfile import TemporaryFile
 
27
 
 
28
from bzrlib import (
 
29
    rio,
 
30
    )
 
31
from bzrlib.tests import TestCase
 
32
from bzrlib.rio import (
 
33
    RioReader,
 
34
    Stanza,
 
35
    read_stanza,
 
36
    read_stanzas,
 
37
    rio_file,
 
38
    )
 
39
 
 
40
 
 
41
class TestRio(TestCase):
 
42
 
 
43
    def test_stanza(self):
 
44
        """Construct rio stanza in memory"""
 
45
        s = Stanza(number='42', name="fred")
 
46
        self.assertTrue('number' in s)
 
47
        self.assertFalse('color' in s)
 
48
        self.assertFalse('42' in s)
 
49
        self.assertEquals(list(s.iter_pairs()),
 
50
                [('name', 'fred'), ('number', '42')])
 
51
        self.assertEquals(s.get('number'), '42')
 
52
        self.assertEquals(s.get('name'), 'fred')
 
53
 
 
54
    def test_value_checks(self):
 
55
        """rio checks types on construction"""
 
56
        # these aren't enforced at construction time
 
57
        ## self.assertRaises(ValueError,
 
58
        ##        Stanza, complex=42 + 3j)
 
59
        ## self.assertRaises(ValueError,
 
60
        ##        Stanza, several=range(10))
 
61
 
 
62
    def test_empty_value(self):
 
63
        """Serialize stanza with empty field"""
 
64
        s = Stanza(empty='')
 
65
        self.assertEqualDiff(s.to_string(),
 
66
                "empty: \n")
 
67
 
 
68
    def test_to_lines(self):
 
69
        """Write simple rio stanza to string"""
 
70
        s = Stanza(number='42', name='fred')
 
71
        self.assertEquals(list(s.to_lines()),
 
72
                ['name: fred\n',
 
73
                 'number: 42\n'])
 
74
 
 
75
    def test_as_dict(self):
 
76
        """Convert rio Stanza to dictionary"""
 
77
        s = Stanza(number='42', name='fred')
 
78
        sd = s.as_dict()
 
79
        self.assertEquals(sd, dict(number='42', name='fred'))
 
80
 
 
81
    def test_to_file(self):
 
82
        """Write rio to file"""
 
83
        tmpf = TemporaryFile()
 
84
        s = Stanza(a_thing='something with "quotes like \\"this\\""', number='42', name='fred')
 
85
        s.write(tmpf)
 
86
        tmpf.seek(0)
 
87
        self.assertEqualDiff(tmpf.read(), r'''
 
88
a_thing: something with "quotes like \"this\""
 
89
name: fred
 
90
number: 42
 
91
'''[1:])
 
92
 
 
93
    def test_multiline_string(self):
 
94
        tmpf = TemporaryFile()
 
95
        s = Stanza(motto="war is peace\nfreedom is slavery\nignorance is strength")
 
96
        s.write(tmpf)
 
97
        tmpf.seek(0)
 
98
        self.assertEqualDiff(tmpf.read(), '''\
 
99
motto: war is peace
 
100
\tfreedom is slavery
 
101
\tignorance is strength
 
102
''')
 
103
        tmpf.seek(0)
 
104
        s2 = read_stanza(tmpf)
 
105
        self.assertEquals(s, s2)
 
106
 
 
107
    def test_read_stanza(self):
 
108
        """Load stanza from string"""
 
109
        lines = """\
 
110
revision: mbp@sourcefrog.net-123-abc
 
111
timestamp: 1130653962
 
112
timezone: 36000
 
113
committer: Martin Pool <mbp@test.sourcefrog.net>
 
114
""".splitlines(True)
 
115
        s = read_stanza(lines)
 
116
        self.assertTrue('revision' in s)
 
117
        self.assertEqualDiff(s.get('revision'), 'mbp@sourcefrog.net-123-abc')
 
118
        self.assertEquals(list(s.iter_pairs()),
 
119
                [('revision', 'mbp@sourcefrog.net-123-abc'),
 
120
                 ('timestamp', '1130653962'),
 
121
                 ('timezone', '36000'),
 
122
                 ('committer', "Martin Pool <mbp@test.sourcefrog.net>")])
 
123
        self.assertEquals(len(s), 4)
 
124
 
 
125
    def test_repeated_field(self):
 
126
        """Repeated field in rio"""
 
127
        s = Stanza()
 
128
        for k, v in [('a', '10'), ('b', '20'), ('a', '100'), ('b', '200'),
 
129
                     ('a', '1000'), ('b', '2000')]:
 
130
            s.add(k, v)
 
131
        s2 = read_stanza(s.to_lines())
 
132
        self.assertEquals(s, s2)
 
133
        self.assertEquals(s.get_all('a'), map(str, [10, 100, 1000]))
 
134
        self.assertEquals(s.get_all('b'), map(str, [20, 200, 2000]))
 
135
 
 
136
    def test_backslash(self):
 
137
        s = Stanza(q='\\')
 
138
        t = s.to_string()
 
139
        self.assertEqualDiff(t, 'q: \\\n')
 
140
        s2 = read_stanza(s.to_lines())
 
141
        self.assertEquals(s, s2)
 
142
 
 
143
    def test_blank_line(self):
 
144
        s = Stanza(none='', one='\n', two='\n\n')
 
145
        self.assertEqualDiff(s.to_string(), """\
 
146
none:\x20
 
147
one:\x20
 
148
\t
 
149
two:\x20
 
150
\t
 
151
\t
 
152
""")
 
153
        s2 = read_stanza(s.to_lines())
 
154
        self.assertEquals(s, s2)
 
155
 
 
156
    def test_whitespace_value(self):
 
157
        s = Stanza(space=' ', tabs='\t\t\t', combo='\n\t\t\n')
 
158
        self.assertEqualDiff(s.to_string(), """\
 
159
combo:\x20
 
160
\t\t\t
 
161
\t
 
162
space:\x20\x20
 
163
tabs: \t\t\t
 
164
""")
 
165
        s2 = read_stanza(s.to_lines())
 
166
        self.assertEquals(s, s2)
 
167
        self.rio_file_stanzas([s])
 
168
 
 
169
    def test_quoted(self):
 
170
        """rio quoted string cases"""
 
171
        s = Stanza(q1='"hello"',
 
172
                   q2=' "for',
 
173
                   q3='\n\n"for"\n',
 
174
                   q4='for\n"\nfor',
 
175
                   q5='\n',
 
176
                   q6='"',
 
177
                   q7='""',
 
178
                   q8='\\',
 
179
                   q9='\\"\\"',
 
180
                   )
 
181
        s2 = read_stanza(s.to_lines())
 
182
        self.assertEquals(s, s2)
 
183
        # apparent bug in read_stanza
 
184
        # s3 = read_stanza(self.stanzas_to_str([s]))
 
185
        # self.assertEquals(s, s3)
 
186
 
 
187
    def test_read_empty(self):
 
188
        """Detect end of rio file"""
 
189
        s = read_stanza([])
 
190
        self.assertEqual(s, None)
 
191
        self.assertTrue(s is None)
 
192
 
 
193
    def test_read_nul_byte(self):
 
194
        """File consisting of a nul byte causes an error."""
 
195
        self.assertRaises(ValueError, read_stanza, ['\0'])
 
196
 
 
197
    def test_read_nul_bytes(self):
 
198
        """File consisting of many nul bytes causes an error."""
 
199
        self.assertRaises(ValueError, read_stanza, ['\0' * 100])
 
200
 
 
201
    def test_read_iter(self):
 
202
        """Read several stanzas from file"""
 
203
        tmpf = TemporaryFile()
 
204
        tmpf.write("""\
 
205
version_header: 1
 
206
 
 
207
name: foo
 
208
val: 123
 
209
 
 
210
name: bar
 
211
val: 129319
 
212
""")
 
213
        tmpf.seek(0)
 
214
        reader = read_stanzas(tmpf)
 
215
        read_iter = iter(reader)
 
216
        stuff = list(reader)
 
217
        self.assertEqual(stuff,
 
218
                [ Stanza(version_header='1'),
 
219
                  Stanza(name="foo", val='123'),
 
220
                  Stanza(name="bar", val='129319'), ])
 
221
 
 
222
    def test_read_several(self):
 
223
        """Read several stanzas from file"""
 
224
        tmpf = TemporaryFile()
 
225
        tmpf.write("""\
 
226
version_header: 1
 
227
 
 
228
name: foo
 
229
val: 123
 
230
 
 
231
name: quoted
 
232
address:   "Willowglen"
 
233
\t  42 Wallaby Way
 
234
\t  Sydney
 
235
 
 
236
name: bar
 
237
val: 129319
 
238
""")
 
239
        tmpf.seek(0)
 
240
        s = read_stanza(tmpf)
 
241
        self.assertEquals(s, Stanza(version_header='1'))
 
242
        s = read_stanza(tmpf)
 
243
        self.assertEquals(s, Stanza(name="foo", val='123'))
 
244
        s = read_stanza(tmpf)
 
245
        self.assertEqualDiff(s.get('name'), 'quoted')
 
246
        self.assertEqualDiff(s.get('address'), '  "Willowglen"\n  42 Wallaby Way\n  Sydney')
 
247
        s = read_stanza(tmpf)
 
248
        self.assertEquals(s, Stanza(name="bar", val='129319'))
 
249
        s = read_stanza(tmpf)
 
250
        self.assertEquals(s, None)
 
251
        self.check_rio_file(tmpf)
 
252
 
 
253
    def check_rio_file(self, real_file):
 
254
        real_file.seek(0)
 
255
        read_write = rio_file(RioReader(real_file)).read()
 
256
        real_file.seek(0)
 
257
        self.assertEquals(read_write, real_file.read())
 
258
 
 
259
    @staticmethod
 
260
    def stanzas_to_str(stanzas):
 
261
        return rio_file(stanzas).read()
 
262
 
 
263
    def rio_file_stanzas(self, stanzas):
 
264
        new_stanzas = list(RioReader(rio_file(stanzas)))
 
265
        self.assertEqual(new_stanzas, stanzas)
 
266
 
 
267
    def test_tricky_quoted(self):
 
268
        tmpf = TemporaryFile()
 
269
        tmpf.write('''\
 
270
s: "one"
 
271
 
 
272
s:\x20
 
273
\t"one"
 
274
\t
 
275
 
 
276
s: "
 
277
 
 
278
s: ""
 
279
 
 
280
s: """
 
281
 
 
282
s:\x20
 
283
\t
 
284
 
 
285
s: \\
 
286
 
 
287
s:\x20
 
288
\t\\
 
289
\t\\\\
 
290
\t
 
291
 
 
292
s: word\\
 
293
 
 
294
s: quote"
 
295
 
 
296
s: backslashes\\\\\\
 
297
 
 
298
s: both\\\"
 
299
 
 
300
''')
 
301
        tmpf.seek(0)
 
302
        expected_vals = ['"one"',
 
303
            '\n"one"\n',
 
304
            '"',
 
305
            '""',
 
306
            '"""',
 
307
            '\n',
 
308
            '\\',
 
309
            '\n\\\n\\\\\n',
 
310
            'word\\',
 
311
            'quote\"',
 
312
            'backslashes\\\\\\',
 
313
            'both\\\"',
 
314
            ]
 
315
        for expected in expected_vals:
 
316
            stanza = read_stanza(tmpf)
 
317
            self.rio_file_stanzas([stanza])
 
318
            self.assertEquals(len(stanza), 1)
 
319
            self.assertEqualDiff(stanza.get('s'), expected)
 
320
 
 
321
    def test_write_empty_stanza(self):
 
322
        """Write empty stanza"""
 
323
        l = list(Stanza().to_lines())
 
324
        self.assertEquals(l, [])
 
325
 
 
326
    def test_rio_raises_type_error(self):
 
327
        """TypeError on adding invalid type to Stanza"""
 
328
        s = Stanza()
 
329
        self.assertRaises(TypeError, s.add, 'foo', {})
 
330
 
 
331
    def test_rio_raises_type_error_key(self):
 
332
        """TypeError on adding invalid type to Stanza"""
 
333
        s = Stanza()
 
334
        self.assertRaises(TypeError, s.add, 10, {})
 
335
 
 
336
    def test_rio_unicode(self):
 
337
        uni_data = u'\N{KATAKANA LETTER O}'
 
338
        s = Stanza(foo=uni_data)
 
339
        self.assertEquals(s.get('foo'), uni_data)
 
340
        raw_lines = s.to_lines()
 
341
        self.assertEquals(raw_lines,
 
342
                ['foo: ' + uni_data.encode('utf-8') + '\n'])
 
343
        new_s = read_stanza(raw_lines)
 
344
        self.assertEquals(new_s.get('foo'), uni_data)
 
345
 
 
346
    def test_rio_to_unicode(self):
 
347
        uni_data = u'\N{KATAKANA LETTER O}'
 
348
        s = Stanza(foo=uni_data)
 
349
        unicode_str = s.to_unicode()
 
350
        self.assertEqual(u'foo: %s\n' % (uni_data,), unicode_str)
 
351
        new_s = rio.read_stanza_unicode(unicode_str.splitlines(True))
 
352
        self.assertEqual(uni_data, new_s.get('foo'))
 
353
 
 
354
    def test_nested_rio_unicode(self):
 
355
        uni_data = u'\N{KATAKANA LETTER O}'
 
356
        s = Stanza(foo=uni_data)
 
357
        parent_stanza = Stanza(child=s.to_unicode())
 
358
        raw_lines = parent_stanza.to_lines()
 
359
        self.assertEqual(['child: foo: ' + uni_data.encode('utf-8') + '\n',
 
360
                          '\t\n',
 
361
                         ], raw_lines)
 
362
        new_parent = read_stanza(raw_lines)
 
363
        child_text = new_parent.get('child')
 
364
        self.assertEqual(u'foo: %s\n' % uni_data, child_text)
 
365
        new_child = rio.read_stanza_unicode(child_text.splitlines(True))
 
366
        self.assertEqual(uni_data, new_child.get('foo'))
 
367
 
 
368
    def mail_munge(self, lines, dos_nl=True):
 
369
        new_lines = []
 
370
        for line in lines:
 
371
            line = re.sub(' *\n', '\n', line)
 
372
            if dos_nl:
 
373
                line = re.sub('([^\r])\n', '\\1\r\n', line)
 
374
            new_lines.append(line)
 
375
        return new_lines
 
376
 
 
377
    def test_patch_rio(self):
 
378
        stanza = Stanza(data='#\n\r\\r ', space=' ' * 255, hash='#' * 255)
 
379
        lines = rio.to_patch_lines(stanza)
 
380
        for line in lines:
 
381
            self.assertContainsRe(line, '^# ')
 
382
            self.assertTrue(72 >= len(line))
 
383
        for line in rio.to_patch_lines(stanza, max_width=12):
 
384
            self.assertTrue(12 >= len(line))
 
385
        new_stanza = rio.read_patch_stanza(self.mail_munge(lines,
 
386
                                                           dos_nl=False))
 
387
        lines = self.mail_munge(lines)
 
388
        new_stanza = rio.read_patch_stanza(lines)
 
389
        self.assertEqual('#\n\r\\r ', new_stanza.get('data'))
 
390
        self.assertEqual(' '* 255, new_stanza.get('space'))
 
391
        self.assertEqual('#'* 255, new_stanza.get('hash'))
 
392
 
 
393
    def test_patch_rio_linebreaks(self):
 
394
        stanza = Stanza(breaktest='linebreak -/'*30)
 
395
        self.assertContainsRe(rio.to_patch_lines(stanza, 71)[0],
 
396
                              'linebreak\\\\\n')
 
397
        stanza = Stanza(breaktest='linebreak-/'*30)
 
398
        self.assertContainsRe(rio.to_patch_lines(stanza, 70)[0],
 
399
                              'linebreak-\\\\\n')
 
400
        stanza = Stanza(breaktest='linebreak/'*30)
 
401
        self.assertContainsRe(rio.to_patch_lines(stanza, 70)[0],
 
402
                              'linebreak\\\\\n')