1
# Copyright (C) 2007, 2009, 2010 Canonical Ltd
1
# Copyright (C) 2007 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
17
"""Container format for Bazaar data.
50
48
def _check_name_encoding(name):
51
49
"""Check that 'name' is valid UTF-8.
53
51
This is separate from _check_name because UTF-8 decoding is relatively
54
52
expensive, and we usually want to avoid it.
64
62
class ContainerSerialiser(object):
65
63
"""A helper class for serialising containers.
67
65
It simply returns bytes from method calls to 'begin', 'end' and
68
66
'bytes_record'. You may find ContainerWriter to be a more convenient
77
75
"""Return the bytes to finish a container."""
80
def bytes_header(self, length, names):
81
"""Return the header for a Bytes record."""
78
def bytes_record(self, bytes, names):
79
"""Return the bytes for a Bytes record with the given name and
83
83
byte_sections = ["B"]
85
byte_sections.append(str(length) + "\n")
85
byte_sections.append(str(len(bytes)) + "\n")
87
87
for name_tuple in names:
88
88
# Make sure we're writing valid names. Note that we will leave a
92
92
byte_sections.append('\x00'.join(name_tuple) + "\n")
94
94
byte_sections.append("\n")
95
# Finally, the contents.
96
byte_sections.append(bytes)
97
# XXX: This causes a memory copy of bytes in size, but is usually
98
# faster than two write calls (12 vs 13 seconds to output a gig of
99
# 1k records.) - results may differ on significantly larger records
100
# like .iso's but as they should be rare in any case and thus not
101
# likely to be the common case. The biggest issue is causing extreme
102
# memory pressure in that case. One possibly improvement here is to
103
# check the size of the content before deciding to join here vs call
95
105
return ''.join(byte_sections)
97
def bytes_record(self, bytes, names):
98
"""Return the bytes for a Bytes record with the given name and
101
If the content may be large, construct the header separately and then
102
stream out the contents.
104
return self.bytes_header(len(bytes), names) + bytes
107
108
class ContainerWriter(object):
108
109
"""A class for writing containers to a file.
112
113
introduced by the begin() and end() methods.
115
# Join up headers with the body if writing fewer than this many bytes:
116
# trades off memory usage and copying to do less IO ops.
117
_JOIN_WRITES_THRESHOLD = 100000
119
116
def __init__(self, write_func):
142
139
def add_bytes_record(self, bytes, names):
143
140
"""Add a Bytes record with the given names.
145
142
:param bytes: The bytes to insert.
146
143
:param names: The names to give the inserted bytes. Each name is
147
144
a tuple of bytestrings. The bytestrings may not contain
154
151
and thus are only suitable for use by a ContainerReader.
156
153
current_offset = self.current_offset
158
if length < self._JOIN_WRITES_THRESHOLD:
159
self.write_func(self._serialiser.bytes_header(length, names)
162
self.write_func(self._serialiser.bytes_header(length, names))
163
self.write_func(bytes)
154
serialised_record = self._serialiser.bytes_record(bytes, names)
155
self.write_func(serialised_record)
164
156
self.records_written += 1
165
157
# return a memo of where we wrote data to allow random access.
166
158
return current_offset, self.current_offset - current_offset
169
161
class ReadVFile(object):
170
"""Adapt a readv result iterator to a file like protocol.
172
The readv result must support the iterator protocol returning (offset,
176
# XXX: This could be a generic transport class, as other code may want to
177
# gradually consume the readv result.
162
"""Adapt a readv result iterator to a file like protocol."""
179
164
def __init__(self, readv_result):
180
"""Construct a new ReadVFile wrapper.
182
:seealso: make_readv_reader
184
:param readv_result: the most recent readv result - list or generator
186
# readv can return a sequence or an iterator, but we require an
187
# iterator to know how much has been consumed.
188
readv_result = iter(readv_result)
189
165
self.readv_result = readv_result
166
# the most recent readv result block
190
167
self._string = None
193
170
if (self._string is None or
194
171
self._string.tell() == self._string_length):
195
offset, data = self.readv_result.next()
172
length, data = self.readv_result.next()
196
173
self._string_length = len(data)
197
174
self._string = StringIO(data)
201
178
result = self._string.read(length)
202
179
if len(result) < length:
203
raise errors.BzrError('wanted %d bytes but next '
204
'hunk only contains %d: %r...' %
205
(length, len(result), result[:20]))
180
raise errors.BzrError('request for too much data from a readv hunk.')
208
183
def readline(self):
211
186
result = self._string.readline()
212
187
if self._string.tell() == self._string_length and result[-1] != '\n':
213
raise errors.BzrError('short readline in the readvfile hunk: %r'
188
raise errors.BzrError('short readline in the readvfile hunk.')
260
234
is a ``list`` and bytes is a function that takes one argument,
263
You **must not** call the callable after advancing the iterator to the
237
You **must not** call the callable after advancing the interator to the
264
238
next record. That is, this code is invalid::
266
240
record_iter = container.iter_records()
267
241
names1, callable1 = record_iter.next()
268
242
names2, callable2 = record_iter.next()
269
243
bytes1 = callable1(None)
271
245
As it will give incorrect results and invalidate the state of the
274
:raises ContainerError: if any sort of container corruption is
248
:raises ContainerError: if any sort of containter corruption is
275
249
detected, e.g. UnknownContainerFormatError is the format of the
276
250
container is unrecognised.
277
251
:seealso: ContainerReader.read
279
253
self._read_format()
280
254
return self._iter_records()
282
256
def iter_record_objects(self):
283
257
"""Iterate over the container, yielding each record as it is read.
286
260
methods. Like with iter_records, it is not safe to use a record object
287
261
after advancing the iterator to yield next record.
289
:raises ContainerError: if any sort of container corruption is
263
:raises ContainerError: if any sort of containter corruption is
290
264
detected, e.g. UnknownContainerFormatError is the format of the
291
265
container is unrecognised.
292
266
:seealso: iter_records
294
268
self._read_format()
295
269
return self._iter_record_objects()
297
271
def _iter_records(self):
298
272
for record in self._iter_record_objects():
299
273
yield record.read()
341
315
# risk that the same unicode string has been encoded two
342
316
# different ways.
343
317
if name_tuple in all_names:
344
raise errors.DuplicateRecordNameError(name_tuple[0])
318
raise errors.DuplicateRecordNameError(name_tuple)
345
319
all_names.add(name_tuple)
346
320
excess_bytes = self.reader_func(1)
347
321
if excess_bytes != '':
433
407
last_buffer_length = None
434
408
cur_buffer_length = len(self._buffer)
435
last_state_handler = None
436
while (cur_buffer_length != last_buffer_length
437
or last_state_handler != self._state_handler):
409
while cur_buffer_length != last_buffer_length:
438
410
last_buffer_length = cur_buffer_length
439
last_state_handler = self._state_handler
440
411
self._state_handler()
441
412
cur_buffer_length = len(self._buffer)
443
def read_pending_records(self, max=None):
445
records = self._parsed_records[:max]
446
del self._parsed_records[:max]
449
records = self._parsed_records
450
self._parsed_records = []
414
def read_pending_records(self):
415
records = self._parsed_records
416
self._parsed_records = []
453
419
def _consume_line(self):
454
420
"""Take a line out of the buffer, and return the line.
502
468
for name_part in name_parts:
503
469
_check_name(name_part)
504
470
self._current_record_names.append(name_parts)
506
472
def _state_expecting_body(self):
507
473
if len(self._buffer) >= self._current_record_length:
508
474
body_bytes = self._buffer[:self._current_record_length]