1
# Copyright (C) 2008, 2009 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Core compression logic for compressing streams of related files."""
19
from itertools import izip
20
from cStringIO import StringIO
39
from bzrlib.graph import Graph
40
from bzrlib.knit import _DirectPackAccess
41
from bzrlib.btree_index import BTreeBuilder
42
from bzrlib.lru_cache import LRUSizeCache
43
from bzrlib.tsort import topo_sort
44
from bzrlib.versionedfile import (
47
ChunkedContentFactory,
48
FulltextContentFactory,
52
_USE_LZMA = False and (pylzma is not None)
54
# osutils.sha_string('')
55
_null_sha1 = 'da39a3ee5e6b4b0d3255bfef95601890afd80709'
58
def sort_gc_optimal(parent_map):
59
"""Sort and group the keys in parent_map into groupcompress order.
61
groupcompress is defined (currently) as reverse-topological order, grouped
64
:return: A sorted-list of keys
66
# groupcompress ordering is approximately reverse topological,
67
# properly grouped by file-id.
69
for item in parent_map.iteritems():
71
if isinstance(key, str) or len(key) == 1:
76
per_prefix_map[prefix].append(item)
78
per_prefix_map[prefix] = [item]
81
for prefix in sorted(per_prefix_map):
82
present_keys.extend(reversed(topo_sort(per_prefix_map[prefix])))
86
# The max zlib window size is 32kB, so if we set 'max_size' output of the
87
# decompressor to the requested bytes + 32kB, then we should guarantee
88
# num_bytes coming out.
89
_ZLIB_DECOMP_WINDOW = 32*1024
91
class GroupCompressBlock(object):
92
"""An object which maintains the internal structure of the compressed data.
94
This tracks the meta info (start of text, length, type, etc.)
97
# Group Compress Block v1 Zlib
98
GCB_HEADER = 'gcb1z\n'
99
GCB_LZ_HEADER = 'gcb1l\n'
102
# map by key? or just order in file?
103
self._compressor_name = None
104
self._z_content = None
105
self._z_content_decompressor = None
106
self._z_content_length = None
107
self._content_length = None
111
# This is the maximum number of bytes this object will reference if
112
# everything is decompressed. However, if we decompress less than
113
# everything... (this would cause some problems for LRUSizeCache)
114
return self._content_length + self._z_content_length
116
def _ensure_content(self, num_bytes=None):
117
"""Make sure that content has been expanded enough.
119
:param num_bytes: Ensure that we have extracted at least num_bytes of
120
content. If None, consume everything
122
# TODO: If we re-use the same content block at different times during
123
# get_record_stream(), it is possible that the first pass will
124
# get inserted, triggering an extract/_ensure_content() which
125
# will get rid of _z_content. And then the next use of the block
126
# will try to access _z_content (to send it over the wire), and
127
# fail because it is already extracted. Consider never releasing
128
# _z_content because of this.
129
if num_bytes is None:
130
num_bytes = self._content_length
131
if self._content_length is not None:
132
assert num_bytes <= self._content_length
133
if self._content is None:
134
assert self._z_content is not None
135
if self._z_content == '':
137
elif self._compressor_name == 'lzma':
138
# We don't do partial lzma decomp yet
139
self._content = pylzma.decompress(self._z_content)
141
# Start a zlib decompressor
142
assert self._compressor_name == 'zlib'
143
if num_bytes is None:
144
self._content = zlib.decompress(self._z_content)
146
self._z_content_decompressor = zlib.decompressobj()
147
# Seed the decompressor with the uncompressed bytes, so
148
# that the rest of the code is simplified
149
self._content = self._z_content_decompressor.decompress(
150
self._z_content, num_bytes + _ZLIB_DECOMP_WINDOW)
151
# Any bytes remaining to be decompressed will be in the
152
# decompressors 'unconsumed_tail'
153
# Do we have enough bytes already?
154
if num_bytes is not None and len(self._content) >= num_bytes:
156
if num_bytes is None and self._z_content_decompressor is None:
157
# We must have already decompressed everything
159
# If we got this far, and don't have a decompressor, something is wrong
160
assert self._z_content_decompressor is not None
161
remaining_decomp = self._z_content_decompressor.unconsumed_tail
162
if num_bytes is None:
164
# We don't know how much is left, but we'll decompress it all
165
self._content += self._z_content_decompressor.decompress(
167
# Note: There what I consider a bug in zlib.decompressobj
168
# If you pass back in the entire unconsumed_tail, only
169
# this time you don't pass a max-size, it doesn't
170
# change the unconsumed_tail back to None/''.
171
# However, we know we are done with the whole stream
172
self._z_content_decompressor = None
173
self._content_length = len(self._content)
175
# If we have nothing left to decomp, we ran out of decomp bytes
176
assert remaining_decomp
177
needed_bytes = num_bytes - len(self._content)
178
# We always set max_size to 32kB over the minimum needed, so that
179
# zlib will give us as much as we really want.
180
# TODO: If this isn't good enough, we could make a loop here,
181
# that keeps expanding the request until we get enough
182
self._content += self._z_content_decompressor.decompress(
183
remaining_decomp, needed_bytes + _ZLIB_DECOMP_WINDOW)
184
assert len(self._content) >= num_bytes
185
if not self._z_content_decompressor.unconsumed_tail:
186
# The stream is finished
187
self._z_content_decompressor = None
189
def _parse_bytes(self, bytes, pos):
190
"""Read the various lengths from the header.
192
This also populates the various 'compressed' buffers.
194
:return: The position in bytes just after the last newline
196
# At present, we have 2 integers for the compressed and uncompressed
197
# content. In base10 (ascii) 14 bytes can represent > 1TB, so to avoid
198
# checking too far, cap the search to 14 bytes.
199
pos2 = bytes.index('\n', pos, pos + 14)
200
self._z_content_length = int(bytes[pos:pos2])
202
pos2 = bytes.index('\n', pos, pos + 14)
203
self._content_length = int(bytes[pos:pos2])
205
assert len(bytes) == (pos + self._z_content_length)
206
self._z_content = bytes[pos:]
207
assert len(self._z_content) == self._z_content_length
210
def from_bytes(cls, bytes):
212
if bytes[:6] not in (cls.GCB_HEADER, cls.GCB_LZ_HEADER):
213
raise ValueError('bytes did not start with %r' % (cls.GCB_HEADER,))
215
out._compressor_name = 'zlib'
216
elif bytes[4] == 'l':
217
out._compressor_name = 'lzma'
219
raise ValueError('unknown compressor: %r' % (bytes,))
220
out._parse_bytes(bytes, 6)
223
def extract(self, key, start, end, sha1=None):
224
"""Extract the text for a specific key.
226
:param key: The label used for this content
227
:param sha1: TODO (should we validate only when sha1 is supplied?)
228
:return: The bytes for the content
230
if start == end == 0:
232
self._ensure_content(end)
233
# The bytes are 'f' or 'd' for the type, then a variable-length
234
# base128 integer for the content size, then the actual content
235
# We know that the variable-length integer won't be longer than 5
236
# bytes (it takes 5 bytes to encode 2^32)
237
c = self._content[start]
242
raise ValueError('Unknown content control code: %s'
245
content_len, len_len = decode_base128_int(
246
self._content[start + 1:start + 6])
247
content_start = start + 1 + len_len
248
if end != content_start + content_len:
249
raise ValueError('end != len according to field header'
250
' %s != %s' % (end, content_start + content_len))
252
bytes = self._content[content_start:end]
254
bytes = apply_delta_to_source(self._content, content_start, end)
257
def set_content(self, content):
258
"""Set the content of this block."""
259
self._content_length = len(content)
260
self._content = content
261
self._z_content = None
264
"""Encode the information into a byte stream."""
265
compress = zlib.compress
267
compress = pylzma.compress
268
if self._z_content is None:
269
assert self._content is not None
270
self._z_content = compress(self._content)
271
self._z_content_length = len(self._z_content)
273
header = self.GCB_LZ_HEADER
275
header = self.GCB_HEADER
277
'%d\n%d\n' % (self._z_content_length, self._content_length),
280
return ''.join(chunks)
283
class _LazyGroupCompressFactory(object):
284
"""Yield content from a GroupCompressBlock on demand."""
286
def __init__(self, key, parents, manager, start, end, first):
287
"""Create a _LazyGroupCompressFactory
289
:param key: The key of just this record
290
:param parents: The parents of this key (possibly None)
291
:param gc_block: A GroupCompressBlock object
292
:param start: Offset of the first byte for this record in the
294
:param end: Offset of the byte just after the end of this record
295
(ie, bytes = content[start:end])
296
:param first: Is this the first Factory for the given block?
299
self.parents = parents
301
# Note: This attribute coupled with Manager._factories creates a
302
# reference cycle. Perhaps we would rather use a weakref(), or
303
# find an appropriate time to release the ref. After the first
304
# get_bytes_as call? After Manager.get_record_stream() returns
306
self._manager = manager
308
self.storage_kind = 'groupcompress-block'
310
self.storage_kind = 'groupcompress-block-ref'
316
return '%s(%s, first=%s)' % (self.__class__.__name__,
317
self.key, self._first)
319
def get_bytes_as(self, storage_kind):
320
if storage_kind == self.storage_kind:
322
# wire bytes, something...
323
return self._manager._wire_bytes()
326
if storage_kind in ('fulltext', 'chunked'):
327
if self._bytes is None:
328
# Grab and cache the raw bytes for this entry
329
# and break the ref-cycle with _manager since we don't need it
331
self._manager._prepare_for_extract()
332
block = self._manager._block
333
self._bytes = block.extract(self.key, self._start, self._end)
334
# XXX: It seems the smart fetch extracts inventories and chk
335
# pages as fulltexts to find the next chk pages, but then
336
# passes them down to be inserted as a
337
# groupcompress-block, so this is not safe to do. Perhaps
338
# we could just change the storage kind to "fulltext" at
340
# self._manager = None
341
if storage_kind == 'fulltext':
345
raise errors.UnavailableRepresentation(self.key, storage_kind,
349
class _LazyGroupContentManager(object):
350
"""This manages a group of _LazyGroupCompressFactory objects."""
352
def __init__(self, block):
354
# We need to preserve the ordering
358
def add_factory(self, key, parents, start, end):
359
if not self._factories:
363
# Note that this creates a reference cycle....
364
factory = _LazyGroupCompressFactory(key, parents, self,
365
start, end, first=first)
366
# max() works here, but as a function call, doing a compare seems to be
367
# significantly faster, timeit says 250ms for max() and 100ms for the
369
if end > self._last_byte:
370
self._last_byte = end
371
self._factories.append(factory)
373
def get_record_stream(self):
374
"""Get a record for all keys added so far."""
375
for factory in self._factories:
377
# Break the ref-cycle
378
factory._bytes = None
379
# XXX: this is not safe, the smart fetch code requests the content
380
# as both a 'fulltext', and then later on as a
381
# groupcompress-block. The iter_interesting_nodes code also is
382
# still buffering multiple records and returning them later.
383
# So that code would need to be updated to either re-fetch the
384
# original object, or buffer it somehow.
385
# factory._manager = None
386
# TODO: Consider setting self._factories = None after the above loop,
387
# as it will break the reference cycle
389
def _trim_block(self, last_byte):
390
"""Create a new GroupCompressBlock, with just some of the content."""
391
# None of the factories need to be adjusted, because the content is
392
# located in an identical place. Just that some of the unreferenced
393
# trailing bytes are stripped
394
trace.mutter('stripping trailing bytes from groupcompress block'
395
' %d => %d', self._block._content_length, last_byte)
396
new_block = GroupCompressBlock()
397
self._block._ensure_content(last_byte)
398
new_block.set_content(self._block._content[:last_byte])
399
self._block = new_block
401
def _rebuild_block(self):
402
"""Create a new GroupCompressBlock with only the referenced texts."""
403
compressor = GroupCompressor()
405
old_length = self._block._content_length
407
for factory in self._factories:
408
bytes = factory.get_bytes_as('fulltext')
409
(found_sha1, start_point, end_point, type,
410
length) = compressor.compress(factory.key, bytes, factory.sha1)
411
# Now update this factory with the new offsets, etc
412
factory.sha1 = found_sha1
413
factory._start = start_point
414
factory._end = end_point
415
self._last_byte = end_point
416
new_block = compressor.flush()
417
# TODO: Should we check that new_block really *is* smaller than the old
418
# block? It seems hard to come up with a method that it would
419
# expand, since we do full compression again. Perhaps based on a
420
# request that ends up poorly ordered?
421
delta = time.time() - tstart
422
self._block = new_block
423
trace.mutter('creating new compressed block on-the-fly in %.3fs'
424
' %d bytes => %d bytes', delta, old_length,
425
self._block._content_length)
427
def _prepare_for_extract(self):
428
"""A _LazyGroupCompressFactory is about to extract to fulltext."""
429
# We expect that if one child is going to fulltext, all will be. This
430
# helps prevent all of them from extracting a small amount at a time.
431
# Which in itself isn't terribly expensive, but resizing 2MB 32kB at a
432
# time (self._block._content) is a little expensive.
433
self._block._ensure_content(self._last_byte)
435
def _check_rebuild_block(self):
436
"""Check to see if our block should be repacked."""
439
for factory in self._factories:
440
total_bytes_used += factory._end - factory._start
441
last_byte_used = max(last_byte_used, factory._end)
442
# If we are using most of the bytes from the block, we have nothing
443
# else to check (currently more that 1/2)
444
if total_bytes_used * 2 >= self._block._content_length:
446
# Can we just strip off the trailing bytes? If we are going to be
447
# transmitting more than 50% of the front of the content, go ahead
448
if total_bytes_used * 2 > last_byte_used:
449
self._trim_block(last_byte_used)
452
# We are using a small amount of the data, and it isn't just packed
453
# nicely at the front, so rebuild the content.
454
# Note: This would be *nicer* as a strip-data-from-group, rather than
455
# building it up again from scratch
456
# It might be reasonable to consider the fulltext sizes for
457
# different bits when deciding this, too. As you may have a small
458
# fulltext, and a trivial delta, and you are just trading around
459
# for another fulltext. If we do a simple 'prune' you may end up
460
# expanding many deltas into fulltexts, as well.
461
# If we build a cheap enough 'strip', then we could try a strip,
462
# if that expands the content, we then rebuild.
463
self._rebuild_block()
465
def _wire_bytes(self):
466
"""Return a byte stream suitable for transmitting over the wire."""
467
self._check_rebuild_block()
468
# The outer block starts with:
469
# 'groupcompress-block\n'
470
# <length of compressed key info>\n
471
# <length of uncompressed info>\n
472
# <length of gc block>\n
475
lines = ['groupcompress-block\n']
476
# The minimal info we need is the key, the start offset, and the
477
# parents. The length and type are encoded in the record itself.
478
# However, passing in the other bits makes it easier. The list of
479
# keys, and the start offset, the length
481
# 1 line with parents, '' for ()
482
# 1 line for start offset
483
# 1 line for end byte
485
for factory in self._factories:
486
key_bytes = '\x00'.join(factory.key)
487
parents = factory.parents
489
parent_bytes = 'None:'
491
parent_bytes = '\t'.join('\x00'.join(key) for key in parents)
492
record_header = '%s\n%s\n%d\n%d\n' % (
493
key_bytes, parent_bytes, factory._start, factory._end)
494
header_lines.append(record_header)
495
header_bytes = ''.join(header_lines)
497
header_bytes_len = len(header_bytes)
498
z_header_bytes = zlib.compress(header_bytes)
500
z_header_bytes_len = len(z_header_bytes)
501
block_bytes = self._block.to_bytes()
502
lines.append('%d\n%d\n%d\n' % (z_header_bytes_len, header_bytes_len,
504
lines.append(z_header_bytes)
505
lines.append(block_bytes)
506
del z_header_bytes, block_bytes
507
return ''.join(lines)
510
def from_bytes(cls, bytes):
511
# TODO: This does extra string copying, probably better to do it a
513
(storage_kind, z_header_len, header_len,
514
block_len, rest) = bytes.split('\n', 4)
516
if storage_kind != 'groupcompress-block':
517
raise ValueError('Unknown storage kind: %s' % (storage_kind,))
518
z_header_len = int(z_header_len)
519
if len(rest) < z_header_len:
520
raise ValueError('Compressed header len shorter than all bytes')
521
z_header = rest[:z_header_len]
522
header_len = int(header_len)
523
header = zlib.decompress(z_header)
524
if len(header) != header_len:
525
raise ValueError('invalid length for decompressed bytes')
527
block_len = int(block_len)
528
if len(rest) != z_header_len + block_len:
529
raise ValueError('Invalid length for block')
530
block_bytes = rest[z_header_len:]
532
# So now we have a valid GCB, we just need to parse the factories that
534
header_lines = header.split('\n')
536
last = header_lines.pop()
538
raise ValueError('header lines did not end with a trailing'
540
if len(header_lines) % 4 != 0:
541
raise ValueError('The header was not an even multiple of 4 lines')
542
block = GroupCompressBlock.from_bytes(block_bytes)
545
for start in xrange(0, len(header_lines), 4):
547
key = tuple(header_lines[start].split('\x00'))
548
parents_line = header_lines[start+1]
549
if parents_line == 'None:':
552
parents = tuple([tuple(segment.split('\x00'))
553
for segment in parents_line.split('\t')
555
start_offset = int(header_lines[start+2])
556
end_offset = int(header_lines[start+3])
557
result.add_factory(key, parents, start_offset, end_offset)
561
def network_block_to_records(storage_kind, bytes, line_end):
562
if storage_kind != 'groupcompress-block':
563
raise ValueError('Unknown storage kind: %s' % (storage_kind,))
564
manager = _LazyGroupContentManager.from_bytes(bytes)
565
return manager.get_record_stream()
568
class _CommonGroupCompressor(object):
571
"""Create a GroupCompressor."""
576
self.labels_deltas = {}
577
self._delta_index = None # Set by the children
578
self._block = GroupCompressBlock()
580
def compress(self, key, bytes, expected_sha, nostore_sha=None, soft=False):
581
"""Compress lines with label key.
583
:param key: A key tuple. It is stored in the output
584
for identification of the text during decompression. If the last
585
element is 'None' it is replaced with the sha1 of the text -
587
:param bytes: The bytes to be compressed
588
:param expected_sha: If non-None, the sha the lines are believed to
589
have. During compression the sha is calculated; a mismatch will
591
:param nostore_sha: If the computed sha1 sum matches, we will raise
592
ExistingContent rather than adding the text.
593
:param soft: Do a 'soft' compression. This means that we require larger
594
ranges to match to be considered for a copy command.
596
:return: The sha1 of lines, the start and end offsets in the delta, the
597
type ('fulltext' or 'delta') and the number of bytes accumulated in
598
the group output so far.
600
:seealso VersionedFiles.add_lines:
602
if not bytes: # empty, like a dir entry, etc
603
if nostore_sha == _null_sha1:
604
raise errors.ExistingContent()
605
return _null_sha1, 0, 0, 'fulltext', 0
606
# we assume someone knew what they were doing when they passed it in
607
if expected_sha is not None:
610
sha1 = osutils.sha_string(bytes)
611
if nostore_sha is not None:
612
if sha1 == nostore_sha:
613
raise errors.ExistingContent()
615
key = key[:-1] + ('sha1:' + sha1,)
617
return self._compress(key, bytes, sha1, len(bytes) / 2, soft)
619
def _compress(self, key, bytes, sha1, max_delta_size, soft=False):
620
"""Compress lines with label key.
622
:param key: A key tuple. It is stored in the output for identification
623
of the text during decompression.
625
:param bytes: The bytes to be compressed
627
:param sha1: The sha1 for 'bytes'.
629
:param max_delta_size: The size above which we issue a fulltext instead
632
:param soft: Do a 'soft' compression. This means that we require larger
633
ranges to match to be considered for a copy command.
635
:return: The sha1 of lines, the start and end offsets in the delta, the
636
type ('fulltext' or 'delta') and the number of bytes accumulated in
637
the group output so far.
639
raise NotImplementedError(self._compress)
641
def extract(self, key):
642
"""Extract a key previously added to the compressor.
644
:param key: The key to extract.
645
:return: An iterable over bytes and the sha1.
647
(start_byte, start_chunk, end_byte, end_chunk) = self.labels_deltas[key]
648
delta_chunks = self.chunks[start_chunk:end_chunk]
649
stored_bytes = ''.join(delta_chunks)
650
if stored_bytes[0] == 'f':
651
fulltext_len, offset = decode_base128_int(stored_bytes[1:10])
652
data_len = fulltext_len + 1 + offset
653
if data_len != len(stored_bytes):
654
raise ValueError('Index claimed fulltext len, but stored bytes'
656
% (len(stored_bytes), data_len))
657
bytes = stored_bytes[offset + 1:]
659
# XXX: This is inefficient at best
660
source = ''.join(self.chunks[:start_chunk])
661
if stored_bytes[0] != 'd':
662
raise ValueError('Unknown content kind, bytes claim %s'
663
% (stored_bytes[0],))
664
delta_len, offset = decode_base128_int(stored_bytes[1:10])
665
data_len = delta_len + 1 + offset
666
if data_len != len(stored_bytes):
667
raise ValueError('Index claimed delta len, but stored bytes'
669
% (len(stored_bytes), data_len))
670
bytes = apply_delta(source, stored_bytes[offset + 1:])
671
bytes_sha1 = osutils.sha_string(bytes)
672
return bytes, bytes_sha1
675
"""Finish this group, creating a formatted stream.
677
After calling this, the compressor should no longer be used
679
content = ''.join(self.chunks)
681
self._delta_index = None
682
self._block.set_content(content)
686
"""Call this if you want to 'revoke' the last compression.
688
After this, the data structures will be rolled back, but you cannot do
691
self._delta_index = None
692
del self.chunks[self._last[0]:]
693
self.endpoint = self._last[1]
697
"""Return the overall compression ratio."""
698
return float(self.input_bytes) / float(self.endpoint)
701
class PythonGroupCompressor(_CommonGroupCompressor):
704
"""Create a GroupCompressor.
706
:param delta: If False, do not compress records.
708
super(PythonGroupCompressor, self).__init__()
709
self._delta_index = LinesDeltaIndex([])
710
# The actual content is managed by LinesDeltaIndex
711
self.chunks = self._delta_index.lines
713
def _compress(self, key, bytes, sha1, max_delta_size, soft=False):
714
"""see _CommonGroupCompressor._compress"""
715
bytes_length = len(bytes)
716
new_lines = osutils.split_lines(bytes)
717
out_lines, index_lines = self._delta_index.make_delta(new_lines,
718
bytes_length=bytes_length, soft=soft)
719
delta_length = sum(map(len, out_lines))
720
if delta_length > max_delta_size:
721
# The delta is longer than the fulltext, insert a fulltext
723
out_lines = ['f', encode_base128_int(bytes_length)]
724
out_lines.extend(new_lines)
725
index_lines = [False, False]
726
index_lines.extend([True] * len(new_lines))
727
out_length = len(out_lines[1]) + bytes_length + 1
729
# this is a worthy delta, output it
732
# Update the delta_length to include those two encoded integers
733
out_lines[1] = encode_base128_int(delta_length)
734
out_length = len(out_lines[3]) + 1 + delta_length
735
start = self.endpoint # Before insertion
736
chunk_start = len(self._delta_index.lines)
737
self._delta_index.extend_lines(out_lines, index_lines)
738
self.endpoint = self._delta_index.endpoint
739
self.input_bytes += bytes_length
740
chunk_end = len(self._delta_index.lines)
741
self.labels_deltas[key] = (start, chunk_start,
742
self.endpoint, chunk_end)
743
return sha1, start, self.endpoint, type, out_length
746
class PyrexGroupCompressor(_CommonGroupCompressor):
747
"""Produce a serialised group of compressed texts.
749
It contains code very similar to SequenceMatcher because of having a similar
750
task. However some key differences apply:
751
- there is no junk, we want a minimal edit not a human readable diff.
752
- we don't filter very common lines (because we don't know where a good
753
range will start, and after the first text we want to be emitting minmal
755
- we chain the left side, not the right side
756
- we incrementally update the adjacency matrix as new lines are provided.
757
- we look for matches in all of the left side, so the routine which does
758
the analagous task of find_longest_match does not need to filter on the
763
super(PyrexGroupCompressor, self).__init__()
764
self._delta_index = DeltaIndex()
766
def _compress(self, key, bytes, sha1, max_delta_size, soft=False):
767
"""see _CommonGroupCompressor._compress"""
768
input_len = len(bytes)
769
# By having action/label/sha1/len, we can parse the group if the index
770
# was ever destroyed, we have the key in 'label', we know the final
771
# bytes are valid from sha1, and we know where to find the end of this
772
# record because of 'len'. (the delta record itself will store the
773
# total length for the expanded record)
774
# 'len: %d\n' costs approximately 1% increase in total data
775
# Having the labels at all costs us 9-10% increase, 38% increase for
776
# inventory pages, and 5.8% increase for text pages
777
# new_chunks = ['label:%s\nsha1:%s\n' % (label, sha1)]
778
if self._delta_index._source_offset != self.endpoint:
779
raise AssertionError('_source_offset != endpoint'
780
' somehow the DeltaIndex got out of sync with'
782
delta = self._delta_index.make_delta(bytes, max_delta_size)
785
enc_length = encode_base128_int(len(bytes))
786
len_mini_header = 1 + len(enc_length)
787
length = len(bytes) + len_mini_header
788
self._delta_index.add_source(bytes, len_mini_header)
789
new_chunks = ['f', enc_length, bytes]
792
enc_length = encode_base128_int(len(delta))
793
len_mini_header = 1 + len(enc_length)
794
length = len(delta) + len_mini_header
795
new_chunks = ['d', enc_length, delta]
796
self._delta_index.add_delta_source(delta, len_mini_header)
798
start = self.endpoint
799
chunk_start = len(self.chunks)
800
# Now output these bytes
801
self._output_chunks(new_chunks)
802
self.input_bytes += input_len
803
chunk_end = len(self.chunks)
804
self.labels_deltas[key] = (start, chunk_start,
805
self.endpoint, chunk_end)
806
if not self._delta_index._source_offset == self.endpoint:
807
raise AssertionError('the delta index is out of sync'
808
'with the output lines %s != %s'
809
% (self._delta_index._source_offset, self.endpoint))
810
return sha1, start, self.endpoint, type, length
812
def _output_chunks(self, new_chunks):
813
"""Output some chunks.
815
:param new_chunks: The chunks to output.
817
self._last = (len(self.chunks), self.endpoint)
818
endpoint = self.endpoint
819
self.chunks.extend(new_chunks)
820
endpoint += sum(map(len, new_chunks))
821
self.endpoint = endpoint
824
def make_pack_factory(graph, delta, keylength):
825
"""Create a factory for creating a pack based groupcompress.
827
This is only functional enough to run interface tests, it doesn't try to
828
provide a full pack environment.
830
:param graph: Store a graph.
831
:param delta: Delta compress contents.
832
:param keylength: How long should keys be.
834
def factory(transport):
839
graph_index = BTreeBuilder(reference_lists=ref_length,
840
key_elements=keylength)
841
stream = transport.open_write_stream('newpack')
842
writer = pack.ContainerWriter(stream.write)
844
index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
845
add_callback=graph_index.add_nodes)
846
access = _DirectPackAccess({})
847
access.set_writer(writer, graph_index, (transport, 'newpack'))
848
result = GroupCompressVersionedFiles(index, access, delta)
849
result.stream = stream
850
result.writer = writer
855
def cleanup_pack_group(versioned_files):
856
versioned_files.writer.end()
857
versioned_files.stream.close()
860
class GroupCompressVersionedFiles(VersionedFiles):
861
"""A group-compress based VersionedFiles implementation."""
863
def __init__(self, index, access, delta=True):
864
"""Create a GroupCompressVersionedFiles object.
866
:param index: The index object storing access and graph data.
867
:param access: The access object storing raw data.
868
:param delta: Whether to delta compress or just entropy compress.
871
self._access = access
873
self._unadded_refs = {}
874
self._group_cache = LRUSizeCache(max_size=50*1024*1024)
875
self._fallback_vfs = []
877
def add_lines(self, key, parents, lines, parent_texts=None,
878
left_matching_blocks=None, nostore_sha=None, random_id=False,
880
"""Add a text to the store.
882
:param key: The key tuple of the text to add.
883
:param parents: The parents key tuples of the text to add.
884
:param lines: A list of lines. Each line must be a bytestring. And all
885
of them except the last must be terminated with \n and contain no
886
other \n's. The last line may either contain no \n's or a single
887
terminating \n. If the lines list does meet this constraint the add
888
routine may error or may succeed - but you will be unable to read
889
the data back accurately. (Checking the lines have been split
890
correctly is expensive and extremely unlikely to catch bugs so it
891
is not done at runtime unless check_content is True.)
892
:param parent_texts: An optional dictionary containing the opaque
893
representations of some or all of the parents of version_id to
894
allow delta optimisations. VERY IMPORTANT: the texts must be those
895
returned by add_lines or data corruption can be caused.
896
:param left_matching_blocks: a hint about which areas are common
897
between the text and its left-hand-parent. The format is
898
the SequenceMatcher.get_matching_blocks format.
899
:param nostore_sha: Raise ExistingContent and do not add the lines to
900
the versioned file if the digest of the lines matches this.
901
:param random_id: If True a random id has been selected rather than
902
an id determined by some deterministic process such as a converter
903
from a foreign VCS. When True the backend may choose not to check
904
for uniqueness of the resulting key within the versioned file, so
905
this should only be done when the result is expected to be unique
907
:param check_content: If True, the lines supplied are verified to be
908
bytestrings that are correctly formed lines.
909
:return: The text sha1, the number of bytes in the text, and an opaque
910
representation of the inserted version which can be provided
911
back to future add_lines calls in the parent_texts dictionary.
913
self._index._check_write_ok()
914
self._check_add(key, lines, random_id, check_content)
916
# The caller might pass None if there is no graph data, but kndx
917
# indexes can't directly store that, so we give them
918
# an empty tuple instead.
920
# double handling for now. Make it work until then.
921
length = sum(map(len, lines))
922
record = ChunkedContentFactory(key, parents, None, lines)
923
sha1 = list(self._insert_record_stream([record], random_id=random_id,
924
nostore_sha=nostore_sha))[0]
925
return sha1, length, None
927
def add_fallback_versioned_files(self, a_versioned_files):
928
"""Add a source of texts for texts not present in this knit.
930
:param a_versioned_files: A VersionedFiles object.
932
self._fallback_vfs.append(a_versioned_files)
934
def annotate(self, key):
935
"""See VersionedFiles.annotate."""
937
parent_map = self.get_parent_map([key])
939
raise errors.RevisionNotPresent(key, self)
940
if parent_map[key] is not None:
941
search = graph._make_breadth_first_searcher([key])
945
present, ghosts = search.next_with_ghosts()
946
except StopIteration:
949
parent_map = self.get_parent_map(keys)
952
parent_map = {key:()}
953
head_cache = _mod_graph.FrozenHeadsCache(graph)
955
reannotate = annotate.reannotate
956
for record in self.get_record_stream(keys, 'topological', True):
958
chunks = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
959
parent_lines = [parent_cache[parent] for parent in parent_map[key]]
960
parent_cache[key] = list(
961
reannotate(parent_lines, chunks, key, None, head_cache))
962
return parent_cache[key]
964
def check(self, progress_bar=None):
965
"""See VersionedFiles.check()."""
967
for record in self.get_record_stream(keys, 'unordered', True):
968
record.get_bytes_as('fulltext')
970
def _check_add(self, key, lines, random_id, check_content):
971
"""check that version_id and lines are safe to add."""
973
if version_id is not None:
974
if osutils.contains_whitespace(version_id):
975
raise errors.InvalidRevisionId(version_id, self)
976
self.check_not_reserved_id(version_id)
977
# TODO: If random_id==False and the key is already present, we should
978
# probably check that the existing content is identical to what is
979
# being inserted, and otherwise raise an exception. This would make
980
# the bundle code simpler.
982
self._check_lines_not_unicode(lines)
983
self._check_lines_are_lines(lines)
985
def get_parent_map(self, keys):
986
"""Get a map of the graph parents of keys.
988
:param keys: The keys to look up parents for.
989
:return: A mapping from keys to parents. Absent keys are absent from
992
return self._get_parent_map_with_sources(keys)[0]
994
def _get_parent_map_with_sources(self, keys):
995
"""Get a map of the parents of keys.
997
:param keys: The keys to look up parents for.
998
:return: A tuple. The first element is a mapping from keys to parents.
999
Absent keys are absent from the mapping. The second element is a
1000
list with the locations each key was found in. The first element
1001
is the in-this-knit parents, the second the first fallback source,
1005
sources = [self._index] + self._fallback_vfs
1008
for source in sources:
1011
new_result = source.get_parent_map(missing)
1012
source_results.append(new_result)
1013
result.update(new_result)
1014
missing.difference_update(set(new_result))
1015
return result, source_results
1017
def _get_block(self, index_memo):
1018
read_memo = index_memo[0:3]
1021
block = self._group_cache[read_memo]
1024
zdata = self._access.get_raw_records([read_memo]).next()
1025
# decompress - whole thing - this is not a bug, as it
1026
# permits caching. We might want to store the partially
1027
# decompresed group and decompress object, so that recent
1028
# texts are not penalised by big groups.
1029
block = GroupCompressBlock.from_bytes(zdata)
1030
self._group_cache[read_memo] = block
1032
# print len(zdata), len(plain)
1033
# parse - requires split_lines, better to have byte offsets
1034
# here (but not by much - we only split the region for the
1035
# recipe, and we often want to end up with lines anyway.
1038
def get_missing_compression_parent_keys(self):
1039
"""Return the keys of missing compression parents.
1041
Missing compression parents occur when a record stream was missing
1042
basis texts, or a index was scanned that had missing basis texts.
1044
# GroupCompress cannot currently reference texts that are not in the
1045
# group, so this is valid for now
1048
def get_record_stream(self, keys, ordering, include_delta_closure):
1049
"""Get a stream of records for keys.
1051
:param keys: The keys to include.
1052
:param ordering: Either 'unordered' or 'topological'. A topologically
1053
sorted stream has compression parents strictly before their
1055
:param include_delta_closure: If True then the closure across any
1056
compression parents will be included (in the opaque data).
1057
:return: An iterator of ContentFactory objects, each of which is only
1058
valid until the iterator is advanced.
1060
# keys might be a generator
1061
orig_keys = list(keys)
1065
if (not self._index.has_graph
1066
and ordering in ('topological', 'groupcompress')):
1067
# Cannot topological order when no graph has been stored.
1068
# but we allow 'as-requested' or 'unordered'
1069
ordering = 'unordered'
1071
remaining_keys = keys
1074
keys = set(remaining_keys)
1075
for content_factory in self._get_remaining_record_stream(keys,
1076
orig_keys, ordering, include_delta_closure):
1077
remaining_keys.discard(content_factory.key)
1078
yield content_factory
1080
except errors.RetryWithNewPacks, e:
1081
self._access.reload_or_raise(e)
1083
def _find_from_fallback(self, missing):
1084
"""Find whatever keys you can from the fallbacks.
1086
:param missing: A set of missing keys. This set will be mutated as keys
1087
are found from a fallback_vfs
1088
:return: (parent_map, key_to_source_map, source_results)
1089
parent_map the overall key => parent_keys
1090
key_to_source_map a dict from {key: source}
1091
source_results a list of (source: keys)
1094
key_to_source_map = {}
1096
for source in self._fallback_vfs:
1099
source_parents = source.get_parent_map(missing)
1100
parent_map.update(source_parents)
1101
source_parents = list(source_parents)
1102
source_results.append((source, source_parents))
1103
key_to_source_map.update((key, source) for key in source_parents)
1104
missing.difference_update(source_parents)
1105
return parent_map, key_to_source_map, source_results
1107
def _get_ordered_source_keys(self, ordering, parent_map, key_to_source_map):
1108
"""Get the (source, [keys]) list.
1110
The returned objects should be in the order defined by 'ordering',
1111
which can weave between different sources.
1112
:param ordering: Must be one of 'topological' or 'groupcompress'
1113
:return: List of [(source, [keys])] tuples, such that all keys are in
1114
the defined order, regardless of source.
1116
if ordering == 'topological':
1117
present_keys = topo_sort(parent_map)
1119
# ordering == 'groupcompress'
1120
# XXX: This only optimizes for the target ordering. We may need
1121
# to balance that with the time it takes to extract
1122
# ordering, by somehow grouping based on
1123
# locations[key][0:3]
1124
present_keys = sort_gc_optimal(parent_map)
1125
# Now group by source:
1127
current_source = None
1128
for key in present_keys:
1129
source = key_to_source_map.get(key, self)
1130
if source is not current_source:
1131
source_keys.append((source, []))
1132
current_source = source
1133
source_keys[-1][1].append(key)
1136
def _get_as_requested_source_keys(self, orig_keys, locations, unadded_keys,
1139
current_source = None
1140
for key in orig_keys:
1141
if key in locations or key in unadded_keys:
1143
elif key in key_to_source_map:
1144
source = key_to_source_map[key]
1147
if source is not current_source:
1148
source_keys.append((source, []))
1149
current_source = source
1150
source_keys[-1][1].append(key)
1153
def _get_io_ordered_source_keys(self, locations, unadded_keys,
1156
# This is the group the bytes are stored in, followed by the
1157
# location in the group
1158
return locations[key][0]
1159
present_keys = sorted(locations.iterkeys(), key=get_group)
1160
# We don't have an ordering for keys in the in-memory object, but
1161
# lets process the in-memory ones first.
1162
present_keys = list(unadded_keys) + present_keys
1163
# Now grab all of the ones from other sources
1164
source_keys = [(self, present_keys)]
1165
source_keys.extend(source_result)
1168
def _get_remaining_record_stream(self, keys, orig_keys, ordering,
1169
include_delta_closure):
1170
"""Get a stream of records for keys.
1172
:param keys: The keys to include.
1173
:param ordering: one of 'unordered', 'topological', 'groupcompress' or
1175
:param include_delta_closure: If True then the closure across any
1176
compression parents will be included (in the opaque data).
1177
:return: An iterator of ContentFactory objects, each of which is only
1178
valid until the iterator is advanced.
1181
locations = self._index.get_build_details(keys)
1182
unadded_keys = set(self._unadded_refs).intersection(keys)
1183
missing = keys.difference(locations)
1184
missing.difference_update(unadded_keys)
1185
(fallback_parent_map, key_to_source_map,
1186
source_result) = self._find_from_fallback(missing)
1187
if ordering in ('topological', 'groupcompress'):
1188
# would be better to not globally sort initially but instead
1189
# start with one key, recurse to its oldest parent, then grab
1190
# everything in the same group, etc.
1191
parent_map = dict((key, details[2]) for key, details in
1192
locations.iteritems())
1193
for key in unadded_keys:
1194
parent_map[key] = self._unadded_refs[key]
1195
parent_map.update(fallback_parent_map)
1196
source_keys = self._get_ordered_source_keys(ordering, parent_map,
1198
elif ordering == 'as-requested':
1199
source_keys = self._get_as_requested_source_keys(orig_keys,
1200
locations, unadded_keys, key_to_source_map)
1202
# We want to yield the keys in a semi-optimal (read-wise) ordering.
1203
# Otherwise we thrash the _group_cache and destroy performance
1204
source_keys = self._get_io_ordered_source_keys(locations,
1205
unadded_keys, source_result)
1207
yield AbsentContentFactory(key)
1209
last_read_memo = None
1210
# TODO: This works fairly well at batching up existing groups into a
1211
# streamable format, and possibly allowing for taking one big
1212
# group and splitting it when it isn't fully utilized.
1213
# However, it doesn't allow us to find under-utilized groups and
1214
# combine them into a bigger group on the fly.
1215
# (Consider the issue with how chk_map inserts texts
1216
# one-at-a-time.) This could be done at insert_record_stream()
1217
# time, but it probably would decrease the number of
1218
# bytes-on-the-wire for fetch.
1219
for source, keys in source_keys:
1222
if key in self._unadded_refs:
1223
if manager is not None:
1224
for factory in manager.get_record_stream():
1226
last_read_memo = manager = None
1227
bytes, sha1 = self._compressor.extract(key)
1228
parents = self._unadded_refs[key]
1229
yield FulltextContentFactory(key, parents, sha1, bytes)
1231
index_memo, _, parents, (method, _) = locations[key]
1232
read_memo = index_memo[0:3]
1233
if last_read_memo != read_memo:
1234
# We are starting a new block. If we have a
1235
# manager, we have found everything that fits for
1236
# now, so yield records
1237
if manager is not None:
1238
for factory in manager.get_record_stream():
1240
# Now start a new manager
1241
block = self._get_block(index_memo)
1242
manager = _LazyGroupContentManager(block)
1243
last_read_memo = read_memo
1244
start, end = index_memo[3:5]
1245
manager.add_factory(key, parents, start, end)
1247
if manager is not None:
1248
for factory in manager.get_record_stream():
1250
last_read_memo = manager = None
1251
for record in source.get_record_stream(keys, ordering,
1252
include_delta_closure):
1254
if manager is not None:
1255
for factory in manager.get_record_stream():
1258
def get_sha1s(self, keys):
1259
"""See VersionedFiles.get_sha1s()."""
1261
for record in self.get_record_stream(keys, 'unordered', True):
1262
if record.sha1 != None:
1263
result[record.key] = record.sha1
1265
if record.storage_kind != 'absent':
1266
result[record.key] = osutils.sha_string(
1267
record.get_bytes_as('fulltext'))
1270
def insert_record_stream(self, stream):
1271
"""Insert a record stream into this container.
1273
:param stream: A stream of records to insert.
1275
:seealso VersionedFiles.get_record_stream:
1277
for _ in self._insert_record_stream(stream):
1280
def _insert_record_stream(self, stream, random_id=False, nostore_sha=None,
1282
"""Internal core to insert a record stream into this container.
1284
This helper function has a different interface than insert_record_stream
1285
to allow add_lines to be minimal, but still return the needed data.
1287
:param stream: A stream of records to insert.
1288
:param nostore_sha: If the sha1 of a given text matches nostore_sha,
1289
raise ExistingContent, rather than committing the new text.
1290
:param reuse_blocks: If the source is streaming from
1291
groupcompress-blocks, just insert the blocks as-is, rather than
1292
expanding the texts and inserting again.
1293
:return: An iterator over the sha1 of the inserted records.
1294
:seealso insert_record_stream:
1298
def get_adapter(adapter_key):
1300
return adapters[adapter_key]
1302
adapter_factory = adapter_registry.get(adapter_key)
1303
adapter = adapter_factory(self)
1304
adapters[adapter_key] = adapter
1306
# This will go up to fulltexts for gc to gc fetching, which isn't
1308
self._compressor = GroupCompressor()
1309
self._unadded_refs = {}
1312
bytes = self._compressor.flush().to_bytes()
1313
index, start, length = self._access.add_raw_records(
1314
[(None, len(bytes))], bytes)[0]
1316
for key, reads, refs in keys_to_add:
1317
nodes.append((key, "%d %d %s" % (start, length, reads), refs))
1318
self._index.add_records(nodes, random_id=random_id)
1319
self._unadded_refs = {}
1321
self._compressor = GroupCompressor()
1324
last_fulltext_len = None
1325
max_fulltext_len = 0
1326
max_fulltext_prefix = None
1327
insert_manager = None
1330
for record in stream:
1331
# Raise an error when a record is missing.
1332
if record.storage_kind == 'absent':
1333
raise errors.RevisionNotPresent(record.key, self)
1335
# If the reuse_blocks flag is set, check to see if we can just
1336
# copy a groupcompress block as-is.
1337
if record.storage_kind == 'groupcompress-block':
1338
# Insert the raw block into the target repo
1339
insert_manager = record._manager
1340
insert_manager._check_rebuild_block()
1341
bytes = record._manager._block.to_bytes()
1342
_, start, length = self._access.add_raw_records(
1343
[(None, len(bytes))], bytes)[0]
1346
block_length = length
1347
if record.storage_kind in ('groupcompress-block',
1348
'groupcompress-block-ref'):
1349
assert insert_manager is not None
1350
assert record._manager is insert_manager
1351
value = "%d %d %d %d" % (block_start, block_length,
1352
record._start, record._end)
1353
nodes = [(record.key, value, (record.parents,))]
1354
# TODO: Consider buffering up many nodes to be added, not
1355
# sure how much overhead this has, but we're seeing
1356
# ~23s / 120s in add_records calls
1357
self._index.add_records(nodes, random_id=random_id)
1360
bytes = record.get_bytes_as('fulltext')
1361
except errors.UnavailableRepresentation:
1362
adapter_key = record.storage_kind, 'fulltext'
1363
adapter = get_adapter(adapter_key)
1364
bytes = adapter.get_bytes(record)
1365
if len(record.key) > 1:
1366
prefix = record.key[0]
1367
soft = (prefix == last_prefix)
1371
if max_fulltext_len < len(bytes):
1372
max_fulltext_len = len(bytes)
1373
max_fulltext_prefix = prefix
1374
(found_sha1, start_point, end_point, type,
1375
length) = self._compressor.compress(record.key,
1376
bytes, record.sha1, soft=soft,
1377
nostore_sha=nostore_sha)
1378
# delta_ratio = float(len(bytes)) / length
1379
# Check if we want to continue to include that text
1380
if (prefix == max_fulltext_prefix
1381
and end_point < 2 * max_fulltext_len):
1382
# As long as we are on the same file_id, we will fill at least
1383
# 2 * max_fulltext_len
1384
start_new_block = False
1385
elif end_point > 4*1024*1024:
1386
start_new_block = True
1387
elif (prefix is not None and prefix != last_prefix
1388
and end_point > 2*1024*1024):
1389
start_new_block = True
1391
start_new_block = False
1392
last_prefix = prefix
1394
self._compressor.pop_last()
1396
max_fulltext_len = len(bytes)
1397
(found_sha1, start_point, end_point, type,
1398
length) = self._compressor.compress(record.key,
1400
last_fulltext_len = length
1401
if record.key[-1] is None:
1402
key = record.key[:-1] + ('sha1:' + found_sha1,)
1405
self._unadded_refs[key] = record.parents
1407
keys_to_add.append((key, '%d %d' % (start_point, end_point),
1409
if len(keys_to_add):
1411
self._compressor = None
1413
def iter_lines_added_or_present_in_keys(self, keys, pb=None):
1414
"""Iterate over the lines in the versioned files from keys.
1416
This may return lines from other keys. Each item the returned
1417
iterator yields is a tuple of a line and a text version that that line
1418
is present in (not introduced in).
1420
Ordering of results is in whatever order is most suitable for the
1421
underlying storage format.
1423
If a progress bar is supplied, it may be used to indicate progress.
1424
The caller is responsible for cleaning up progress bars (because this
1428
* Lines are normalised by the underlying store: they will all have \n
1430
* Lines are returned in arbitrary order.
1432
:return: An iterator over (line, key).
1435
pb = progress.DummyProgress()
1438
# we don't care about inclusions, the caller cares.
1439
# but we need to setup a list of records to visit.
1440
# we need key, position, length
1441
for key_idx, record in enumerate(self.get_record_stream(keys,
1442
'unordered', True)):
1443
# XXX: todo - optimise to use less than full texts.
1445
pb.update('Walking content', key_idx, total)
1446
if record.storage_kind == 'absent':
1447
raise errors.RevisionNotPresent(key, self)
1448
lines = osutils.split_lines(record.get_bytes_as('fulltext'))
1451
pb.update('Walking content', total, total)
1454
"""See VersionedFiles.keys."""
1455
if 'evil' in debug.debug_flags:
1456
trace.mutter_callsite(2, "keys scales with size of history")
1457
sources = [self._index] + self._fallback_vfs
1459
for source in sources:
1460
result.update(source.keys())
1464
class _GCGraphIndex(object):
1465
"""Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
1467
def __init__(self, graph_index, is_locked, parents=True,
1469
"""Construct a _GCGraphIndex on a graph_index.
1471
:param graph_index: An implementation of bzrlib.index.GraphIndex.
1472
:param is_locked: A callback, returns True if the index is locked and
1474
:param parents: If True, record knits parents, if not do not record
1476
:param add_callback: If not None, allow additions to the index and call
1477
this callback with a list of added GraphIndex nodes:
1478
[(node, value, node_refs), ...]
1480
self._add_callback = add_callback
1481
self._graph_index = graph_index
1482
self._parents = parents
1483
self.has_graph = parents
1484
self._is_locked = is_locked
1486
def add_records(self, records, random_id=False):
1487
"""Add multiple records to the index.
1489
This function does not insert data into the Immutable GraphIndex
1490
backing the KnitGraphIndex, instead it prepares data for insertion by
1491
the caller and checks that it is safe to insert then calls
1492
self._add_callback with the prepared GraphIndex nodes.
1494
:param records: a list of tuples:
1495
(key, options, access_memo, parents).
1496
:param random_id: If True the ids being added were randomly generated
1497
and no check for existence will be performed.
1499
if not self._add_callback:
1500
raise errors.ReadOnlyError(self)
1501
# we hope there are no repositories with inconsistent parentage
1506
for (key, value, refs) in records:
1507
if not self._parents:
1511
raise KnitCorrupt(self,
1512
"attempt to add node with parents "
1513
"in parentless index.")
1516
keys[key] = (value, refs)
1519
present_nodes = self._get_entries(keys)
1520
for (index, key, value, node_refs) in present_nodes:
1521
if node_refs != keys[key][1]:
1522
raise errors.KnitCorrupt(self, "inconsistent details in add_records"
1523
": %s %s" % ((value, node_refs), keys[key]))
1529
for key, (value, node_refs) in keys.iteritems():
1530
result.append((key, value, node_refs))
1532
for key, (value, node_refs) in keys.iteritems():
1533
result.append((key, value))
1535
self._add_callback(records)
1537
def _check_read(self):
1538
"""Raise an exception if reads are not permitted."""
1539
if not self._is_locked():
1540
raise errors.ObjectNotLocked(self)
1542
def _check_write_ok(self):
1543
"""Raise an exception if writes are not permitted."""
1544
if not self._is_locked():
1545
raise errors.ObjectNotLocked(self)
1547
def _get_entries(self, keys, check_present=False):
1548
"""Get the entries for keys.
1550
Note: Callers are responsible for checking that the index is locked
1551
before calling this method.
1553
:param keys: An iterable of index key tuples.
1558
for node in self._graph_index.iter_entries(keys):
1560
found_keys.add(node[1])
1562
# adapt parentless index to the rest of the code.
1563
for node in self._graph_index.iter_entries(keys):
1564
yield node[0], node[1], node[2], ()
1565
found_keys.add(node[1])
1567
missing_keys = keys.difference(found_keys)
1569
raise RevisionNotPresent(missing_keys.pop(), self)
1571
def get_parent_map(self, keys):
1572
"""Get a map of the parents of keys.
1574
:param keys: The keys to look up parents for.
1575
:return: A mapping from keys to parents. Absent keys are absent from
1579
nodes = self._get_entries(keys)
1583
result[node[1]] = node[3][0]
1586
result[node[1]] = None
1589
def get_build_details(self, keys):
1590
"""Get the various build details for keys.
1592
Ghosts are omitted from the result.
1594
:param keys: An iterable of keys.
1595
:return: A dict of key:
1596
(index_memo, compression_parent, parents, record_details).
1598
opaque structure to pass to read_records to extract the raw
1601
Content that this record is built upon, may be None
1603
Logical parents of this node
1605
extra information about the content which needs to be passed to
1606
Factory.parse_record
1610
entries = self._get_entries(keys)
1611
for entry in entries:
1613
if not self._parents:
1616
parents = entry[3][0]
1618
result[key] = (self._node_to_position(entry),
1619
None, parents, (method, None))
1623
"""Get all the keys in the collection.
1625
The keys are not ordered.
1628
return [node[1] for node in self._graph_index.iter_all_entries()]
1630
def _node_to_position(self, node):
1631
"""Convert an index value to position details."""
1632
bits = node[2].split(' ')
1633
# It would be nice not to read the entire gzip.
1634
start = int(bits[0])
1636
basis_end = int(bits[2])
1637
delta_end = int(bits[3])
1638
return node[0], start, stop, basis_end, delta_end
1641
from bzrlib._groupcompress_py import (
1643
apply_delta_to_source,
1649
from bzrlib._groupcompress_pyx import (
1651
apply_delta_to_source,
1656
GroupCompressor = PyrexGroupCompressor
1658
GroupCompressor = PythonGroupCompressor