1
# Copyright (C) 2005 Aaron Bentley, Canonical Ltd
2
# <aaron.bentley@utoronto.ca>
4
# This program is free software; you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License as published by
6
# the Free Software Foundation; either version 2 of the License, or
7
# (at your option) any later version.
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
# GNU General Public License for more details.
14
# You should have received a copy of the GNU General Public License
15
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18
from __future__ import absolute_import
20
class IterableFileBase(object):
21
"""Create a file-like object from any iterable"""
23
def __init__(self, iterable):
25
self._iter = iterable.__iter__()
29
def read_n(self, length):
31
>>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_n(8)
34
def test_length(result):
35
if len(result) >= length:
39
return self._read(test_length)
41
def read_to(self, sequence, length=None):
43
>>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
49
def test_contents(result):
50
if length is not None:
51
if len(result) >= length:
54
return result.index(sequence)+len(sequence)
57
return self._read(test_contents)
59
def _read(self, result_length):
61
Read data until result satisfies the condition result_length.
62
result_length is a callable that returns None until the condition
63
is satisfied, and returns the length of the result to use when
64
the condition is satisfied. (i.e. it returns the length of the
65
subset of the first condition match.)
68
while result_length(result) is None:
70
result += self._iter.next()
75
output_length = result_length(result)
76
self._buffer = result[output_length:]
77
return result[:output_length]
81
>>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_all()
86
return self._read(no_stop)
89
def push_back(self, contents):
91
>>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
96
'Shis is \\na te\\nst.'
98
self._buffer = contents + self._buffer
101
class IterableFile(object):
102
"""This class supplies all File methods that can be implemented cheaply."""
103
def __init__(self, iterable):
104
object.__init__(self)
105
self._file_base = IterableFileBase(iterable)
106
self._iter = self._make_iterator()
110
def _make_iterator(self):
111
while not self._file_base.done:
113
result = self._file_base.read_to('\n')
117
def _check_closed(self):
119
raise ValueError("File is closed.")
123
>>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
130
self._file_base.done = True
133
closed = property(lambda x: x._closed)
136
"""No-op for standard compliance.
137
>>> f = IterableFile([])
140
Traceback (most recent call last):
141
ValueError: File is closed.
146
"""Implementation of the iterator protocol's next()
148
>>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.'])
153
Traceback (most recent call last):
154
ValueError: File is closed.
155
>>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.\\n'])
161
Traceback (most recent call last):
165
return self._iter.next()
169
>>> list(IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.']))
170
['Th\\n', 'is is \\n', 'a te\\n', 'st.']
171
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
174
Traceback (most recent call last):
175
ValueError: File is closed.
179
def read(self, length=None):
181
>>> IterableFile(['This ', 'is ', 'a ', 'test.']).read()
183
>>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
186
>>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
189
Traceback (most recent call last):
190
ValueError: File is closed.
194
return self._file_base.read_all()
196
return self._file_base.read_n(length)
198
def read_to(self, sequence, size=None):
200
Read characters until a sequence is found, with optional max size.
201
The specified sequence, if found, will be included in the result
203
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
210
Traceback (most recent call last):
211
ValueError: File is closed.
214
return self._file_base.read_to(sequence, size)
216
def readline(self, size=None):
218
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
225
Traceback (most recent call last):
226
ValueError: File is closed.
228
return self.read_to('\n', size)
230
def readlines(self, sizehint=None):
232
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
234
['Th\\n', 'is is \\n', 'a te\\n', 'st.']
235
>>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
238
Traceback (most recent call last):
239
ValueError: File is closed.
243
line = self.readline()
248
elif len(line) < sizehint:
250
sizehint -= len(line)
252
self._file_base.push_back(line)
256
if __name__ == "__main__":