~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/iterablefile.py

  • Committer: Martin Pool
  • Date: 2005-04-28 07:24:55 UTC
  • Revision ID: mbp@sourcefrog.net-20050428072453-7b99afa993a1e549
todo

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 Aaron Bentley, Canonical Ltd
2
 
# <aaron.bentley@utoronto.ca>
3
 
#
4
 
# This program is free software; you can redistribute it and/or modify
5
 
# it under the terms of the GNU General Public License as published by
6
 
# the Free Software Foundation; either version 2 of the License, or
7
 
# (at your option) any later version.
8
 
#
9
 
# This program is distributed in the hope that it will be useful,
10
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
 
# GNU General Public License for more details.
13
 
#
14
 
# You should have received a copy of the GNU General Public License
15
 
# along with this program; if not, write to the Free Software
16
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
 
 
18
 
from __future__ import absolute_import
19
 
 
20
 
class IterableFileBase(object):
21
 
    """Create a file-like object from any iterable"""
22
 
 
23
 
    def __init__(self, iterable):
24
 
        object.__init__(self)
25
 
        self._iter = iterable.__iter__()
26
 
        self._buffer = ""
27
 
        self.done = False
28
 
 
29
 
    def read_n(self, length):
30
 
        """
31
 
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_n(8)
32
 
        'This is '
33
 
        """
34
 
        def test_length(result):
35
 
            if len(result) >= length:
36
 
                return length
37
 
            else:
38
 
                return None
39
 
        return self._read(test_length)
40
 
 
41
 
    def read_to(self, sequence, length=None):
42
 
        """
43
 
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
44
 
        >>> f.read_to('\\n')
45
 
        'Th\\n'
46
 
        >>> f.read_to('\\n')
47
 
        'is is \\n'
48
 
        """
49
 
        def test_contents(result):
50
 
            if length is not None:
51
 
                if len(result) >= length:
52
 
                    return length
53
 
            try:
54
 
                return result.index(sequence)+len(sequence)
55
 
            except ValueError:
56
 
                return None
57
 
        return self._read(test_contents)
58
 
 
59
 
    def _read(self, result_length):
60
 
        """
61
 
        Read data until result satisfies the condition result_length.
62
 
        result_length is a callable that returns None until the condition
63
 
        is satisfied, and returns the length of the result to use when
64
 
        the condition is satisfied.  (i.e. it returns the length of the
65
 
        subset of the first condition match.)
66
 
        """
67
 
        result = self._buffer
68
 
        while result_length(result) is None:
69
 
            try:
70
 
                result += self._iter.next()
71
 
            except StopIteration:
72
 
                self.done = True
73
 
                self._buffer = ""
74
 
                return result
75
 
        output_length = result_length(result)
76
 
        self._buffer = result[output_length:]
77
 
        return result[:output_length]
78
 
 
79
 
    def read_all(self):
80
 
        """
81
 
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_all()
82
 
        'This is a test.'
83
 
        """
84
 
        def no_stop(result):
85
 
            return None
86
 
        return self._read(no_stop)
87
 
 
88
 
 
89
 
    def push_back(self, contents):
90
 
        """
91
 
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
92
 
        >>> f.read_to('\\n')
93
 
        'Th\\n'
94
 
        >>> f.push_back("Sh")
95
 
        >>> f.read_all()
96
 
        'Shis is \\na te\\nst.'
97
 
        """
98
 
        self._buffer = contents + self._buffer
99
 
 
100
 
 
101
 
class IterableFile(object):
102
 
    """This class supplies all File methods that can be implemented cheaply."""
103
 
    def __init__(self, iterable):
104
 
        object.__init__(self)
105
 
        self._file_base = IterableFileBase(iterable)
106
 
        self._iter = self._make_iterator()
107
 
        self._closed = False
108
 
        self.softspace = 0
109
 
 
110
 
    def _make_iterator(self):
111
 
        while not self._file_base.done:
112
 
            self._check_closed()
113
 
            result = self._file_base.read_to('\n')
114
 
            if result != '':
115
 
                yield result
116
 
 
117
 
    def _check_closed(self):
118
 
        if self.closed:
119
 
            raise ValueError("File is closed.")
120
 
 
121
 
    def close(self):
122
 
        """
123
 
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
124
 
        >>> f.closed
125
 
        False
126
 
        >>> f.close()
127
 
        >>> f.closed
128
 
        True
129
 
        """
130
 
        self._file_base.done = True
131
 
        self._closed = True
132
 
 
133
 
    closed = property(lambda x: x._closed)
134
 
 
135
 
    def flush(self):
136
 
        """No-op for standard compliance.
137
 
        >>> f = IterableFile([])
138
 
        >>> f.close()
139
 
        >>> f.flush()
140
 
        Traceback (most recent call last):
141
 
        ValueError: File is closed.
142
 
        """
143
 
        self._check_closed()
144
 
 
145
 
    def next(self):
146
 
        """Implementation of the iterator protocol's next()
147
 
 
148
 
        >>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.'])
149
 
        >>> f.next()
150
 
        'This \\n'
151
 
        >>> f.close()
152
 
        >>> f.next()
153
 
        Traceback (most recent call last):
154
 
        ValueError: File is closed.
155
 
        >>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.\\n'])
156
 
        >>> f.next()
157
 
        'This \\n'
158
 
        >>> f.next()
159
 
        'is a test.\\n'
160
 
        >>> f.next()
161
 
        Traceback (most recent call last):
162
 
        StopIteration
163
 
        """
164
 
        self._check_closed()
165
 
        return self._iter.next()
166
 
 
167
 
    def __iter__(self):
168
 
        """
169
 
        >>> list(IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.']))
170
 
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
171
 
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
172
 
        >>> f.close()
173
 
        >>> list(f)
174
 
        Traceback (most recent call last):
175
 
        ValueError: File is closed.
176
 
        """
177
 
        return self
178
 
 
179
 
    def read(self, length=None):
180
 
        """
181
 
        >>> IterableFile(['This ', 'is ', 'a ', 'test.']).read()
182
 
        'This is a test.'
183
 
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
184
 
        >>> f.read(10)
185
 
        'This is a '
186
 
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
187
 
        >>> f.close()
188
 
        >>> f.read(10)
189
 
        Traceback (most recent call last):
190
 
        ValueError: File is closed.
191
 
        """
192
 
        self._check_closed()
193
 
        if length is None:
194
 
            return self._file_base.read_all()
195
 
        else:
196
 
            return self._file_base.read_n(length)
197
 
 
198
 
    def read_to(self, sequence, size=None):
199
 
        """
200
 
        Read characters until a sequence is found, with optional max size.
201
 
        The specified sequence, if found, will be included in the result
202
 
 
203
 
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
204
 
        >>> f.read_to('i')
205
 
        'Th\\ni'
206
 
        >>> f.read_to('i')
207
 
        's i'
208
 
        >>> f.close()
209
 
        >>> f.read_to('i')
210
 
        Traceback (most recent call last):
211
 
        ValueError: File is closed.
212
 
        """
213
 
        self._check_closed()
214
 
        return self._file_base.read_to(sequence, size)
215
 
 
216
 
    def readline(self, size=None):
217
 
        """
218
 
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
219
 
        >>> f.readline()
220
 
        'Th\\n'
221
 
        >>> f.readline(4)
222
 
        'is i'
223
 
        >>> f.close()
224
 
        >>> f.readline()
225
 
        Traceback (most recent call last):
226
 
        ValueError: File is closed.
227
 
        """
228
 
        return self.read_to('\n', size)
229
 
 
230
 
    def readlines(self, sizehint=None):
231
 
        """
232
 
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
233
 
        >>> f.readlines()
234
 
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
235
 
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
236
 
        >>> f.close()
237
 
        >>> f.readlines()
238
 
        Traceback (most recent call last):
239
 
        ValueError: File is closed.
240
 
        """
241
 
        lines = []
242
 
        while True:
243
 
            line = self.readline()
244
 
            if line == "":
245
 
                return lines
246
 
            if sizehint is None:
247
 
                lines.append(line)
248
 
            elif len(line) < sizehint:
249
 
                lines.append(line)
250
 
                sizehint -= len(line)
251
 
            else:
252
 
                self._file_base.push_back(line)
253
 
                return lines
254
 
 
255
 
 
256
 
if __name__ == "__main__":
257
 
    import doctest
258
 
    doctest.testmod()