~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/pack.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2007-11-13 00:51:40 UTC
  • mfrom: (2916.2.17 streamable-containers)
  • Revision ID: pqm@pqm.ubuntu.com-20071113005140-mp4owdlrd1ccnqc9
(robertc) Streaming-friendly container APIs. (Andrew Bennetts)

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
 
17
17
"""Container format for Bazaar data.
18
18
 
19
 
"Containers" and "records" are described in doc/developers/container-format.txt.
 
19
"Containers" and "records" are described in
 
20
doc/developers/container-format.txt.
20
21
"""
21
22
 
22
23
from cStringIO import StringIO
58
59
        raise errors.InvalidRecordError(str(e))
59
60
 
60
61
 
 
62
class ContainerSerialiser(object):
 
63
    """A helper class for serialising containers.
 
64
    
 
65
    It simply returns bytes from method calls to 'begin', 'end' and
 
66
    'bytes_record'.  You may find ContainerWriter to be a more convenient
 
67
    interface.
 
68
    """
 
69
 
 
70
    def begin(self):
 
71
        """Return the bytes to begin a container."""
 
72
        return FORMAT_ONE + "\n"
 
73
 
 
74
    def end(self):
 
75
        """Return the bytes to finish a container."""
 
76
        return "E"
 
77
 
 
78
    def bytes_record(self, bytes, names):
 
79
        """Return the bytes for a Bytes record with the given name and
 
80
        contents.
 
81
        """
 
82
        # Kind marker
 
83
        byte_sections = ["B"]
 
84
        # Length
 
85
        byte_sections.append(str(len(bytes)) + "\n")
 
86
        # Names
 
87
        for name_tuple in names:
 
88
            # Make sure we're writing valid names.  Note that we will leave a
 
89
            # half-written record if a name is bad!
 
90
            for name in name_tuple:
 
91
                _check_name(name)
 
92
            byte_sections.append('\x00'.join(name_tuple) + "\n")
 
93
        # End of headers
 
94
        byte_sections.append("\n")
 
95
        # Finally, the contents.
 
96
        byte_sections.append(bytes)
 
97
        # XXX: This causes a memory copy of bytes in size, but is usually
 
98
        # faster than two write calls (12 vs 13 seconds to output a gig of
 
99
        # 1k records.) - results may differ on significantly larger records
 
100
        # like .iso's but as they should be rare in any case and thus not
 
101
        # likely to be the common case. The biggest issue is causing extreme
 
102
        # memory pressure in that case. One possibly improvement here is to
 
103
        # check the size of the content before deciding to join here vs call
 
104
        # write twice.
 
105
        return ''.join(byte_sections)
 
106
 
 
107
 
61
108
class ContainerWriter(object):
62
 
    """A class for writing containers.
 
109
    """A class for writing containers to a file.
63
110
 
64
111
    :attribute records_written: The number of user records added to the
65
112
        container. This does not count the prelude or suffix of the container
75
122
        self._write_func = write_func
76
123
        self.current_offset = 0
77
124
        self.records_written = 0
 
125
        self._serialiser = ContainerSerialiser()
78
126
 
79
127
    def begin(self):
80
128
        """Begin writing a container."""
81
 
        self.write_func(FORMAT_ONE + "\n")
 
129
        self.write_func(self._serialiser.begin())
82
130
 
83
131
    def write_func(self, bytes):
84
132
        self._write_func(bytes)
86
134
 
87
135
    def end(self):
88
136
        """Finish writing a container."""
89
 
        self.write_func("E")
 
137
        self.write_func(self._serialiser.end())
90
138
 
91
139
    def add_bytes_record(self, bytes, names):
92
140
        """Add a Bytes record with the given names.
103
151
            and thus are only suitable for use by a ContainerReader.
104
152
        """
105
153
        current_offset = self.current_offset
106
 
        # Kind marker
107
 
        byte_sections = ["B"]
108
 
        # Length
109
 
        byte_sections.append(str(len(bytes)) + "\n")
110
 
        # Names
111
 
        for name_tuple in names:
112
 
            # Make sure we're writing valid names.  Note that we will leave a
113
 
            # half-written record if a name is bad!
114
 
            for name in name_tuple:
115
 
                _check_name(name)
116
 
            byte_sections.append('\x00'.join(name_tuple) + "\n")
117
 
        # End of headers
118
 
        byte_sections.append("\n")
119
 
        # Finally, the contents.
120
 
        byte_sections.append(bytes)
121
 
        # XXX: This causes a memory copy of bytes in size, but is usually
122
 
        # faster than two write calls (12 vs 13 seconds to output a gig of
123
 
        # 1k records.) - results may differ on significantly larger records
124
 
        # like .iso's but as they should be rare in any case and thus not
125
 
        # likely to be the common case. The biggest issue is causing extreme
126
 
        # memory pressure in that case. One possibly improvement here is to
127
 
        # check the size of the content before deciding to join here vs call
128
 
        # write twice.
129
 
        self.write_func(''.join(byte_sections))
 
154
        serialised_record = self._serialiser.bytes_record(bytes, names)
 
155
        self.write_func(serialised_record)
130
156
        self.records_written += 1
131
157
        # return a memo of where we wrote data to allow random access.
132
158
        return current_offset, self.current_offset - current_offset
355
381
                _check_name_encoding(name)
356
382
        read_bytes(None)
357
383
 
 
384
 
 
385
class ContainerPushParser(object):
 
386
    """A "push" parser for container format 1.
 
387
 
 
388
    It accepts bytes via the ``accept_bytes`` method, and parses them into
 
389
    records which can be retrieved via the ``read_pending_records`` method.
 
390
    """
 
391
 
 
392
    def __init__(self):
 
393
        self._buffer = ''
 
394
        self._state_handler = self._state_expecting_format_line
 
395
        self._parsed_records = []
 
396
        self._reset_current_record()
 
397
        self.finished = False
 
398
 
 
399
    def _reset_current_record(self):
 
400
        self._current_record_length = None
 
401
        self._current_record_names = []
 
402
 
 
403
    def accept_bytes(self, bytes):
 
404
        self._buffer += bytes
 
405
        # Keep iterating the state machine until it stops consuming bytes from
 
406
        # the buffer.
 
407
        last_buffer_length = None
 
408
        cur_buffer_length = len(self._buffer)
 
409
        while cur_buffer_length != last_buffer_length:
 
410
            last_buffer_length = cur_buffer_length
 
411
            self._state_handler()
 
412
            cur_buffer_length = len(self._buffer)
 
413
 
 
414
    def read_pending_records(self):
 
415
        records = self._parsed_records
 
416
        self._parsed_records = []
 
417
        return records
 
418
    
 
419
    def _consume_line(self):
 
420
        """Take a line out of the buffer, and return the line.
 
421
 
 
422
        If a newline byte is not found in the buffer, the buffer is
 
423
        unchanged and this returns None instead.
 
424
        """
 
425
        newline_pos = self._buffer.find('\n')
 
426
        if newline_pos != -1:
 
427
            line = self._buffer[:newline_pos]
 
428
            self._buffer = self._buffer[newline_pos+1:]
 
429
            return line
 
430
        else:
 
431
            return None
 
432
 
 
433
    def _state_expecting_format_line(self):
 
434
        line = self._consume_line()
 
435
        if line is not None:
 
436
            if line != FORMAT_ONE:
 
437
                raise errors.UnknownContainerFormatError(line)
 
438
            self._state_handler = self._state_expecting_record_type
 
439
 
 
440
    def _state_expecting_record_type(self):
 
441
        if len(self._buffer) >= 1:
 
442
            record_type = self._buffer[0]
 
443
            self._buffer = self._buffer[1:]
 
444
            if record_type == 'B':
 
445
                self._state_handler = self._state_expecting_length
 
446
            elif record_type == 'E':
 
447
                self.finished = True
 
448
                self._state_handler = self._state_expecting_nothing
 
449
            else:
 
450
                raise errors.UnknownRecordTypeError(record_type)
 
451
 
 
452
    def _state_expecting_length(self):
 
453
        line = self._consume_line()
 
454
        if line is not None:
 
455
            try:
 
456
                self._current_record_length = int(line)
 
457
            except ValueError:
 
458
                raise errors.InvalidRecordError(
 
459
                    "%r is not a valid length." % (line,))
 
460
            self._state_handler = self._state_expecting_name
 
461
 
 
462
    def _state_expecting_name(self):
 
463
        encoded_name_parts = self._consume_line()
 
464
        if encoded_name_parts == '':
 
465
            self._state_handler = self._state_expecting_body
 
466
        elif encoded_name_parts:
 
467
            name_parts = tuple(encoded_name_parts.split('\x00'))
 
468
            for name_part in name_parts:
 
469
                _check_name(name_part)
 
470
            self._current_record_names.append(name_parts)
 
471
            
 
472
    def _state_expecting_body(self):
 
473
        if len(self._buffer) >= self._current_record_length:
 
474
            body_bytes = self._buffer[:self._current_record_length]
 
475
            self._buffer = self._buffer[self._current_record_length:]
 
476
            record = (self._current_record_names, body_bytes)
 
477
            self._parsed_records.append(record)
 
478
            self._reset_current_record()
 
479
            self._state_handler = self._state_expecting_record_type
 
480
 
 
481
    def _state_expecting_nothing(self):
 
482
        pass
 
483
 
 
484
    def read_size_hint(self):
 
485
        hint = 16384
 
486
        if self._state_handler == self._state_expecting_body:
 
487
            remaining = self._current_record_length - len(self._buffer)
 
488
            if remaining < 0:
 
489
                remaining = 0
 
490
            return max(hint, remaining)
 
491
        return hint
 
492
 
 
493
 
 
494
def iter_records_from_file(source_file):
 
495
    parser = ContainerPushParser()
 
496
    while True:
 
497
        bytes = source_file.read(parser.read_size_hint())
 
498
        parser.accept_bytes(bytes)
 
499
        for record in parser.read_pending_records():
 
500
            yield record
 
501
        if parser.finished:
 
502
            break
 
503