1
# groupcompress, a bzr plugin providing new compression logic.
2
# Copyright (C) 2008 Canonical Limited.
4
# This program is free software; you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License version 2 as published
6
# by the Free Software Foundation.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
"""Core compression logic for compressing streams of related files."""
20
from itertools import izip
21
from cStringIO import StringIO
39
from bzrlib.graph import Graph
40
from bzrlib.knit import _DirectPackAccess
41
from bzrlib.osutils import (
46
from bzrlib.btree_index import BTreeBuilder
47
from bzrlib.lru_cache import LRUSizeCache
48
from bzrlib.tsort import topo_sort
49
from bzrlib.versionedfile import (
52
ChunkedContentFactory,
53
FulltextContentFactory,
57
_USE_LZMA = False and (pylzma is not None)
61
def encode_base128_int(val):
62
"""Convert an integer into a 7-bit lsb encoding."""
66
bytes.append(chr((val | 0x80) & 0xFF))
68
bytes.append(chr(val))
72
def decode_base128_int(bytes):
73
"""Decode an integer from a 7-bit lsb encoding."""
77
bval = ord(bytes[offset])
79
val |= (bval & 0x7F) << shift
82
bval = ord(bytes[offset])
88
def sort_gc_optimal(parent_map):
89
"""Sort and group the keys in parent_map into gc-optimal order.
91
gc-optimal is defined (currently) as reverse-topological order, grouped by
94
:return: A sorted-list of keys
96
# gc-optimal ordering is approximately reverse topological,
97
# properly grouped by file-id.
99
for item in parent_map.iteritems():
101
if isinstance(key, str) or len(key) == 1:
106
per_prefix_map[prefix].append(item)
108
per_prefix_map[prefix] = [item]
111
for prefix in sorted(per_prefix_map):
112
present_keys.extend(reversed(topo_sort(per_prefix_map[prefix])))
116
class GroupCompressBlockEntry(object):
117
"""Track the information about a single object inside a GC group.
119
This is generally just the dumb data structure.
122
def __init__(self, key, type, sha1, start, length):
124
self.type = type # delta, fulltext, external?
125
self.sha1 = sha1 # Sha1 of content
126
self.start = start # Byte offset to start of data
127
self.length = length # Length of content
130
return '%s(%s, %s, %s, %s, %s)' % (
131
self.__class__.__name__,
132
self.key, self.type, self.sha1, self.start, self.length
136
class GroupCompressBlock(object):
137
"""An object which maintains the internal structure of the compressed data.
139
This tracks the meta info (start of text, length, type, etc.)
142
# Group Compress Block v1 Zlib
143
GCB_HEADER = 'gcb1z\n'
144
GCB_LZ_HEADER = 'gcb1l\n'
147
# map by key? or just order in file?
152
def _parse_header(self):
153
"""Parse the meta-info from the stream."""
159
def from_bytes(cls, bytes):
161
if bytes[:6] not in (cls.GCB_HEADER, cls.GCB_LZ_HEADER):
162
raise ValueError('bytes did not start with %r' % (cls.GCB_HEADER,))
164
decomp = zlib.decompress
165
elif bytes[4] == 'l':
166
decomp = pylzma.decompress
168
assert False, 'unknown compressor: %r' % (bytes,)
169
pos = bytes.index('\n', 6)
170
z_header_length = int(bytes[6:pos])
172
pos2 = bytes.index('\n', pos)
173
header_length = int(bytes[pos:pos2])
174
if z_header_length == 0:
175
assert header_length == 0
176
zcontent = bytes[pos2+1:]
178
out._content = decomp(zcontent)
179
out._size = len(out._content)
182
pos2 = pos + z_header_length
183
z_header_bytes = bytes[pos:pos2]
184
assert len(z_header_bytes) == z_header_length
185
header_bytes = decomp(z_header_bytes)
186
assert len(header_bytes) == header_length
188
lines = header_bytes.split('\n')
189
header_len = len(header_bytes)
193
if not line: #End of record
196
out.add_entry(**info_dict)
199
key, value = line.split(':', 1)
201
value = tuple(map(intern, value.split('\x00')))
202
elif key in ('start', 'length'):
205
value = intern(value)
206
info_dict[key] = value
207
zcontent = bytes[pos2:]
209
out._content = decomp(zcontent)
210
out._size = header_len + len(out._content)
213
def extract(self, key, index_memo, sha1=None):
214
"""Extract the text for a specific key.
216
:param key: The label used for this content
217
:param sha1: TODO (should we validate only when sha1 is supplied?)
218
:return: The bytes for the content
220
if _NO_LABELS or not self._entries:
221
start, end = index_memo[3:5]
222
# The bytes are 'f' or 'd' for the type, then a variable-length
223
# base128 integer for the content size, then the actual content
224
# We know that the variable-length integer won't be longer than 10
225
# bytes (it only takes 5 bytes to encode 2^32)
226
c = self._content[start]
232
entry = GroupCompressBlockEntry(key, type, sha1=None,
233
start=start, length=end-start)
235
entry = self._entries[key]
236
c = self._content[entry.start]
237
if entry.type == 'fulltext':
239
elif entry.type == 'delta':
242
content_len, len_len = decode_base128_int(
243
self._content[entry.start + 1:entry.start + 11])
244
assert entry.length == content_len + 1 + len_len
245
content_start = entry.start + 1 + len_len
246
end = entry.start + entry.length
247
content = self._content[content_start:end]
251
bytes = _groupcompress_pyx.apply_delta(self._content, content)
252
if entry.sha1 is None:
253
entry.sha1 = sha_string(bytes)
256
def add_entry(self, key, type, sha1, start, length):
257
"""Add new meta info about an entry.
259
:param key: The key for the new content
260
:param type: Whether this is a delta or fulltext entry (external?)
261
:param sha1: sha1sum of the fulltext of this entry
262
:param start: where the encoded bytes start
263
:param length: total number of bytes in the encoded form
266
entry = GroupCompressBlockEntry(key, type, sha1, start, length)
267
assert key not in self._entries
268
self._entries[key] = entry
271
def to_bytes(self, content=''):
272
"""Encode the information into a byte stream."""
273
compress = zlib.compress
275
compress = pylzma.compress
277
for key in sorted(self._entries):
278
entry = self._entries[key]
285
) % ('\x00'.join(entry.key),
292
bytes = ''.join(chunks)
293
info_len = len(bytes)
295
z_bytes.append(compress(bytes))
297
# TODO: we may want to have the header compressed in the same chain
298
# as the data, or we may not, evaulate it
299
# having them compressed together is probably a win for
300
# revisions and the 'inv' portion of chk inventories. As the
301
# label in the header is duplicated in the text.
302
# For chk pages and real bytes, I would guess this is not
304
z_len = sum(map(len, z_bytes))
310
z_bytes.append(compress(content))
312
header = self.GCB_LZ_HEADER
314
header = self.GCB_HEADER
317
'%d\n' % (info_len,),
320
chunks.extend(z_bytes)
321
return ''.join(chunks)
324
class GroupCompressor(object):
325
"""Produce a serialised group of compressed texts.
327
It contains code very similar to SequenceMatcher because of having a similar
328
task. However some key differences apply:
329
- there is no junk, we want a minimal edit not a human readable diff.
330
- we don't filter very common lines (because we don't know where a good
331
range will start, and after the first text we want to be emitting minmal
333
- we chain the left side, not the right side
334
- we incrementally update the adjacency matrix as new lines are provided.
335
- we look for matches in all of the left side, so the routine which does
336
the analagous task of find_longest_match does not need to filter on the
340
def __init__(self, delta=True):
341
"""Create a GroupCompressor.
343
:param delta: If False, do not compress records.
345
# Consider seeding the lines with some sort of GC Start flag, or
346
# putting it as part of the output stream, rather than in the
352
self.labels_deltas = {}
354
self._delta_index = _groupcompress_pyx.DeltaIndex()
355
self._block = GroupCompressBlock()
357
def compress(self, key, bytes, expected_sha, soft=False):
358
"""Compress lines with label key.
360
:param key: A key tuple. It is stored in the output
361
for identification of the text during decompression. If the last
362
element is 'None' it is replaced with the sha1 of the text -
364
:param bytes: The bytes to be compressed
365
:param expected_sha: If non-None, the sha the lines are believed to
366
have. During compression the sha is calculated; a mismatch will
368
:param soft: Do a 'soft' compression. This means that we require larger
369
ranges to match to be considered for a copy command.
370
:return: The sha1 of lines, and the number of bytes accumulated in
371
the group output so far.
373
if not _FAST or expected_sha is None:
374
sha1 = sha_string(bytes)
378
key = key[:-1] + ('sha1:' + sha1,)
379
input_len = len(bytes)
380
# By having action/label/sha1/len, we can parse the group if the index
381
# was ever destroyed, we have the key in 'label', we know the final
382
# bytes are valid from sha1, and we know where to find the end of this
383
# record because of 'len'. (the delta record itself will store the
384
# total length for the expanded record)
385
# 'len: %d\n' costs approximately 1% increase in total data
386
# Having the labels at all costs us 9-10% increase, 38% increase for
387
# inventory pages, and 5.8% increase for text pages
388
# new_chunks = ['label:%s\nsha1:%s\n' % (label, sha1)]
389
if self._delta_index._source_offset != self.endpoint:
390
raise AssertionError('_source_offset != endpoint'
391
' somehow the DeltaIndex got out of sync with'
393
max_delta_size = len(bytes) / 2
394
delta = self._delta_index.make_delta(bytes, max_delta_size)
397
enc_length = encode_base128_int(len(bytes))
398
len_mini_header = 1 + len(enc_length)
399
length = len(bytes) + len_mini_header
400
self._delta_index.add_source(bytes, len_mini_header)
401
new_chunks = ['f', enc_length, bytes]
404
enc_length = encode_base128_int(len(delta))
405
len_mini_header = 1 + len(enc_length)
406
length = len(delta) + len_mini_header
407
new_chunks = ['d', enc_length, delta]
409
self._delta_index._source_offset += length
411
self._delta_index.add_delta_source(delta, len_mini_header)
412
self._block.add_entry(key, type=type, sha1=sha1,
413
start=self.endpoint, length=length)
414
delta_start = (self.endpoint, len(self.lines))
416
self.output_chunks(new_chunks)
417
self.input_bytes += input_len
418
delta_end = (self.endpoint, len(self.lines))
419
self.labels_deltas[key] = (delta_start, delta_end)
420
if not self._delta_index._source_offset == self.endpoint:
421
raise AssertionError('the delta index is out of sync'
422
'with the output lines %s != %s'
423
% (self._delta_index._source_offset, self.endpoint))
424
return sha1, self.endpoint, type, length
426
def extract(self, key):
427
"""Extract a key previously added to the compressor.
429
:param key: The key to extract.
430
:return: An iterable over bytes and the sha1.
432
delta_details = self.labels_deltas[key]
433
delta_chunks = self.lines[delta_details[0][1]:delta_details[1][1]]
434
stored_bytes = ''.join(delta_chunks)
435
# TODO: Fix this, we shouldn't really be peeking here
436
entry = self._block._entries[key]
437
if entry.type == 'fulltext':
438
assert stored_bytes[0] == 'f'
439
fulltext_len, offset = decode_base128_int(stored_bytes[1:10])
440
assert fulltext_len + 1 + offset == len(stored_bytes)
441
bytes = stored_bytes[offset + 1:]
443
assert entry.type == 'delta'
444
# XXX: This is inefficient at best
445
source = ''.join(self.lines)
446
assert stored_bytes[0] == 'd'
447
delta_len, offset = decode_base128_int(stored_bytes[1:10])
448
assert delta_len + 1 + offset == len(stored_bytes)
449
bytes = _groupcompress_pyx.apply_delta(source,
450
stored_bytes[offset + 1:])
451
assert entry.sha1 == sha_string(bytes)
452
return bytes, entry.sha1
454
def output_chunks(self, new_chunks):
455
"""Output some chunks.
457
:param new_chunks: The chunks to output.
459
self._last = (len(self.lines), self.endpoint)
460
endpoint = self.endpoint
461
self.lines.extend(new_chunks)
462
endpoint += sum(map(len, new_chunks))
463
self.endpoint = endpoint
466
"""Call this if you want to 'revoke' the last compression.
468
After this, the data structures will be rolled back, but you cannot do
471
self._delta_index = None
472
del self.lines[self._last[0]:]
473
self.endpoint = self._last[1]
477
"""Return the overall compression ratio."""
478
return float(self.input_bytes) / float(self.endpoint)
481
def make_pack_factory(graph, delta, keylength):
482
"""Create a factory for creating a pack based groupcompress.
484
This is only functional enough to run interface tests, it doesn't try to
485
provide a full pack environment.
487
:param graph: Store a graph.
488
:param delta: Delta compress contents.
489
:param keylength: How long should keys be.
491
def factory(transport):
492
parents = graph or delta
496
graph_index = BTreeBuilder(reference_lists=ref_length,
497
key_elements=keylength)
498
stream = transport.open_write_stream('newpack')
499
writer = pack.ContainerWriter(stream.write)
501
index = _GCGraphIndex(graph_index, lambda:True, parents=parents,
502
add_callback=graph_index.add_nodes)
503
access = _DirectPackAccess({})
504
access.set_writer(writer, graph_index, (transport, 'newpack'))
505
result = GroupCompressVersionedFiles(index, access, delta)
506
result.stream = stream
507
result.writer = writer
512
def cleanup_pack_group(versioned_files):
513
versioned_files.writer.end()
514
versioned_files.stream.close()
517
class GroupCompressVersionedFiles(VersionedFiles):
518
"""A group-compress based VersionedFiles implementation."""
520
def __init__(self, index, access, delta=True):
521
"""Create a GroupCompressVersionedFiles object.
523
:param index: The index object storing access and graph data.
524
:param access: The access object storing raw data.
525
:param delta: Whether to delta compress or just entropy compress.
528
self._access = access
530
self._unadded_refs = {}
531
self._group_cache = LRUSizeCache(max_size=50*1024*1024)
533
def add_lines(self, key, parents, lines, parent_texts=None,
534
left_matching_blocks=None, nostore_sha=None, random_id=False,
536
"""Add a text to the store.
538
:param key: The key tuple of the text to add.
539
:param parents: The parents key tuples of the text to add.
540
:param lines: A list of lines. Each line must be a bytestring. And all
541
of them except the last must be terminated with \n and contain no
542
other \n's. The last line may either contain no \n's or a single
543
terminating \n. If the lines list does meet this constraint the add
544
routine may error or may succeed - but you will be unable to read
545
the data back accurately. (Checking the lines have been split
546
correctly is expensive and extremely unlikely to catch bugs so it
547
is not done at runtime unless check_content is True.)
548
:param parent_texts: An optional dictionary containing the opaque
549
representations of some or all of the parents of version_id to
550
allow delta optimisations. VERY IMPORTANT: the texts must be those
551
returned by add_lines or data corruption can be caused.
552
:param left_matching_blocks: a hint about which areas are common
553
between the text and its left-hand-parent. The format is
554
the SequenceMatcher.get_matching_blocks format.
555
:param nostore_sha: Raise ExistingContent and do not add the lines to
556
the versioned file if the digest of the lines matches this.
557
:param random_id: If True a random id has been selected rather than
558
an id determined by some deterministic process such as a converter
559
from a foreign VCS. When True the backend may choose not to check
560
for uniqueness of the resulting key within the versioned file, so
561
this should only be done when the result is expected to be unique
563
:param check_content: If True, the lines supplied are verified to be
564
bytestrings that are correctly formed lines.
565
:return: The text sha1, the number of bytes in the text, and an opaque
566
representation of the inserted version which can be provided
567
back to future add_lines calls in the parent_texts dictionary.
569
self._index._check_write_ok()
570
self._check_add(key, lines, random_id, check_content)
572
# The caller might pass None if there is no graph data, but kndx
573
# indexes can't directly store that, so we give them
574
# an empty tuple instead.
576
# double handling for now. Make it work until then.
577
length = sum(map(len, lines))
578
record = ChunkedContentFactory(key, parents, None, lines)
579
sha1 = list(self._insert_record_stream([record], random_id=random_id))[0]
580
return sha1, length, None
582
def annotate(self, key):
583
"""See VersionedFiles.annotate."""
585
parent_map = self.get_parent_map([key])
587
raise errors.RevisionNotPresent(key, self)
588
if parent_map[key] is not None:
589
search = graph._make_breadth_first_searcher([key])
593
present, ghosts = search.next_with_ghosts()
594
except StopIteration:
597
parent_map = self.get_parent_map(keys)
600
parent_map = {key:()}
601
head_cache = _mod_graph.FrozenHeadsCache(graph)
603
reannotate = annotate.reannotate
604
for record in self.get_record_stream(keys, 'topological', True):
606
chunks = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
607
parent_lines = [parent_cache[parent] for parent in parent_map[key]]
608
parent_cache[key] = list(
609
reannotate(parent_lines, chunks, key, None, head_cache))
610
return parent_cache[key]
612
def check(self, progress_bar=None):
613
"""See VersionedFiles.check()."""
615
for record in self.get_record_stream(keys, 'unordered', True):
616
record.get_bytes_as('fulltext')
618
def _check_add(self, key, lines, random_id, check_content):
619
"""check that version_id and lines are safe to add."""
621
if version_id is not None:
622
if contains_whitespace(version_id):
623
raise errors.InvalidRevisionId(version_id, self)
624
self.check_not_reserved_id(version_id)
625
# TODO: If random_id==False and the key is already present, we should
626
# probably check that the existing content is identical to what is
627
# being inserted, and otherwise raise an exception. This would make
628
# the bundle code simpler.
630
self._check_lines_not_unicode(lines)
631
self._check_lines_are_lines(lines)
633
def get_parent_map(self, keys):
634
"""Get a map of the parents of keys.
636
:param keys: The keys to look up parents for.
637
:return: A mapping from keys to parents. Absent keys are absent from
641
sources = [self._index]
644
for source in sources:
647
new_result = source.get_parent_map(missing)
648
source_results.append(new_result)
649
result.update(new_result)
650
missing.difference_update(set(new_result))
651
if self._unadded_refs:
653
if key in self._unadded_refs:
654
result[key] = self._unadded_refs[key]
657
def _get_block(self, index_memo):
658
read_memo = index_memo[0:3]
661
block = self._group_cache[read_memo]
664
zdata = self._access.get_raw_records([read_memo]).next()
665
# decompress - whole thing - this is not a bug, as it
666
# permits caching. We might want to store the partially
667
# decompresed group and decompress object, so that recent
668
# texts are not penalised by big groups.
669
block = GroupCompressBlock.from_bytes(zdata)
670
self._group_cache[read_memo] = block
672
# print len(zdata), len(plain)
673
# parse - requires split_lines, better to have byte offsets
674
# here (but not by much - we only split the region for the
675
# recipe, and we often want to end up with lines anyway.
678
def get_missing_compression_parent_keys(self):
679
"""Return the keys of missing compression parents.
681
Missing compression parents occur when a record stream was missing
682
basis texts, or a index was scanned that had missing basis texts.
684
# GroupCompress cannot currently reference texts that are not in the
685
# group, so this is valid for now
688
def get_record_stream(self, keys, ordering, include_delta_closure):
689
"""Get a stream of records for keys.
691
:param keys: The keys to include.
692
:param ordering: Either 'unordered' or 'topological'. A topologically
693
sorted stream has compression parents strictly before their
695
:param include_delta_closure: If True then the closure across any
696
compression parents will be included (in the opaque data).
697
:return: An iterator of ContentFactory objects, each of which is only
698
valid until the iterator is advanced.
700
# keys might be a generator
701
orig_keys = list(keys)
702
keys = set(orig_keys)
705
if (not self._index.has_graph
706
and ordering in ('topological', 'gc-optimal')):
707
# Cannot topological order when no graph has been stored.
708
ordering = 'unordered'
710
locations = self._index.get_build_details(keys)
711
local_keys = frozenset(keys).intersection(set(self._unadded_refs))
712
if ordering == 'topological':
713
# would be better to not globally sort initially but instead
714
# start with one key, recurse to its oldest parent, then grab
715
# everything in the same group, etc.
716
parent_map = dict((key, details[2]) for key, details in
717
locations.iteritems())
718
for key in local_keys:
719
parent_map[key] = self._unadded_refs[key]
720
present_keys = topo_sort(parent_map)
721
# Now group by source:
722
elif ordering == 'gc-optimal':
723
parent_map = dict((key, details[2]) for key, details in
724
locations.iteritems())
725
for key in local_keys:
726
parent_map[key] = self._unadded_refs[key]
727
# XXX: This only optimizes for the target ordering. We may need to
728
# balance that with the time it takes to extract ordering, by
729
# somehow grouping based on locations[key][0:3]
730
present_keys = sort_gc_optimal(parent_map)
731
elif ordering == 'as-requested':
732
present_keys = [key for key in orig_keys if key in locations
733
or key in local_keys]
735
# We want to yield the keys in a semi-optimal (read-wise) ordering.
736
# Otherwise we thrash the _group_cache and destroy performance
738
# This is the group the bytes are stored in, followed by the
739
# location in the group
740
return locations[key][0]
741
present_keys = sorted(locations.iterkeys(), key=get_group)
742
# We don't have an ordering for keys in the in-memory object, but
743
# lets process the in-memory ones first.
744
present_keys = list(local_keys) + present_keys
745
locations.update((key, None) for key in local_keys)
746
absent_keys = keys.difference(set(locations))
747
for key in absent_keys:
748
yield AbsentContentFactory(key)
749
for key in present_keys:
750
if key in self._unadded_refs:
751
bytes, sha1 = self._compressor.extract(key)
752
parents = self._unadded_refs[key]
754
index_memo, _, parents, (method, _) = locations[key]
755
block = self._get_block(index_memo)
756
entry, bytes = block.extract(key, index_memo)
758
# TODO: If we don't have labels, then the sha1 here is computed
759
# from the data, so we don't want to re-sha the string.
760
if not _FAST and sha_string(bytes) != sha1:
761
raise AssertionError('sha1 sum did not match')
762
yield FulltextContentFactory(key, parents, sha1, bytes)
764
def get_sha1s(self, keys):
765
"""See VersionedFiles.get_sha1s()."""
767
for record in self.get_record_stream(keys, 'unordered', True):
768
if record.sha1 != None:
769
result[record.key] = record.sha1
771
if record.storage_kind != 'absent':
772
result[record.key] == sha_string(record.get_bytes_as(
776
def insert_record_stream(self, stream):
777
"""Insert a record stream into this container.
779
:param stream: A stream of records to insert.
781
:seealso VersionedFiles.get_record_stream:
783
for _ in self._insert_record_stream(stream):
786
def _insert_record_stream(self, stream, random_id=False):
787
"""Internal core to insert a record stream into this container.
789
This helper function has a different interface than insert_record_stream
790
to allow add_lines to be minimal, but still return the needed data.
792
:param stream: A stream of records to insert.
793
:return: An iterator over the sha1 of the inserted records.
794
:seealso insert_record_stream:
798
def get_adapter(adapter_key):
800
return adapters[adapter_key]
802
adapter_factory = adapter_registry.get(adapter_key)
803
adapter = adapter_factory(self)
804
adapters[adapter_key] = adapter
806
# This will go up to fulltexts for gc to gc fetching, which isn't
808
self._compressor = GroupCompressor(self._delta)
809
self._unadded_refs = {}
813
bytes = self._compressor._block.to_bytes(
814
''.join(self._compressor.lines))
815
index, start, length = self._access.add_raw_records(
816
[(None, len(bytes))], bytes)[0]
818
for key, reads, refs in keys_to_add:
819
nodes.append((key, "%d %d %s" % (start, length, reads), refs))
820
self._index.add_records(nodes, random_id=random_id)
821
self._unadded_refs = {}
823
self._compressor = GroupCompressor(self._delta)
826
last_fulltext_len = None
828
max_fulltext_prefix = None
829
for record in stream:
830
# Raise an error when a record is missing.
831
if record.storage_kind == 'absent':
832
raise errors.RevisionNotPresent(record.key, self)
834
bytes = record.get_bytes_as('fulltext')
835
except errors.UnavailableRepresentation:
836
adapter_key = record.storage_kind, 'fulltext'
837
adapter = get_adapter(adapter_key)
838
bytes = adapter.get_bytes(record)
839
if len(record.key) > 1:
840
prefix = record.key[0]
841
soft = (prefix == last_prefix)
845
if max_fulltext_len < len(bytes):
846
max_fulltext_len = len(bytes)
847
max_fulltext_prefix = prefix
848
(found_sha1, end_point, type,
849
length) = self._compressor.compress(record.key,
850
bytes, record.sha1, soft=soft)
851
# delta_ratio = float(len(bytes)) / length
852
# Check if we want to continue to include that text
853
if (prefix == max_fulltext_prefix
854
and end_point < 2 * max_fulltext_len):
855
# As long as we are on the same file_id, we will fill at least
856
# 2 * max_fulltext_len
857
start_new_block = False
858
elif end_point > 4*1024*1024:
859
start_new_block = True
860
elif (prefix is not None and prefix != last_prefix
861
and end_point > 2*1024*1024):
862
start_new_block = True
864
start_new_block = False
865
# if type == 'fulltext':
866
# # If this is the first text, we don't do anything
867
# if self._compressor.num_keys > 1:
868
# if prefix is not None and prefix != last_prefix:
869
# # We just inserted a fulltext for a different prefix
871
# if end_point > 512 * 1024:
872
# start_new_block = True
873
# # TODO: Consider packing several small texts together
874
# # maybe only flush if end_point > some threshold
875
# # if end_point > 512 * 1024 or len(bytes) <
876
# # start_new_block = true
878
# # We just added a fulltext, part of the same file-id
879
# if (end_point > 2*1024*1024
880
# and end_point > 5*max_fulltext_len):
881
# start_new_block = True
882
# last_fulltext_len = len(bytes)
884
# delta_ratio = float(len(bytes)) / length
885
# if delta_ratio < 3: # Not much compression
886
# if end_point > 1*1024*1024:
887
# start_new_block = True
888
# elif delta_ratio < 10: # 10:1 compression
889
# if end_point > 4*1024*1024:
890
# start_new_block = True
893
self._compressor.pop_last()
896
max_fulltext_len = len(bytes)
897
(found_sha1, end_point, type,
898
length) = self._compressor.compress(record.key,
900
assert type == 'fulltext'
901
last_fulltext_len = length
902
if record.key[-1] is None:
903
key = record.key[:-1] + ('sha1:' + found_sha1,)
906
self._unadded_refs[key] = record.parents
908
keys_to_add.append((key, '%d %d' % (basis_end, end_point),
910
basis_end = end_point
913
self._compressor = None
914
assert self._unadded_refs == {}
916
def iter_lines_added_or_present_in_keys(self, keys, pb=None):
917
"""Iterate over the lines in the versioned files from keys.
919
This may return lines from other keys. Each item the returned
920
iterator yields is a tuple of a line and a text version that that line
921
is present in (not introduced in).
923
Ordering of results is in whatever order is most suitable for the
924
underlying storage format.
926
If a progress bar is supplied, it may be used to indicate progress.
927
The caller is responsible for cleaning up progress bars (because this
931
* Lines are normalised by the underlying store: they will all have \n
933
* Lines are returned in arbitrary order.
935
:return: An iterator over (line, key).
938
pb = progress.DummyProgress()
941
# we don't care about inclusions, the caller cares.
942
# but we need to setup a list of records to visit.
943
# we need key, position, length
944
for key_idx, record in enumerate(self.get_record_stream(keys,
946
# XXX: todo - optimise to use less than full texts.
948
pb.update('Walking content.', key_idx, total)
949
if record.storage_kind == 'absent':
950
raise errors.RevisionNotPresent(key, self)
951
lines = split_lines(record.get_bytes_as('fulltext'))
954
pb.update('Walking content.', total, total)
957
"""See VersionedFiles.keys."""
958
if 'evil' in debug.debug_flags:
959
trace.mutter_callsite(2, "keys scales with size of history")
960
sources = [self._index]
962
for source in sources:
963
result.update(source.keys())
967
class _GCGraphIndex(object):
968
"""Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
970
def __init__(self, graph_index, is_locked, parents=True,
972
"""Construct a _GCGraphIndex on a graph_index.
974
:param graph_index: An implementation of bzrlib.index.GraphIndex.
975
:param is_locked: A callback, returns True if the index is locked and
977
:param parents: If True, record knits parents, if not do not record
979
:param add_callback: If not None, allow additions to the index and call
980
this callback with a list of added GraphIndex nodes:
981
[(node, value, node_refs), ...]
983
self._add_callback = add_callback
984
self._graph_index = graph_index
985
self._parents = parents
986
self.has_graph = parents
987
self._is_locked = is_locked
989
def add_records(self, records, random_id=False):
990
"""Add multiple records to the index.
992
This function does not insert data into the Immutable GraphIndex
993
backing the KnitGraphIndex, instead it prepares data for insertion by
994
the caller and checks that it is safe to insert then calls
995
self._add_callback with the prepared GraphIndex nodes.
997
:param records: a list of tuples:
998
(key, options, access_memo, parents).
999
:param random_id: If True the ids being added were randomly generated
1000
and no check for existence will be performed.
1002
if not self._add_callback:
1003
raise errors.ReadOnlyError(self)
1004
# we hope there are no repositories with inconsistent parentage
1009
for (key, value, refs) in records:
1010
if not self._parents:
1014
raise KnitCorrupt(self,
1015
"attempt to add node with parents "
1016
"in parentless index.")
1019
keys[key] = (value, refs)
1022
present_nodes = self._get_entries(keys)
1023
for (index, key, value, node_refs) in present_nodes:
1024
if node_refs != keys[key][1]:
1025
raise errors.KnitCorrupt(self, "inconsistent details in add_records"
1026
": %s %s" % ((value, node_refs), keys[key]))
1032
for key, (value, node_refs) in keys.iteritems():
1033
result.append((key, value, node_refs))
1035
for key, (value, node_refs) in keys.iteritems():
1036
result.append((key, value))
1038
self._add_callback(records)
1040
def _check_read(self):
1041
"""Raise an exception if reads are not permitted."""
1042
if not self._is_locked():
1043
raise errors.ObjectNotLocked(self)
1045
def _check_write_ok(self):
1046
"""Raise an exception if writes are not permitted."""
1047
if not self._is_locked():
1048
raise errors.ObjectNotLocked(self)
1050
def _get_entries(self, keys, check_present=False):
1051
"""Get the entries for keys.
1053
Note: Callers are responsible for checking that the index is locked
1054
before calling this method.
1056
:param keys: An iterable of index key tuples.
1061
for node in self._graph_index.iter_entries(keys):
1063
found_keys.add(node[1])
1065
# adapt parentless index to the rest of the code.
1066
for node in self._graph_index.iter_entries(keys):
1067
yield node[0], node[1], node[2], ()
1068
found_keys.add(node[1])
1070
missing_keys = keys.difference(found_keys)
1072
raise RevisionNotPresent(missing_keys.pop(), self)
1074
def get_parent_map(self, keys):
1075
"""Get a map of the parents of keys.
1077
:param keys: The keys to look up parents for.
1078
:return: A mapping from keys to parents. Absent keys are absent from
1082
nodes = self._get_entries(keys)
1086
result[node[1]] = node[3][0]
1089
result[node[1]] = None
1092
def get_build_details(self, keys):
1093
"""Get the various build details for keys.
1095
Ghosts are omitted from the result.
1097
:param keys: An iterable of keys.
1098
:return: A dict of key:
1099
(index_memo, compression_parent, parents, record_details).
1101
opaque structure to pass to read_records to extract the raw
1104
Content that this record is built upon, may be None
1106
Logical parents of this node
1108
extra information about the content which needs to be passed to
1109
Factory.parse_record
1113
entries = self._get_entries(keys)
1114
for entry in entries:
1116
if not self._parents:
1119
parents = entry[3][0]
1121
result[key] = (self._node_to_position(entry),
1122
None, parents, (method, None))
1126
"""Get all the keys in the collection.
1128
The keys are not ordered.
1131
return [node[1] for node in self._graph_index.iter_all_entries()]
1133
def _node_to_position(self, node):
1134
"""Convert an index value to position details."""
1135
bits = node[2].split(' ')
1136
# It would be nice not to read the entire gzip.
1137
start = int(bits[0])
1139
basis_end = int(bits[2])
1140
delta_end = int(bits[3])
1141
return node[0], start, stop, basis_end, delta_end
1145
from bzrlib import _groupcompress_pyx