186
160
self._writer.begin()
187
161
# what state is the pack in? (open, finished, aborted)
188
162
self._state = 'open'
191
RepositoryPackCollection.pack_factory = NewPack
163
# no name until we finish writing the content
166
def _check_references(self):
167
"""Make sure our external references are present.
169
Packs are allowed to have deltas whose base is not in the pack, but it
170
must be present somewhere in this collection. It is not allowed to
171
have deltas based on a fallback repository.
172
(See <https://bugs.launchpad.net/bzr/+bug/288751>)
174
# Groupcompress packs don't have any external references, arguably CHK
175
# pages have external references, but we cannot 'cheaply' determine
176
# them without actually walking all of the chk pages.
179
class ResumedGCPack(ResumedPack):
181
def _check_references(self):
182
"""Make sure our external compression parents are present."""
183
# See GCPack._check_references for why this is empty
185
def _get_external_refs(self, index):
186
# GC repositories don't have compression parents external to a given
191
class GCCHKPacker(Packer):
192
"""This class understand what it takes to collect a GCCHK repo."""
194
def __init__(self, pack_collection, packs, suffix, revision_ids=None,
196
super(GCCHKPacker, self).__init__(pack_collection, packs, suffix,
197
revision_ids=revision_ids,
198
reload_func=reload_func)
199
self._pack_collection = pack_collection
200
# ATM, We only support this for GCCHK repositories
201
if pack_collection.chk_index is None:
202
raise AssertionError('pack_collection.chk_index should not be None')
203
self._gather_text_refs = False
204
self._chk_id_roots = []
205
self._chk_p_id_roots = []
206
self._text_refs = None
207
# set by .pack() if self.revision_ids is not None
208
self.revision_keys = None
210
def _get_progress_stream(self, source_vf, keys, message, pb):
212
substream = source_vf.get_record_stream(keys, 'groupcompress', True)
213
for idx, record in enumerate(substream):
215
pb.update(message, idx + 1, len(keys))
219
def _get_filtered_inv_stream(self, source_vf, keys, message, pb=None):
220
"""Filter the texts of inventories, to find the chk pages."""
221
total_keys = len(keys)
222
def _filtered_inv_stream():
224
p_id_roots_set = set()
225
stream = source_vf.get_record_stream(keys, 'groupcompress', True)
226
for idx, record in enumerate(stream):
227
# Inventories should always be with revisions; assume success.
228
bytes = record.get_bytes_as('fulltext')
229
chk_inv = inventory.CHKInventory.deserialise(None, bytes,
232
pb.update('inv', idx, total_keys)
233
key = chk_inv.id_to_entry.key()
234
if key not in id_roots_set:
235
self._chk_id_roots.append(key)
236
id_roots_set.add(key)
237
p_id_map = chk_inv.parent_id_basename_to_file_id
239
raise AssertionError('Parent id -> file_id map not set')
241
if key not in p_id_roots_set:
242
p_id_roots_set.add(key)
243
self._chk_p_id_roots.append(key)
245
# We have finished processing all of the inventory records, we
246
# don't need these sets anymore
248
p_id_roots_set.clear()
249
return _filtered_inv_stream()
251
def _get_chk_streams(self, source_vf, keys, pb=None):
252
# We want to stream the keys from 'id_roots', and things they
253
# reference, and then stream things from p_id_roots and things they
254
# reference, and then any remaining keys that we didn't get to.
256
# We also group referenced texts together, so if one root references a
257
# text with prefix 'a', and another root references a node with prefix
258
# 'a', we want to yield those nodes before we yield the nodes for 'b'
259
# This keeps 'similar' nodes together.
261
# Note: We probably actually want multiple streams here, to help the
262
# client understand that the different levels won't compress well
263
# against each other.
264
# Test the difference between using one Group per level, and
265
# using 1 Group per prefix. (so '' (root) would get a group, then
266
# all the references to search-key 'a' would get a group, etc.)
267
total_keys = len(keys)
268
remaining_keys = set(keys)
270
if self._gather_text_refs:
271
self._text_refs = set()
272
def _get_referenced_stream(root_keys, parse_leaf_nodes=False):
275
keys_by_search_prefix = {}
276
remaining_keys.difference_update(cur_keys)
278
def handle_internal_node(node):
279
for prefix, value in node._items.iteritems():
280
# We don't want to request the same key twice, and we
281
# want to order it by the first time it is seen.
282
# Even further, we don't want to request a key which is
283
# not in this group of pack files (it should be in the
284
# repo, but it doesn't have to be in the group being
286
# TODO: consider how to treat externally referenced chk
287
# pages as 'external_references' so that we
288
# always fill them in for stacked branches
289
if value not in next_keys and value in remaining_keys:
290
keys_by_search_prefix.setdefault(prefix,
293
def handle_leaf_node(node):
294
# Store is None, because we know we have a LeafNode, and we
295
# just want its entries
296
for file_id, bytes in node.iteritems(None):
297
self._text_refs.add(chk_map._bytes_to_text_key(bytes))
299
stream = source_vf.get_record_stream(cur_keys,
300
'as-requested', True)
301
for record in stream:
302
if record.storage_kind == 'absent':
303
# An absent CHK record: we assume that the missing
304
# record is in a different pack - e.g. a page not
305
# altered by the commit we're packing.
307
bytes = record.get_bytes_as('fulltext')
308
# We don't care about search_key_func for this code,
309
# because we only care about external references.
310
node = chk_map._deserialise(bytes, record.key,
311
search_key_func=None)
312
common_base = node._search_prefix
313
if isinstance(node, chk_map.InternalNode):
314
handle_internal_node(node)
315
elif parse_leaf_nodes:
316
handle_leaf_node(node)
319
pb.update('chk node', counter[0], total_keys)
322
# Double check that we won't be emitting any keys twice
323
# If we get rid of the pre-calculation of all keys, we could
324
# turn this around and do
325
# next_keys.difference_update(seen_keys)
326
# However, we also may have references to chk pages in another
327
# pack file during autopack. We filter earlier, so we should no
328
# longer need to do this
329
# next_keys = next_keys.intersection(remaining_keys)
331
for prefix in sorted(keys_by_search_prefix):
332
cur_keys.extend(keys_by_search_prefix.pop(prefix))
333
for stream in _get_referenced_stream(self._chk_id_roots,
334
self._gather_text_refs):
336
del self._chk_id_roots
337
# while it isn't really possible for chk_id_roots to not be in the
338
# local group of packs, it is possible that the tree shape has not
339
# changed recently, so we need to filter _chk_p_id_roots by the
341
chk_p_id_roots = [key for key in self._chk_p_id_roots
342
if key in remaining_keys]
343
del self._chk_p_id_roots
344
for stream in _get_referenced_stream(chk_p_id_roots, False):
347
trace.mutter('There were %d keys in the chk index, %d of which'
348
' were not referenced', total_keys,
350
if self.revision_ids is None:
351
stream = source_vf.get_record_stream(remaining_keys,
355
def _build_vf(self, index_name, parents, delta, for_write=False):
356
"""Build a VersionedFiles instance on top of this group of packs."""
357
index_name = index_name + '_index'
359
access = _DirectPackAccess(index_to_pack,
360
reload_func=self._reload_func)
363
if self.new_pack is None:
364
raise AssertionError('No new pack has been set')
365
index = getattr(self.new_pack, index_name)
366
index_to_pack[index] = self.new_pack.access_tuple()
367
index.set_optimize(for_size=True)
368
access.set_writer(self.new_pack._writer, index,
369
self.new_pack.access_tuple())
370
add_callback = index.add_nodes
373
for pack in self.packs:
374
sub_index = getattr(pack, index_name)
375
index_to_pack[sub_index] = pack.access_tuple()
376
indices.append(sub_index)
377
index = _mod_index.CombinedGraphIndex(indices)
379
vf = GroupCompressVersionedFiles(
381
add_callback=add_callback,
383
is_locked=self._pack_collection.repo.is_locked),
388
def _build_vfs(self, index_name, parents, delta):
389
"""Build the source and target VersionedFiles."""
390
source_vf = self._build_vf(index_name, parents,
391
delta, for_write=False)
392
target_vf = self._build_vf(index_name, parents,
393
delta, for_write=True)
394
return source_vf, target_vf
396
def _copy_stream(self, source_vf, target_vf, keys, message, vf_to_stream,
398
trace.mutter('repacking %d %s', len(keys), message)
399
self.pb.update('repacking %s' % (message,), pb_offset)
400
child_pb = ui.ui_factory.nested_progress_bar()
402
stream = vf_to_stream(source_vf, keys, message, child_pb)
403
for _ in target_vf._insert_record_stream(stream,
410
def _copy_revision_texts(self):
411
source_vf, target_vf = self._build_vfs('revision', True, False)
412
if not self.revision_keys:
413
# We are doing a full fetch, aka 'pack'
414
self.revision_keys = source_vf.keys()
415
self._copy_stream(source_vf, target_vf, self.revision_keys,
416
'revisions', self._get_progress_stream, 1)
418
def _copy_inventory_texts(self):
419
source_vf, target_vf = self._build_vfs('inventory', True, True)
420
# It is not sufficient to just use self.revision_keys, as stacked
421
# repositories can have more inventories than they have revisions.
422
# One alternative would be to do something with
423
# get_parent_map(self.revision_keys), but that shouldn't be any faster
425
inventory_keys = source_vf.keys()
426
missing_inventories = set(self.revision_keys).difference(inventory_keys)
427
if missing_inventories:
428
# Go back to the original repo, to see if these are really missing
429
# https://bugs.launchpad.net/bzr/+bug/437003
430
# If we are packing a subset of the repo, it is fine to just have
431
# the data in another Pack file, which is not included in this pack
433
inv_index = self._pack_collection.repo.inventories._index
434
pmap = inv_index.get_parent_map(missing_inventories)
435
really_missing = missing_inventories.difference(pmap)
437
missing_inventories = sorted(really_missing)
438
raise ValueError('We are missing inventories for revisions: %s'
439
% (missing_inventories,))
440
self._copy_stream(source_vf, target_vf, inventory_keys,
441
'inventories', self._get_filtered_inv_stream, 2)
443
def _get_chk_vfs_for_copy(self):
444
return self._build_vfs('chk', False, False)
446
def _copy_chk_texts(self):
447
source_vf, target_vf = self._get_chk_vfs_for_copy()
448
# TODO: This is technically spurious... if it is a performance issue,
450
total_keys = source_vf.keys()
451
trace.mutter('repacking chk: %d id_to_entry roots,'
452
' %d p_id_map roots, %d total keys',
453
len(self._chk_id_roots), len(self._chk_p_id_roots),
455
self.pb.update('repacking chk', 3)
456
child_pb = ui.ui_factory.nested_progress_bar()
458
for stream in self._get_chk_streams(source_vf, total_keys,
460
for _ in target_vf._insert_record_stream(stream,
467
def _copy_text_texts(self):
468
source_vf, target_vf = self._build_vfs('text', True, True)
469
# XXX: We don't walk the chk map to determine referenced (file_id,
470
# revision_id) keys. We don't do it yet because you really need
471
# to filter out the ones that are present in the parents of the
472
# rev just before the ones you are copying, otherwise the filter
473
# is grabbing too many keys...
474
text_keys = source_vf.keys()
475
self._copy_stream(source_vf, target_vf, text_keys,
476
'texts', self._get_progress_stream, 4)
478
def _copy_signature_texts(self):
479
source_vf, target_vf = self._build_vfs('signature', False, False)
480
signature_keys = source_vf.keys()
481
signature_keys.intersection(self.revision_keys)
482
self._copy_stream(source_vf, target_vf, signature_keys,
483
'signatures', self._get_progress_stream, 5)
485
def _create_pack_from_packs(self):
486
self.pb.update('repacking', 0, 7)
487
self.new_pack = self.open_pack()
488
# Is this necessary for GC ?
489
self.new_pack.set_write_cache_size(1024*1024)
490
self._copy_revision_texts()
491
self._copy_inventory_texts()
492
self._copy_chk_texts()
493
self._copy_text_texts()
494
self._copy_signature_texts()
495
self.new_pack._check_references()
496
if not self._use_pack(self.new_pack):
497
self.new_pack.abort()
499
self.new_pack.finish_content()
500
if len(self.packs) == 1:
501
old_pack = self.packs[0]
502
if old_pack.name == self.new_pack._hash.hexdigest():
503
# The single old pack was already optimally packed.
504
trace.mutter('single pack %s was already optimally packed',
506
self.new_pack.abort()
508
self.pb.update('finishing repack', 6, 7)
509
self.new_pack.finish()
510
self._pack_collection.allocate(self.new_pack)
514
class GCCHKReconcilePacker(GCCHKPacker):
515
"""A packer which regenerates indices etc as it copies.
517
This is used by ``bzr reconcile`` to cause parent text pointers to be
521
def __init__(self, *args, **kwargs):
522
super(GCCHKReconcilePacker, self).__init__(*args, **kwargs)
523
self._data_changed = False
524
self._gather_text_refs = True
526
def _copy_inventory_texts(self):
527
source_vf, target_vf = self._build_vfs('inventory', True, True)
528
self._copy_stream(source_vf, target_vf, self.revision_keys,
529
'inventories', self._get_filtered_inv_stream, 2)
530
if source_vf.keys() != self.revision_keys:
531
self._data_changed = True
533
def _copy_text_texts(self):
534
"""generate what texts we should have and then copy."""
535
source_vf, target_vf = self._build_vfs('text', True, True)
536
trace.mutter('repacking %d texts', len(self._text_refs))
537
self.pb.update("repacking texts", 4)
538
# we have three major tasks here:
539
# 1) generate the ideal index
540
repo = self._pack_collection.repo
541
# We want the one we just wrote, so base it on self.new_pack
542
revision_vf = self._build_vf('revision', True, False, for_write=True)
543
ancestor_keys = revision_vf.get_parent_map(revision_vf.keys())
544
# Strip keys back into revision_ids.
545
ancestors = dict((k[0], tuple([p[0] for p in parents]))
546
for k, parents in ancestor_keys.iteritems())
548
# TODO: _generate_text_key_index should be much cheaper to generate from
549
# a chk repository, rather than the current implementation
550
ideal_index = repo._generate_text_key_index(None, ancestors)
551
file_id_parent_map = source_vf.get_parent_map(self._text_refs)
552
# 2) generate a keys list that contains all the entries that can
553
# be used as-is, with corrected parents.
555
new_parent_keys = {} # (key, parent_keys)
557
NULL_REVISION = _mod_revision.NULL_REVISION
558
for key in self._text_refs:
564
ideal_parents = tuple(ideal_index[key])
566
discarded_keys.append(key)
567
self._data_changed = True
569
if ideal_parents == (NULL_REVISION,):
571
source_parents = file_id_parent_map[key]
572
if ideal_parents == source_parents:
576
# We need to change the parent graph, but we don't need to
577
# re-insert the text (since we don't pun the compression
578
# parent with the parents list)
579
self._data_changed = True
580
new_parent_keys[key] = ideal_parents
581
# we're finished with some data.
583
del file_id_parent_map
584
# 3) bulk copy the data, updating records than need it
585
def _update_parents_for_texts():
586
stream = source_vf.get_record_stream(self._text_refs,
587
'groupcompress', False)
588
for record in stream:
589
if record.key in new_parent_keys:
590
record.parents = new_parent_keys[record.key]
592
target_vf.insert_record_stream(_update_parents_for_texts())
594
def _use_pack(self, new_pack):
595
"""Override _use_pack to check for reconcile having changed content."""
596
return new_pack.data_inserted() and self._data_changed
599
class GCCHKCanonicalizingPacker(GCCHKPacker):
600
"""A packer that ensures inventories have canonical-form CHK maps.
602
Ideally this would be part of reconcile, but it's very slow and rarely
603
needed. (It repairs repositories affected by
604
https://bugs.launchpad.net/bzr/+bug/522637).
607
def __init__(self, *args, **kwargs):
608
super(GCCHKCanonicalizingPacker, self).__init__(*args, **kwargs)
609
self._data_changed = False
611
def _exhaust_stream(self, source_vf, keys, message, vf_to_stream, pb_offset):
612
"""Create and exhaust a stream, but don't insert it.
614
This is useful to get the side-effects of generating a stream.
616
self.pb.update('scanning %s' % (message,), pb_offset)
617
child_pb = ui.ui_factory.nested_progress_bar()
619
list(vf_to_stream(source_vf, keys, message, child_pb))
623
def _copy_inventory_texts(self):
624
source_vf, target_vf = self._build_vfs('inventory', True, True)
625
source_chk_vf, target_chk_vf = self._get_chk_vfs_for_copy()
626
inventory_keys = source_vf.keys()
627
# First, copy the existing CHKs on the assumption that most of them
628
# will be correct. This will save us from having to reinsert (and
629
# recompress) these records later at the cost of perhaps preserving a
631
# (Iterate but don't insert _get_filtered_inv_stream to populate the
632
# variables needed by GCCHKPacker._copy_chk_texts.)
633
self._exhaust_stream(source_vf, inventory_keys, 'inventories',
634
self._get_filtered_inv_stream, 2)
635
GCCHKPacker._copy_chk_texts(self)
636
# Now copy and fix the inventories, and any regenerated CHKs.
637
def chk_canonicalizing_inv_stream(source_vf, keys, message, pb=None):
638
return self._get_filtered_canonicalizing_inv_stream(
639
source_vf, keys, message, pb, source_chk_vf, target_chk_vf)
640
self._copy_stream(source_vf, target_vf, inventory_keys,
641
'inventories', chk_canonicalizing_inv_stream, 4)
643
def _copy_chk_texts(self):
644
# No-op; in this class this happens during _copy_inventory_texts.
647
def _get_filtered_canonicalizing_inv_stream(self, source_vf, keys, message,
648
pb=None, source_chk_vf=None, target_chk_vf=None):
649
"""Filter the texts of inventories, regenerating CHKs to make sure they
652
total_keys = len(keys)
653
target_chk_vf = versionedfile.NoDupeAddLinesDecorator(target_chk_vf)
654
def _filtered_inv_stream():
655
stream = source_vf.get_record_stream(keys, 'groupcompress', True)
656
search_key_name = None
657
for idx, record in enumerate(stream):
658
# Inventories should always be with revisions; assume success.
659
bytes = record.get_bytes_as('fulltext')
660
chk_inv = inventory.CHKInventory.deserialise(
661
source_chk_vf, bytes, record.key)
663
pb.update('inv', idx, total_keys)
664
chk_inv.id_to_entry._ensure_root()
665
if search_key_name is None:
666
# Find the name corresponding to the search_key_func
667
search_key_reg = chk_map.search_key_registry
668
for search_key_name, func in search_key_reg.iteritems():
669
if func == chk_inv.id_to_entry._search_key_func:
671
canonical_inv = inventory.CHKInventory.from_inventory(
672
target_chk_vf, chk_inv,
673
maximum_size=chk_inv.id_to_entry._root_node._maximum_size,
674
search_key_name=search_key_name)
675
if chk_inv.id_to_entry.key() != canonical_inv.id_to_entry.key():
677
'Non-canonical CHK map for id_to_entry of inv: %s '
678
'(root is %s, should be %s)' % (chk_inv.revision_id,
679
chk_inv.id_to_entry.key()[0],
680
canonical_inv.id_to_entry.key()[0]))
681
self._data_changed = True
682
p_id_map = chk_inv.parent_id_basename_to_file_id
683
p_id_map._ensure_root()
684
canon_p_id_map = canonical_inv.parent_id_basename_to_file_id
685
if p_id_map.key() != canon_p_id_map.key():
687
'Non-canonical CHK map for parent_id_to_basename of '
688
'inv: %s (root is %s, should be %s)'
689
% (chk_inv.revision_id, p_id_map.key()[0],
690
canon_p_id_map.key()[0]))
691
self._data_changed = True
692
yield versionedfile.ChunkedContentFactory(record.key,
693
record.parents, record.sha1,
694
canonical_inv.to_lines())
695
# We have finished processing all of the inventory records, we
696
# don't need these sets anymore
697
return _filtered_inv_stream()
699
def _use_pack(self, new_pack):
700
"""Override _use_pack to check for reconcile having changed content."""
701
return new_pack.data_inserted() and self._data_changed
193
704
class GCRepositoryPackCollection(RepositoryPackCollection):
195
706
pack_factory = GCPack
197
def _make_index(self, name, suffix):
198
"""Overridden to use BTreeGraphIndex objects."""
199
size_offset = self._suffix_offsets[suffix]
200
index_name = name + suffix
201
index_size = self._names[name][size_offset]
202
return BTreeGraphIndex(
203
self._index_transport, index_name, index_size)
205
def _start_write_group(self):
206
# Overridden to add 'self.pack_factory()'
207
# Do not permit preparation for writing if we're not in a 'write lock'.
208
if not self.repo.is_write_locked():
209
raise errors.NotWriteLocked(self)
210
self._new_pack = self.pack_factory(self, upload_suffix='.pack',
211
file_mode=self.repo.bzrdir._get_file_mode())
212
# allow writing: queue writes to a new index
213
self.revision_index.add_writable_index(self._new_pack.revision_index,
215
self.inventory_index.add_writable_index(self._new_pack.inventory_index,
217
self.text_index.add_writable_index(self._new_pack.text_index,
219
self.signature_index.add_writable_index(self._new_pack.signature_index,
221
if chk_support and self.chk_index is not None:
222
self.chk_index.add_writable_index(self._new_pack.chk_index,
224
self.repo.chk_bytes._index._add_callback = self.chk_index.add_callback
226
self.repo.inventories._index._add_callback = self.inventory_index.add_callback
227
self.repo.revisions._index._add_callback = self.revision_index.add_callback
228
self.repo.signatures._index._add_callback = self.signature_index.add_callback
229
self.repo.texts._index._add_callback = self.text_index.add_callback
233
class GCPackRepository(KnitPackRepository):
234
"""GC customisation of KnitPackRepository."""
707
resumed_pack_factory = ResumedGCPack
708
normal_packer_class = GCCHKPacker
709
optimising_packer_class = GCCHKPacker
711
def _check_new_inventories(self):
712
"""Detect missing inventories or chk root entries for the new revisions
715
:returns: list of strs, summarising any problems found. If the list is
716
empty no problems were found.
718
# Ensure that all revisions added in this write group have:
719
# - corresponding inventories,
720
# - chk root entries for those inventories,
721
# - and any present parent inventories have their chk root
723
# And all this should be independent of any fallback repository.
725
key_deps = self.repo.revisions._index._key_dependencies
726
new_revisions_keys = key_deps.get_new_keys()
727
no_fallback_inv_index = self.repo.inventories._index
728
no_fallback_chk_bytes_index = self.repo.chk_bytes._index
729
no_fallback_texts_index = self.repo.texts._index
730
inv_parent_map = no_fallback_inv_index.get_parent_map(
732
# Are any inventories for corresponding to the new revisions missing?
733
corresponding_invs = set(inv_parent_map)
734
missing_corresponding = set(new_revisions_keys)
735
missing_corresponding.difference_update(corresponding_invs)
736
if missing_corresponding:
737
problems.append("inventories missing for revisions %s" %
738
(sorted(missing_corresponding),))
740
# Are any chk root entries missing for any inventories? This includes
741
# any present parent inventories, which may be used when calculating
742
# deltas for streaming.
743
all_inv_keys = set(corresponding_invs)
744
for parent_inv_keys in inv_parent_map.itervalues():
745
all_inv_keys.update(parent_inv_keys)
746
# Filter out ghost parents.
747
all_inv_keys.intersection_update(
748
no_fallback_inv_index.get_parent_map(all_inv_keys))
749
parent_invs_only_keys = all_inv_keys.symmetric_difference(
752
inv_ids = [key[-1] for key in all_inv_keys]
753
parent_invs_only_ids = [key[-1] for key in parent_invs_only_keys]
754
root_key_info = _build_interesting_key_sets(
755
self.repo, inv_ids, parent_invs_only_ids)
756
expected_chk_roots = root_key_info.all_keys()
757
present_chk_roots = no_fallback_chk_bytes_index.get_parent_map(
759
missing_chk_roots = expected_chk_roots.difference(present_chk_roots)
760
if missing_chk_roots:
762
"missing referenced chk root keys: %s."
763
"Run 'bzr reconcile --canonicalize-chks' on the affected "
765
% (sorted(missing_chk_roots),))
766
# Don't bother checking any further.
768
# Find all interesting chk_bytes records, and make sure they are
769
# present, as well as the text keys they reference.
770
chk_bytes_no_fallbacks = self.repo.chk_bytes.without_fallbacks()
771
chk_bytes_no_fallbacks._search_key_func = \
772
self.repo.chk_bytes._search_key_func
773
chk_diff = chk_map.iter_interesting_nodes(
774
chk_bytes_no_fallbacks, root_key_info.interesting_root_keys,
775
root_key_info.uninteresting_root_keys)
778
for record in _filter_text_keys(chk_diff, text_keys,
779
chk_map._bytes_to_text_key):
781
except errors.NoSuchRevision, e:
782
# XXX: It would be nice if we could give a more precise error here.
783
problems.append("missing chk node(s) for id_to_entry maps")
784
chk_diff = chk_map.iter_interesting_nodes(
785
chk_bytes_no_fallbacks, root_key_info.interesting_pid_root_keys,
786
root_key_info.uninteresting_pid_root_keys)
788
for interesting_rec, interesting_map in chk_diff:
790
except errors.NoSuchRevision, e:
792
"missing chk node(s) for parent_id_basename_to_file_id maps")
793
present_text_keys = no_fallback_texts_index.get_parent_map(text_keys)
794
missing_text_keys = text_keys.difference(present_text_keys)
795
if missing_text_keys:
796
problems.append("missing text keys: %r"
797
% (sorted(missing_text_keys),))
801
class CHKInventoryRepository(PackRepository):
802
"""subclass of PackRepository that uses CHK based inventories."""
236
804
def __init__(self, _format, a_bzrdir, control_files, _commit_builder_class,
238
806
"""Overridden to change pack collection class."""
239
KnitPackRepository.__init__(self, _format, a_bzrdir, control_files,
240
_commit_builder_class, _serializer)
241
# and now replace everything it did :)
807
super(CHKInventoryRepository, self).__init__(_format, a_bzrdir,
808
control_files, _commit_builder_class, _serializer)
242
809
index_transport = self._transport.clone('indices')
244
self._pack_collection = GCRepositoryPackCollection(self,
245
self._transport, index_transport,
246
self._transport.clone('upload'),
247
self._transport.clone('packs'),
248
_format.index_builder_class,
250
use_chk_index=self._format.supports_chks,
253
self._pack_collection = GCRepositoryPackCollection(self,
254
self._transport, index_transport,
255
self._transport.clone('upload'),
256
self._transport.clone('packs'),
257
_format.index_builder_class,
810
self._pack_collection = GCRepositoryPackCollection(self,
811
self._transport, index_transport,
812
self._transport.clone('upload'),
813
self._transport.clone('packs'),
814
_format.index_builder_class,
816
use_chk_index=self._format.supports_chks,
259
818
self.inventories = GroupCompressVersionedFiles(
260
819
_GCGraphIndex(self._pack_collection.inventory_index.combined_index,
261
820
add_callback=self._pack_collection.inventory_index.add_callback,
262
parents=True, is_locked=self.is_locked),
821
parents=True, is_locked=self.is_locked,
822
inconsistency_fatal=False),
263
823
access=self._pack_collection.inventory_index.data_access)
264
824
self.revisions = GroupCompressVersionedFiles(
265
825
_GCGraphIndex(self._pack_collection.revision_index.combined_index,
266
826
add_callback=self._pack_collection.revision_index.add_callback,
267
parents=True, is_locked=self.is_locked),
827
parents=True, is_locked=self.is_locked,
828
track_external_parent_refs=True, track_new_keys=True),
268
829
access=self._pack_collection.revision_index.data_access,
270
831
self.signatures = GroupCompressVersionedFiles(
271
832
_GCGraphIndex(self._pack_collection.signature_index.combined_index,
272
833
add_callback=self._pack_collection.signature_index.add_callback,
273
parents=False, is_locked=self.is_locked),
834
parents=False, is_locked=self.is_locked,
835
inconsistency_fatal=False),
274
836
access=self._pack_collection.signature_index.data_access,
276
838
self.texts = GroupCompressVersionedFiles(
277
839
_GCGraphIndex(self._pack_collection.text_index.combined_index,
278
840
add_callback=self._pack_collection.text_index.add_callback,
279
parents=True, is_locked=self.is_locked),
841
parents=True, is_locked=self.is_locked,
842
inconsistency_fatal=False),
280
843
access=self._pack_collection.text_index.data_access)
281
if chk_support and _format.supports_chks:
282
# No graph, no compression:- references from chks are between
283
# different objects not temporal versions of the same; and without
284
# some sort of temporal structure knit compression will just fail.
285
self.chk_bytes = GroupCompressVersionedFiles(
286
_GCGraphIndex(self._pack_collection.chk_index.combined_index,
287
add_callback=self._pack_collection.chk_index.add_callback,
288
parents=False, is_locked=self.is_locked),
289
access=self._pack_collection.chk_index.data_access)
291
self.chk_bytes = None
844
# No parents, individual CHK pages don't have specific ancestry
845
self.chk_bytes = GroupCompressVersionedFiles(
846
_GCGraphIndex(self._pack_collection.chk_index.combined_index,
847
add_callback=self._pack_collection.chk_index.add_callback,
848
parents=False, is_locked=self.is_locked,
849
inconsistency_fatal=False),
850
access=self._pack_collection.chk_index.data_access)
851
search_key_name = self._format._serializer.search_key_name
852
search_key_func = chk_map.search_key_registry.get(search_key_name)
853
self.chk_bytes._search_key_func = search_key_func
292
854
# True when the repository object is 'write locked' (as opposed to the
293
# physical lock only taken out around changes to the pack-names list.)
855
# physical lock only taken out around changes to the pack-names list.)
294
856
# Another way to represent this would be a decorator around the control
295
857
# files object that presents logical locks as physical ones - if this
296
858
# gets ugly consider that alternative design. RBC 20071011
301
863
self._reconcile_fixes_text_parents = True
302
864
self._reconcile_backsup_inventory = False
306
class GCCHKPackRepository(CHKInventoryRepository):
307
"""GC customisation of CHKInventoryRepository."""
309
def __init__(self, _format, a_bzrdir, control_files, _commit_builder_class,
311
"""Overridden to change pack collection class."""
312
KnitPackRepository.__init__(self, _format, a_bzrdir, control_files,
313
_commit_builder_class, _serializer)
314
# and now replace everything it did :)
315
index_transport = self._transport.clone('indices')
317
self._pack_collection = GCRepositoryPackCollection(self,
318
self._transport, index_transport,
319
self._transport.clone('upload'),
320
self._transport.clone('packs'),
321
_format.index_builder_class,
323
use_chk_index=self._format.supports_chks,
326
self._pack_collection = GCRepositoryPackCollection(self,
327
self._transport, index_transport,
328
self._transport.clone('upload'),
329
self._transport.clone('packs'),
330
_format.index_builder_class,
332
self.inventories = GroupCompressVersionedFiles(
333
_GCGraphIndex(self._pack_collection.inventory_index.combined_index,
334
add_callback=self._pack_collection.inventory_index.add_callback,
335
parents=True, is_locked=self.is_locked),
336
access=self._pack_collection.inventory_index.data_access)
337
self.revisions = GroupCompressVersionedFiles(
338
_GCGraphIndex(self._pack_collection.revision_index.combined_index,
339
add_callback=self._pack_collection.revision_index.add_callback,
340
parents=True, is_locked=self.is_locked),
341
access=self._pack_collection.revision_index.data_access,
343
self.signatures = GroupCompressVersionedFiles(
344
_GCGraphIndex(self._pack_collection.signature_index.combined_index,
345
add_callback=self._pack_collection.signature_index.add_callback,
346
parents=False, is_locked=self.is_locked),
347
access=self._pack_collection.signature_index.data_access,
349
self.texts = GroupCompressVersionedFiles(
350
_GCGraphIndex(self._pack_collection.text_index.combined_index,
351
add_callback=self._pack_collection.text_index.add_callback,
352
parents=True, is_locked=self.is_locked),
353
access=self._pack_collection.text_index.data_access)
354
if chk_support and _format.supports_chks:
355
# No graph, no compression:- references from chks are between
356
# different objects not temporal versions of the same; and without
357
# some sort of temporal structure knit compression will just fail.
358
self.chk_bytes = GroupCompressVersionedFiles(
359
_GCGraphIndex(self._pack_collection.chk_index.combined_index,
360
add_callback=self._pack_collection.chk_index.add_callback,
361
parents=False, is_locked=self.is_locked),
362
access=self._pack_collection.chk_index.data_access)
364
self.chk_bytes = None
365
# True when the repository object is 'write locked' (as opposed to the
366
# physical lock only taken out around changes to the pack-names list.)
367
# Another way to represent this would be a decorator around the control
368
# files object that presents logical locks as physical ones - if this
369
# gets ugly consider that alternative design. RBC 20071011
370
self._write_lock_count = 0
371
self._transaction = None
373
self._reconcile_does_inventory_gc = True
374
self._reconcile_fixes_text_parents = True
375
self._reconcile_backsup_inventory = False
378
class RepositoryFormatPackGCPlain(RepositoryFormatPackDevelopment2):
379
"""A B+Tree index using pack repository."""
381
repository_class = GCPackRepository
383
def get_format_string(self):
384
"""See RepositoryFormat.get_format_string()."""
385
return ("Bazaar development format - btree+gc "
386
"(needs bzr.dev from 1.6)\n")
388
def get_format_description(self):
389
"""See RepositoryFormat.get_format_description()."""
390
return ("Development repository format - btree+groupcompress "
391
", interoperates with pack-0.92\n")
394
class RepositoryFormatPackGCRichRoot(RepositoryFormatKnitPack4):
395
"""A B+Tree index using pack repository."""
397
repository_class = GCPackRepository
399
def get_format_string(self):
400
"""See RepositoryFormat.get_format_string()."""
401
return ("Bazaar development format - btree+gc-rich-root "
402
"(needs bzr.dev from 1.6)\n")
404
def get_format_description(self):
405
"""See RepositoryFormat.get_format_description()."""
406
return ("Development repository format - btree+groupcompress "
407
", interoperates with rich-root-pack\n")
410
class RepositoryFormatPackGCSubtrees(RepositoryFormatPackDevelopment2Subtree):
411
"""A B+Tree index using pack repository."""
413
repository_class = GCPackRepository
415
def get_format_string(self):
416
"""See RepositoryFormat.get_format_string()."""
417
return ("Bazaar development format - btree+gc-subtrees "
418
"(needs bzr.dev from 1.6)\n")
420
def get_format_description(self):
421
"""See RepositoryFormat.get_format_description()."""
422
return ("Development repository format - btree+groupcompress "
423
", interoperates with pack-0.92-subtrees\n")
426
'Bazaar development format - 1.9+gc (needs bzr.dev from 1.9)\n',
427
class RepositoryFormatPackGCPlainCHK(RepositoryFormatPackDevelopment4):
428
"""A CHK+group compress pack repository."""
430
repository_class = GCCHKPackRepository
432
def get_format_string(self):
433
"""See RepositoryFormat.get_format_string()."""
434
return ('Bazaar development format - chk+gc '
435
'(needs bzr.dev from 1.12)\n')
437
def get_format_description(self):
438
"""See RepositoryFormat.get_format_description()."""
439
return ("Development repository format - chk+groupcompress "
440
", interoperates with pack-0.92\n")
446
def pack_incompatible(source, target, orig_method=InterPackRepo.is_compatible):
447
"""Be incompatible with the regular fetch code."""
448
formats = (RepositoryFormatPackGCPlain, RepositoryFormatPackGCRichRoot,
449
RepositoryFormatPackGCSubtrees)
451
formats = formats = (RepositoryFormatPackGCPlain,)
452
if isinstance(source._format, formats) or isinstance(target._format, formats):
455
return orig_method(source, target)
458
InterPackRepo.is_compatible = staticmethod(pack_incompatible)
866
def _add_inventory_checked(self, revision_id, inv, parents):
867
"""Add inv to the repository after checking the inputs.
869
This function can be overridden to allow different inventory styles.
871
:seealso: add_inventory, for the contract.
874
serializer = self._format._serializer
875
result = inventory.CHKInventory.from_inventory(self.chk_bytes, inv,
876
maximum_size=serializer.maximum_size,
877
search_key_name=serializer.search_key_name)
878
inv_lines = result.to_lines()
879
return self._inventory_add_lines(revision_id, parents,
880
inv_lines, check_content=False)
882
def _create_inv_from_null(self, delta, revision_id):
883
"""This will mutate new_inv directly.
885
This is a simplified form of create_by_apply_delta which knows that all
886
the old values must be None, so everything is a create.
888
serializer = self._format._serializer
889
new_inv = inventory.CHKInventory(serializer.search_key_name)
890
new_inv.revision_id = revision_id
891
entry_to_bytes = new_inv._entry_to_bytes
892
id_to_entry_dict = {}
893
parent_id_basename_dict = {}
894
for old_path, new_path, file_id, entry in delta:
895
if old_path is not None:
896
raise ValueError('Invalid delta, somebody tried to delete %r'
897
' from the NULL_REVISION'
898
% ((old_path, file_id),))
900
raise ValueError('Invalid delta, delta from NULL_REVISION has'
901
' no new_path %r' % (file_id,))
903
new_inv.root_id = file_id
904
parent_id_basename_key = StaticTuple('', '').intern()
906
utf8_entry_name = entry.name.encode('utf-8')
907
parent_id_basename_key = StaticTuple(entry.parent_id,
908
utf8_entry_name).intern()
909
new_value = entry_to_bytes(entry)
911
# new_inv._path_to_fileid_cache[new_path] = file_id
912
key = StaticTuple(file_id).intern()
913
id_to_entry_dict[key] = new_value
914
parent_id_basename_dict[parent_id_basename_key] = file_id
916
new_inv._populate_from_dicts(self.chk_bytes, id_to_entry_dict,
917
parent_id_basename_dict, maximum_size=serializer.maximum_size)
920
def add_inventory_by_delta(self, basis_revision_id, delta, new_revision_id,
921
parents, basis_inv=None, propagate_caches=False):
922
"""Add a new inventory expressed as a delta against another revision.
924
:param basis_revision_id: The inventory id the delta was created
926
:param delta: The inventory delta (see Inventory.apply_delta for
928
:param new_revision_id: The revision id that the inventory is being
930
:param parents: The revision ids of the parents that revision_id is
931
known to have and are in the repository already. These are supplied
932
for repositories that depend on the inventory graph for revision
933
graph access, as well as for those that pun ancestry with delta
935
:param basis_inv: The basis inventory if it is already known,
937
:param propagate_caches: If True, the caches for this inventory are
938
copied to and updated for the result if possible.
940
:returns: (validator, new_inv)
941
The validator(which is a sha1 digest, though what is sha'd is
942
repository format specific) of the serialized inventory, and the
945
if not self.is_in_write_group():
946
raise AssertionError("%r not in write group" % (self,))
947
_mod_revision.check_not_reserved_id(new_revision_id)
949
if basis_inv is None:
950
if basis_revision_id == _mod_revision.NULL_REVISION:
951
new_inv = self._create_inv_from_null(delta, new_revision_id)
952
if new_inv.root_id is None:
953
raise errors.RootMissing()
954
inv_lines = new_inv.to_lines()
955
return self._inventory_add_lines(new_revision_id, parents,
956
inv_lines, check_content=False), new_inv
958
basis_tree = self.revision_tree(basis_revision_id)
959
basis_tree.lock_read()
960
basis_inv = basis_tree.root_inventory
962
result = basis_inv.create_by_apply_delta(delta, new_revision_id,
963
propagate_caches=propagate_caches)
964
inv_lines = result.to_lines()
965
return self._inventory_add_lines(new_revision_id, parents,
966
inv_lines, check_content=False), result
968
if basis_tree is not None:
971
def _deserialise_inventory(self, revision_id, bytes):
972
return inventory.CHKInventory.deserialise(self.chk_bytes, bytes,
975
def _iter_inventories(self, revision_ids, ordering):
976
"""Iterate over many inventory objects."""
978
ordering = 'unordered'
979
keys = [(revision_id,) for revision_id in revision_ids]
980
stream = self.inventories.get_record_stream(keys, ordering, True)
982
for record in stream:
983
if record.storage_kind != 'absent':
984
texts[record.key] = record.get_bytes_as('fulltext')
986
texts[record.key] = None
990
yield (None, key[-1])
992
yield (inventory.CHKInventory.deserialise(
993
self.chk_bytes, bytes, key), key[-1])
995
def _get_inventory_xml(self, revision_id):
996
"""Get serialized inventory as a string."""
997
# Without a native 'xml' inventory, this method doesn't make sense.
998
# However older working trees, and older bundles want it - so we supply
999
# it allowing _get_inventory_xml to work. Bundles currently use the
1000
# serializer directly; this also isn't ideal, but there isn't an xml
1001
# iteration interface offered at all for repositories.
1002
return self._serializer.write_inventory_to_string(
1003
self.get_inventory(revision_id))
1005
def _find_present_inventory_keys(self, revision_keys):
1006
parent_map = self.inventories.get_parent_map(revision_keys)
1007
present_inventory_keys = set(k for k in parent_map)
1008
return present_inventory_keys
1010
def fileids_altered_by_revision_ids(self, revision_ids, _inv_weave=None):
1011
"""Find the file ids and versions affected by revisions.
1013
:param revisions: an iterable containing revision ids.
1014
:param _inv_weave: The inventory weave from this repository or None.
1015
If None, the inventory weave will be opened automatically.
1016
:return: a dictionary mapping altered file-ids to an iterable of
1017
revision_ids. Each altered file-ids has the exact revision_ids that
1018
altered it listed explicitly.
1020
rich_root = self.supports_rich_root()
1021
bytes_to_info = inventory.CHKInventory._bytes_to_utf8name_key
1022
file_id_revisions = {}
1023
pb = ui.ui_factory.nested_progress_bar()
1025
revision_keys = [(r,) for r in revision_ids]
1026
parent_keys = self._find_parent_keys_of_revisions(revision_keys)
1027
# TODO: instead of using _find_present_inventory_keys, change the
1028
# code paths to allow missing inventories to be tolerated.
1029
# However, we only want to tolerate missing parent
1030
# inventories, not missing inventories for revision_ids
1031
present_parent_inv_keys = self._find_present_inventory_keys(
1033
present_parent_inv_ids = set(
1034
[k[-1] for k in present_parent_inv_keys])
1035
inventories_to_read = set(revision_ids)
1036
inventories_to_read.update(present_parent_inv_ids)
1037
root_key_info = _build_interesting_key_sets(
1038
self, inventories_to_read, present_parent_inv_ids)
1039
interesting_root_keys = root_key_info.interesting_root_keys
1040
uninteresting_root_keys = root_key_info.uninteresting_root_keys
1041
chk_bytes = self.chk_bytes
1042
for record, items in chk_map.iter_interesting_nodes(chk_bytes,
1043
interesting_root_keys, uninteresting_root_keys,
1045
for name, bytes in items:
1046
(name_utf8, file_id, revision_id) = bytes_to_info(bytes)
1047
# TODO: consider interning file_id, revision_id here, or
1048
# pushing that intern() into bytes_to_info()
1049
# TODO: rich_root should always be True here, for all
1050
# repositories that support chk_bytes
1051
if not rich_root and name_utf8 == '':
1054
file_id_revisions[file_id].add(revision_id)
1056
file_id_revisions[file_id] = set([revision_id])
1059
return file_id_revisions
1061
def find_text_key_references(self):
1062
"""Find the text key references within the repository.
1064
:return: A dictionary mapping text keys ((fileid, revision_id) tuples)
1065
to whether they were referred to by the inventory of the
1066
revision_id that they contain. The inventory texts from all present
1067
revision ids are assessed to generate this report.
1069
# XXX: Slow version but correct: rewrite as a series of delta
1070
# examinations/direct tree traversal. Note that that will require care
1071
# as a common node is reachable both from the inventory that added it,
1072
# and others afterwards.
1073
revision_keys = self.revisions.keys()
1075
rich_roots = self.supports_rich_root()
1076
pb = ui.ui_factory.nested_progress_bar()
1078
all_revs = self.all_revision_ids()
1079
total = len(all_revs)
1080
for pos, inv in enumerate(self.iter_inventories(all_revs)):
1081
pb.update("Finding text references", pos, total)
1082
for _, entry in inv.iter_entries():
1083
if not rich_roots and entry.file_id == inv.root_id:
1085
key = (entry.file_id, entry.revision)
1086
result.setdefault(key, False)
1087
if entry.revision == inv.revision_id:
1094
def reconcile_canonicalize_chks(self):
1095
"""Reconcile this repository to make sure all CHKs are in canonical
1098
from bzrlib.reconcile import PackReconciler
1099
reconciler = PackReconciler(self, thorough=True, canonicalize_chks=True)
1100
reconciler.reconcile()
1103
def _reconcile_pack(self, collection, packs, extension, revs, pb):
1104
packer = GCCHKReconcilePacker(collection, packs, extension)
1105
return packer.pack(pb)
1107
def _canonicalize_chks_pack(self, collection, packs, extension, revs, pb):
1108
packer = GCCHKCanonicalizingPacker(collection, packs, extension, revs)
1109
return packer.pack(pb)
1111
def _get_source(self, to_format):
1112
"""Return a source for streaming from this repository."""
1113
if self._format._serializer == to_format._serializer:
1114
# We must be exactly the same format, otherwise stuff like the chk
1115
# page layout might be different.
1116
# Actually, this test is just slightly looser than exact so that
1117
# CHK2 <-> 2a transfers will work.
1118
return GroupCHKStreamSource(self, to_format)
1119
return super(CHKInventoryRepository, self)._get_source(to_format)
1121
def _find_inconsistent_revision_parents(self, revisions_iterator=None):
1122
"""Find revisions with different parent lists in the revision object
1123
and in the index graph.
1125
:param revisions_iterator: None, or an iterator of (revid,
1126
Revision-or-None). This iterator controls the revisions checked.
1127
:returns: an iterator yielding tuples of (revison-id, parents-in-index,
1128
parents-in-revision).
1130
if not self.is_locked():
1131
raise AssertionError()
1133
if revisions_iterator is None:
1134
revisions_iterator = self._iter_revisions(None)
1135
for revid, revision in revisions_iterator:
1136
if revision is None:
1138
parent_map = vf.get_parent_map([(revid,)])
1139
parents_according_to_index = tuple(parent[-1] for parent in
1140
parent_map[(revid,)])
1141
parents_according_to_revision = tuple(revision.parent_ids)
1142
if parents_according_to_index != parents_according_to_revision:
1143
yield (revid, parents_according_to_index,
1144
parents_according_to_revision)
1146
def _check_for_inconsistent_revision_parents(self):
1147
inconsistencies = list(self._find_inconsistent_revision_parents())
1149
raise errors.BzrCheckError(
1150
"Revision index has inconsistent parents.")
1153
class GroupCHKStreamSource(StreamSource):
1154
"""Used when both the source and target repo are GroupCHK repos."""
1156
def __init__(self, from_repository, to_format):
1157
"""Create a StreamSource streaming from from_repository."""
1158
super(GroupCHKStreamSource, self).__init__(from_repository, to_format)
1159
self._revision_keys = None
1160
self._text_keys = None
1161
self._text_fetch_order = 'groupcompress'
1162
self._chk_id_roots = None
1163
self._chk_p_id_roots = None
1165
def _get_inventory_stream(self, inventory_keys, allow_absent=False):
1166
"""Get a stream of inventory texts.
1168
When this function returns, self._chk_id_roots and self._chk_p_id_roots
1169
should be populated.
1171
self._chk_id_roots = []
1172
self._chk_p_id_roots = []
1173
def _filtered_inv_stream():
1174
id_roots_set = set()
1175
p_id_roots_set = set()
1176
source_vf = self.from_repository.inventories
1177
stream = source_vf.get_record_stream(inventory_keys,
1178
'groupcompress', True)
1179
for record in stream:
1180
if record.storage_kind == 'absent':
1184
raise errors.NoSuchRevision(self, record.key)
1185
bytes = record.get_bytes_as('fulltext')
1186
chk_inv = inventory.CHKInventory.deserialise(None, bytes,
1188
key = chk_inv.id_to_entry.key()
1189
if key not in id_roots_set:
1190
self._chk_id_roots.append(key)
1191
id_roots_set.add(key)
1192
p_id_map = chk_inv.parent_id_basename_to_file_id
1193
if p_id_map is None:
1194
raise AssertionError('Parent id -> file_id map not set')
1195
key = p_id_map.key()
1196
if key not in p_id_roots_set:
1197
p_id_roots_set.add(key)
1198
self._chk_p_id_roots.append(key)
1200
# We have finished processing all of the inventory records, we
1201
# don't need these sets anymore
1202
id_roots_set.clear()
1203
p_id_roots_set.clear()
1204
return ('inventories', _filtered_inv_stream())
1206
def _get_filtered_chk_streams(self, excluded_revision_keys):
1207
self._text_keys = set()
1208
excluded_revision_keys.discard(_mod_revision.NULL_REVISION)
1209
if not excluded_revision_keys:
1210
uninteresting_root_keys = set()
1211
uninteresting_pid_root_keys = set()
1213
# filter out any excluded revisions whose inventories are not
1215
# TODO: Update Repository.iter_inventories() to add
1216
# ignore_missing=True
1217
present_keys = self.from_repository._find_present_inventory_keys(
1218
excluded_revision_keys)
1219
present_ids = [k[-1] for k in present_keys]
1220
uninteresting_root_keys = set()
1221
uninteresting_pid_root_keys = set()
1222
for inv in self.from_repository.iter_inventories(present_ids):
1223
uninteresting_root_keys.add(inv.id_to_entry.key())
1224
uninteresting_pid_root_keys.add(
1225
inv.parent_id_basename_to_file_id.key())
1226
chk_bytes = self.from_repository.chk_bytes
1227
def _filter_id_to_entry():
1228
interesting_nodes = chk_map.iter_interesting_nodes(chk_bytes,
1229
self._chk_id_roots, uninteresting_root_keys)
1230
for record in _filter_text_keys(interesting_nodes, self._text_keys,
1231
chk_map._bytes_to_text_key):
1232
if record is not None:
1235
self._chk_id_roots = None
1236
yield 'chk_bytes', _filter_id_to_entry()
1237
def _get_parent_id_basename_to_file_id_pages():
1238
for record, items in chk_map.iter_interesting_nodes(chk_bytes,
1239
self._chk_p_id_roots, uninteresting_pid_root_keys):
1240
if record is not None:
1243
self._chk_p_id_roots = None
1244
yield 'chk_bytes', _get_parent_id_basename_to_file_id_pages()
1246
def _get_text_stream(self):
1247
# Note: We know we don't have to handle adding root keys, because both
1248
# the source and target are the identical network name.
1249
text_stream = self.from_repository.texts.get_record_stream(
1250
self._text_keys, self._text_fetch_order, False)
1251
return ('texts', text_stream)
1253
def get_stream(self, search):
1254
def wrap_and_count(pb, rc, stream):
1255
"""Yield records from stream while showing progress."""
1257
for record in stream:
1258
if count == rc.STEP:
1260
pb.update('Estimate', rc.current, rc.max)
1265
revision_ids = search.get_keys()
1266
pb = ui.ui_factory.nested_progress_bar()
1267
rc = self._record_counter
1268
self._record_counter.setup(len(revision_ids))
1269
for stream_info in self._fetch_revision_texts(revision_ids):
1270
yield (stream_info[0],
1271
wrap_and_count(pb, rc, stream_info[1]))
1272
self._revision_keys = [(rev_id,) for rev_id in revision_ids]
1273
# TODO: The keys to exclude might be part of the search recipe
1274
# For now, exclude all parents that are at the edge of ancestry, for
1275
# which we have inventories
1276
from_repo = self.from_repository
1277
parent_keys = from_repo._find_parent_keys_of_revisions(
1278
self._revision_keys)
1279
self.from_repository.revisions.clear_cache()
1280
self.from_repository.signatures.clear_cache()
1281
# Clear the repo's get_parent_map cache too.
1282
self.from_repository._unstacked_provider.disable_cache()
1283
self.from_repository._unstacked_provider.enable_cache()
1284
s = self._get_inventory_stream(self._revision_keys)
1285
yield (s[0], wrap_and_count(pb, rc, s[1]))
1286
self.from_repository.inventories.clear_cache()
1287
for stream_info in self._get_filtered_chk_streams(parent_keys):
1288
yield (stream_info[0], wrap_and_count(pb, rc, stream_info[1]))
1289
self.from_repository.chk_bytes.clear_cache()
1290
s = self._get_text_stream()
1291
yield (s[0], wrap_and_count(pb, rc, s[1]))
1292
self.from_repository.texts.clear_cache()
1293
pb.update('Done', rc.max, rc.max)
1296
def get_stream_for_missing_keys(self, missing_keys):
1297
# missing keys can only occur when we are byte copying and not
1298
# translating (because translation means we don't send
1299
# unreconstructable deltas ever).
1300
missing_inventory_keys = set()
1301
for key in missing_keys:
1302
if key[0] != 'inventories':
1303
raise AssertionError('The only missing keys we should'
1304
' be filling in are inventory keys, not %s'
1306
missing_inventory_keys.add(key[1:])
1307
if self._chk_id_roots or self._chk_p_id_roots:
1308
raise AssertionError('Cannot call get_stream_for_missing_keys'
1309
' until all of get_stream() has been consumed.')
1310
# Yield the inventory stream, so we can find the chk stream
1311
# Some of the missing_keys will be missing because they are ghosts.
1312
# As such, we can ignore them. The Sink is required to verify there are
1313
# no unavailable texts when the ghost inventories are not filled in.
1314
yield self._get_inventory_stream(missing_inventory_keys,
1316
# We use the empty set for excluded_revision_keys, to make it clear
1317
# that we want to transmit all referenced chk pages.
1318
for stream_info in self._get_filtered_chk_streams(set()):
1322
class _InterestingKeyInfo(object):
1324
self.interesting_root_keys = set()
1325
self.interesting_pid_root_keys = set()
1326
self.uninteresting_root_keys = set()
1327
self.uninteresting_pid_root_keys = set()
1329
def all_interesting(self):
1330
return self.interesting_root_keys.union(self.interesting_pid_root_keys)
1332
def all_uninteresting(self):
1333
return self.uninteresting_root_keys.union(
1334
self.uninteresting_pid_root_keys)
1337
return self.all_interesting().union(self.all_uninteresting())
1340
def _build_interesting_key_sets(repo, inventory_ids, parent_only_inv_ids):
1341
result = _InterestingKeyInfo()
1342
for inv in repo.iter_inventories(inventory_ids, 'unordered'):
1343
root_key = inv.id_to_entry.key()
1344
pid_root_key = inv.parent_id_basename_to_file_id.key()
1345
if inv.revision_id in parent_only_inv_ids:
1346
result.uninteresting_root_keys.add(root_key)
1347
result.uninteresting_pid_root_keys.add(pid_root_key)
1349
result.interesting_root_keys.add(root_key)
1350
result.interesting_pid_root_keys.add(pid_root_key)
1354
def _filter_text_keys(interesting_nodes_iterable, text_keys, bytes_to_text_key):
1355
"""Iterate the result of iter_interesting_nodes, yielding the records
1356
and adding to text_keys.
1358
text_keys_update = text_keys.update
1359
for record, items in interesting_nodes_iterable:
1360
text_keys_update([bytes_to_text_key(b) for n,b in items])
1364
class RepositoryFormat2a(RepositoryFormatPack):
1365
"""A CHK repository that uses the bencode revision serializer."""
1367
repository_class = CHKInventoryRepository
1368
supports_external_lookups = True
1369
supports_chks = True
1370
_commit_builder_class = PackRootCommitBuilder
1371
rich_root_data = True
1372
_serializer = chk_serializer.chk_bencode_serializer
1373
_commit_inv_deltas = True
1374
# What index classes to use
1375
index_builder_class = BTreeBuilder
1376
index_class = BTreeGraphIndex
1377
# Note: We cannot unpack a delta that references a text we haven't
1378
# seen yet. There are 2 options, work in fulltexts, or require
1379
# topological sorting. Using fulltexts is more optimal for local
1380
# operations, because the source can be smart about extracting
1381
# multiple in-a-row (and sharing strings). Topological is better
1382
# for remote, because we access less data.
1383
_fetch_order = 'unordered'
1384
_fetch_uses_deltas = False # essentially ignored by the groupcompress code.
1386
pack_compresses = True
1388
def _get_matching_bzrdir(self):
1389
return controldir.format_registry.make_bzrdir('2a')
1391
def _ignore_setting_bzrdir(self, format):
1394
_matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
1397
def get_format_string(cls):
1398
return ('Bazaar repository format 2a (needs bzr 1.16 or later)\n')
1400
def get_format_description(self):
1401
"""See RepositoryFormat.get_format_description()."""
1402
return ("Repository format 2a - rich roots, group compression"
1403
" and chk inventories")
1406
class RepositoryFormat2aSubtree(RepositoryFormat2a):
1407
"""A 2a repository format that supports nested trees.
1411
def _get_matching_bzrdir(self):
1412
return controldir.format_registry.make_bzrdir('development-subtree')
1414
def _ignore_setting_bzrdir(self, format):
1417
_matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
1420
def get_format_string(cls):
1421
return ('Bazaar development format 8\n')
1423
def get_format_description(self):
1424
"""See RepositoryFormat.get_format_description()."""
1425
return ("Development repository format 8 - nested trees, "
1426
"group compression and chk inventories")
1429
supports_tree_reference = True