95
99
lines[-1] = lines[-1][:-1]
103
def sort_gc_optimal(parent_map):
104
"""Sort and group the keys in parent_map into gc-optimal order.
106
gc-optimal is defined (currently) as reverse-topological order, grouped by
109
:return: A sorted-list of keys
111
# gc-optimal ordering is approximately reverse topological,
112
# properly grouped by file-id.
114
for item in parent_map.iteritems():
116
if isinstance(key, str) or len(key) == 1:
121
per_prefix_map[prefix].append(item)
123
per_prefix_map[prefix] = [item]
126
for prefix in sorted(per_prefix_map):
127
present_keys.extend(reversed(topo_sort(per_prefix_map[prefix])))
98
131
class GroupCompressor(object):
99
132
"""Produce a serialised group of compressed texts.
143
177
# insert new lines. To find reusable lines we traverse
145
179
max_pos = len(lines)
148
180
result_append = result.append
183
min_match_bytes = 200
149
184
while pos < max_pos:
150
185
block, pos, locations = _get_longest_match(line_locations, pos,
151
186
max_pos, locations)
152
187
if block is not None:
188
# Check to see if we are matching fewer than 5 characters,
189
# which is turned into a simple 'insert', rather than a copy
190
# If we have more than 5 lines, we definitely have more than 5
192
if block[-1] < min_match_bytes:
193
# This block may be a 'short' block, check
194
old_start, new_start, range_len = block
195
matched_bytes = sum(map(len,
196
lines[new_start:new_start + range_len]))
197
if matched_bytes < min_match_bytes:
199
if block is not None:
153
200
result_append(block)
154
201
result_append((len(self.lines), len(lines), 0))
157
def compress(self, key, lines, expected_sha):
204
def compress(self, key, lines, expected_sha, soft=False):
158
205
"""Compress lines with label key.
160
207
:param key: A key tuple. It is stored in the output
173
222
if key[-1] is None:
174
223
key = key[:-1] + ('sha1:' + sha1,)
175
224
label = '\x00'.join(key)
226
new_lines = ['label: %s\n' % label,
230
index_lines = [False, False]
176
231
# setup good encoding for trailing \n support.
177
232
if not lines or lines[-1].endswith('\n'):
178
233
lines.append('\n')
180
235
lines[-1] = lines[-1] + '\n'
182
new_lines.append('label: %s\n' % label)
183
new_lines.append('sha1: %s\n' % sha1)
184
index_lines = [False, False]
188
239
flush_range = self.flush_range
190
blocks = self.get_matching_blocks(lines)
241
blocks = self.get_matching_blocks(lines, soft=soft)
192
# We either copy a range (while there are reusable lines) or we
193
# insert new lines. To find reusable lines we traverse
243
#copies_without_insertion = []
244
# We either copy a range (while there are reusable lines) or we
245
# insert new lines. To find reusable lines we traverse
194
246
for old_start, new_start, range_len in blocks:
195
247
if new_start != current_pos:
248
# if copies_without_insertion:
249
# self.flush_multi(copies_without_insertion,
250
# lines, new_lines, index_lines)
251
# copies_without_insertion = []
196
252
# non-matching region
197
253
flush_range(current_pos, None, new_start - current_pos,
198
254
lines, new_lines, index_lines)
199
255
current_pos = new_start + range_len
200
256
if not range_len:
202
flush_range(new_start, old_start, range_len, lines,
203
new_lines, index_lines)
258
# copies_without_insertion.append((new_start, old_start, range_len))
259
flush_range(new_start, old_start, range_len,
260
lines, new_lines, index_lines)
261
# if copies_without_insertion:
262
# self.flush_multi(copies_without_insertion,
263
# lines, new_lines, index_lines)
264
# copies_without_insertion = []
204
265
delta_start = (self.endpoint, len(self.lines))
205
266
self.output_lines(new_lines, index_lines)
206
267
trim_encoding_newline(lines)
218
279
delta_details = self.labels_deltas[key]
219
280
delta_lines = self.lines[delta_details[0][1]:delta_details[1][1]]
220
281
label, sha1, delta = parse(delta_lines)
282
## delta = parse(delta_lines)
222
284
raise AssertionError("wrong key: %r, wanted %r" % (label, key))
223
285
# Perhaps we want to keep the line offsets too in memory at least?
224
lines = apply_delta(''.join(self.lines), delta)
225
sha1 = sha_strings(lines)
286
chunks = apply_delta(''.join(self.lines), delta)
287
sha1 = sha_strings(chunks)
290
def flush_multi(self, instructions, lines, new_lines, index_lines):
291
"""Flush a bunch of different ranges out.
293
This should only be called with data that are "pure" copies.
295
flush_range = self.flush_range
296
if len(instructions) > 2:
297
# This is the number of lines to be copied
298
total_copy_range = sum(i[2] for i in instructions)
299
if len(instructions) > 0.5 * total_copy_range:
300
# We are copying N lines, but taking more than N/2
301
# copy instructions to do so. We will go ahead and expand this
302
# text so that other code is able to match against it
303
flush_range(instructions[0][0], None, total_copy_range,
304
lines, new_lines, index_lines)
306
for ns, os, rl in instructions:
307
flush_range(ns, os, rl, lines, new_lines, index_lines)
228
309
def flush_range(self, range_start, copy_start, range_len, lines, new_lines, index_lines):
229
310
insert_instruction = "i,%d\n" % range_len
366
447
# an empty tuple instead.
368
449
# double handling for now. Make it work until then.
369
bytes = ''.join(lines)
370
record = FulltextContentFactory(key, parents, None, bytes)
450
length = sum(map(len, lines))
451
record = ChunkedContentFactory(key, parents, None, lines)
371
452
sha1 = list(self._insert_record_stream([record], random_id=random_id))[0]
372
return sha1, len(bytes), None
453
return sha1, length, None
374
455
def annotate(self, key):
375
456
"""See VersionedFiles.annotate."""
395
476
reannotate = annotate.reannotate
396
477
for record in self.get_record_stream(keys, 'topological', True):
398
fulltext = split_lines(record.get_bytes_as('fulltext'))
479
chunks = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
399
480
parent_lines = [parent_cache[parent] for parent in parent_map[key]]
400
481
parent_cache[key] = list(
401
reannotate(parent_lines, fulltext, key, None, head_cache))
482
reannotate(parent_lines, chunks, key, None, head_cache))
402
483
return parent_cache[key]
404
485
def check(self, progress_bar=None):
446
527
result[key] = self._unadded_refs[key]
530
def _get_delta_lines(self, key):
531
"""Helper function for manual debugging.
533
This is a convenience function that shouldn't be used in production
536
build_details = self._index.get_build_details([key])[key]
537
index_memo = build_details[0]
538
group, delta_lines = self._get_group_and_delta_lines(index_memo)
541
def _get_group_and_delta_lines(self, index_memo):
542
read_memo = index_memo[0:3]
545
plain = self._group_cache[read_memo]
548
zdata = self._access.get_raw_records([read_memo]).next()
549
# decompress - whole thing - this is not a bug, as it
550
# permits caching. We might want to store the partially
551
# decompresed group and decompress object, so that recent
552
# texts are not penalised by big groups.
553
plain = zlib.decompress(zdata) #, index_memo[4])
554
self._group_cache[read_memo] = plain
556
# print len(zdata), len(plain)
557
# parse - requires split_lines, better to have byte offsets
558
# here (but not by much - we only split the region for the
559
# recipe, and we often want to end up with lines anyway.
560
return plain, split_lines(plain[index_memo[3]:index_memo[4]])
562
def get_missing_compression_parent_keys(self):
563
"""Return the keys of missing compression parents.
565
Missing compression parents occur when a record stream was missing
566
basis texts, or a index was scanned that had missing basis texts.
568
# GroupCompress cannot currently reference texts that are not in the
569
# group, so this is valid for now
449
572
def get_record_stream(self, keys, ordering, include_delta_closure):
450
573
"""Get a stream of records for keys.
459
582
valid until the iterator is advanced.
461
584
# keys might be a generator
585
orig_keys = list(keys)
586
keys = set(orig_keys)
465
if not self._index.has_graph:
589
if (not self._index.has_graph
590
and ordering in ('topological', 'gc-optimal')):
466
591
# Cannot topological order when no graph has been stored.
467
592
ordering = 'unordered'
469
594
locations = self._index.get_build_details(keys)
595
local_keys = frozenset(keys).intersection(set(self._unadded_refs))
470
596
if ordering == 'topological':
471
597
# would be better to not globally sort initially but instead
472
598
# start with one key, recurse to its oldest parent, then grab
473
599
# everything in the same group, etc.
474
600
parent_map = dict((key, details[2]) for key, details in
475
601
locations.iteritems())
476
local = frozenset(keys).intersection(set(self._unadded_refs))
602
for key in local_keys:
478
603
parent_map[key] = self._unadded_refs[key]
479
locations[key] = None
480
604
present_keys = topo_sort(parent_map)
481
605
# Now group by source:
606
elif ordering == 'gc-optimal':
607
parent_map = dict((key, details[2]) for key, details in
608
locations.iteritems())
609
for key in local_keys:
610
parent_map[key] = self._unadded_refs[key]
611
# XXX: This only optimizes for the target ordering. We may need to
612
# balance that with the time it takes to extract ordering, by
613
# somehow grouping based on locations[key][0:3]
614
present_keys = sort_gc_optimal(parent_map)
615
elif ordering == 'as-requested':
616
present_keys = [key for key in orig_keys if key in locations
617
or key in local_keys]
483
present_keys = locations.keys()
484
local = frozenset(keys).intersection(set(self._unadded_refs))
486
present_keys.append(key)
487
locations[key] = None
619
# We want to yield the keys in a semi-optimal (read-wise) ordering.
620
# Otherwise we thrash the _group_cache and destroy performance
622
# This is the group the bytes are stored in, followed by the
623
# location in the group
624
return locations[key][0]
625
present_keys = sorted(locations.iterkeys(), key=get_group)
626
# We don't have an ordering for keys in the in-memory object, but
627
# lets process the in-memory ones first.
628
present_keys = list(local_keys) + present_keys
629
locations.update((key, None) for key in local_keys)
488
630
absent_keys = keys.difference(set(locations))
489
631
for key in absent_keys:
490
632
yield AbsentContentFactory(key)
491
633
for key in present_keys:
492
634
if key in self._unadded_refs:
493
lines, sha1 = self._compressor.extract(key)
635
chunks, sha1 = self._compressor.extract(key)
494
636
parents = self._unadded_refs[key]
496
638
index_memo, _, parents, (method, _) = locations[key]
497
read_memo = index_memo[0:3]
500
plain = self._group_cache[read_memo]
503
zdata = self._access.get_raw_records([read_memo]).next()
504
# decompress - whole thing - this is not a bug, as it
505
# permits caching. We might want to store the partially
506
# decompresed group and decompress object, so that recent
507
# texts are not penalised by big groups.
508
decomp = zlib.decompressobj()
509
plain = decomp.decompress(zdata) #, index_memo[4])
510
self._group_cache[read_memo] = plain
512
# print len(zdata), len(plain)
513
# parse - requires split_lines, better to have byte offsets
514
# here (but not by much - we only split the region for the
515
# recipe, and we often want to end up with lines anyway.
516
delta_lines = split_lines(plain[index_memo[3]:index_memo[4]])
639
plain, delta_lines = self._get_group_and_delta_lines(index_memo)
517
640
label, sha1, delta = parse(delta_lines)
519
642
raise AssertionError("wrong key: %r, wanted %r" % (label, key))
520
lines = apply_delta(plain, delta)
521
bytes = ''.join(lines)
522
yield FulltextContentFactory(key, parents, sha1, bytes)
643
chunks = apply_delta(plain, delta)
644
if sha_strings(chunks) != sha1:
645
raise AssertionError('sha1 sum did not match')
646
yield ChunkedContentFactory(key, parents, sha1, chunks)
524
648
def get_sha1s(self, keys):
525
649
"""See VersionedFiles.get_sha1s()."""
578
702
for key, reads, refs in keys_to_add:
579
703
nodes.append((key, "%d %d %s" % (start, length, reads), refs))
580
704
self._index.add_records(nodes, random_id=random_id)
581
706
for record in stream:
582
707
# Raise an error when a record is missing.
583
708
if record.storage_kind == 'absent':
584
raise errors.RevisionNotPresent([record.key], self)
585
elif record.storage_kind == 'fulltext':
586
bytes = record.get_bytes_as('fulltext')
588
adapter_key = record.storage_kind, 'fulltext'
589
adapter = get_adapter(adapter_key)
590
bytes = adapter.get_bytes(record)
709
raise errors.RevisionNotPresent(record.key, self)
711
lines = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
712
except errors.UnavailableRepresentation:
714
bytes = record.get_bytes_as('fulltext')
715
except errors.UnavailableRepresentation:
716
adapter_key = record.storage_kind, 'fulltext'
717
adapter = get_adapter(adapter_key)
718
bytes = adapter.get_bytes(record)
719
lines = osutils.split_lines(bytes)
721
if len(record.key) > 1:
722
prefix = record.key[0]
723
if (last_prefix is not None and prefix != last_prefix):
725
if basis_end > 1024 * 1024 * 4:
727
self._compressor = GroupCompressor(self._delta)
728
self._unadded_refs = {}
591
733
found_sha1, end_point = self._compressor.compress(record.key,
592
split_lines(bytes), record.sha1)
734
lines, record.sha1, soft=soft)
593
735
if record.key[-1] is None:
594
736
key = record.key[:-1] + ('sha1:' + found_sha1,)
670
812
"""Construct a _GCGraphIndex on a graph_index.
672
814
:param graph_index: An implementation of bzrlib.index.GraphIndex.
673
:param is_locked: A callback to check whether the object should answer
815
:param is_locked: A callback, returns True if the index is locked and
675
817
:param parents: If True, record knits parents, if not do not record
677
819
:param add_callback: If not None, allow additions to the index and call
678
820
this callback with a list of added GraphIndex nodes:
679
821
[(node, value, node_refs), ...]
680
:param is_locked: A callback, returns True if the index is locked and
683
823
self._add_callback = add_callback
684
824
self._graph_index = graph_index
738
878
self._add_callback(records)
740
880
def _check_read(self):
741
"""raise if reads are not permitted."""
881
"""Raise an exception if reads are not permitted."""
742
882
if not self._is_locked():
743
883
raise errors.ObjectNotLocked(self)
745
885
def _check_write_ok(self):
746
"""Assert if writes are not permitted."""
886
"""Raise an exception if writes are not permitted."""
747
887
if not self._is_locked():
748
888
raise errors.ObjectNotLocked(self)
750
890
def _get_entries(self, keys, check_present=False):
751
891
"""Get the entries for keys.
893
Note: Callers are responsible for checking that the index is locked
894
before calling this method.
753
896
:param keys: An iterable of index key tuples.