~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_rio.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2010-09-29 22:03:03 UTC
  • mfrom: (5416.2.6 jam-integration)
  • Revision ID: pqm@pqm.ubuntu.com-20100929220303-cr95h8iwtggco721
(mbp) Add 'break-lock --force'

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2007, 2009, 2010 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for rio serialization
 
18
 
 
19
A simple, reproducible structured IO format.
 
20
 
 
21
rio itself works in Unicode strings.  It is typically encoded to UTF-8,
 
22
but this depends on the transport.
 
23
"""
 
24
 
 
25
import cStringIO
 
26
import os
 
27
import re
 
28
import sys
 
29
from tempfile import TemporaryFile
 
30
 
 
31
from bzrlib import (
 
32
    rio,
 
33
    )
 
34
from bzrlib.tests import TestCaseInTempDir, TestCase
 
35
from bzrlib.rio import (RioWriter, Stanza, read_stanza, read_stanzas, rio_file,
 
36
                        RioReader)
 
37
 
 
38
 
 
39
class TestRio(TestCase):
 
40
 
 
41
    def test_stanza(self):
 
42
        """Construct rio stanza in memory"""
 
43
        s = Stanza(number='42', name="fred")
 
44
        self.assertTrue('number' in s)
 
45
        self.assertFalse('color' in s)
 
46
        self.assertFalse('42' in s)
 
47
        self.assertEquals(list(s.iter_pairs()),
 
48
                [('name', 'fred'), ('number', '42')])
 
49
        self.assertEquals(s.get('number'), '42')
 
50
        self.assertEquals(s.get('name'), 'fred')
 
51
 
 
52
    def test_value_checks(self):
 
53
        """rio checks types on construction"""
 
54
        # these aren't enforced at construction time
 
55
        ## self.assertRaises(ValueError,
 
56
        ##        Stanza, complex=42 + 3j)
 
57
        ## self.assertRaises(ValueError,
 
58
        ##        Stanza, several=range(10))
 
59
 
 
60
    def test_empty_value(self):
 
61
        """Serialize stanza with empty field"""
 
62
        s = Stanza(empty='')
 
63
        self.assertEqualDiff(s.to_string(),
 
64
                "empty: \n")
 
65
 
 
66
    def test_to_lines(self):
 
67
        """Write simple rio stanza to string"""
 
68
        s = Stanza(number='42', name='fred')
 
69
        self.assertEquals(list(s.to_lines()),
 
70
                ['name: fred\n',
 
71
                 'number: 42\n'])
 
72
 
 
73
    def test_as_dict(self):
 
74
        """Convert rio Stanza to dictionary"""
 
75
        s = Stanza(number='42', name='fred')
 
76
        sd = s.as_dict()
 
77
        self.assertEquals(sd, dict(number='42', name='fred'))
 
78
 
 
79
    def test_to_file(self):
 
80
        """Write rio to file"""
 
81
        tmpf = TemporaryFile()
 
82
        s = Stanza(a_thing='something with "quotes like \\"this\\""', number='42', name='fred')
 
83
        s.write(tmpf)
 
84
        tmpf.seek(0)
 
85
        self.assertEqualDiff(tmpf.read(), r'''
 
86
a_thing: something with "quotes like \"this\""
 
87
name: fred
 
88
number: 42
 
89
'''[1:])
 
90
 
 
91
    def test_multiline_string(self):
 
92
        tmpf = TemporaryFile()
 
93
        s = Stanza(motto="war is peace\nfreedom is slavery\nignorance is strength")
 
94
        s.write(tmpf)
 
95
        tmpf.seek(0)
 
96
        self.assertEqualDiff(tmpf.read(), '''\
 
97
motto: war is peace
 
98
\tfreedom is slavery
 
99
\tignorance is strength
 
100
''')
 
101
        tmpf.seek(0)
 
102
        s2 = read_stanza(tmpf)
 
103
        self.assertEquals(s, s2)
 
104
 
 
105
    def test_read_stanza(self):
 
106
        """Load stanza from string"""
 
107
        lines = """\
 
108
revision: mbp@sourcefrog.net-123-abc
 
109
timestamp: 1130653962
 
110
timezone: 36000
 
111
committer: Martin Pool <mbp@test.sourcefrog.net>
 
112
""".splitlines(True)
 
113
        s = read_stanza(lines)
 
114
        self.assertTrue('revision' in s)
 
115
        self.assertEqualDiff(s.get('revision'), 'mbp@sourcefrog.net-123-abc')
 
116
        self.assertEquals(list(s.iter_pairs()),
 
117
                [('revision', 'mbp@sourcefrog.net-123-abc'),
 
118
                 ('timestamp', '1130653962'),
 
119
                 ('timezone', '36000'),
 
120
                 ('committer', "Martin Pool <mbp@test.sourcefrog.net>")])
 
121
        self.assertEquals(len(s), 4)
 
122
 
 
123
    def test_repeated_field(self):
 
124
        """Repeated field in rio"""
 
125
        s = Stanza()
 
126
        for k, v in [('a', '10'), ('b', '20'), ('a', '100'), ('b', '200'),
 
127
                     ('a', '1000'), ('b', '2000')]:
 
128
            s.add(k, v)
 
129
        s2 = read_stanza(s.to_lines())
 
130
        self.assertEquals(s, s2)
 
131
        self.assertEquals(s.get_all('a'), map(str, [10, 100, 1000]))
 
132
        self.assertEquals(s.get_all('b'), map(str, [20, 200, 2000]))
 
133
 
 
134
    def test_backslash(self):
 
135
        s = Stanza(q='\\')
 
136
        t = s.to_string()
 
137
        self.assertEqualDiff(t, 'q: \\\n')
 
138
        s2 = read_stanza(s.to_lines())
 
139
        self.assertEquals(s, s2)
 
140
 
 
141
    def test_blank_line(self):
 
142
        s = Stanza(none='', one='\n', two='\n\n')
 
143
        self.assertEqualDiff(s.to_string(), """\
 
144
none:\x20
 
145
one:\x20
 
146
\t
 
147
two:\x20
 
148
\t
 
149
\t
 
150
""")
 
151
        s2 = read_stanza(s.to_lines())
 
152
        self.assertEquals(s, s2)
 
153
 
 
154
    def test_whitespace_value(self):
 
155
        s = Stanza(space=' ', tabs='\t\t\t', combo='\n\t\t\n')
 
156
        self.assertEqualDiff(s.to_string(), """\
 
157
combo:\x20
 
158
\t\t\t
 
159
\t
 
160
space:\x20\x20
 
161
tabs: \t\t\t
 
162
""")
 
163
        s2 = read_stanza(s.to_lines())
 
164
        self.assertEquals(s, s2)
 
165
        self.rio_file_stanzas([s])
 
166
 
 
167
    def test_quoted(self):
 
168
        """rio quoted string cases"""
 
169
        s = Stanza(q1='"hello"',
 
170
                   q2=' "for',
 
171
                   q3='\n\n"for"\n',
 
172
                   q4='for\n"\nfor',
 
173
                   q5='\n',
 
174
                   q6='"',
 
175
                   q7='""',
 
176
                   q8='\\',
 
177
                   q9='\\"\\"',
 
178
                   )
 
179
        s2 = read_stanza(s.to_lines())
 
180
        self.assertEquals(s, s2)
 
181
        # apparent bug in read_stanza
 
182
        # s3 = read_stanza(self.stanzas_to_str([s]))
 
183
        # self.assertEquals(s, s3)
 
184
 
 
185
    def test_read_empty(self):
 
186
        """Detect end of rio file"""
 
187
        s = read_stanza([])
 
188
        self.assertEqual(s, None)
 
189
        self.assertTrue(s is None)
 
190
 
 
191
    def test_read_nul_byte(self):
 
192
        """File consisting of a nul byte causes an error."""
 
193
        self.assertRaises(ValueError, read_stanza, ['\0'])
 
194
 
 
195
    def test_read_nul_bytes(self):
 
196
        """File consisting of many nul bytes causes an error."""
 
197
        self.assertRaises(ValueError, read_stanza, ['\0' * 100])
 
198
 
 
199
    def test_read_iter(self):
 
200
        """Read several stanzas from file"""
 
201
        tmpf = TemporaryFile()
 
202
        tmpf.write("""\
 
203
version_header: 1
 
204
 
 
205
name: foo
 
206
val: 123
 
207
 
 
208
name: bar
 
209
val: 129319
 
210
""")
 
211
        tmpf.seek(0)
 
212
        reader = read_stanzas(tmpf)
 
213
        read_iter = iter(reader)
 
214
        stuff = list(reader)
 
215
        self.assertEqual(stuff,
 
216
                [ Stanza(version_header='1'),
 
217
                  Stanza(name="foo", val='123'),
 
218
                  Stanza(name="bar", val='129319'), ])
 
219
 
 
220
    def test_read_several(self):
 
221
        """Read several stanzas from file"""
 
222
        tmpf = TemporaryFile()
 
223
        tmpf.write("""\
 
224
version_header: 1
 
225
 
 
226
name: foo
 
227
val: 123
 
228
 
 
229
name: quoted
 
230
address:   "Willowglen"
 
231
\t  42 Wallaby Way
 
232
\t  Sydney
 
233
 
 
234
name: bar
 
235
val: 129319
 
236
""")
 
237
        tmpf.seek(0)
 
238
        s = read_stanza(tmpf)
 
239
        self.assertEquals(s, Stanza(version_header='1'))
 
240
        s = read_stanza(tmpf)
 
241
        self.assertEquals(s, Stanza(name="foo", val='123'))
 
242
        s = read_stanza(tmpf)
 
243
        self.assertEqualDiff(s.get('name'), 'quoted')
 
244
        self.assertEqualDiff(s.get('address'), '  "Willowglen"\n  42 Wallaby Way\n  Sydney')
 
245
        s = read_stanza(tmpf)
 
246
        self.assertEquals(s, Stanza(name="bar", val='129319'))
 
247
        s = read_stanza(tmpf)
 
248
        self.assertEquals(s, None)
 
249
        self.check_rio_file(tmpf)
 
250
 
 
251
    def check_rio_file(self, real_file):
 
252
        real_file.seek(0)
 
253
        read_write = rio_file(RioReader(real_file)).read()
 
254
        real_file.seek(0)
 
255
        self.assertEquals(read_write, real_file.read())
 
256
 
 
257
    @staticmethod
 
258
    def stanzas_to_str(stanzas):
 
259
        return rio_file(stanzas).read()
 
260
 
 
261
    def rio_file_stanzas(self, stanzas):
 
262
        new_stanzas = list(RioReader(rio_file(stanzas)))
 
263
        self.assertEqual(new_stanzas, stanzas)
 
264
 
 
265
    def test_tricky_quoted(self):
 
266
        tmpf = TemporaryFile()
 
267
        tmpf.write('''\
 
268
s: "one"
 
269
 
 
270
s:\x20
 
271
\t"one"
 
272
\t
 
273
 
 
274
s: "
 
275
 
 
276
s: ""
 
277
 
 
278
s: """
 
279
 
 
280
s:\x20
 
281
\t
 
282
 
 
283
s: \\
 
284
 
 
285
s:\x20
 
286
\t\\
 
287
\t\\\\
 
288
\t
 
289
 
 
290
s: word\\
 
291
 
 
292
s: quote"
 
293
 
 
294
s: backslashes\\\\\\
 
295
 
 
296
s: both\\\"
 
297
 
 
298
''')
 
299
        tmpf.seek(0)
 
300
        expected_vals = ['"one"',
 
301
            '\n"one"\n',
 
302
            '"',
 
303
            '""',
 
304
            '"""',
 
305
            '\n',
 
306
            '\\',
 
307
            '\n\\\n\\\\\n',
 
308
            'word\\',
 
309
            'quote\"',
 
310
            'backslashes\\\\\\',
 
311
            'both\\\"',
 
312
            ]
 
313
        for expected in expected_vals:
 
314
            stanza = read_stanza(tmpf)
 
315
            self.rio_file_stanzas([stanza])
 
316
            self.assertEquals(len(stanza), 1)
 
317
            self.assertEqualDiff(stanza.get('s'), expected)
 
318
 
 
319
    def test_write_empty_stanza(self):
 
320
        """Write empty stanza"""
 
321
        l = list(Stanza().to_lines())
 
322
        self.assertEquals(l, [])
 
323
 
 
324
    def test_rio_raises_type_error(self):
 
325
        """TypeError on adding invalid type to Stanza"""
 
326
        s = Stanza()
 
327
        self.assertRaises(TypeError, s.add, 'foo', {})
 
328
 
 
329
    def test_rio_raises_type_error_key(self):
 
330
        """TypeError on adding invalid type to Stanza"""
 
331
        s = Stanza()
 
332
        self.assertRaises(TypeError, s.add, 10, {})
 
333
 
 
334
    def test_rio_unicode(self):
 
335
        uni_data = u'\N{KATAKANA LETTER O}'
 
336
        s = Stanza(foo=uni_data)
 
337
        self.assertEquals(s.get('foo'), uni_data)
 
338
        raw_lines = s.to_lines()
 
339
        self.assertEquals(raw_lines,
 
340
                ['foo: ' + uni_data.encode('utf-8') + '\n'])
 
341
        new_s = read_stanza(raw_lines)
 
342
        self.assertEquals(new_s.get('foo'), uni_data)
 
343
 
 
344
    def test_rio_to_unicode(self):
 
345
        uni_data = u'\N{KATAKANA LETTER O}'
 
346
        s = Stanza(foo=uni_data)
 
347
        unicode_str = s.to_unicode()
 
348
        self.assertEqual(u'foo: %s\n' % (uni_data,), unicode_str)
 
349
        new_s = rio.read_stanza_unicode(unicode_str.splitlines(True))
 
350
        self.assertEqual(uni_data, new_s.get('foo'))
 
351
 
 
352
    def test_nested_rio_unicode(self):
 
353
        uni_data = u'\N{KATAKANA LETTER O}'
 
354
        s = Stanza(foo=uni_data)
 
355
        parent_stanza = Stanza(child=s.to_unicode())
 
356
        raw_lines = parent_stanza.to_lines()
 
357
        self.assertEqual(['child: foo: ' + uni_data.encode('utf-8') + '\n',
 
358
                          '\t\n',
 
359
                         ], raw_lines)
 
360
        new_parent = read_stanza(raw_lines)
 
361
        child_text = new_parent.get('child')
 
362
        self.assertEqual(u'foo: %s\n' % uni_data, child_text)
 
363
        new_child = rio.read_stanza_unicode(child_text.splitlines(True))
 
364
        self.assertEqual(uni_data, new_child.get('foo'))
 
365
 
 
366
    def mail_munge(self, lines, dos_nl=True):
 
367
        new_lines = []
 
368
        for line in lines:
 
369
            line = re.sub(' *\n', '\n', line)
 
370
            if dos_nl:
 
371
                line = re.sub('([^\r])\n', '\\1\r\n', line)
 
372
            new_lines.append(line)
 
373
        return new_lines
 
374
 
 
375
    def test_patch_rio(self):
 
376
        stanza = Stanza(data='#\n\r\\r ', space=' ' * 255, hash='#' * 255)
 
377
        lines = rio.to_patch_lines(stanza)
 
378
        for line in lines:
 
379
            self.assertContainsRe(line, '^# ')
 
380
            self.assertTrue(72 >= len(line))
 
381
        for line in rio.to_patch_lines(stanza, max_width=12):
 
382
            self.assertTrue(12 >= len(line))
 
383
        new_stanza = rio.read_patch_stanza(self.mail_munge(lines,
 
384
                                                           dos_nl=False))
 
385
        lines = self.mail_munge(lines)
 
386
        new_stanza = rio.read_patch_stanza(lines)
 
387
        self.assertEqual('#\n\r\\r ', new_stanza.get('data'))
 
388
        self.assertEqual(' '* 255, new_stanza.get('space'))
 
389
        self.assertEqual('#'* 255, new_stanza.get('hash'))
 
390
 
 
391
    def test_patch_rio_linebreaks(self):
 
392
        stanza = Stanza(breaktest='linebreak -/'*30)
 
393
        self.assertContainsRe(rio.to_patch_lines(stanza, 71)[0],
 
394
                              'linebreak\\\\\n')
 
395
        stanza = Stanza(breaktest='linebreak-/'*30)
 
396
        self.assertContainsRe(rio.to_patch_lines(stanza, 70)[0],
 
397
                              'linebreak-\\\\\n')
 
398
        stanza = Stanza(breaktest='linebreak/'*30)
 
399
        self.assertContainsRe(rio.to_patch_lines(stanza, 70)[0],
 
400
                              'linebreak\\\\\n')