~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/iterablefile.py

  • Committer: John Arbash Meinel
  • Date: 2006-07-09 15:47:02 UTC
  • mto: This revision was merged to the branch mainline in revision 1846.
  • Revision ID: john@arbash-meinel.com-20060709154702-304436a47a55e265
Renaming LockHelpers.py to lock_helpers.py

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005 Aaron Bentley
 
2
# <aaron.bentley@utoronto.ca>
 
3
#
 
4
#    This program is free software; you can redistribute it and/or modify
 
5
#    it under the terms of the GNU General Public License as published by
 
6
#    the Free Software Foundation; either version 2 of the License, or
 
7
#    (at your option) any later version.
 
8
#
 
9
#    This program is distributed in the hope that it will be useful,
 
10
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
 
11
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
12
#    GNU General Public License for more details.
 
13
#
 
14
#    You should have received a copy of the GNU General Public License
 
15
#    along with this program; if not, write to the Free Software
 
16
#    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
17
 
 
18
class IterableFileBase(object):
 
19
    """Create a file-like object from any iterable"""
 
20
 
 
21
    def __init__(self, iterable):
 
22
        object.__init__(self)
 
23
        self._iter = iterable.__iter__()
 
24
        self._buffer = ""
 
25
        self.done = False
 
26
 
 
27
    def read_n(self, length):
 
28
        """
 
29
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_n(8)
 
30
        'This is '
 
31
        """
 
32
        def test_length(result):
 
33
            if len(result) >= length:
 
34
                return length
 
35
            else:
 
36
                return None
 
37
        return self._read(test_length)
 
38
 
 
39
    def read_to(self, sequence, length=None):
 
40
        """
 
41
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
42
        >>> f.read_to('\\n')
 
43
        'Th\\n'
 
44
        >>> f.read_to('\\n')
 
45
        'is is \\n'
 
46
        """
 
47
        def test_contents(result):
 
48
            if length is not None:
 
49
                if len(result) >= length:
 
50
                    return length
 
51
            try:
 
52
                return result.index(sequence)+len(sequence)
 
53
            except ValueError:
 
54
                return None
 
55
        return self._read(test_contents)
 
56
 
 
57
    def _read(self, result_length):
 
58
        """
 
59
        Read data until result satisfies the condition result_length.
 
60
        result_length is a callable that returns None until the condition
 
61
        is satisfied, and returns the length of the result to use when
 
62
        the condition is satisfied.  (i.e. it returns the length of the
 
63
        subset of the first condition match.)
 
64
        """
 
65
        result = self._buffer
 
66
        while result_length(result) is None:
 
67
            try:
 
68
                result += self._iter.next()
 
69
            except StopIteration:
 
70
                self.done = True
 
71
                self._buffer = ""
 
72
                return result
 
73
        output_length = result_length(result)
 
74
        self._buffer = result[output_length:]
 
75
        return result[:output_length]
 
76
 
 
77
    def read_all(self):
 
78
        """
 
79
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_all()
 
80
        'This is a test.'
 
81
        """
 
82
        def no_stop(result):
 
83
            return None
 
84
        return self._read(no_stop)
 
85
 
 
86
 
 
87
    def push_back(self, contents):
 
88
        """
 
89
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
90
        >>> f.read_to('\\n')
 
91
        'Th\\n'
 
92
        >>> f.push_back("Sh")
 
93
        >>> f.read_all()
 
94
        'Shis is \\na te\\nst.'
 
95
        """
 
96
        self._buffer = contents + self._buffer
 
97
 
 
98
 
 
99
class IterableFile(object):
 
100
    """This class supplies all File methods that can be implemented cheaply."""
 
101
    def __init__(self, iterable):
 
102
        object.__init__(self)
 
103
        self._file_base = IterableFileBase(iterable)
 
104
        self._iter = self._make_iterator()
 
105
        self._closed = False
 
106
        self.softspace = 0
 
107
 
 
108
    def _make_iterator(self):
 
109
        while not self._file_base.done:
 
110
            self._check_closed()
 
111
            result = self._file_base.read_to('\n')
 
112
            if result != '':
 
113
                yield result
 
114
 
 
115
    def _check_closed(self):
 
116
        if self.closed:
 
117
            raise ValueError("File is closed.")
 
118
 
 
119
    def close(self):
 
120
        """
 
121
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
 
122
        >>> f.closed
 
123
        False
 
124
        >>> f.close()
 
125
        >>> f.closed
 
126
        True
 
127
        """
 
128
        self._file_base.done = True
 
129
        self._closed = True
 
130
 
 
131
    closed = property(lambda x: x._closed)
 
132
 
 
133
    def flush(self):
 
134
        """No-op for standard compliance.
 
135
        >>> f = IterableFile([])
 
136
        >>> f.close()
 
137
        >>> f.flush()
 
138
        Traceback (most recent call last):
 
139
        ValueError: File is closed.
 
140
        """
 
141
        self._check_closed()
 
142
 
 
143
    def next(self):
 
144
        """Implementation of the iterator protocol's next()
 
145
 
 
146
        >>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.'])
 
147
        >>> f.next()
 
148
        'This \\n'
 
149
        >>> f.close()
 
150
        >>> f.next()
 
151
        Traceback (most recent call last):
 
152
        ValueError: File is closed.
 
153
        >>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.\\n'])
 
154
        >>> f.next()
 
155
        'This \\n'
 
156
        >>> f.next()
 
157
        'is a test.\\n'
 
158
        >>> f.next()
 
159
        Traceback (most recent call last):
 
160
        StopIteration
 
161
        """
 
162
        self._check_closed()
 
163
        return self._iter.next()
 
164
 
 
165
    def __iter__(self):
 
166
        """
 
167
        >>> list(IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.']))
 
168
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
 
169
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
170
        >>> f.close()
 
171
        >>> list(f)
 
172
        Traceback (most recent call last):
 
173
        ValueError: File is closed.
 
174
        """
 
175
        return self
 
176
 
 
177
    def read(self, length=None):
 
178
        """
 
179
        >>> IterableFile(['This ', 'is ', 'a ', 'test.']).read()
 
180
        'This is a test.'
 
181
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
 
182
        >>> f.read(10)
 
183
        'This is a '
 
184
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
 
185
        >>> f.close()
 
186
        >>> f.read(10)
 
187
        Traceback (most recent call last):
 
188
        ValueError: File is closed.
 
189
        """
 
190
        self._check_closed()
 
191
        if length is None:
 
192
            return self._file_base.read_all()
 
193
        else:
 
194
            return self._file_base.read_n(length)
 
195
 
 
196
    def read_to(self, sequence, size=None):
 
197
        """
 
198
        Read characters until a sequence is found, with optional max size.
 
199
        The specified sequence, if found, will be included in the result
 
200
 
 
201
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
202
        >>> f.read_to('i')
 
203
        'Th\\ni'
 
204
        >>> f.read_to('i')
 
205
        's i'
 
206
        >>> f.close()
 
207
        >>> f.read_to('i')
 
208
        Traceback (most recent call last):
 
209
        ValueError: File is closed.
 
210
        """
 
211
        self._check_closed()
 
212
        return self._file_base.read_to(sequence, size)
 
213
 
 
214
    def readline(self, size=None):
 
215
        """
 
216
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
217
        >>> f.readline()
 
218
        'Th\\n'
 
219
        >>> f.readline(4)
 
220
        'is i'
 
221
        >>> f.close()
 
222
        >>> f.readline()
 
223
        Traceback (most recent call last):
 
224
        ValueError: File is closed.
 
225
        """
 
226
        return self.read_to('\n', size)
 
227
 
 
228
    def readlines(self, sizehint=None):
 
229
        """
 
230
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
231
        >>> f.readlines()
 
232
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
 
233
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
 
234
        >>> f.close()
 
235
        >>> f.readlines()
 
236
        Traceback (most recent call last):
 
237
        ValueError: File is closed.
 
238
        """
 
239
        lines = []
 
240
        while True:
 
241
            line = self.readline()
 
242
            if line == "":
 
243
                return lines
 
244
            if sizehint is None:
 
245
                lines.append(line)
 
246
            elif len(line) < sizehint:
 
247
                lines.append(line)
 
248
                sizehint -= len(line)
 
249
            else:
 
250
                self._file_base.push_back(line)
 
251
                return lines
 
252
 
 
253
        
 
254
if __name__ == "__main__":
 
255
    import doctest
 
256
    doctest.testmod()