1
# Copyright (C) 2005 Aaron Bentley
2
# <aaron.bentley@utoronto.ca>
4
# This program is free software; you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License as published by
6
# the Free Software Foundation; either version 2 of the License, or
7
# (at your option) any later version.
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
# GNU General Public License for more details.
14
# You should have received a copy of the GNU General Public License
15
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
class IterableFileBase(object):
19
"""Create a file-like object from any iterable"""
21
def __init__(self, iterable):
23
self._iter = iterable.__iter__()
27
def read_n(self, length):
29
>>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_n(8)
32
def test_length(result):
33
if len(result) >= length:
37
return self._read(test_length)
39
def read_to(self, sequence, length=None):
41
>>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
47
def test_contents(result):
48
if length is not None:
49
if len(result) >= length:
52
return result.index(sequence)+len(sequence)
55
return self._read(test_contents)
57
def _read(self, result_length):
59
Read data until result satisfies the condition result_length.
60
result_length is a callable that returns None until the condition
61
is satisfied, and returns the length of the result to use when
62
the condition is satisfied. (i.e. it returns the length of the
63
subset of the first condition match.)
66
while result_length(result) is None:
68
result += self._iter.next()
73
output_length = result_length(result)
74
self._buffer = result[output_length:]
75
return result[:output_length]
79
>>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_all()
84
return self._read(no_stop)
87
def push_back(self, contents):
89
>>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
94
'Shis is \\na te\\nst.'
96
self._buffer = contents + self._buffer
99
class IterableFile(object):
100
"""This class supplies all File methods that can be implemented cheaply."""
101
def __init__(self, iterable):
102
object.__init__(self)
103
self._file_base = IterableFileBase(iterable)
104
self._iter = self._make_iterator()
108
def _make_iterator(self):
109
while not self._file_base.done:
111
result = self._file_base.read_to('\n')
115
def _check_closed(self):
117
raise ValueError("File is closed.")
121
>>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
128
self._file_base.done = True
131
closed = property(lambda x: x._closed)
134
"""No-op for standard compliance.
135
>>> f = IterableFile([])
138
Traceback (most recent call last):
139
ValueError: File is closed.
144
"""Implementation of the iterator protocol's next()
146
>>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.'])
151
Traceback (most recent call last):
152
ValueError: File is closed.
153
>>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.\\n'])
159
Traceback (most recent call last):
163
return self._iter.next()
167
>>> list(IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.']))
168
['Th\\n', 'is is \\n', 'a te\\n', 'st.']
169
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
172
Traceback (most recent call last):
173
ValueError: File is closed.
177
def read(self, length=None):
179
>>> IterableFile(['This ', 'is ', 'a ', 'test.']).read()
181
>>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
184
>>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
187
Traceback (most recent call last):
188
ValueError: File is closed.
192
return self._file_base.read_all()
194
return self._file_base.read_n(length)
196
def read_to(self, sequence, size=None):
198
Read characters until a sequence is found, with optional max size.
199
The specified sequence, if found, will be included in the result
201
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
208
Traceback (most recent call last):
209
ValueError: File is closed.
212
return self._file_base.read_to(sequence, size)
214
def readline(self, size=None):
216
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
223
Traceback (most recent call last):
224
ValueError: File is closed.
226
return self.read_to('\n', size)
228
def readlines(self, sizehint=None):
230
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
232
['Th\\n', 'is is \\n', 'a te\\n', 'st.']
233
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
236
Traceback (most recent call last):
237
ValueError: File is closed.
241
line = self.readline()
246
elif len(line) < sizehint:
248
sizehint -= len(line)
250
self._file_base.push_back(line)
254
if __name__ == "__main__":