~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/iterablefile.py

  • Committer: Aaron Bentley
  • Date: 2008-10-05 00:49:15 UTC
  • mto: (0.15.1 unshelve)
  • mto: This revision was merged to the branch mainline in revision 3820.
  • Revision ID: aaron@aaronbentley.com-20081005004915-po74nfeweenqmjfd
Add pack serialization

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 Aaron Bentley, Canonical Ltd
2
 
# <aaron.bentley@utoronto.ca>
3
 
#
4
 
# This program is free software; you can redistribute it and/or modify
5
 
# it under the terms of the GNU General Public License as published by
6
 
# the Free Software Foundation; either version 2 of the License, or
7
 
# (at your option) any later version.
8
 
#
9
 
# This program is distributed in the hope that it will be useful,
10
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
 
# GNU General Public License for more details.
13
 
#
14
 
# You should have received a copy of the GNU General Public License
15
 
# along with this program; if not, write to the Free Software
16
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17
 
 
18
 
 
19
 
class IterableFileBase(object):
20
 
    """Create a file-like object from any iterable"""
21
 
 
22
 
    def __init__(self, iterable):
23
 
        object.__init__(self)
24
 
        self._iter = iterable.__iter__()
25
 
        self._buffer = ""
26
 
        self.done = False
27
 
 
28
 
    def read_n(self, length):
29
 
        """
30
 
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_n(8)
31
 
        'This is '
32
 
        """
33
 
        def test_length(result):
34
 
            if len(result) >= length:
35
 
                return length
36
 
            else:
37
 
                return None
38
 
        return self._read(test_length)
39
 
 
40
 
    def read_to(self, sequence, length=None):
41
 
        """
42
 
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
43
 
        >>> f.read_to('\\n')
44
 
        'Th\\n'
45
 
        >>> f.read_to('\\n')
46
 
        'is is \\n'
47
 
        """
48
 
        def test_contents(result):
49
 
            if length is not None:
50
 
                if len(result) >= length:
51
 
                    return length
52
 
            try:
53
 
                return result.index(sequence)+len(sequence)
54
 
            except ValueError:
55
 
                return None
56
 
        return self._read(test_contents)
57
 
 
58
 
    def _read(self, result_length):
59
 
        """
60
 
        Read data until result satisfies the condition result_length.
61
 
        result_length is a callable that returns None until the condition
62
 
        is satisfied, and returns the length of the result to use when
63
 
        the condition is satisfied.  (i.e. it returns the length of the
64
 
        subset of the first condition match.)
65
 
        """
66
 
        result = self._buffer
67
 
        while result_length(result) is None:
68
 
            try:
69
 
                result += self._iter.next()
70
 
            except StopIteration:
71
 
                self.done = True
72
 
                self._buffer = ""
73
 
                return result
74
 
        output_length = result_length(result)
75
 
        self._buffer = result[output_length:]
76
 
        return result[:output_length]
77
 
 
78
 
    def read_all(self):
79
 
        """
80
 
        >>> IterableFileBase(['This ', 'is ', 'a ', 'test.']).read_all()
81
 
        'This is a test.'
82
 
        """
83
 
        def no_stop(result):
84
 
            return None
85
 
        return self._read(no_stop)
86
 
 
87
 
 
88
 
    def push_back(self, contents):
89
 
        """
90
 
        >>> f = IterableFileBase(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
91
 
        >>> f.read_to('\\n')
92
 
        'Th\\n'
93
 
        >>> f.push_back("Sh")
94
 
        >>> f.read_all()
95
 
        'Shis is \\na te\\nst.'
96
 
        """
97
 
        self._buffer = contents + self._buffer
98
 
 
99
 
 
100
 
class IterableFile(object):
101
 
    """This class supplies all File methods that can be implemented cheaply."""
102
 
    def __init__(self, iterable):
103
 
        object.__init__(self)
104
 
        self._file_base = IterableFileBase(iterable)
105
 
        self._iter = self._make_iterator()
106
 
        self._closed = False
107
 
        self.softspace = 0
108
 
 
109
 
    def _make_iterator(self):
110
 
        while not self._file_base.done:
111
 
            self._check_closed()
112
 
            result = self._file_base.read_to('\n')
113
 
            if result != '':
114
 
                yield result
115
 
 
116
 
    def _check_closed(self):
117
 
        if self.closed:
118
 
            raise ValueError("File is closed.")
119
 
 
120
 
    def close(self):
121
 
        """
122
 
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
123
 
        >>> f.closed
124
 
        False
125
 
        >>> f.close()
126
 
        >>> f.closed
127
 
        True
128
 
        """
129
 
        self._file_base.done = True
130
 
        self._closed = True
131
 
 
132
 
    closed = property(lambda x: x._closed)
133
 
 
134
 
    def flush(self):
135
 
        """No-op for standard compliance.
136
 
        >>> f = IterableFile([])
137
 
        >>> f.close()
138
 
        >>> f.flush()
139
 
        Traceback (most recent call last):
140
 
        ValueError: File is closed.
141
 
        """
142
 
        self._check_closed()
143
 
 
144
 
    def next(self):
145
 
        """Implementation of the iterator protocol's next()
146
 
 
147
 
        >>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.'])
148
 
        >>> f.next()
149
 
        'This \\n'
150
 
        >>> f.close()
151
 
        >>> f.next()
152
 
        Traceback (most recent call last):
153
 
        ValueError: File is closed.
154
 
        >>> f = IterableFile(['This \\n', 'is ', 'a ', 'test.\\n'])
155
 
        >>> f.next()
156
 
        'This \\n'
157
 
        >>> f.next()
158
 
        'is a test.\\n'
159
 
        >>> f.next()
160
 
        Traceback (most recent call last):
161
 
        StopIteration
162
 
        """
163
 
        self._check_closed()
164
 
        return self._iter.next()
165
 
 
166
 
    def __iter__(self):
167
 
        """
168
 
        >>> list(IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.']))
169
 
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
170
 
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
171
 
        >>> f.close()
172
 
        >>> list(f)
173
 
        Traceback (most recent call last):
174
 
        ValueError: File is closed.
175
 
        """
176
 
        return self
177
 
 
178
 
    def read(self, length=None):
179
 
        """
180
 
        >>> IterableFile(['This ', 'is ', 'a ', 'test.']).read()
181
 
        'This is a test.'
182
 
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
183
 
        >>> f.read(10)
184
 
        'This is a '
185
 
        >>> f = IterableFile(['This ', 'is ', 'a ', 'test.'])
186
 
        >>> f.close()
187
 
        >>> f.read(10)
188
 
        Traceback (most recent call last):
189
 
        ValueError: File is closed.
190
 
        """
191
 
        self._check_closed()
192
 
        if length is None:
193
 
            return self._file_base.read_all()
194
 
        else:
195
 
            return self._file_base.read_n(length)
196
 
 
197
 
    def read_to(self, sequence, size=None):
198
 
        """
199
 
        Read characters until a sequence is found, with optional max size.
200
 
        The specified sequence, if found, will be included in the result
201
 
 
202
 
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
203
 
        >>> f.read_to('i')
204
 
        'Th\\ni'
205
 
        >>> f.read_to('i')
206
 
        's i'
207
 
        >>> f.close()
208
 
        >>> f.read_to('i')
209
 
        Traceback (most recent call last):
210
 
        ValueError: File is closed.
211
 
        """
212
 
        self._check_closed()
213
 
        return self._file_base.read_to(sequence, size)
214
 
 
215
 
    def readline(self, size=None):
216
 
        """
217
 
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
218
 
        >>> f.readline()
219
 
        'Th\\n'
220
 
        >>> f.readline(4)
221
 
        'is i'
222
 
        >>> f.close()
223
 
        >>> f.readline()
224
 
        Traceback (most recent call last):
225
 
        ValueError: File is closed.
226
 
        """
227
 
        return self.read_to('\n', size)
228
 
 
229
 
    def readlines(self, sizehint=None):
230
 
        """
231
 
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
232
 
        >>> f.readlines()
233
 
        ['Th\\n', 'is is \\n', 'a te\\n', 'st.']
234
 
        >>> f = IterableFile(['Th\\nis ', 'is \\n', 'a ', 'te\\nst.'])
235
 
        >>> f.close()
236
 
        >>> f.readlines()
237
 
        Traceback (most recent call last):
238
 
        ValueError: File is closed.
239
 
        """
240
 
        lines = []
241
 
        while True:
242
 
            line = self.readline()
243
 
            if line == "":
244
 
                return lines
245
 
            if sizehint is None:
246
 
                lines.append(line)
247
 
            elif len(line) < sizehint:
248
 
                lines.append(line)
249
 
                sizehint -= len(line)
250
 
            else:
251
 
                self._file_base.push_back(line)
252
 
                return lines
253
 
 
254
 
        
255
 
if __name__ == "__main__":
256
 
    import doctest
257
 
    doctest.testmod()