51
51
FulltextContentFactory,
54
from bzrlib.plugins.groupcompress import errors as gc_errors
60
action_byte = bytes[0]
61
action = {'f':'fulltext', 'd':'delta'}[action_byte]
62
return action, None, None, bytes[1:]
63
(action, label_line, sha1_line, len_line,
64
delta_bytes) = bytes.split('\n', 4)
65
if (action not in ('fulltext', 'delta')
66
or not label_line.startswith('label:')
67
or not sha1_line.startswith('sha1:')
68
or not len_line.startswith('len:')
70
raise AssertionError("bad text record %r" % (bytes,))
71
label = tuple(label_line[6:].split('\x00'))
73
length = int(len_line[4:])
74
if not len(delta_bytes) == length:
75
raise AssertionError("bad length record %r" % (bytes,))
76
return action, label, sha1, delta_bytes
59
def encode_base128_int(val):
60
"""Convert an integer into a 7-bit lsb encoding."""
64
bytes.append(chr((val | 0x80) & 0xFF))
66
bytes.append(chr(val))
70
def decode_base128_int(bytes):
71
"""Decode an integer from a 7-bit lsb encoding."""
75
bval = ord(bytes[offset])
77
val |= (bval & 0x7F) << shift
80
bval = ord(bytes[offset])
79
86
def sort_gc_optimal(parent_map):
104
111
return present_keys
114
class GroupCompressBlockEntry(object):
115
"""Track the information about a single object inside a GC group.
117
This is generally just the dumb data structure.
120
def __init__(self, key, type, sha1, start, length):
122
self.type = type # delta, fulltext, external?
123
self.sha1 = sha1 # Sha1 of content
124
self.start = start # Byte offset to start of data
125
self.length = length # Length of content
128
return '%s(%s, %s, %s, %s, %s)' % (
129
self.__class__.__name__,
130
self.key, self.type, self.sha1, self.start, self.length
134
class GroupCompressBlock(object):
135
"""An object which maintains the internal structure of the compressed data.
137
This tracks the meta info (start of text, length, type, etc.)
140
# Group Compress Block v1 Zlib
141
GCB_HEADER = 'gcb1z\n'
144
# map by key? or just order in file?
149
def _parse_header(self):
150
"""Parse the meta-info from the stream."""
156
def from_bytes(cls, bytes):
158
if bytes[:6] != cls.GCB_HEADER:
159
raise gc_errors.InvalidGroupCompressBlock(
160
'bytes did not start with %r' % (cls.GCB_HEADER,))
161
pos = bytes.index('\n', 6)
162
z_header_length = int(bytes[6:pos])
164
pos2 = bytes.index('\n', pos)
165
header_length = int(bytes[pos:pos2])
166
if z_header_length == 0:
167
assert header_length == 0
168
out._content = zlib.decompress(bytes[pos2+1:])
169
out._size = len(out._content)
172
pos2 = pos + z_header_length
173
z_header_bytes = bytes[pos:pos2]
174
assert len(z_header_bytes) == z_header_length
175
d = zlib.decompressobj()
176
header_bytes = d.decompress(z_header_bytes)
177
assert len(header_bytes) == header_length
179
lines = header_bytes.split('\n')
180
header_len = len(header_bytes)
184
if not line: #End of record
187
out.add_entry(**info_dict)
190
key, value = line.split(':', 1)
192
value = tuple(map(intern, value.split('\x00')))
193
elif key in ('start', 'length'):
196
value = intern(value)
197
info_dict[key] = value
198
zcontent = bytes[pos2:]
200
out._content = d.decompress(zcontent)
201
assert d.flush() == ''
202
out._size = header_len + len(out._content)
205
def extract(self, key, index_memo, sha1=None):
206
"""Extract the text for a specific key.
208
:param key: The label used for this content
209
:param sha1: TODO (should we validate only when sha1 is supplied?)
210
:return: The bytes for the content
212
if _NO_LABELS or not self._entries:
213
start, end = index_memo[3:5]
214
c = self._content[start]
216
bytes = self._content[start+1:end]
221
delta = self._content[start+1:end]
222
bytes = _groupcompress_pyx.apply_delta(self._content, delta)
223
entry = GroupCompressBlockEntry(key, type, sha_string(bytes),
226
entry = self._entries[key]
227
if entry.type == 'fulltext':
228
assert self._content[entry.start] == 'f'
229
bytes = self._content[entry.start+1:entry.start + entry.length]
230
elif entry.type == 'delta':
231
assert self._content[entry.start] == 'd'
232
delta = self._content[entry.start+1:entry.start + entry.length]
233
bytes = _groupcompress_pyx.apply_delta(self._content, delta)
237
def add_entry(self, key, type, sha1, start, length):
238
"""Add new meta info about an entry.
240
:param key: The key for the new content
241
:param type: Whether this is a delta or fulltext entry (external?)
242
:param sha1: sha1sum of the fulltext of this entry
243
:param start: where the encoded bytes start
244
:param length: total number of bytes in the encoded form
247
entry = GroupCompressBlockEntry(key, type, sha1, start, length)
248
assert key not in self._entries
249
self._entries[key] = entry
252
def to_bytes(self, content=''):
253
"""Encode the information into a byte stream."""
255
for key in sorted(self._entries):
256
entry = self._entries[key]
263
) % ('\x00'.join(entry.key),
270
bytes = ''.join(chunks)
271
info_len = len(bytes)
272
c = zlib.compressobj()
274
z_bytes.append(c.compress(bytes))
276
# TODO: we may want to have the header compressed in the same chain
277
# as the data, or we may not, evaulate it
278
# having them compressed together is probably a win for
279
# revisions and the 'inv' portion of chk inventories. As the
280
# label in the header is duplicated in the text.
281
# For chk pages and real bytes, I would guess this is not
283
z_bytes.append(c.flush(zlib.Z_SYNC_FLUSH))
284
z_len = sum(map(len, z_bytes))
290
c = zlib.compressobj()
291
z_bytes.append(c.compress(content))
292
z_bytes.append(c.flush())
293
chunks = [self.GCB_HEADER,
295
'%d\n' % (info_len,),
298
chunks.extend(z_bytes)
299
return ''.join(chunks)
107
302
class GroupCompressor(object):
108
303
"""Produce a serialised group of compressed texts.
177
371
max_delta_size = len(bytes) / 2
178
372
delta = self._delta_index.make_delta(bytes, max_delta_size)
179
373
if (delta is None):
180
# We can't delta (perhaps source_text is empty)
181
# so mark this as an insert
185
new_chunks.insert(0, 'fulltext\n')
186
new_chunks.append('len:%s\n' % (input_len,))
187
unadded_bytes = sum(map(len, new_chunks))
188
self._delta_index.add_source(bytes, unadded_bytes)
189
new_chunks.append(bytes)
375
length = len(bytes) + 1
376
self._delta_index.add_source(bytes, 1)
377
new_chunks = ['f', bytes]
194
new_chunks.insert(0, 'delta\n')
195
new_chunks.append('len:%s\n' % (len(delta),))
380
length = len(delta) + 1
381
new_chunks = ['d', delta]
197
new_chunks.append(delta)
198
unadded_bytes = sum(map(len, new_chunks))
199
self._delta_index._source_offset += unadded_bytes
383
self._delta_index._source_offset += length
201
unadded_bytes = sum(map(len, new_chunks))
202
self._delta_index.add_delta_source(delta, unadded_bytes)
203
new_chunks.append(delta)
385
self._delta_index.add_delta_source(delta, 1)
386
self._block.add_entry(key, type=type, sha1=sha1,
387
start=self.endpoint, length=length)
204
388
delta_start = (self.endpoint, len(self.lines))
205
390
self.output_chunks(new_chunks)
206
391
self.input_bytes += input_len
207
392
delta_end = (self.endpoint, len(self.lines))
221
406
delta_details = self.labels_deltas[key]
222
407
delta_chunks = self.lines[delta_details[0][1]:delta_details[1][1]]
223
action, label, sha1, delta = parse(''.join(delta_chunks))
224
if not _NO_LABELS and label != key:
225
raise AssertionError("wrong key: %r, wanted %r" % (label, key))
226
if action == 'fulltext':
229
source = ''.join(self.lines[delta_details[0][0]])
230
bytes = _groupcompress_pyx.apply_delta(source, delta)
232
sha1 = sha_string(bytes)
234
assert sha1 == sha_string(bytes)
408
stored_bytes = ''.join(delta_chunks)
409
# TODO: Fix this, we shouldn't really be peeking here
410
entry = self._block._entries[key]
411
if entry.type == 'fulltext':
414
assert entry.type == 'delta'
415
# XXX: This is inefficient at best
416
source = ''.join(self.lines)
417
bytes = _groupcompress_pyx.apply_delta(source, stored_bytes)
418
assert entry.sha1 == sha_string(bytes)
419
return bytes, entry.sha1
237
421
def output_chunks(self, new_chunks):
238
422
"""Output some chunks.
240
424
:param new_chunks: The chunks to output.
426
self._last = (len(self.lines), self.endpoint)
242
427
endpoint = self.endpoint
243
428
self.lines.extend(new_chunks)
244
429
endpoint += sum(map(len, new_chunks))
245
430
self.endpoint = endpoint
433
"""Call this if you want to 'revoke' the last compression.
435
After this, the data structures will be rolled back, but you cannot do
438
self._delta_index = None
439
del self.lines[self._last[0]:]
440
self.endpoint = self._last[1]
248
444
"""Return the overall compression ratio."""
249
445
return float(self.input_bytes) / float(self.endpoint)
523
719
parents = self._unadded_refs[key]
525
721
index_memo, _, parents, (method, _) = locations[key]
526
plain, delta_bytes = self._get_group_and_delta_bytes(index_memo)
527
action, label, sha1, delta = parse(delta_bytes)
528
if not _NO_LABELS and label != key:
529
raise AssertionError("wrong key: %r, wanted %r" % (label, key))
530
if action == 'fulltext':
533
# TODO: relax apply_delta so that it can allow source to be
534
# longer than expected
535
bytes = _groupcompress_pyx.apply_delta(plain, delta)
537
import pdb; pdb.set_trace()
541
sha1 = sha_strings(chunks)
543
if not _FAST and sha_strings(chunks) != sha1:
544
raise AssertionError('sha1 sum did not match')
545
yield ChunkedContentFactory(key, parents, sha1, chunks)
722
block = self._get_block(index_memo)
723
entry, bytes = block.extract(key, index_memo)
725
if not _FAST and sha_string(bytes) != sha1:
726
raise AssertionError('sha1 sum did not match')
727
yield FulltextContentFactory(key, parents, sha1, bytes)
547
729
def get_sha1s(self, keys):
548
730
"""See VersionedFiles.get_sha1s()."""
612
801
adapter_key = record.storage_kind, 'fulltext'
613
802
adapter = get_adapter(adapter_key)
614
803
bytes = adapter.get_bytes(record)
616
804
if len(record.key) > 1:
617
805
prefix = record.key[0]
618
if (last_prefix is not None and prefix != last_prefix):
620
if basis_end > 1024 * 1024 * 2:
622
self._compressor = GroupCompressor(self._delta)
623
self._unadded_refs = {}
628
found_sha1, end_point = self._compressor.compress(record.key,
806
soft = (prefix == last_prefix)
810
if max_fulltext_len < len(bytes):
811
max_fulltext_len = len(bytes)
812
max_fulltext_prefix = prefix
813
(found_sha1, end_point, type,
814
length) = self._compressor.compress(record.key,
629
815
bytes, record.sha1, soft=soft)
816
# delta_ratio = float(len(bytes)) / length
817
# Check if we want to continue to include that text
818
if (prefix == max_fulltext_prefix
819
and end_point < 2 * max_fulltext_len):
820
# As long as we are on the same file_id, we will fill at least
821
# 2 * max_fulltext_len
822
start_new_block = False
823
elif end_point > 4*1024*1024:
824
start_new_block = True
825
elif (prefix is not None and prefix != last_prefix
826
and end_point > 2*1024*1024):
827
start_new_block = True
829
start_new_block = False
830
# if type == 'fulltext':
831
# # If this is the first text, we don't do anything
832
# if self._compressor.num_keys > 1:
833
# if prefix is not None and prefix != last_prefix:
834
# # We just inserted a fulltext for a different prefix
836
# if end_point > 512 * 1024:
837
# start_new_block = True
838
# # TODO: Consider packing several small texts together
839
# # maybe only flush if end_point > some threshold
840
# # if end_point > 512 * 1024 or len(bytes) <
841
# # start_new_block = true
843
# # We just added a fulltext, part of the same file-id
844
# if (end_point > 2*1024*1024
845
# and end_point > 5*max_fulltext_len):
846
# start_new_block = True
847
# last_fulltext_len = len(bytes)
849
# delta_ratio = float(len(bytes)) / length
850
# if delta_ratio < 3: # Not much compression
851
# if end_point > 1*1024*1024:
852
# start_new_block = True
853
# elif delta_ratio < 10: # 10:1 compression
854
# if end_point > 4*1024*1024:
855
# start_new_block = True
858
self._compressor.pop_last()
861
max_fulltext_len = len(bytes)
862
(found_sha1, end_point, type,
863
length) = self._compressor.compress(record.key,
865
assert type == 'fulltext'
866
last_fulltext_len = length
630
867
if record.key[-1] is None:
631
868
key = record.key[:-1] + ('sha1:' + found_sha1,)
876
1106
return node[0], start, stop, basis_end, delta_end
879
def _get_longest_match(equivalence_table, pos, max_pos, locations):
880
"""Get the longest possible match for the current position."""
885
if locations is None:
886
locations = equivalence_table.get_idx_matches(pos)
887
if locations is None:
888
# No more matches, just return whatever we have, but we know that
889
# this last position is not going to match anything
893
if copy_ends is None:
894
# We are starting a new range
895
copy_ends = [loc + 1 for loc in locations]
897
locations = None # Consumed
899
# We are currently in the middle of a match
900
next_locations = set(copy_ends).intersection(locations)
901
if len(next_locations):
903
copy_ends = [loc + 1 for loc in next_locations]
905
locations = None # Consumed
907
# But we are done with this match, we should be
908
# starting a new one, though. We will pass back 'locations'
909
# so that we don't have to do another lookup.
912
if copy_ends is None:
913
return None, pos, locations
914
return ((min(copy_ends) - range_len, range_start, range_len)), pos, locations
918
1110
from bzrlib.plugins.groupcompress import _groupcompress_pyx
919
1111
except ImportError: