~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/pack.py

merge merge tweaks from aaron, which includes latest .dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2007, 2009, 2010 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
"""Container format for Bazaar data.
18
 
 
19
 
"Containers" and "records" are described in
20
 
doc/developers/container-format.txt.
21
 
"""
22
 
 
23
 
from __future__ import absolute_import
24
 
 
25
 
from cStringIO import StringIO
26
 
import re
27
 
 
28
 
from bzrlib import errors
29
 
 
30
 
 
31
 
FORMAT_ONE = "Bazaar pack format 1 (introduced in 0.18)"
32
 
 
33
 
 
34
 
_whitespace_re = re.compile('[\t\n\x0b\x0c\r ]')
35
 
 
36
 
 
37
 
def _check_name(name):
38
 
    """Do some basic checking of 'name'.
39
 
 
40
 
    At the moment, this just checks that there are no whitespace characters in a
41
 
    name.
42
 
 
43
 
    :raises InvalidRecordError: if name is not valid.
44
 
    :seealso: _check_name_encoding
45
 
    """
46
 
    if _whitespace_re.search(name) is not None:
47
 
        raise errors.InvalidRecordError("%r is not a valid name." % (name,))
48
 
 
49
 
 
50
 
def _check_name_encoding(name):
51
 
    """Check that 'name' is valid UTF-8.
52
 
 
53
 
    This is separate from _check_name because UTF-8 decoding is relatively
54
 
    expensive, and we usually want to avoid it.
55
 
 
56
 
    :raises InvalidRecordError: if name is not valid UTF-8.
57
 
    """
58
 
    try:
59
 
        name.decode('utf-8')
60
 
    except UnicodeDecodeError, e:
61
 
        raise errors.InvalidRecordError(str(e))
62
 
 
63
 
 
64
 
class ContainerSerialiser(object):
65
 
    """A helper class for serialising containers.
66
 
 
67
 
    It simply returns bytes from method calls to 'begin', 'end' and
68
 
    'bytes_record'.  You may find ContainerWriter to be a more convenient
69
 
    interface.
70
 
    """
71
 
 
72
 
    def begin(self):
73
 
        """Return the bytes to begin a container."""
74
 
        return FORMAT_ONE + "\n"
75
 
 
76
 
    def end(self):
77
 
        """Return the bytes to finish a container."""
78
 
        return "E"
79
 
 
80
 
    def bytes_header(self, length, names):
81
 
        """Return the header for a Bytes record."""
82
 
        # Kind marker
83
 
        byte_sections = ["B"]
84
 
        # Length
85
 
        byte_sections.append(str(length) + "\n")
86
 
        # Names
87
 
        for name_tuple in names:
88
 
            # Make sure we're writing valid names.  Note that we will leave a
89
 
            # half-written record if a name is bad!
90
 
            for name in name_tuple:
91
 
                _check_name(name)
92
 
            byte_sections.append('\x00'.join(name_tuple) + "\n")
93
 
        # End of headers
94
 
        byte_sections.append("\n")
95
 
        return ''.join(byte_sections)
96
 
 
97
 
    def bytes_record(self, bytes, names):
98
 
        """Return the bytes for a Bytes record with the given name and
99
 
        contents.
100
 
 
101
 
        If the content may be large, construct the header separately and then
102
 
        stream out the contents.
103
 
        """
104
 
        return self.bytes_header(len(bytes), names) + bytes
105
 
 
106
 
 
107
 
class ContainerWriter(object):
108
 
    """A class for writing containers to a file.
109
 
 
110
 
    :attribute records_written: The number of user records added to the
111
 
        container. This does not count the prelude or suffix of the container
112
 
        introduced by the begin() and end() methods.
113
 
    """
114
 
 
115
 
    # Join up headers with the body if writing fewer than this many bytes:
116
 
    # trades off memory usage and copying to do less IO ops.
117
 
    _JOIN_WRITES_THRESHOLD = 100000
118
 
 
119
 
    def __init__(self, write_func):
120
 
        """Constructor.
121
 
 
122
 
        :param write_func: a callable that will be called when this
123
 
            ContainerWriter needs to write some bytes.
124
 
        """
125
 
        self._write_func = write_func
126
 
        self.current_offset = 0
127
 
        self.records_written = 0
128
 
        self._serialiser = ContainerSerialiser()
129
 
 
130
 
    def begin(self):
131
 
        """Begin writing a container."""
132
 
        self.write_func(self._serialiser.begin())
133
 
 
134
 
    def write_func(self, bytes):
135
 
        self._write_func(bytes)
136
 
        self.current_offset += len(bytes)
137
 
 
138
 
    def end(self):
139
 
        """Finish writing a container."""
140
 
        self.write_func(self._serialiser.end())
141
 
 
142
 
    def add_bytes_record(self, bytes, names):
143
 
        """Add a Bytes record with the given names.
144
 
 
145
 
        :param bytes: The bytes to insert.
146
 
        :param names: The names to give the inserted bytes. Each name is
147
 
            a tuple of bytestrings. The bytestrings may not contain
148
 
            whitespace.
149
 
        :return: An offset, length tuple. The offset is the offset
150
 
            of the record within the container, and the length is the
151
 
            length of data that will need to be read to reconstitute the
152
 
            record. These offset and length can only be used with the pack
153
 
            interface - they might be offset by headers or other such details
154
 
            and thus are only suitable for use by a ContainerReader.
155
 
        """
156
 
        current_offset = self.current_offset
157
 
        length = len(bytes)
158
 
        if length < self._JOIN_WRITES_THRESHOLD:
159
 
            self.write_func(self._serialiser.bytes_header(length, names)
160
 
                + bytes)
161
 
        else:
162
 
            self.write_func(self._serialiser.bytes_header(length, names))
163
 
            self.write_func(bytes)
164
 
        self.records_written += 1
165
 
        # return a memo of where we wrote data to allow random access.
166
 
        return current_offset, self.current_offset - current_offset
167
 
 
168
 
 
169
 
class ReadVFile(object):
170
 
    """Adapt a readv result iterator to a file like protocol.
171
 
    
172
 
    The readv result must support the iterator protocol returning (offset,
173
 
    data_bytes) pairs.
174
 
    """
175
 
 
176
 
    # XXX: This could be a generic transport class, as other code may want to
177
 
    # gradually consume the readv result.
178
 
 
179
 
    def __init__(self, readv_result):
180
 
        """Construct a new ReadVFile wrapper.
181
 
 
182
 
        :seealso: make_readv_reader
183
 
 
184
 
        :param readv_result: the most recent readv result - list or generator
185
 
        """
186
 
        # readv can return a sequence or an iterator, but we require an
187
 
        # iterator to know how much has been consumed.
188
 
        readv_result = iter(readv_result)
189
 
        self.readv_result = readv_result
190
 
        self._string = None
191
 
 
192
 
    def _next(self):
193
 
        if (self._string is None or
194
 
            self._string.tell() == self._string_length):
195
 
            offset, data = self.readv_result.next()
196
 
            self._string_length = len(data)
197
 
            self._string = StringIO(data)
198
 
 
199
 
    def read(self, length):
200
 
        self._next()
201
 
        result = self._string.read(length)
202
 
        if len(result) < length:
203
 
            raise errors.BzrError('wanted %d bytes but next '
204
 
                'hunk only contains %d: %r...' %
205
 
                (length, len(result), result[:20]))
206
 
        return result
207
 
 
208
 
    def readline(self):
209
 
        """Note that readline will not cross readv segments."""
210
 
        self._next()
211
 
        result = self._string.readline()
212
 
        if self._string.tell() == self._string_length and result[-1] != '\n':
213
 
            raise errors.BzrError('short readline in the readvfile hunk: %r'
214
 
                % (result, ))
215
 
        return result
216
 
 
217
 
 
218
 
def make_readv_reader(transport, filename, requested_records):
219
 
    """Create a ContainerReader that will read selected records only.
220
 
 
221
 
    :param transport: The transport the pack file is located on.
222
 
    :param filename: The filename of the pack file.
223
 
    :param requested_records: The record offset, length tuples as returned
224
 
        by add_bytes_record for the desired records.
225
 
    """
226
 
    readv_blocks = [(0, len(FORMAT_ONE)+1)]
227
 
    readv_blocks.extend(requested_records)
228
 
    result = ContainerReader(ReadVFile(
229
 
        transport.readv(filename, readv_blocks)))
230
 
    return result
231
 
 
232
 
 
233
 
class BaseReader(object):
234
 
 
235
 
    def __init__(self, source_file):
236
 
        """Constructor.
237
 
 
238
 
        :param source_file: a file-like object with `read` and `readline`
239
 
            methods.
240
 
        """
241
 
        self._source = source_file
242
 
 
243
 
    def reader_func(self, length=None):
244
 
        return self._source.read(length)
245
 
 
246
 
    def _read_line(self):
247
 
        line = self._source.readline()
248
 
        if not line.endswith('\n'):
249
 
            raise errors.UnexpectedEndOfContainerError()
250
 
        return line.rstrip('\n')
251
 
 
252
 
 
253
 
class ContainerReader(BaseReader):
254
 
    """A class for reading Bazaar's container format."""
255
 
 
256
 
    def iter_records(self):
257
 
        """Iterate over the container, yielding each record as it is read.
258
 
 
259
 
        Each yielded record will be a 2-tuple of (names, callable), where names
260
 
        is a ``list`` and bytes is a function that takes one argument,
261
 
        ``max_length``.
262
 
 
263
 
        You **must not** call the callable after advancing the iterator to the
264
 
        next record.  That is, this code is invalid::
265
 
 
266
 
            record_iter = container.iter_records()
267
 
            names1, callable1 = record_iter.next()
268
 
            names2, callable2 = record_iter.next()
269
 
            bytes1 = callable1(None)
270
 
 
271
 
        As it will give incorrect results and invalidate the state of the
272
 
        ContainerReader.
273
 
 
274
 
        :raises ContainerError: if any sort of container corruption is
275
 
            detected, e.g. UnknownContainerFormatError is the format of the
276
 
            container is unrecognised.
277
 
        :seealso: ContainerReader.read
278
 
        """
279
 
        self._read_format()
280
 
        return self._iter_records()
281
 
 
282
 
    def iter_record_objects(self):
283
 
        """Iterate over the container, yielding each record as it is read.
284
 
 
285
 
        Each yielded record will be an object with ``read`` and ``validate``
286
 
        methods.  Like with iter_records, it is not safe to use a record object
287
 
        after advancing the iterator to yield next record.
288
 
 
289
 
        :raises ContainerError: if any sort of container corruption is
290
 
            detected, e.g. UnknownContainerFormatError is the format of the
291
 
            container is unrecognised.
292
 
        :seealso: iter_records
293
 
        """
294
 
        self._read_format()
295
 
        return self._iter_record_objects()
296
 
 
297
 
    def _iter_records(self):
298
 
        for record in self._iter_record_objects():
299
 
            yield record.read()
300
 
 
301
 
    def _iter_record_objects(self):
302
 
        while True:
303
 
            record_kind = self.reader_func(1)
304
 
            if record_kind == 'B':
305
 
                # Bytes record.
306
 
                reader = BytesRecordReader(self._source)
307
 
                yield reader
308
 
            elif record_kind == 'E':
309
 
                # End marker.  There are no more records.
310
 
                return
311
 
            elif record_kind == '':
312
 
                # End of stream encountered, but no End Marker record seen, so
313
 
                # this container is incomplete.
314
 
                raise errors.UnexpectedEndOfContainerError()
315
 
            else:
316
 
                # Unknown record type.
317
 
                raise errors.UnknownRecordTypeError(record_kind)
318
 
 
319
 
    def _read_format(self):
320
 
        format = self._read_line()
321
 
        if format != FORMAT_ONE:
322
 
            raise errors.UnknownContainerFormatError(format)
323
 
 
324
 
    def validate(self):
325
 
        """Validate this container and its records.
326
 
 
327
 
        Validating consumes the data stream just like iter_records and
328
 
        iter_record_objects, so you cannot call it after
329
 
        iter_records/iter_record_objects.
330
 
 
331
 
        :raises ContainerError: if something is invalid.
332
 
        """
333
 
        all_names = set()
334
 
        for record_names, read_bytes in self.iter_records():
335
 
            read_bytes(None)
336
 
            for name_tuple in record_names:
337
 
                for name in name_tuple:
338
 
                    _check_name_encoding(name)
339
 
                # Check that the name is unique.  Note that Python will refuse
340
 
                # to decode non-shortest forms of UTF-8 encoding, so there is no
341
 
                # risk that the same unicode string has been encoded two
342
 
                # different ways.
343
 
                if name_tuple in all_names:
344
 
                    raise errors.DuplicateRecordNameError(name_tuple[0])
345
 
                all_names.add(name_tuple)
346
 
        excess_bytes = self.reader_func(1)
347
 
        if excess_bytes != '':
348
 
            raise errors.ContainerHasExcessDataError(excess_bytes)
349
 
 
350
 
 
351
 
class BytesRecordReader(BaseReader):
352
 
 
353
 
    def read(self):
354
 
        """Read this record.
355
 
 
356
 
        You can either validate or read a record, you can't do both.
357
 
 
358
 
        :returns: A tuple of (names, callable).  The callable can be called
359
 
            repeatedly to obtain the bytes for the record, with a max_length
360
 
            argument.  If max_length is None, returns all the bytes.  Because
361
 
            records can be arbitrarily large, using None is not recommended
362
 
            unless you have reason to believe the content will fit in memory.
363
 
        """
364
 
        # Read the content length.
365
 
        length_line = self._read_line()
366
 
        try:
367
 
            length = int(length_line)
368
 
        except ValueError:
369
 
            raise errors.InvalidRecordError(
370
 
                "%r is not a valid length." % (length_line,))
371
 
 
372
 
        # Read the list of names.
373
 
        names = []
374
 
        while True:
375
 
            name_line = self._read_line()
376
 
            if name_line == '':
377
 
                break
378
 
            name_tuple = tuple(name_line.split('\x00'))
379
 
            for name in name_tuple:
380
 
                _check_name(name)
381
 
            names.append(name_tuple)
382
 
 
383
 
        self._remaining_length = length
384
 
        return names, self._content_reader
385
 
 
386
 
    def _content_reader(self, max_length):
387
 
        if max_length is None:
388
 
            length_to_read = self._remaining_length
389
 
        else:
390
 
            length_to_read = min(max_length, self._remaining_length)
391
 
        self._remaining_length -= length_to_read
392
 
        bytes = self.reader_func(length_to_read)
393
 
        if len(bytes) != length_to_read:
394
 
            raise errors.UnexpectedEndOfContainerError()
395
 
        return bytes
396
 
 
397
 
    def validate(self):
398
 
        """Validate this record.
399
 
 
400
 
        You can either validate or read, you can't do both.
401
 
 
402
 
        :raises ContainerError: if this record is invalid.
403
 
        """
404
 
        names, read_bytes = self.read()
405
 
        for name_tuple in names:
406
 
            for name in name_tuple:
407
 
                _check_name_encoding(name)
408
 
        read_bytes(None)
409
 
 
410
 
 
411
 
class ContainerPushParser(object):
412
 
    """A "push" parser for container format 1.
413
 
 
414
 
    It accepts bytes via the ``accept_bytes`` method, and parses them into
415
 
    records which can be retrieved via the ``read_pending_records`` method.
416
 
    """
417
 
 
418
 
    def __init__(self):
419
 
        self._buffer = ''
420
 
        self._state_handler = self._state_expecting_format_line
421
 
        self._parsed_records = []
422
 
        self._reset_current_record()
423
 
        self.finished = False
424
 
 
425
 
    def _reset_current_record(self):
426
 
        self._current_record_length = None
427
 
        self._current_record_names = []
428
 
 
429
 
    def accept_bytes(self, bytes):
430
 
        self._buffer += bytes
431
 
        # Keep iterating the state machine until it stops consuming bytes from
432
 
        # the buffer.
433
 
        last_buffer_length = None
434
 
        cur_buffer_length = len(self._buffer)
435
 
        last_state_handler = None
436
 
        while (cur_buffer_length != last_buffer_length
437
 
               or last_state_handler != self._state_handler):
438
 
            last_buffer_length = cur_buffer_length
439
 
            last_state_handler = self._state_handler
440
 
            self._state_handler()
441
 
            cur_buffer_length = len(self._buffer)
442
 
 
443
 
    def read_pending_records(self, max=None):
444
 
        if max:
445
 
            records = self._parsed_records[:max]
446
 
            del self._parsed_records[:max]
447
 
            return records
448
 
        else:
449
 
            records = self._parsed_records
450
 
            self._parsed_records = []
451
 
            return records
452
 
 
453
 
    def _consume_line(self):
454
 
        """Take a line out of the buffer, and return the line.
455
 
 
456
 
        If a newline byte is not found in the buffer, the buffer is
457
 
        unchanged and this returns None instead.
458
 
        """
459
 
        newline_pos = self._buffer.find('\n')
460
 
        if newline_pos != -1:
461
 
            line = self._buffer[:newline_pos]
462
 
            self._buffer = self._buffer[newline_pos+1:]
463
 
            return line
464
 
        else:
465
 
            return None
466
 
 
467
 
    def _state_expecting_format_line(self):
468
 
        line = self._consume_line()
469
 
        if line is not None:
470
 
            if line != FORMAT_ONE:
471
 
                raise errors.UnknownContainerFormatError(line)
472
 
            self._state_handler = self._state_expecting_record_type
473
 
 
474
 
    def _state_expecting_record_type(self):
475
 
        if len(self._buffer) >= 1:
476
 
            record_type = self._buffer[0]
477
 
            self._buffer = self._buffer[1:]
478
 
            if record_type == 'B':
479
 
                self._state_handler = self._state_expecting_length
480
 
            elif record_type == 'E':
481
 
                self.finished = True
482
 
                self._state_handler = self._state_expecting_nothing
483
 
            else:
484
 
                raise errors.UnknownRecordTypeError(record_type)
485
 
 
486
 
    def _state_expecting_length(self):
487
 
        line = self._consume_line()
488
 
        if line is not None:
489
 
            try:
490
 
                self._current_record_length = int(line)
491
 
            except ValueError:
492
 
                raise errors.InvalidRecordError(
493
 
                    "%r is not a valid length." % (line,))
494
 
            self._state_handler = self._state_expecting_name
495
 
 
496
 
    def _state_expecting_name(self):
497
 
        encoded_name_parts = self._consume_line()
498
 
        if encoded_name_parts == '':
499
 
            self._state_handler = self._state_expecting_body
500
 
        elif encoded_name_parts:
501
 
            name_parts = tuple(encoded_name_parts.split('\x00'))
502
 
            for name_part in name_parts:
503
 
                _check_name(name_part)
504
 
            self._current_record_names.append(name_parts)
505
 
 
506
 
    def _state_expecting_body(self):
507
 
        if len(self._buffer) >= self._current_record_length:
508
 
            body_bytes = self._buffer[:self._current_record_length]
509
 
            self._buffer = self._buffer[self._current_record_length:]
510
 
            record = (self._current_record_names, body_bytes)
511
 
            self._parsed_records.append(record)
512
 
            self._reset_current_record()
513
 
            self._state_handler = self._state_expecting_record_type
514
 
 
515
 
    def _state_expecting_nothing(self):
516
 
        pass
517
 
 
518
 
    def read_size_hint(self):
519
 
        hint = 16384
520
 
        if self._state_handler == self._state_expecting_body:
521
 
            remaining = self._current_record_length - len(self._buffer)
522
 
            if remaining < 0:
523
 
                remaining = 0
524
 
            return max(hint, remaining)
525
 
        return hint
526
 
 
527
 
 
528
 
def iter_records_from_file(source_file):
529
 
    parser = ContainerPushParser()
530
 
    while True:
531
 
        bytes = source_file.read(parser.read_size_hint())
532
 
        parser.accept_bytes(bytes)
533
 
        for record in parser.read_pending_records():
534
 
            yield record
535
 
        if parser.finished:
536
 
            break
537