~bzr-pqm/bzr/bzr.dev

1185.67.6 by Aaron Bentley
Added tests and fixes for LockableFiles.put_utf8(); imported IterableFile
1
# Copyright (C) 2005 Aaron Bentley
2
# <aaron.bentley@utoronto.ca>
3
#
4
#    This program is free software; you can redistribute it and/or modify
5
#    it under the terms of the GNU General Public License as published by
6
#    the Free Software Foundation; either version 2 of the License, or
7
#    (at your option) any later version.
8
#
9
#    This program is distributed in the hope that it will be useful,
10
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
11
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
#    GNU General Public License for more details.
13
#
14
#    You should have received a copy of the GNU General Public License
15
#    along with this program; if not, write to the Free Software
16
#    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17
18
class IterableFileBase(object):
19
    """Create a file-like object from any iterable"""
1185.82.123 by Aaron Bentley
Cleanups to prepare for review
20
1185.67.6 by Aaron Bentley
Added tests and fixes for LockableFiles.put_utf8(); imported IterableFile
21
    def __init__(self, iterable):
22
        object.__init__(self)
23
        self._iter = iterable.__iter__()
24
        self._buffer = ""
25
        self.done = False
26
27
    def read_n(self, length):
28
        """
29
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_n(8)
30
        'This is '
31
        """
32
        def test_length(result):
33
            if len(result) >= length:
34
                return length
35
            else:
36
                return None
37
        return self._read(test_length)
38
39
    def read_to(self, sequence, length=None):
40
        """
41
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
42
        >>> f.read_to('\\n')
43
        'Th\\n'
44
        >>> f.read_to('\\n')
45
        'is is \\n'
46
        """
47
        def test_contents(result):
48
            if length is not None:
49
                if len(result) >= length:
50
                    return length
51
            try:
52
                return result.index(sequence)+len(sequence)
53
            except ValueError:
54
                return None
55
        return self._read(test_contents)
56
57
    def _read(self, result_length):
58
        """
59
        Read data until result satisfies the condition result_length.
60
        result_length is a callable that returns None until the condition
61
        is satisfied, and returns the length of the result to use when
62
        the condition is satisfied.  (i.e. it returns the length of the
63
        subset of the first condition match.)
64
        """
65
        result = self._buffer
66
        while result_length(result) is None:
67
            try:
68
                result += self._iter.next()
69
            except StopIteration:
70
                self.done = True
71
                self._buffer = ""
72
                return result
73
        output_length = result_length(result)
74
        self._buffer = result[output_length:]
75
        return result[:output_length]
76
77
    def read_all(self):
78
        """
79
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_all()
80
        'This is a test.'
81
        """
82
        def no_stop(result):
83
            return None
84
        return self._read(no_stop)
85
86
87
    def push_back(self, contents):
88
        """
89
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
90
        >>> f.read_to('\\n')
91
        'Th\\n'
92
        >>> f.push_back("Sh")
93
        >>> f.read_all()
94
        'Shis is \\na te\\nst.'
95
        """
96
        self._buffer = contents + self._buffer
97
98
99
class IterableFile(object):
100
    """This class supplies all File methods that can be implemented cheaply."""
101
    def __init__(self, iterable):
102
        object.__init__(self)
103
        self._file_base = IterableFileBase(iterable)
104
        self._iter = self._make_iterator()
105
        self._closed = False
106
        self.softspace = 0
107
108
    def _make_iterator(self):
109
        while not self._file_base.done:
110
            self._check_closed()
111
            result = self._file_base.read_to('\n')
112
            if result != '':
113
                yield result
114
115
    def _check_closed(self):
116
        if self.closed:
117
            raise ValueError("File is closed.")
118
119
    def close(self):
120
        """
121
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
122
        >>> f.closed
123
        False
124
        >>> f.close()
125
        >>> f.closed
126
        True
127
        """
128
        self._file_base.done = True
129
        self._closed = True
130
131
    closed = property(lambda x: x._closed)
132
133
    def flush(self):
134
        """No-op for standard compliance.
135
        >>> f = IterableFile([])
136
        >>> f.close()
137
        >>> f.flush()
138
        Traceback (most recent call last):
139
        ValueError: File is closed.
140
        """
141
        self._check_closed()
142
143
    def next(self):
144
        """Implementation of the iterator protocol's next()
145
146
        >>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.'])
147
        >>> f.next()
148
        'This \\n'
149
        >>> f.close()
150
        >>> f.next()
151
        Traceback (most recent call last):
152
        ValueError: File is closed.
153
        >>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.\\n'])
154
        >>> f.next()
155
        'This \\n'
156
        >>> f.next()
157
        'is a test.\\n'
158
        >>> f.next()
159
        Traceback (most recent call last):
160
        StopIteration
161
        """
162
        self._check_closed()
163
        return self._iter.next()
164
165
    def __iter__(self):
166
        """
167
        >>> list(IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.']))
168
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
169
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
170
        >>> f.close()
171
        >>> list(f)
172
        Traceback (most recent call last):
173
        ValueError: File is closed.
174
        """
175
        return self
176
177
    def read(self, length=None):
178
        """
179
        >>> IterableFile(['This ', 'is ', 'a ', 'test.']).read()
180
        'This is a test.'
181
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
182
        >>> f.read(10)
183
        'This is a '
184
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
185
        >>> f.close()
186
        >>> f.read(10)
187
        Traceback (most recent call last):
188
        ValueError: File is closed.
189
        """
190
        self._check_closed()
191
        if length is None:
192
            return self._file_base.read_all()
193
        else:
194
            return self._file_base.read_n(length)
195
196
    def read_to(self, sequence, size=None):
197
        """
198
        Read characters until a sequence is found, with optional max size.
199
        The specified sequence, if found, will be included in the result
200
201
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
202
        >>> f.read_to('i')
203
        'Th\\ni'
204
        >>> f.read_to('i')
205
        's i'
206
        >>> f.close()
207
        >>> f.read_to('i')
208
        Traceback (most recent call last):
209
        ValueError: File is closed.
210
        """
211
        self._check_closed()
212
        return self._file_base.read_to(sequence, size)
213
214
    def readline(self, size=None):
215
        """
216
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
217
        >>> f.readline()
218
        'Th\\n'
219
        >>> f.readline(4)
220
        'is i'
221
        >>> f.close()
222
        >>> f.readline()
223
        Traceback (most recent call last):
224
        ValueError: File is closed.
225
        """
226
        return self.read_to('\n', size)
227
228
    def readlines(self, sizehint=None):
229
        """
230
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
231
        >>> f.readlines()
232
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
233
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
234
        >>> f.close()
235
        >>> f.readlines()
236
        Traceback (most recent call last):
237
        ValueError: File is closed.
238
        """
239
        lines = []
240
        while True:
241
            line = self.readline()
242
            if line == "":
243
                return lines
244
            if sizehint is None:
245
                lines.append(line)
246
            elif len(line) < sizehint:
247
                lines.append(line)
248
                sizehint -= len(line)
249
            else:
250
                self._file_base.push_back(line)
251
                return lines
252
253
        
254
if __name__ == "__main__":
1551.6.38 by Aaron Bentley
Move doctest import to increase speed
255
    import doctest
1185.67.6 by Aaron Bentley
Added tests and fixes for LockableFiles.put_utf8(); imported IterableFile
256
    doctest.testmod()