~bzr-pqm/bzr/bzr.dev

2052.3.3 by John Arbash Meinel
Add (c) Canonical to files that Aaron has approved
1
# Copyright (C) 2005 Aaron Bentley, Canonical Ltd
1185.67.6 by Aaron Bentley
Added tests and fixes for LockableFiles.put_utf8(); imported IterableFile
2
# <aaron.bentley@utoronto.ca>
3
#
2052.3.1 by John Arbash Meinel
Add tests to cleanup the copyright of all source files
4
# This program is free software; you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License as published by
6
# the Free Software Foundation; either version 2 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU General Public License for more details.
13
#
14
# You should have received a copy of the GNU General Public License
15
# along with this program; if not, write to the Free Software
4183.7.1 by Sabin Iacob
update FSF mailing address
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
1185.67.6 by Aaron Bentley
Added tests and fixes for LockableFiles.put_utf8(); imported IterableFile
17
6379.6.3 by Jelmer Vernooij
Use absolute_import.
18
from __future__ import absolute_import
2052.3.3 by John Arbash Meinel
Add (c) Canonical to files that Aaron has approved
19
1185.67.6 by Aaron Bentley
Added tests and fixes for LockableFiles.put_utf8(); imported IterableFile
20
class IterableFileBase(object):
21
    """Create a file-like object from any iterable"""
1185.82.123 by Aaron Bentley
Cleanups to prepare for review
22
1185.67.6 by Aaron Bentley
Added tests and fixes for LockableFiles.put_utf8(); imported IterableFile
23
    def __init__(self, iterable):
24
        object.__init__(self)
25
        self._iter = iterable.__iter__()
26
        self._buffer = ""
27
        self.done = False
28
29
    def read_n(self, length):
30
        """
31
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_n(8)
32
        'This is '
33
        """
34
        def test_length(result):
35
            if len(result) >= length:
36
                return length
37
            else:
38
                return None
39
        return self._read(test_length)
40
41
    def read_to(self, sequence, length=None):
42
        """
43
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
44
        >>> f.read_to('\\n')
45
        'Th\\n'
46
        >>> f.read_to('\\n')
47
        'is is \\n'
48
        """
49
        def test_contents(result):
50
            if length is not None:
51
                if len(result) >= length:
52
                    return length
53
            try:
54
                return result.index(sequence)+len(sequence)
55
            except ValueError:
56
                return None
57
        return self._read(test_contents)
58
59
    def _read(self, result_length):
60
        """
61
        Read data until result satisfies the condition result_length.
62
        result_length is a callable that returns None until the condition
63
        is satisfied, and returns the length of the result to use when
64
        the condition is satisfied.  (i.e. it returns the length of the
65
        subset of the first condition match.)
66
        """
67
        result = self._buffer
68
        while result_length(result) is None:
69
            try:
70
                result += self._iter.next()
71
            except StopIteration:
72
                self.done = True
73
                self._buffer = ""
74
                return result
75
        output_length = result_length(result)
76
        self._buffer = result[output_length:]
77
        return result[:output_length]
78
79
    def read_all(self):
80
        """
81
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_all()
82
        'This is a test.'
83
        """
84
        def no_stop(result):
85
            return None
86
        return self._read(no_stop)
87
88
89
    def push_back(self, contents):
90
        """
91
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
92
        >>> f.read_to('\\n')
93
        'Th\\n'
94
        >>> f.push_back("Sh")
95
        >>> f.read_all()
96
        'Shis is \\na te\\nst.'
97
        """
98
        self._buffer = contents + self._buffer
99
100
101
class IterableFile(object):
102
    """This class supplies all File methods that can be implemented cheaply."""
103
    def __init__(self, iterable):
104
        object.__init__(self)
105
        self._file_base = IterableFileBase(iterable)
106
        self._iter = self._make_iterator()
107
        self._closed = False
108
        self.softspace = 0
109
110
    def _make_iterator(self):
111
        while not self._file_base.done:
112
            self._check_closed()
113
            result = self._file_base.read_to('\n')
114
            if result != '':
115
                yield result
116
117
    def _check_closed(self):
118
        if self.closed:
119
            raise ValueError("File is closed.")
120
121
    def close(self):
122
        """
123
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
124
        >>> f.closed
125
        False
126
        >>> f.close()
127
        >>> f.closed
128
        True
129
        """
130
        self._file_base.done = True
131
        self._closed = True
132
133
    closed = property(lambda x: x._closed)
134
135
    def flush(self):
136
        """No-op for standard compliance.
137
        >>> f = IterableFile([])
138
        >>> f.close()
139
        >>> f.flush()
140
        Traceback (most recent call last):
141
        ValueError: File is closed.
142
        """
143
        self._check_closed()
144
145
    def next(self):
146
        """Implementation of the iterator protocol's next()
147
148
        >>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.'])
149
        >>> f.next()
150
        'This \\n'
151
        >>> f.close()
152
        >>> f.next()
153
        Traceback (most recent call last):
154
        ValueError: File is closed.
155
        >>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.\\n'])
156
        >>> f.next()
157
        'This \\n'
158
        >>> f.next()
159
        'is a test.\\n'
160
        >>> f.next()
161
        Traceback (most recent call last):
162
        StopIteration
163
        """
164
        self._check_closed()
165
        return self._iter.next()
166
167
    def __iter__(self):
168
        """
169
        >>> list(IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.']))
170
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
171
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
172
        >>> f.close()
173
        >>> list(f)
174
        Traceback (most recent call last):
175
        ValueError: File is closed.
176
        """
177
        return self
178
179
    def read(self, length=None):
180
        """
181
        >>> IterableFile(['This ', 'is ', 'a ', 'test.']).read()
182
        'This is a test.'
183
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
184
        >>> f.read(10)
185
        'This is a '
186
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
187
        >>> f.close()
188
        >>> f.read(10)
189
        Traceback (most recent call last):
190
        ValueError: File is closed.
191
        """
192
        self._check_closed()
193
        if length is None:
194
            return self._file_base.read_all()
195
        else:
196
            return self._file_base.read_n(length)
197
198
    def read_to(self, sequence, size=None):
199
        """
200
        Read characters until a sequence is found, with optional max size.
201
        The specified sequence, if found, will be included in the result
202
203
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
204
        >>> f.read_to('i')
205
        'Th\\ni'
206
        >>> f.read_to('i')
207
        's i'
208
        >>> f.close()
209
        >>> f.read_to('i')
210
        Traceback (most recent call last):
211
        ValueError: File is closed.
212
        """
213
        self._check_closed()
214
        return self._file_base.read_to(sequence, size)
215
216
    def readline(self, size=None):
217
        """
218
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
219
        >>> f.readline()
220
        'Th\\n'
221
        >>> f.readline(4)
222
        'is i'
223
        >>> f.close()
224
        >>> f.readline()
225
        Traceback (most recent call last):
226
        ValueError: File is closed.
227
        """
228
        return self.read_to('\n', size)
229
230
    def readlines(self, sizehint=None):
231
        """
232
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
233
        >>> f.readlines()
234
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
235
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
236
        >>> f.close()
237
        >>> f.readlines()
238
        Traceback (most recent call last):
239
        ValueError: File is closed.
240
        """
241
        lines = []
242
        while True:
243
            line = self.readline()
244
            if line == "":
245
                return lines
246
            if sizehint is None:
247
                lines.append(line)
248
            elif len(line) < sizehint:
249
                lines.append(line)
250
                sizehint -= len(line)
251
            else:
252
                self._file_base.push_back(line)
253
                return lines
254
3943.8.1 by Marius Kruger
remove all trailing whitespace from bzr source
255
1185.67.6 by Aaron Bentley
Added tests and fixes for LockableFiles.put_utf8(); imported IterableFile
256
if __name__ == "__main__":
1551.6.38 by Aaron Bentley
Move doctest import to increase speed
257
    import doctest
1185.67.6 by Aaron Bentley
Added tests and fixes for LockableFiles.put_utf8(); imported IterableFile
258
    doctest.testmod()