~bzr-pqm/bzr/bzr.dev

2052.3.3 by John Arbash Meinel
Add (c) Canonical to files that Aaron has approved
1
# Copyright (C) 2005 Aaron Bentley, Canonical Ltd
1185.67.6 by Aaron Bentley
Added tests and fixes for LockableFiles.put_utf8(); imported IterableFile
2
# <aaron.bentley@utoronto.ca>
3
#
2052.3.1 by John Arbash Meinel
Add tests to cleanup the copyright of all source files
4
# This program is free software; you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License as published by
6
# the Free Software Foundation; either version 2 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU General Public License for more details.
13
#
14
# You should have received a copy of the GNU General Public License
15
# along with this program; if not, write to the Free Software
4183.7.1 by Sabin Iacob
update FSF mailing address
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
1185.67.6 by Aaron Bentley
Added tests and fixes for LockableFiles.put_utf8(); imported IterableFile
17
2052.3.3 by John Arbash Meinel
Add (c) Canonical to files that Aaron has approved
18
1185.67.6 by Aaron Bentley
Added tests and fixes for LockableFiles.put_utf8(); imported IterableFile
19
class IterableFileBase(object):
20
    """Create a file-like object from any iterable"""
1185.82.123 by Aaron Bentley
Cleanups to prepare for review
21
1185.67.6 by Aaron Bentley
Added tests and fixes for LockableFiles.put_utf8(); imported IterableFile
22
    def __init__(self, iterable):
23
        object.__init__(self)
24
        self._iter = iterable.__iter__()
25
        self._buffer = ""
26
        self.done = False
27
28
    def read_n(self, length):
29
        """
30
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_n(8)
31
        'This is '
32
        """
33
        def test_length(result):
34
            if len(result) >= length:
35
                return length
36
            else:
37
                return None
38
        return self._read(test_length)
39
40
    def read_to(self, sequence, length=None):
41
        """
42
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
43
        >>> f.read_to('\\n')
44
        'Th\\n'
45
        >>> f.read_to('\\n')
46
        'is is \\n'
47
        """
48
        def test_contents(result):
49
            if length is not None:
50
                if len(result) >= length:
51
                    return length
52
            try:
53
                return result.index(sequence)+len(sequence)
54
            except ValueError:
55
                return None
56
        return self._read(test_contents)
57
58
    def _read(self, result_length):
59
        """
60
        Read data until result satisfies the condition result_length.
61
        result_length is a callable that returns None until the condition
62
        is satisfied, and returns the length of the result to use when
63
        the condition is satisfied.  (i.e. it returns the length of the
64
        subset of the first condition match.)
65
        """
66
        result = self._buffer
67
        while result_length(result) is None:
68
            try:
69
                result += self._iter.next()
70
            except StopIteration:
71
                self.done = True
72
                self._buffer = ""
73
                return result
74
        output_length = result_length(result)
75
        self._buffer = result[output_length:]
76
        return result[:output_length]
77
78
    def read_all(self):
79
        """
80
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_all()
81
        'This is a test.'
82
        """
83
        def no_stop(result):
84
            return None
85
        return self._read(no_stop)
86
87
88
    def push_back(self, contents):
89
        """
90
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
91
        >>> f.read_to('\\n')
92
        'Th\\n'
93
        >>> f.push_back("Sh")
94
        >>> f.read_all()
95
        'Shis is \\na te\\nst.'
96
        """
97
        self._buffer = contents + self._buffer
98
99
100
class IterableFile(object):
101
    """This class supplies all File methods that can be implemented cheaply."""
102
    def __init__(self, iterable):
103
        object.__init__(self)
104
        self._file_base = IterableFileBase(iterable)
105
        self._iter = self._make_iterator()
106
        self._closed = False
107
        self.softspace = 0
108
109
    def _make_iterator(self):
110
        while not self._file_base.done:
111
            self._check_closed()
112
            result = self._file_base.read_to('\n')
113
            if result != '':
114
                yield result
115
116
    def _check_closed(self):
117
        if self.closed:
118
            raise ValueError("File is closed.")
119
120
    def close(self):
121
        """
122
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
123
        >>> f.closed
124
        False
125
        >>> f.close()
126
        >>> f.closed
127
        True
128
        """
129
        self._file_base.done = True
130
        self._closed = True
131
132
    closed = property(lambda x: x._closed)
133
134
    def flush(self):
135
        """No-op for standard compliance.
136
        >>> f = IterableFile([])
137
        >>> f.close()
138
        >>> f.flush()
139
        Traceback (most recent call last):
140
        ValueError: File is closed.
141
        """
142
        self._check_closed()
143
144
    def next(self):
145
        """Implementation of the iterator protocol's next()
146
147
        >>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.'])
148
        >>> f.next()
149
        'This \\n'
150
        >>> f.close()
151
        >>> f.next()
152
        Traceback (most recent call last):
153
        ValueError: File is closed.
154
        >>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.\\n'])
155
        >>> f.next()
156
        'This \\n'
157
        >>> f.next()
158
        'is a test.\\n'
159
        >>> f.next()
160
        Traceback (most recent call last):
161
        StopIteration
162
        """
163
        self._check_closed()
164
        return self._iter.next()
165
166
    def __iter__(self):
167
        """
168
        >>> list(IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.']))
169
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
170
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
171
        >>> f.close()
172
        >>> list(f)
173
        Traceback (most recent call last):
174
        ValueError: File is closed.
175
        """
176
        return self
177
178
    def read(self, length=None):
179
        """
180
        >>> IterableFile(['This ', 'is ', 'a ', 'test.']).read()
181
        'This is a test.'
182
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
183
        >>> f.read(10)
184
        'This is a '
185
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
186
        >>> f.close()
187
        >>> f.read(10)
188
        Traceback (most recent call last):
189
        ValueError: File is closed.
190
        """
191
        self._check_closed()
192
        if length is None:
193
            return self._file_base.read_all()
194
        else:
195
            return self._file_base.read_n(length)
196
197
    def read_to(self, sequence, size=None):
198
        """
199
        Read characters until a sequence is found, with optional max size.
200
        The specified sequence, if found, will be included in the result
201
202
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
203
        >>> f.read_to('i')
204
        'Th\\ni'
205
        >>> f.read_to('i')
206
        's i'
207
        >>> f.close()
208
        >>> f.read_to('i')
209
        Traceback (most recent call last):
210
        ValueError: File is closed.
211
        """
212
        self._check_closed()
213
        return self._file_base.read_to(sequence, size)
214
215
    def readline(self, size=None):
216
        """
217
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
218
        >>> f.readline()
219
        'Th\\n'
220
        >>> f.readline(4)
221
        'is i'
222
        >>> f.close()
223
        >>> f.readline()
224
        Traceback (most recent call last):
225
        ValueError: File is closed.
226
        """
227
        return self.read_to('\n', size)
228
229
    def readlines(self, sizehint=None):
230
        """
231
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
232
        >>> f.readlines()
233
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
234
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
235
        >>> f.close()
236
        >>> f.readlines()
237
        Traceback (most recent call last):
238
        ValueError: File is closed.
239
        """
240
        lines = []
241
        while True:
242
            line = self.readline()
243
            if line == "":
244
                return lines
245
            if sizehint is None:
246
                lines.append(line)
247
            elif len(line) < sizehint:
248
                lines.append(line)
249
                sizehint -= len(line)
250
            else:
251
                self._file_base.push_back(line)
252
                return lines
253
3943.8.1 by Marius Kruger
remove all trailing whitespace from bzr source
254
1185.67.6 by Aaron Bentley
Added tests and fixes for LockableFiles.put_utf8(); imported IterableFile
255
if __name__ == "__main__":
1551.6.38 by Aaron Bentley
Move doctest import to increase speed
256
    import doctest
1185.67.6 by Aaron Bentley
Added tests and fixes for LockableFiles.put_utf8(); imported IterableFile
257
    doctest.testmod()