~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/groupcompress.py

Bring the groupcompress plugin into the brisbane-core branch.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# groupcompress, a bzr plugin providing new compression logic.
 
2
# Copyright (C) 2008 Canonical Limited.
 
3
 
4
# This program is free software; you can redistribute it and/or modify
 
5
# it under the terms of the GNU General Public License version 2 as published
 
6
# by the Free Software Foundation.
 
7
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
 
16
 
17
 
 
18
"""Core compression logic for compressing streams of related files."""
 
19
 
 
20
from itertools import izip
 
21
from cStringIO import StringIO
 
22
import struct
 
23
import zlib
 
24
try:
 
25
    import pylzma
 
26
except ImportError:
 
27
    pylzma = None
 
28
 
 
29
from bzrlib import (
 
30
    annotate,
 
31
    debug,
 
32
    diff,
 
33
    errors,
 
34
    graph as _mod_graph,
 
35
    osutils,
 
36
    pack,
 
37
    patiencediff,
 
38
    )
 
39
from bzrlib.graph import Graph
 
40
from bzrlib.knit import _DirectPackAccess
 
41
from bzrlib.osutils import (
 
42
    contains_whitespace,
 
43
    sha_string,
 
44
    split_lines,
 
45
    )
 
46
from bzrlib.btree_index import BTreeBuilder
 
47
from bzrlib.lru_cache import LRUSizeCache
 
48
from bzrlib.tsort import topo_sort
 
49
from bzrlib.versionedfile import (
 
50
    adapter_registry,
 
51
    AbsentContentFactory,
 
52
    ChunkedContentFactory,
 
53
    FulltextContentFactory,
 
54
    VersionedFiles,
 
55
    )
 
56
 
 
57
_USE_LZMA = False and (pylzma is not None)
 
58
_NO_LABELS = False
 
59
_FAST = False
 
60
 
 
61
def encode_base128_int(val):
 
62
    """Convert an integer into a 7-bit lsb encoding."""
 
63
    bytes = []
 
64
    count = 0
 
65
    while val >= 0x80:
 
66
        bytes.append(chr((val | 0x80) & 0xFF))
 
67
        val >>= 7
 
68
    bytes.append(chr(val))
 
69
    return ''.join(bytes)
 
70
 
 
71
 
 
72
def decode_base128_int(bytes):
 
73
    """Decode an integer from a 7-bit lsb encoding."""
 
74
    offset = 0
 
75
    val = 0
 
76
    shift = 0
 
77
    bval = ord(bytes[offset])
 
78
    while bval >= 0x80:
 
79
        val |= (bval & 0x7F) << shift
 
80
        shift += 7
 
81
        offset += 1
 
82
        bval = ord(bytes[offset])
 
83
    val |= bval << shift
 
84
    offset += 1
 
85
    return val, offset
 
86
 
 
87
 
 
88
def sort_gc_optimal(parent_map):
 
89
    """Sort and group the keys in parent_map into gc-optimal order.
 
90
 
 
91
    gc-optimal is defined (currently) as reverse-topological order, grouped by
 
92
    the key prefix.
 
93
 
 
94
    :return: A sorted-list of keys
 
95
    """
 
96
    # gc-optimal ordering is approximately reverse topological,
 
97
    # properly grouped by file-id.
 
98
    per_prefix_map = {}
 
99
    for item in parent_map.iteritems():
 
100
        key = item[0]
 
101
        if isinstance(key, str) or len(key) == 1:
 
102
            prefix = ''
 
103
        else:
 
104
            prefix = key[0]
 
105
        try:
 
106
            per_prefix_map[prefix].append(item)
 
107
        except KeyError:
 
108
            per_prefix_map[prefix] = [item]
 
109
 
 
110
    present_keys = []
 
111
    for prefix in sorted(per_prefix_map):
 
112
        present_keys.extend(reversed(topo_sort(per_prefix_map[prefix])))
 
113
    return present_keys
 
114
 
 
115
 
 
116
class GroupCompressBlockEntry(object):
 
117
    """Track the information about a single object inside a GC group.
 
118
 
 
119
    This is generally just the dumb data structure.
 
120
    """
 
121
 
 
122
    def __init__(self, key, type, sha1, start, length):
 
123
        self.key = key
 
124
        self.type = type # delta, fulltext, external?
 
125
        self.sha1 = sha1 # Sha1 of content
 
126
        self.start = start # Byte offset to start of data
 
127
        self.length = length # Length of content
 
128
 
 
129
    def __repr__(self):
 
130
        return '%s(%s, %s, %s, %s, %s)' % (
 
131
            self.__class__.__name__,
 
132
            self.key, self.type, self.sha1, self.start, self.length
 
133
            )
 
134
 
 
135
 
 
136
class GroupCompressBlock(object):
 
137
    """An object which maintains the internal structure of the compressed data.
 
138
 
 
139
    This tracks the meta info (start of text, length, type, etc.)
 
140
    """
 
141
 
 
142
    # Group Compress Block v1 Zlib
 
143
    GCB_HEADER = 'gcb1z\n'
 
144
    GCB_LZ_HEADER = 'gcb1l\n'
 
145
 
 
146
    def __init__(self):
 
147
        # map by key? or just order in file?
 
148
        self._entries = {}
 
149
        self._content = None
 
150
        self._size = 0
 
151
 
 
152
    def _parse_header(self):
 
153
        """Parse the meta-info from the stream."""
 
154
 
 
155
    def __len__(self):
 
156
        return self._size
 
157
 
 
158
    @classmethod
 
159
    def from_bytes(cls, bytes):
 
160
        out = cls()
 
161
        if bytes[:6] not in (cls.GCB_HEADER, cls.GCB_LZ_HEADER):
 
162
            raise ValueError('bytes did not start with %r' % (cls.GCB_HEADER,))
 
163
        if bytes[4] == 'z':
 
164
            decomp = zlib.decompress
 
165
        elif bytes[4] == 'l':
 
166
            decomp = pylzma.decompress
 
167
        else:
 
168
            assert False, 'unknown compressor: %r' % (bytes,)
 
169
        pos = bytes.index('\n', 6)
 
170
        z_header_length = int(bytes[6:pos])
 
171
        pos += 1
 
172
        pos2 = bytes.index('\n', pos)
 
173
        header_length = int(bytes[pos:pos2])
 
174
        if z_header_length == 0:
 
175
            assert header_length == 0
 
176
            zcontent = bytes[pos2+1:]
 
177
            if zcontent:
 
178
                out._content = decomp(zcontent)
 
179
                out._size = len(out._content)
 
180
            return out
 
181
        pos = pos2 + 1
 
182
        pos2 = pos + z_header_length
 
183
        z_header_bytes = bytes[pos:pos2]
 
184
        assert len(z_header_bytes) == z_header_length
 
185
        header_bytes = decomp(z_header_bytes)
 
186
        assert len(header_bytes) == header_length
 
187
        del z_header_bytes
 
188
        lines = header_bytes.split('\n')
 
189
        header_len = len(header_bytes)
 
190
        del header_bytes
 
191
        info_dict = {}
 
192
        for line in lines:
 
193
            if not line: #End of record
 
194
                if not info_dict:
 
195
                    break
 
196
                out.add_entry(**info_dict)
 
197
                info_dict = {}
 
198
                continue
 
199
            key, value = line.split(':', 1)
 
200
            if key == 'key':
 
201
                value = tuple(map(intern, value.split('\x00')))
 
202
            elif key in ('start', 'length'):
 
203
                value = int(value)
 
204
            elif key == 'type':
 
205
                value = intern(value)
 
206
            info_dict[key] = value
 
207
        zcontent = bytes[pos2:]
 
208
        if zcontent:
 
209
            out._content = decomp(zcontent)
 
210
            out._size = header_len + len(out._content)
 
211
        return out
 
212
 
 
213
    def extract(self, key, index_memo, sha1=None):
 
214
        """Extract the text for a specific key.
 
215
 
 
216
        :param key: The label used for this content
 
217
        :param sha1: TODO (should we validate only when sha1 is supplied?)
 
218
        :return: The bytes for the content
 
219
        """
 
220
        if _NO_LABELS or not self._entries:
 
221
            start, end = index_memo[3:5]
 
222
            # The bytes are 'f' or 'd' for the type, then a variable-length
 
223
            # base128 integer for the content size, then the actual content
 
224
            # We know that the variable-length integer won't be longer than 10
 
225
            # bytes (it only takes 5 bytes to encode 2^32)
 
226
            c = self._content[start]
 
227
            if c == 'f':
 
228
                type = 'fulltext'
 
229
            else:
 
230
                assert c == 'd'
 
231
                type = 'delta'
 
232
            entry = GroupCompressBlockEntry(key, type, sha1=None,
 
233
                                            start=start, length=end-start)
 
234
        else:
 
235
            entry = self._entries[key]
 
236
            c = self._content[entry.start]
 
237
            if entry.type == 'fulltext':
 
238
                assert c == 'f'
 
239
            elif entry.type == 'delta':
 
240
                assert c == 'd'
 
241
            start = entry.start
 
242
        content_len, len_len = decode_base128_int(
 
243
                            self._content[entry.start + 1:entry.start + 11])
 
244
        assert entry.length == content_len + 1 + len_len
 
245
        content_start = entry.start + 1 + len_len
 
246
        end = entry.start + entry.length
 
247
        content = self._content[content_start:end]
 
248
        if c == 'f':
 
249
            bytes = content
 
250
        elif c == 'd':
 
251
            bytes = _groupcompress_pyx.apply_delta(self._content, content)
 
252
        if entry.sha1 is None:
 
253
            entry.sha1 = sha_string(bytes)
 
254
        return entry, bytes
 
255
 
 
256
    def add_entry(self, key, type, sha1, start, length):
 
257
        """Add new meta info about an entry.
 
258
 
 
259
        :param key: The key for the new content
 
260
        :param type: Whether this is a delta or fulltext entry (external?)
 
261
        :param sha1: sha1sum of the fulltext of this entry
 
262
        :param start: where the encoded bytes start
 
263
        :param length: total number of bytes in the encoded form
 
264
        :return: The entry?
 
265
        """
 
266
        entry = GroupCompressBlockEntry(key, type, sha1, start, length)
 
267
        assert key not in self._entries
 
268
        self._entries[key] = entry
 
269
        return entry
 
270
 
 
271
    def to_bytes(self, content=''):
 
272
        """Encode the information into a byte stream."""
 
273
        compress = zlib.compress
 
274
        if _USE_LZMA:
 
275
            compress = pylzma.compress
 
276
        chunks = []
 
277
        for key in sorted(self._entries):
 
278
            entry = self._entries[key]
 
279
            chunk = ('key:%s\n'
 
280
                     'sha1:%s\n'
 
281
                     'type:%s\n'
 
282
                     'start:%s\n'
 
283
                     'length:%s\n'
 
284
                     '\n'
 
285
                     ) % ('\x00'.join(entry.key),
 
286
                          entry.sha1,
 
287
                          entry.type,
 
288
                          entry.start,
 
289
                          entry.length,
 
290
                          )
 
291
            chunks.append(chunk)
 
292
        bytes = ''.join(chunks)
 
293
        info_len = len(bytes)
 
294
        z_bytes = []
 
295
        z_bytes.append(compress(bytes))
 
296
        del bytes
 
297
        # TODO: we may want to have the header compressed in the same chain
 
298
        #       as the data, or we may not, evaulate it
 
299
        #       having them compressed together is probably a win for
 
300
        #       revisions and the 'inv' portion of chk inventories. As the
 
301
        #       label in the header is duplicated in the text.
 
302
        #       For chk pages and real bytes, I would guess this is not
 
303
        #       true.
 
304
        z_len = sum(map(len, z_bytes))
 
305
        c_len = len(content)
 
306
        if _NO_LABELS:
 
307
            z_bytes = []
 
308
            z_len = 0
 
309
            info_len = 0
 
310
        z_bytes.append(compress(content))
 
311
        if _USE_LZMA:
 
312
            header = self.GCB_LZ_HEADER
 
313
        else:
 
314
            header = self.GCB_HEADER
 
315
        chunks = [header,
 
316
                  '%d\n' % (z_len,),
 
317
                  '%d\n' % (info_len,),
 
318
                  #'%d\n' % (c_len,),
 
319
                 ]
 
320
        chunks.extend(z_bytes)
 
321
        return ''.join(chunks)
 
322
 
 
323
 
 
324
class GroupCompressor(object):
 
325
    """Produce a serialised group of compressed texts.
 
326
 
 
327
    It contains code very similar to SequenceMatcher because of having a similar
 
328
    task. However some key differences apply:
 
329
     - there is no junk, we want a minimal edit not a human readable diff.
 
330
     - we don't filter very common lines (because we don't know where a good
 
331
       range will start, and after the first text we want to be emitting minmal
 
332
       edits only.
 
333
     - we chain the left side, not the right side
 
334
     - we incrementally update the adjacency matrix as new lines are provided.
 
335
     - we look for matches in all of the left side, so the routine which does
 
336
       the analagous task of find_longest_match does not need to filter on the
 
337
       left side.
 
338
    """
 
339
 
 
340
    def __init__(self, delta=True):
 
341
        """Create a GroupCompressor.
 
342
 
 
343
        :param delta: If False, do not compress records.
 
344
        """
 
345
        # Consider seeding the lines with some sort of GC Start flag, or
 
346
        # putting it as part of the output stream, rather than in the
 
347
        # compressed bytes.
 
348
        self.lines = []
 
349
        self.endpoint = 0
 
350
        self.input_bytes = 0
 
351
        self.num_keys = 0
 
352
        self.labels_deltas = {}
 
353
        self._last = None
 
354
        self._delta_index = _groupcompress_pyx.DeltaIndex()
 
355
        self._block = GroupCompressBlock()
 
356
 
 
357
    def compress(self, key, bytes, expected_sha, soft=False):
 
358
        """Compress lines with label key.
 
359
 
 
360
        :param key: A key tuple. It is stored in the output
 
361
            for identification of the text during decompression. If the last
 
362
            element is 'None' it is replaced with the sha1 of the text -
 
363
            e.g. sha1:xxxxxxx.
 
364
        :param bytes: The bytes to be compressed
 
365
        :param expected_sha: If non-None, the sha the lines are believed to
 
366
            have. During compression the sha is calculated; a mismatch will
 
367
            cause an error.
 
368
        :param soft: Do a 'soft' compression. This means that we require larger
 
369
            ranges to match to be considered for a copy command.
 
370
        :return: The sha1 of lines, and the number of bytes accumulated in
 
371
            the group output so far.
 
372
        """
 
373
        if not _FAST or expected_sha is None:
 
374
            sha1 = sha_string(bytes)
 
375
        else:
 
376
            sha1 = expected_sha
 
377
        if key[-1] is None:
 
378
            key = key[:-1] + ('sha1:' + sha1,)
 
379
        input_len = len(bytes)
 
380
        # By having action/label/sha1/len, we can parse the group if the index
 
381
        # was ever destroyed, we have the key in 'label', we know the final
 
382
        # bytes are valid from sha1, and we know where to find the end of this
 
383
        # record because of 'len'. (the delta record itself will store the
 
384
        # total length for the expanded record)
 
385
        # 'len: %d\n' costs approximately 1% increase in total data
 
386
        # Having the labels at all costs us 9-10% increase, 38% increase for
 
387
        # inventory pages, and 5.8% increase for text pages
 
388
        # new_chunks = ['label:%s\nsha1:%s\n' % (label, sha1)]
 
389
        if self._delta_index._source_offset != self.endpoint:
 
390
            raise AssertionError('_source_offset != endpoint'
 
391
                ' somehow the DeltaIndex got out of sync with'
 
392
                ' the output lines')
 
393
        max_delta_size = len(bytes) / 2
 
394
        delta = self._delta_index.make_delta(bytes, max_delta_size)
 
395
        if (delta is None):
 
396
            type = 'fulltext'
 
397
            enc_length = encode_base128_int(len(bytes))
 
398
            len_mini_header = 1 + len(enc_length)
 
399
            length = len(bytes) + len_mini_header
 
400
            self._delta_index.add_source(bytes, len_mini_header)
 
401
            new_chunks = ['f', enc_length, bytes]
 
402
        else:
 
403
            type = 'delta'
 
404
            enc_length = encode_base128_int(len(delta))
 
405
            len_mini_header = 1 + len(enc_length)
 
406
            length = len(delta) + len_mini_header
 
407
            new_chunks = ['d', enc_length, delta]
 
408
            if _FAST:
 
409
                self._delta_index._source_offset += length
 
410
            else:
 
411
                self._delta_index.add_delta_source(delta, len_mini_header)
 
412
        self._block.add_entry(key, type=type, sha1=sha1,
 
413
                              start=self.endpoint, length=length)
 
414
        delta_start = (self.endpoint, len(self.lines))
 
415
        self.num_keys += 1
 
416
        self.output_chunks(new_chunks)
 
417
        self.input_bytes += input_len
 
418
        delta_end = (self.endpoint, len(self.lines))
 
419
        self.labels_deltas[key] = (delta_start, delta_end)
 
420
        if not self._delta_index._source_offset == self.endpoint:
 
421
            raise AssertionError('the delta index is out of sync'
 
422
                'with the output lines %s != %s'
 
423
                % (self._delta_index._source_offset, self.endpoint))
 
424
        return sha1, self.endpoint, type, length
 
425
 
 
426
    def extract(self, key):
 
427
        """Extract a key previously added to the compressor.
 
428
 
 
429
        :param key: The key to extract.
 
430
        :return: An iterable over bytes and the sha1.
 
431
        """
 
432
        delta_details = self.labels_deltas[key]
 
433
        delta_chunks = self.lines[delta_details[0][1]:delta_details[1][1]]
 
434
        stored_bytes = ''.join(delta_chunks)
 
435
        # TODO: Fix this, we shouldn't really be peeking here
 
436
        entry = self._block._entries[key]
 
437
        if entry.type == 'fulltext':
 
438
            assert stored_bytes[0] == 'f'
 
439
            fulltext_len, offset = decode_base128_int(stored_bytes[1:10])
 
440
            assert fulltext_len + 1 + offset == len(stored_bytes)
 
441
            bytes = stored_bytes[offset + 1:]
 
442
        else:
 
443
            assert entry.type == 'delta'
 
444
            # XXX: This is inefficient at best
 
445
            source = ''.join(self.lines)
 
446
            assert stored_bytes[0] == 'd'
 
447
            delta_len, offset = decode_base128_int(stored_bytes[1:10])
 
448
            assert delta_len + 1 + offset == len(stored_bytes)
 
449
            bytes = _groupcompress_pyx.apply_delta(source,
 
450
                                                   stored_bytes[offset + 1:])
 
451
            assert entry.sha1 == sha_string(bytes)
 
452
        return bytes, entry.sha1
 
453
 
 
454
    def output_chunks(self, new_chunks):
 
455
        """Output some chunks.
 
456
 
 
457
        :param new_chunks: The chunks to output.
 
458
        """
 
459
        self._last = (len(self.lines), self.endpoint)
 
460
        endpoint = self.endpoint
 
461
        self.lines.extend(new_chunks)
 
462
        endpoint += sum(map(len, new_chunks))
 
463
        self.endpoint = endpoint
 
464
 
 
465
    def pop_last(self):
 
466
        """Call this if you want to 'revoke' the last compression.
 
467
 
 
468
        After this, the data structures will be rolled back, but you cannot do
 
469
        more compression.
 
470
        """
 
471
        self._delta_index = None
 
472
        del self.lines[self._last[0]:]
 
473
        self.endpoint = self._last[1]
 
474
        self._last = None
 
475
 
 
476
    def ratio(self):
 
477
        """Return the overall compression ratio."""
 
478
        return float(self.input_bytes) / float(self.endpoint)
 
479
 
 
480
 
 
481
def make_pack_factory(graph, delta, keylength):
 
482
    """Create a factory for creating a pack based groupcompress.
 
483
 
 
484
    This is only functional enough to run interface tests, it doesn't try to
 
485
    provide a full pack environment.
 
486
    
 
487
    :param graph: Store a graph.
 
488
    :param delta: Delta compress contents.
 
489
    :param keylength: How long should keys be.
 
490
    """
 
491
    def factory(transport):
 
492
        parents = graph or delta
 
493
        ref_length = 0
 
494
        if graph:
 
495
            ref_length = 1
 
496
        graph_index = BTreeBuilder(reference_lists=ref_length,
 
497
            key_elements=keylength)
 
498
        stream = transport.open_write_stream('newpack')
 
499
        writer = pack.ContainerWriter(stream.write)
 
500
        writer.begin()
 
501
        index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
 
502
            add_callback=graph_index.add_nodes)
 
503
        access = _DirectPackAccess({})
 
504
        access.set_writer(writer, graph_index, (transport, 'newpack'))
 
505
        result = GroupCompressVersionedFiles(index, access, delta)
 
506
        result.stream = stream
 
507
        result.writer = writer
 
508
        return result
 
509
    return factory
 
510
 
 
511
 
 
512
def cleanup_pack_group(versioned_files):
 
513
    versioned_files.writer.end()
 
514
    versioned_files.stream.close()
 
515
 
 
516
 
 
517
class GroupCompressVersionedFiles(VersionedFiles):
 
518
    """A group-compress based VersionedFiles implementation."""
 
519
 
 
520
    def __init__(self, index, access, delta=True):
 
521
        """Create a GroupCompressVersionedFiles object.
 
522
 
 
523
        :param index: The index object storing access and graph data.
 
524
        :param access: The access object storing raw data.
 
525
        :param delta: Whether to delta compress or just entropy compress.
 
526
        """
 
527
        self._index = index
 
528
        self._access = access
 
529
        self._delta = delta
 
530
        self._unadded_refs = {}
 
531
        self._group_cache = LRUSizeCache(max_size=50*1024*1024)
 
532
 
 
533
    def add_lines(self, key, parents, lines, parent_texts=None,
 
534
        left_matching_blocks=None, nostore_sha=None, random_id=False,
 
535
        check_content=True):
 
536
        """Add a text to the store.
 
537
 
 
538
        :param key: The key tuple of the text to add.
 
539
        :param parents: The parents key tuples of the text to add.
 
540
        :param lines: A list of lines. Each line must be a bytestring. And all
 
541
            of them except the last must be terminated with \n and contain no
 
542
            other \n's. The last line may either contain no \n's or a single
 
543
            terminating \n. If the lines list does meet this constraint the add
 
544
            routine may error or may succeed - but you will be unable to read
 
545
            the data back accurately. (Checking the lines have been split
 
546
            correctly is expensive and extremely unlikely to catch bugs so it
 
547
            is not done at runtime unless check_content is True.)
 
548
        :param parent_texts: An optional dictionary containing the opaque 
 
549
            representations of some or all of the parents of version_id to
 
550
            allow delta optimisations.  VERY IMPORTANT: the texts must be those
 
551
            returned by add_lines or data corruption can be caused.
 
552
        :param left_matching_blocks: a hint about which areas are common
 
553
            between the text and its left-hand-parent.  The format is
 
554
            the SequenceMatcher.get_matching_blocks format.
 
555
        :param nostore_sha: Raise ExistingContent and do not add the lines to
 
556
            the versioned file if the digest of the lines matches this.
 
557
        :param random_id: If True a random id has been selected rather than
 
558
            an id determined by some deterministic process such as a converter
 
559
            from a foreign VCS. When True the backend may choose not to check
 
560
            for uniqueness of the resulting key within the versioned file, so
 
561
            this should only be done when the result is expected to be unique
 
562
            anyway.
 
563
        :param check_content: If True, the lines supplied are verified to be
 
564
            bytestrings that are correctly formed lines.
 
565
        :return: The text sha1, the number of bytes in the text, and an opaque
 
566
                 representation of the inserted version which can be provided
 
567
                 back to future add_lines calls in the parent_texts dictionary.
 
568
        """
 
569
        self._index._check_write_ok()
 
570
        self._check_add(key, lines, random_id, check_content)
 
571
        if parents is None:
 
572
            # The caller might pass None if there is no graph data, but kndx
 
573
            # indexes can't directly store that, so we give them
 
574
            # an empty tuple instead.
 
575
            parents = ()
 
576
        # double handling for now. Make it work until then.
 
577
        length = sum(map(len, lines))
 
578
        record = ChunkedContentFactory(key, parents, None, lines)
 
579
        sha1 = list(self._insert_record_stream([record], random_id=random_id))[0]
 
580
        return sha1, length, None
 
581
 
 
582
    def annotate(self, key):
 
583
        """See VersionedFiles.annotate."""
 
584
        graph = Graph(self)
 
585
        parent_map = self.get_parent_map([key])
 
586
        if not parent_map:
 
587
            raise errors.RevisionNotPresent(key, self)
 
588
        if parent_map[key] is not None:
 
589
            search = graph._make_breadth_first_searcher([key])
 
590
            keys = set()
 
591
            while True:
 
592
                try:
 
593
                    present, ghosts = search.next_with_ghosts()
 
594
                except StopIteration:
 
595
                    break
 
596
                keys.update(present)
 
597
            parent_map = self.get_parent_map(keys)
 
598
        else:
 
599
            keys = [key]
 
600
            parent_map = {key:()}
 
601
        head_cache = _mod_graph.FrozenHeadsCache(graph)
 
602
        parent_cache = {}
 
603
        reannotate = annotate.reannotate
 
604
        for record in self.get_record_stream(keys, 'topological', True):
 
605
            key = record.key
 
606
            chunks = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
 
607
            parent_lines = [parent_cache[parent] for parent in parent_map[key]]
 
608
            parent_cache[key] = list(
 
609
                reannotate(parent_lines, chunks, key, None, head_cache))
 
610
        return parent_cache[key]
 
611
 
 
612
    def check(self, progress_bar=None):
 
613
        """See VersionedFiles.check()."""
 
614
        keys = self.keys()
 
615
        for record in self.get_record_stream(keys, 'unordered', True):
 
616
            record.get_bytes_as('fulltext')
 
617
 
 
618
    def _check_add(self, key, lines, random_id, check_content):
 
619
        """check that version_id and lines are safe to add."""
 
620
        version_id = key[-1]
 
621
        if version_id is not None:
 
622
            if contains_whitespace(version_id):
 
623
                raise errors.InvalidRevisionId(version_id, self)
 
624
        self.check_not_reserved_id(version_id)
 
625
        # TODO: If random_id==False and the key is already present, we should
 
626
        # probably check that the existing content is identical to what is
 
627
        # being inserted, and otherwise raise an exception.  This would make
 
628
        # the bundle code simpler.
 
629
        if check_content:
 
630
            self._check_lines_not_unicode(lines)
 
631
            self._check_lines_are_lines(lines)
 
632
 
 
633
    def get_parent_map(self, keys):
 
634
        """Get a map of the parents of keys.
 
635
 
 
636
        :param keys: The keys to look up parents for.
 
637
        :return: A mapping from keys to parents. Absent keys are absent from
 
638
            the mapping.
 
639
        """
 
640
        result = {}
 
641
        sources = [self._index]
 
642
        source_results = []
 
643
        missing = set(keys)
 
644
        for source in sources:
 
645
            if not missing:
 
646
                break
 
647
            new_result = source.get_parent_map(missing)
 
648
            source_results.append(new_result)
 
649
            result.update(new_result)
 
650
            missing.difference_update(set(new_result))
 
651
        if self._unadded_refs:
 
652
            for key in missing:
 
653
                if key in self._unadded_refs:
 
654
                    result[key] = self._unadded_refs[key]
 
655
        return result
 
656
 
 
657
    def _get_block(self, index_memo):
 
658
        read_memo = index_memo[0:3]
 
659
        # get the group:
 
660
        try:
 
661
            block = self._group_cache[read_memo]
 
662
        except KeyError:
 
663
            # read the group
 
664
            zdata = self._access.get_raw_records([read_memo]).next()
 
665
            # decompress - whole thing - this is not a bug, as it
 
666
            # permits caching. We might want to store the partially
 
667
            # decompresed group and decompress object, so that recent
 
668
            # texts are not penalised by big groups.
 
669
            block = GroupCompressBlock.from_bytes(zdata)
 
670
            self._group_cache[read_memo] = block
 
671
        # cheapo debugging:
 
672
        # print len(zdata), len(plain)
 
673
        # parse - requires split_lines, better to have byte offsets
 
674
        # here (but not by much - we only split the region for the
 
675
        # recipe, and we often want to end up with lines anyway.
 
676
        return block
 
677
 
 
678
    def get_missing_compression_parent_keys(self):
 
679
        """Return the keys of missing compression parents.
 
680
 
 
681
        Missing compression parents occur when a record stream was missing
 
682
        basis texts, or a index was scanned that had missing basis texts.
 
683
        """
 
684
        # GroupCompress cannot currently reference texts that are not in the
 
685
        # group, so this is valid for now
 
686
        return frozenset()
 
687
 
 
688
    def get_record_stream(self, keys, ordering, include_delta_closure):
 
689
        """Get a stream of records for keys.
 
690
 
 
691
        :param keys: The keys to include.
 
692
        :param ordering: Either 'unordered' or 'topological'. A topologically
 
693
            sorted stream has compression parents strictly before their
 
694
            children.
 
695
        :param include_delta_closure: If True then the closure across any
 
696
            compression parents will be included (in the opaque data).
 
697
        :return: An iterator of ContentFactory objects, each of which is only
 
698
            valid until the iterator is advanced.
 
699
        """
 
700
        # keys might be a generator
 
701
        orig_keys = list(keys)
 
702
        keys = set(orig_keys)
 
703
        if not keys:
 
704
            return
 
705
        if (not self._index.has_graph
 
706
            and ordering in ('topological', 'gc-optimal')):
 
707
            # Cannot topological order when no graph has been stored.
 
708
            ordering = 'unordered'
 
709
        # Cheap: iterate
 
710
        locations = self._index.get_build_details(keys)
 
711
        local_keys = frozenset(keys).intersection(set(self._unadded_refs))
 
712
        if ordering == 'topological':
 
713
            # would be better to not globally sort initially but instead
 
714
            # start with one key, recurse to its oldest parent, then grab
 
715
            # everything in the same group, etc.
 
716
            parent_map = dict((key, details[2]) for key, details in
 
717
                locations.iteritems())
 
718
            for key in local_keys:
 
719
                parent_map[key] = self._unadded_refs[key]
 
720
            present_keys = topo_sort(parent_map)
 
721
            # Now group by source:
 
722
        elif ordering == 'gc-optimal':
 
723
            parent_map = dict((key, details[2]) for key, details in
 
724
                              locations.iteritems())
 
725
            for key in local_keys:
 
726
                parent_map[key] = self._unadded_refs[key]
 
727
            # XXX: This only optimizes for the target ordering. We may need to
 
728
            #      balance that with the time it takes to extract ordering, by
 
729
            #      somehow grouping based on locations[key][0:3]
 
730
            present_keys = sort_gc_optimal(parent_map)
 
731
        elif ordering == 'as-requested':
 
732
            present_keys = [key for key in orig_keys if key in locations
 
733
                            or key in local_keys]
 
734
        else:
 
735
            # We want to yield the keys in a semi-optimal (read-wise) ordering.
 
736
            # Otherwise we thrash the _group_cache and destroy performance
 
737
            def get_group(key):
 
738
                # This is the group the bytes are stored in, followed by the
 
739
                # location in the group
 
740
                return locations[key][0]
 
741
            present_keys = sorted(locations.iterkeys(), key=get_group)
 
742
            # We don't have an ordering for keys in the in-memory object, but
 
743
            # lets process the in-memory ones first.
 
744
            present_keys = list(local_keys) + present_keys
 
745
        locations.update((key, None) for key in local_keys)
 
746
        absent_keys = keys.difference(set(locations))
 
747
        for key in absent_keys:
 
748
            yield AbsentContentFactory(key)
 
749
        for key in present_keys:
 
750
            if key in self._unadded_refs:
 
751
                bytes, sha1 = self._compressor.extract(key)
 
752
                parents = self._unadded_refs[key]
 
753
            else:
 
754
                index_memo, _, parents, (method, _) = locations[key]
 
755
                block = self._get_block(index_memo)
 
756
                entry, bytes = block.extract(key, index_memo)
 
757
                sha1 = entry.sha1
 
758
                # TODO: If we don't have labels, then the sha1 here is computed
 
759
                #       from the data, so we don't want to re-sha the string.
 
760
                if not _FAST and sha_string(bytes) != sha1:
 
761
                    raise AssertionError('sha1 sum did not match')
 
762
            yield FulltextContentFactory(key, parents, sha1, bytes)
 
763
 
 
764
    def get_sha1s(self, keys):
 
765
        """See VersionedFiles.get_sha1s()."""
 
766
        result = {}
 
767
        for record in self.get_record_stream(keys, 'unordered', True):
 
768
            if record.sha1 != None:
 
769
                result[record.key] = record.sha1
 
770
            else:
 
771
                if record.storage_kind != 'absent':
 
772
                    result[record.key] == sha_string(record.get_bytes_as(
 
773
                        'fulltext'))
 
774
        return result
 
775
 
 
776
    def insert_record_stream(self, stream):
 
777
        """Insert a record stream into this container.
 
778
 
 
779
        :param stream: A stream of records to insert. 
 
780
        :return: None
 
781
        :seealso VersionedFiles.get_record_stream:
 
782
        """
 
783
        for _ in self._insert_record_stream(stream):
 
784
            pass
 
785
 
 
786
    def _insert_record_stream(self, stream, random_id=False):
 
787
        """Internal core to insert a record stream into this container.
 
788
 
 
789
        This helper function has a different interface than insert_record_stream
 
790
        to allow add_lines to be minimal, but still return the needed data.
 
791
 
 
792
        :param stream: A stream of records to insert. 
 
793
        :return: An iterator over the sha1 of the inserted records.
 
794
        :seealso insert_record_stream:
 
795
        :seealso add_lines:
 
796
        """
 
797
        adapters = {}
 
798
        def get_adapter(adapter_key):
 
799
            try:
 
800
                return adapters[adapter_key]
 
801
            except KeyError:
 
802
                adapter_factory = adapter_registry.get(adapter_key)
 
803
                adapter = adapter_factory(self)
 
804
                adapters[adapter_key] = adapter
 
805
                return adapter
 
806
        # This will go up to fulltexts for gc to gc fetching, which isn't
 
807
        # ideal.
 
808
        self._compressor = GroupCompressor(self._delta)
 
809
        self._unadded_refs = {}
 
810
        keys_to_add = []
 
811
        basis_end = 0
 
812
        def flush():
 
813
            bytes = self._compressor._block.to_bytes(
 
814
                ''.join(self._compressor.lines))
 
815
            index, start, length = self._access.add_raw_records(
 
816
                [(None, len(bytes))], bytes)[0]
 
817
            nodes = []
 
818
            for key, reads, refs in keys_to_add:
 
819
                nodes.append((key, "%d %d %s" % (start, length, reads), refs))
 
820
            self._index.add_records(nodes, random_id=random_id)
 
821
            self._unadded_refs = {}
 
822
            del keys_to_add[:]
 
823
            self._compressor = GroupCompressor(self._delta)
 
824
 
 
825
        last_prefix = None
 
826
        last_fulltext_len = None
 
827
        max_fulltext_len = 0
 
828
        max_fulltext_prefix = None
 
829
        for record in stream:
 
830
            # Raise an error when a record is missing.
 
831
            if record.storage_kind == 'absent':
 
832
                raise errors.RevisionNotPresent(record.key, self)
 
833
            try:
 
834
                bytes = record.get_bytes_as('fulltext')
 
835
            except errors.UnavailableRepresentation:
 
836
                adapter_key = record.storage_kind, 'fulltext'
 
837
                adapter = get_adapter(adapter_key)
 
838
                bytes = adapter.get_bytes(record)
 
839
            if len(record.key) > 1:
 
840
                prefix = record.key[0]
 
841
                soft = (prefix == last_prefix)
 
842
            else:
 
843
                prefix = None
 
844
                soft = False
 
845
            if max_fulltext_len < len(bytes):
 
846
                max_fulltext_len = len(bytes)
 
847
                max_fulltext_prefix = prefix
 
848
            (found_sha1, end_point, type,
 
849
             length) = self._compressor.compress(record.key,
 
850
                bytes, record.sha1, soft=soft)
 
851
            # delta_ratio = float(len(bytes)) / length
 
852
            # Check if we want to continue to include that text
 
853
            if (prefix == max_fulltext_prefix
 
854
                and end_point < 2 * max_fulltext_len):
 
855
                # As long as we are on the same file_id, we will fill at least
 
856
                # 2 * max_fulltext_len
 
857
                start_new_block = False
 
858
            elif end_point > 4*1024*1024:
 
859
                start_new_block = True
 
860
            elif (prefix is not None and prefix != last_prefix
 
861
                  and end_point > 2*1024*1024):
 
862
                start_new_block = True
 
863
            else:
 
864
                start_new_block = False
 
865
            # if type == 'fulltext':
 
866
            #     # If this is the first text, we don't do anything
 
867
            #     if self._compressor.num_keys > 1:
 
868
            #         if prefix is not None and prefix != last_prefix:
 
869
            #             # We just inserted a fulltext for a different prefix
 
870
            #             # (aka file-id).
 
871
            #             if end_point > 512 * 1024:
 
872
            #                 start_new_block = True
 
873
            #             # TODO: Consider packing several small texts together
 
874
            #             #       maybe only flush if end_point > some threshold
 
875
            #             # if end_point > 512 * 1024 or len(bytes) <
 
876
            #             #     start_new_block = true
 
877
            #         else:
 
878
            #             # We just added a fulltext, part of the same file-id
 
879
            #             if (end_point > 2*1024*1024
 
880
            #                 and end_point > 5*max_fulltext_len):
 
881
            #                 start_new_block = True
 
882
            #     last_fulltext_len = len(bytes)
 
883
            # else:
 
884
            #     delta_ratio = float(len(bytes)) / length
 
885
            #     if delta_ratio < 3: # Not much compression
 
886
            #         if end_point > 1*1024*1024:
 
887
            #             start_new_block = True
 
888
            #     elif delta_ratio < 10: # 10:1 compression
 
889
            #         if end_point > 4*1024*1024:
 
890
            #             start_new_block = True
 
891
            last_prefix = prefix
 
892
            if start_new_block:
 
893
                self._compressor.pop_last()
 
894
                flush()
 
895
                basis_end = 0
 
896
                max_fulltext_len = len(bytes)
 
897
                (found_sha1, end_point, type,
 
898
                 length) = self._compressor.compress(record.key,
 
899
                    bytes, record.sha1)
 
900
                assert type == 'fulltext'
 
901
                last_fulltext_len = length
 
902
            if record.key[-1] is None:
 
903
                key = record.key[:-1] + ('sha1:' + found_sha1,)
 
904
            else:
 
905
                key = record.key
 
906
            self._unadded_refs[key] = record.parents
 
907
            yield found_sha1
 
908
            keys_to_add.append((key, '%d %d' % (basis_end, end_point),
 
909
                (record.parents,)))
 
910
            basis_end = end_point
 
911
        if len(keys_to_add):
 
912
            flush()
 
913
        self._compressor = None
 
914
        assert self._unadded_refs == {}
 
915
 
 
916
    def iter_lines_added_or_present_in_keys(self, keys, pb=None):
 
917
        """Iterate over the lines in the versioned files from keys.
 
918
 
 
919
        This may return lines from other keys. Each item the returned
 
920
        iterator yields is a tuple of a line and a text version that that line
 
921
        is present in (not introduced in).
 
922
 
 
923
        Ordering of results is in whatever order is most suitable for the
 
924
        underlying storage format.
 
925
 
 
926
        If a progress bar is supplied, it may be used to indicate progress.
 
927
        The caller is responsible for cleaning up progress bars (because this
 
928
        is an iterator).
 
929
 
 
930
        NOTES:
 
931
         * Lines are normalised by the underlying store: they will all have \n
 
932
           terminators.
 
933
         * Lines are returned in arbitrary order.
 
934
 
 
935
        :return: An iterator over (line, key).
 
936
        """
 
937
        if pb is None:
 
938
            pb = progress.DummyProgress()
 
939
        keys = set(keys)
 
940
        total = len(keys)
 
941
        # we don't care about inclusions, the caller cares.
 
942
        # but we need to setup a list of records to visit.
 
943
        # we need key, position, length
 
944
        for key_idx, record in enumerate(self.get_record_stream(keys,
 
945
            'unordered', True)):
 
946
            # XXX: todo - optimise to use less than full texts.
 
947
            key = record.key
 
948
            pb.update('Walking content.', key_idx, total)
 
949
            if record.storage_kind == 'absent':
 
950
                raise errors.RevisionNotPresent(key, self)
 
951
            lines = split_lines(record.get_bytes_as('fulltext'))
 
952
            for line in lines:
 
953
                yield line, key
 
954
        pb.update('Walking content.', total, total)
 
955
 
 
956
    def keys(self):
 
957
        """See VersionedFiles.keys."""
 
958
        if 'evil' in debug.debug_flags:
 
959
            trace.mutter_callsite(2, "keys scales with size of history")
 
960
        sources = [self._index]
 
961
        result = set()
 
962
        for source in sources:
 
963
            result.update(source.keys())
 
964
        return result
 
965
 
 
966
 
 
967
class _GCGraphIndex(object):
 
968
    """Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
 
969
 
 
970
    def __init__(self, graph_index, is_locked, parents=True,
 
971
        add_callback=None):
 
972
        """Construct a _GCGraphIndex on a graph_index.
 
973
 
 
974
        :param graph_index: An implementation of bzrlib.index.GraphIndex.
 
975
        :param is_locked: A callback, returns True if the index is locked and
 
976
            thus usable.
 
977
        :param parents: If True, record knits parents, if not do not record 
 
978
            parents.
 
979
        :param add_callback: If not None, allow additions to the index and call
 
980
            this callback with a list of added GraphIndex nodes:
 
981
            [(node, value, node_refs), ...]
 
982
        """
 
983
        self._add_callback = add_callback
 
984
        self._graph_index = graph_index
 
985
        self._parents = parents
 
986
        self.has_graph = parents
 
987
        self._is_locked = is_locked
 
988
 
 
989
    def add_records(self, records, random_id=False):
 
990
        """Add multiple records to the index.
 
991
        
 
992
        This function does not insert data into the Immutable GraphIndex
 
993
        backing the KnitGraphIndex, instead it prepares data for insertion by
 
994
        the caller and checks that it is safe to insert then calls
 
995
        self._add_callback with the prepared GraphIndex nodes.
 
996
 
 
997
        :param records: a list of tuples:
 
998
                         (key, options, access_memo, parents).
 
999
        :param random_id: If True the ids being added were randomly generated
 
1000
            and no check for existence will be performed.
 
1001
        """
 
1002
        if not self._add_callback:
 
1003
            raise errors.ReadOnlyError(self)
 
1004
        # we hope there are no repositories with inconsistent parentage
 
1005
        # anymore.
 
1006
 
 
1007
        changed = False
 
1008
        keys = {}
 
1009
        for (key, value, refs) in records:
 
1010
            if not self._parents:
 
1011
                if refs:
 
1012
                    for ref in refs:
 
1013
                        if ref:
 
1014
                            raise KnitCorrupt(self,
 
1015
                                "attempt to add node with parents "
 
1016
                                "in parentless index.")
 
1017
                    refs = ()
 
1018
                    changed = True
 
1019
            keys[key] = (value, refs)
 
1020
        # check for dups
 
1021
        if not random_id:
 
1022
            present_nodes = self._get_entries(keys)
 
1023
            for (index, key, value, node_refs) in present_nodes:
 
1024
                if node_refs != keys[key][1]:
 
1025
                    raise errors.KnitCorrupt(self, "inconsistent details in add_records"
 
1026
                        ": %s %s" % ((value, node_refs), keys[key]))
 
1027
                del keys[key]
 
1028
                changed = True
 
1029
        if changed:
 
1030
            result = []
 
1031
            if self._parents:
 
1032
                for key, (value, node_refs) in keys.iteritems():
 
1033
                    result.append((key, value, node_refs))
 
1034
            else:
 
1035
                for key, (value, node_refs) in keys.iteritems():
 
1036
                    result.append((key, value))
 
1037
            records = result
 
1038
        self._add_callback(records)
 
1039
        
 
1040
    def _check_read(self):
 
1041
        """Raise an exception if reads are not permitted."""
 
1042
        if not self._is_locked():
 
1043
            raise errors.ObjectNotLocked(self)
 
1044
 
 
1045
    def _check_write_ok(self):
 
1046
        """Raise an exception if writes are not permitted."""
 
1047
        if not self._is_locked():
 
1048
            raise errors.ObjectNotLocked(self)
 
1049
 
 
1050
    def _get_entries(self, keys, check_present=False):
 
1051
        """Get the entries for keys.
 
1052
 
 
1053
        Note: Callers are responsible for checking that the index is locked
 
1054
        before calling this method.
 
1055
 
 
1056
        :param keys: An iterable of index key tuples.
 
1057
        """
 
1058
        keys = set(keys)
 
1059
        found_keys = set()
 
1060
        if self._parents:
 
1061
            for node in self._graph_index.iter_entries(keys):
 
1062
                yield node
 
1063
                found_keys.add(node[1])
 
1064
        else:
 
1065
            # adapt parentless index to the rest of the code.
 
1066
            for node in self._graph_index.iter_entries(keys):
 
1067
                yield node[0], node[1], node[2], ()
 
1068
                found_keys.add(node[1])
 
1069
        if check_present:
 
1070
            missing_keys = keys.difference(found_keys)
 
1071
            if missing_keys:
 
1072
                raise RevisionNotPresent(missing_keys.pop(), self)
 
1073
 
 
1074
    def get_parent_map(self, keys):
 
1075
        """Get a map of the parents of keys.
 
1076
 
 
1077
        :param keys: The keys to look up parents for.
 
1078
        :return: A mapping from keys to parents. Absent keys are absent from
 
1079
            the mapping.
 
1080
        """
 
1081
        self._check_read()
 
1082
        nodes = self._get_entries(keys)
 
1083
        result = {}
 
1084
        if self._parents:
 
1085
            for node in nodes:
 
1086
                result[node[1]] = node[3][0]
 
1087
        else:
 
1088
            for node in nodes:
 
1089
                result[node[1]] = None
 
1090
        return result
 
1091
 
 
1092
    def get_build_details(self, keys):
 
1093
        """Get the various build details for keys.
 
1094
 
 
1095
        Ghosts are omitted from the result.
 
1096
 
 
1097
        :param keys: An iterable of keys.
 
1098
        :return: A dict of key:
 
1099
            (index_memo, compression_parent, parents, record_details).
 
1100
            index_memo
 
1101
                opaque structure to pass to read_records to extract the raw
 
1102
                data
 
1103
            compression_parent
 
1104
                Content that this record is built upon, may be None
 
1105
            parents
 
1106
                Logical parents of this node
 
1107
            record_details
 
1108
                extra information about the content which needs to be passed to
 
1109
                Factory.parse_record
 
1110
        """
 
1111
        self._check_read()
 
1112
        result = {}
 
1113
        entries = self._get_entries(keys)
 
1114
        for entry in entries:
 
1115
            key = entry[1]
 
1116
            if not self._parents:
 
1117
                parents = None
 
1118
            else:
 
1119
                parents = entry[3][0]
 
1120
            method = 'group'
 
1121
            result[key] = (self._node_to_position(entry),
 
1122
                                  None, parents, (method, None))
 
1123
        return result
 
1124
    
 
1125
    def keys(self):
 
1126
        """Get all the keys in the collection.
 
1127
        
 
1128
        The keys are not ordered.
 
1129
        """
 
1130
        self._check_read()
 
1131
        return [node[1] for node in self._graph_index.iter_all_entries()]
 
1132
    
 
1133
    def _node_to_position(self, node):
 
1134
        """Convert an index value to position details."""
 
1135
        bits = node[2].split(' ')
 
1136
        # It would be nice not to read the entire gzip.
 
1137
        start = int(bits[0])
 
1138
        stop = int(bits[1])
 
1139
        basis_end = int(bits[2])
 
1140
        delta_end = int(bits[3])
 
1141
        return node[0], start, stop, basis_end, delta_end
 
1142
 
 
1143
 
 
1144
try:
 
1145
    from bzrlib import _groupcompress_pyx
 
1146
except ImportError:
 
1147
    pass