~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/iterablefile.py

  • Committer: Robert Collins
  • Date: 2005-10-02 22:47:02 UTC
  • mto: This revision was merged to the branch mainline in revision 1397.
  • Revision ID: robertc@robertcollins.net-20051002224701-8a8b20b90de559a6
support ghosts in commits

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 Aaron Bentley
2
 
# <aaron.bentley@utoronto.ca>
3
 
#
4
 
#    This program is free software; you can redistribute it and/or modify
5
 
#    it under the terms of the GNU General Public License as published by
6
 
#    the Free Software Foundation; either version 2 of the License, or
7
 
#    (at your option) any later version.
8
 
#
9
 
#    This program is distributed in the hope that it will be useful,
10
 
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
11
 
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
 
#    GNU General Public License for more details.
13
 
#
14
 
#    You should have received a copy of the GNU General Public License
15
 
#    along with this program; if not, write to the Free Software
16
 
#    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17
 
 
18
 
class IterableFileBase(object):
19
 
    """Create a file-like object from any iterable"""
20
 
 
21
 
    def __init__(self, iterable):
22
 
        object.__init__(self)
23
 
        self._iter = iterable.__iter__()
24
 
        self._buffer = ""
25
 
        self.done = False
26
 
 
27
 
    def read_n(self, length):
28
 
        """
29
 
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_n(8)
30
 
        'This is '
31
 
        """
32
 
        def test_length(result):
33
 
            if len(result) >= length:
34
 
                return length
35
 
            else:
36
 
                return None
37
 
        return self._read(test_length)
38
 
 
39
 
    def read_to(self, sequence, length=None):
40
 
        """
41
 
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
42
 
        >>> f.read_to('\\n')
43
 
        'Th\\n'
44
 
        >>> f.read_to('\\n')
45
 
        'is is \\n'
46
 
        """
47
 
        def test_contents(result):
48
 
            if length is not None:
49
 
                if len(result) >= length:
50
 
                    return length
51
 
            try:
52
 
                return result.index(sequence)+len(sequence)
53
 
            except ValueError:
54
 
                return None
55
 
        return self._read(test_contents)
56
 
 
57
 
    def _read(self, result_length):
58
 
        """
59
 
        Read data until result satisfies the condition result_length.
60
 
        result_length is a callable that returns None until the condition
61
 
        is satisfied, and returns the length of the result to use when
62
 
        the condition is satisfied.  (i.e. it returns the length of the
63
 
        subset of the first condition match.)
64
 
        """
65
 
        result = self._buffer
66
 
        while result_length(result) is None:
67
 
            try:
68
 
                result += self._iter.next()
69
 
            except StopIteration:
70
 
                self.done = True
71
 
                self._buffer = ""
72
 
                return result
73
 
        output_length = result_length(result)
74
 
        self._buffer = result[output_length:]
75
 
        return result[:output_length]
76
 
 
77
 
    def read_all(self):
78
 
        """
79
 
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_all()
80
 
        'This is a test.'
81
 
        """
82
 
        def no_stop(result):
83
 
            return None
84
 
        return self._read(no_stop)
85
 
 
86
 
 
87
 
    def push_back(self, contents):
88
 
        """
89
 
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
90
 
        >>> f.read_to('\\n')
91
 
        'Th\\n'
92
 
        >>> f.push_back("Sh")
93
 
        >>> f.read_all()
94
 
        'Shis is \\na te\\nst.'
95
 
        """
96
 
        self._buffer = contents + self._buffer
97
 
 
98
 
 
99
 
class IterableFile(object):
100
 
    """This class supplies all File methods that can be implemented cheaply."""
101
 
    def __init__(self, iterable):
102
 
        object.__init__(self)
103
 
        self._file_base = IterableFileBase(iterable)
104
 
        self._iter = self._make_iterator()
105
 
        self._closed = False
106
 
        self.softspace = 0
107
 
 
108
 
    def _make_iterator(self):
109
 
        while not self._file_base.done:
110
 
            self._check_closed()
111
 
            result = self._file_base.read_to('\n')
112
 
            if result != '':
113
 
                yield result
114
 
 
115
 
    def _check_closed(self):
116
 
        if self.closed:
117
 
            raise ValueError("File is closed.")
118
 
 
119
 
    def close(self):
120
 
        """
121
 
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
122
 
        >>> f.closed
123
 
        False
124
 
        >>> f.close()
125
 
        >>> f.closed
126
 
        True
127
 
        """
128
 
        self._file_base.done = True
129
 
        self._closed = True
130
 
 
131
 
    closed = property(lambda x: x._closed)
132
 
 
133
 
    def flush(self):
134
 
        """No-op for standard compliance.
135
 
        >>> f = IterableFile([])
136
 
        >>> f.close()
137
 
        >>> f.flush()
138
 
        Traceback (most recent call last):
139
 
        ValueError: File is closed.
140
 
        """
141
 
        self._check_closed()
142
 
 
143
 
    def next(self):
144
 
        """Implementation of the iterator protocol's next()
145
 
 
146
 
        >>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.'])
147
 
        >>> f.next()
148
 
        'This \\n'
149
 
        >>> f.close()
150
 
        >>> f.next()
151
 
        Traceback (most recent call last):
152
 
        ValueError: File is closed.
153
 
        >>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.\\n'])
154
 
        >>> f.next()
155
 
        'This \\n'
156
 
        >>> f.next()
157
 
        'is a test.\\n'
158
 
        >>> f.next()
159
 
        Traceback (most recent call last):
160
 
        StopIteration
161
 
        """
162
 
        self._check_closed()
163
 
        return self._iter.next()
164
 
 
165
 
    def __iter__(self):
166
 
        """
167
 
        >>> list(IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.']))
168
 
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
169
 
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
170
 
        >>> f.close()
171
 
        >>> list(f)
172
 
        Traceback (most recent call last):
173
 
        ValueError: File is closed.
174
 
        """
175
 
        return self
176
 
 
177
 
    def read(self, length=None):
178
 
        """
179
 
        >>> IterableFile(['This ', 'is ', 'a ', 'test.']).read()
180
 
        'This is a test.'
181
 
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
182
 
        >>> f.read(10)
183
 
        'This is a '
184
 
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
185
 
        >>> f.close()
186
 
        >>> f.read(10)
187
 
        Traceback (most recent call last):
188
 
        ValueError: File is closed.
189
 
        """
190
 
        self._check_closed()
191
 
        if length is None:
192
 
            return self._file_base.read_all()
193
 
        else:
194
 
            return self._file_base.read_n(length)
195
 
 
196
 
    def read_to(self, sequence, size=None):
197
 
        """
198
 
        Read characters until a sequence is found, with optional max size.
199
 
        The specified sequence, if found, will be included in the result
200
 
 
201
 
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
202
 
        >>> f.read_to('i')
203
 
        'Th\\ni'
204
 
        >>> f.read_to('i')
205
 
        's i'
206
 
        >>> f.close()
207
 
        >>> f.read_to('i')
208
 
        Traceback (most recent call last):
209
 
        ValueError: File is closed.
210
 
        """
211
 
        self._check_closed()
212
 
        return self._file_base.read_to(sequence, size)
213
 
 
214
 
    def readline(self, size=None):
215
 
        """
216
 
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
217
 
        >>> f.readline()
218
 
        'Th\\n'
219
 
        >>> f.readline(4)
220
 
        'is i'
221
 
        >>> f.close()
222
 
        >>> f.readline()
223
 
        Traceback (most recent call last):
224
 
        ValueError: File is closed.
225
 
        """
226
 
        return self.read_to('\n', size)
227
 
 
228
 
    def readlines(self, sizehint=None):
229
 
        """
230
 
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
231
 
        >>> f.readlines()
232
 
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
233
 
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
234
 
        >>> f.close()
235
 
        >>> f.readlines()
236
 
        Traceback (most recent call last):
237
 
        ValueError: File is closed.
238
 
        """
239
 
        lines = []
240
 
        while True:
241
 
            line = self.readline()
242
 
            if line == "":
243
 
                return lines
244
 
            if sizehint is None:
245
 
                lines.append(line)
246
 
            elif len(line) < sizehint:
247
 
                lines.append(line)
248
 
                sizehint -= len(line)
249
 
            else:
250
 
                self._file_base.push_back(line)
251
 
                return lines
252
 
 
253
 
        
254
 
if __name__ == "__main__":
255
 
    import doctest
256
 
    doctest.testmod()