1
# Copyright (C) 2005 Aaron Bentley, Canonical Ltd
2
# <aaron.bentley@utoronto.ca>
4
# This program is free software; you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License as published by
6
# the Free Software Foundation; either version 2 of the License, or
7
# (at your option) any later version.
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
# GNU General Public License for more details.
14
# You should have received a copy of the GNU General Public License
15
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19
class IterableFileBase(object):
20
"""Create a file-like object from any iterable"""
22
def __init__(self, iterable):
24
self._iter = iterable.__iter__()
28
def read_n(self, length):
30
>>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_n(8)
33
def test_length(result):
34
if len(result) >= length:
38
return self._read(test_length)
40
def read_to(self, sequence, length=None):
42
>>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
48
def test_contents(result):
49
if length is not None:
50
if len(result) >= length:
53
return result.index(sequence)+len(sequence)
56
return self._read(test_contents)
58
def _read(self, result_length):
60
Read data until result satisfies the condition result_length.
61
result_length is a callable that returns None until the condition
62
is satisfied, and returns the length of the result to use when
63
the condition is satisfied. (i.e. it returns the length of the
64
subset of the first condition match.)
67
while result_length(result) is None:
69
result += self._iter.next()
74
output_length = result_length(result)
75
self._buffer = result[output_length:]
76
return result[:output_length]
80
>>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_all()
85
return self._read(no_stop)
88
def push_back(self, contents):
90
>>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
95
'Shis is \\na te\\nst.'
97
self._buffer = contents + self._buffer
100
class IterableFile(object):
101
"""This class supplies all File methods that can be implemented cheaply."""
102
def __init__(self, iterable):
103
object.__init__(self)
104
self._file_base = IterableFileBase(iterable)
105
self._iter = self._make_iterator()
109
def _make_iterator(self):
110
while not self._file_base.done:
112
result = self._file_base.read_to('\n')
116
def _check_closed(self):
118
raise ValueError("File is closed.")
122
>>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
129
self._file_base.done = True
132
closed = property(lambda x: x._closed)
135
"""No-op for standard compliance.
136
>>> f = IterableFile([])
139
Traceback (most recent call last):
140
ValueError: File is closed.
145
"""Implementation of the iterator protocol's next()
147
>>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.'])
152
Traceback (most recent call last):
153
ValueError: File is closed.
154
>>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.\\n'])
160
Traceback (most recent call last):
164
return self._iter.next()
168
>>> list(IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.']))
169
['Th\\n', 'is is \\n', 'a te\\n', 'st.']
170
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
173
Traceback (most recent call last):
174
ValueError: File is closed.
178
def read(self, length=None):
180
>>> IterableFile(['This ', 'is ', 'a ', 'test.']).read()
182
>>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
185
>>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
188
Traceback (most recent call last):
189
ValueError: File is closed.
193
return self._file_base.read_all()
195
return self._file_base.read_n(length)
197
def read_to(self, sequence, size=None):
199
Read characters until a sequence is found, with optional max size.
200
The specified sequence, if found, will be included in the result
202
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
209
Traceback (most recent call last):
210
ValueError: File is closed.
213
return self._file_base.read_to(sequence, size)
215
def readline(self, size=None):
217
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
224
Traceback (most recent call last):
225
ValueError: File is closed.
227
return self.read_to('\n', size)
229
def readlines(self, sizehint=None):
231
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
233
['Th\\n', 'is is \\n', 'a te\\n', 'st.']
234
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
237
Traceback (most recent call last):
238
ValueError: File is closed.
242
line = self.readline()
247
elif len(line) < sizehint:
249
sizehint -= len(line)
251
self._file_base.push_back(line)
255
if __name__ == "__main__":