228
237
self.repo.signatures._index._add_callback = self.signature_index.add_callback
229
238
self.repo.texts._index._add_callback = self.text_index.add_callback
240
def _get_filtered_inv_stream(self, source_vf, keys):
241
"""Filter the texts of inventories, to find the chk pages."""
245
p_id_roots_set = set()
246
def _filter_inv_stream(stream):
247
for idx, record in enumerate(stream):
248
### child_pb.update('fetch inv', idx, len(inv_keys_to_fetch))
249
bytes = record.get_bytes_as('fulltext')
250
chk_inv = inventory.CHKInventory.deserialise(None, bytes, record.key)
251
key = chk_inv.id_to_entry.key()
252
if key not in id_roots_set:
254
id_roots_set.add(key)
255
p_id_map = chk_inv.parent_id_basename_to_file_id
256
if p_id_map is not None:
258
if key not in p_id_roots_set:
259
p_id_roots_set.add(key)
260
p_id_roots.append(key)
262
stream = source_vf.get_record_stream(keys, 'gc-optimal', True)
263
return _filter_inv_stream(stream), id_roots, p_id_roots
265
def _get_chk_stream(self, source_vf, keys, id_roots, p_id_roots, pb=None):
266
# We want to stream the keys from 'id_roots', and things they
267
# reference, and then stream things from p_id_roots and things they
268
# reference, and then any remaining keys that we didn't get to.
270
# We also group referenced texts together, so if one root references a
271
# text with prefix 'a', and another root references a node with prefix
272
# 'a', we want to yield those nodes before we yield the nodes for 'b'
273
# This keeps 'similar' nodes together.
275
# Note: We probably actually want multiple streams here, to help the
276
# client understand that the different levels won't compress well
277
# against each other.
278
# Test the difference between using one Group per level, and
279
# using 1 Group per prefix. (so '' (root) would get a group, then
280
# all the references to search-key 'a' would get a group, etc.)
281
remaining_keys = set(keys)
283
def _get_referenced_stream(root_keys):
286
keys_by_search_prefix = {}
287
remaining_keys.difference_update(cur_keys)
289
stream = source_vf.get_record_stream(cur_keys, 'as-requested',
292
for record in stream:
293
bytes = record.get_bytes_as('fulltext')
294
# We don't care about search_key_func for this code,
295
# because we only care about external references.
296
node = chk_map._deserialise(bytes, record.key,
297
search_key_func=None)
298
common_base = node._search_prefix
299
if isinstance(node, chk_map.InternalNode):
300
for prefix, value in node._items.iteritems():
301
if not isinstance(value, tuple):
302
raise AssertionError("value is %s when"
303
" tuple expected" % (value.__class__))
304
if value not in next_keys:
305
keys_by_search_prefix.setdefault(prefix,
310
pb.update('chk node', counter[0])
313
# Double check that we won't be emitting any keys twice
314
next_keys = next_keys.intersection(remaining_keys)
316
for prefix in sorted(keys_by_search_prefix):
317
cur_keys.extend(keys_by_search_prefix[prefix])
318
for stream in _get_referenced_stream(id_roots):
320
for stream in _get_referenced_stream(p_id_roots):
323
note('There were %d keys in the chk index, which were not'
324
' referenced from inventories', len(remaining_keys))
325
stream = source_vf.get_record_stream(remaining_keys, 'unordered',
329
def _execute_pack_operations(self, pack_operations, _packer_class=Packer,
331
"""Execute a series of pack operations.
333
:param pack_operations: A list of [revision_count, packs_to_combine].
334
:param _packer_class: The class of packer to use (default: Packer).
337
for revision_count, packs in pack_operations:
338
# we may have no-ops from the setup logic
341
# Create a new temp VersionedFile instance based on these packs,
342
# and then just fetch everything into the target
344
to_copy = [('revision_index', 'revisions'),
345
('inventory_index', 'inventories'),
346
('text_index', 'texts'),
347
('signature_index', 'signatures'),
349
# TODO: This is a very non-optimal ordering for chk_bytes. The
350
# issue is that pages that are similar are not transmitted
351
# together. Perhaps get_record_stream('gc-optimal') should be
352
# taught about how to group chk pages?
354
if getattr(self, 'chk_index', None) is not None:
356
to_copy.insert(2, ('chk_index', 'chk_bytes'))
358
# Shouldn't we start_write_group around this?
359
if self._new_pack is not None:
360
raise errors.BzrError('call to %s.pack() while another pack is'
362
% (self.__class__.__name__,))
363
new_pack = self.pack_factory(self, '.autopack',
364
file_mode=self.repo.bzrdir._get_file_mode())
365
new_pack.set_write_cache_size(1024*1024)
366
# TODO: A better alternative is to probably use Packer.open_pack(), and
367
# then create a GroupCompressVersionedFiles() around the
368
# target pack to insert into.
369
pb = ui.ui_factory.nested_progress_bar()
371
for idx, (index_name, vf_name) in enumerate(to_copy):
372
pb.update('repacking %s' % (vf_name,), idx + 1, len(to_copy))
374
new_index = getattr(new_pack, index_name)
375
new_index.set_optimize(for_size=True)
377
source_index = getattr(pack, index_name)
378
keys.update(e[1] for e in source_index.iter_all_entries())
379
source_vf = getattr(self.repo, vf_name)
380
target_access = knit._DirectPackAccess({})
381
target_access.set_writer(new_pack._writer, new_index,
382
new_pack.access_tuple())
383
target_vf = GroupCompressVersionedFiles(
384
_GCGraphIndex(new_index,
385
add_callback=new_index.add_nodes,
386
parents=source_vf._index._parents,
387
is_locked=self.repo.is_locked),
388
access=target_access,
389
delta=source_vf._delta)
391
child_pb = ui.ui_factory.nested_progress_bar()
394
if vf_name == 'inventories':
395
stream, id_roots, p_id_roots = self._get_filtered_inv_stream(
397
elif vf_name == 'chk_bytes':
398
for stream in self._get_chk_stream(source_vf, keys,
399
id_roots, p_id_roots,
401
target_vf.insert_record_stream(stream)
406
substream = source_vf.get_record_stream(keys, 'gc-optimal', True)
407
for idx, record in enumerate(substream):
408
child_pb.update(vf_name, idx + 1, len(keys))
411
target_vf.insert_record_stream(stream)
414
new_pack._check_references() # shouldn't be needed
421
if not new_pack.data_inserted():
422
raise AssertionError('We copied from pack files,'
423
' but had no data copied')
424
# we need to abort somehow, because we don't want to remove
427
self.allocate(new_pack)
429
self._remove_pack_from_memory(pack)
430
# record the newly available packs and stop advertising the old
432
self._save_pack_names(clear_obsolete_packs=True)
433
# Move the old packs out of the way now they are no longer referenced.
434
for revision_count, packs in pack_operations:
435
self._obsolete_packs(packs)
233
439
class GCPackRepository(KnitPackRepository):
234
440
"""GC customisation of KnitPackRepository."""
442
# Note: I think the CHK support can be dropped from this class as it's
443
# implemented via the GCCHKPackRepository class defined next. IGC 20090301
236
445
def __init__(self, _format, a_bzrdir, control_files, _commit_builder_class,
238
447
"""Overridden to change pack collection class."""
313
531
_commit_builder_class, _serializer)
314
532
# and now replace everything it did :)
315
533
index_transport = self._transport.clone('indices')
317
self._pack_collection = GCRepositoryPackCollection(self,
318
self._transport, index_transport,
319
self._transport.clone('upload'),
320
self._transport.clone('packs'),
321
_format.index_builder_class,
323
use_chk_index=self._format.supports_chks,
326
self._pack_collection = GCRepositoryPackCollection(self,
327
self._transport, index_transport,
328
self._transport.clone('upload'),
329
self._transport.clone('packs'),
330
_format.index_builder_class,
534
self._pack_collection = GCRepositoryPackCollection(self,
535
self._transport, index_transport,
536
self._transport.clone('upload'),
537
self._transport.clone('packs'),
538
_format.index_builder_class,
540
use_chk_index=self._format.supports_chks,
332
542
self.inventories = GroupCompressVersionedFiles(
333
543
_GCGraphIndex(self._pack_collection.inventory_index.combined_index,
334
544
add_callback=self._pack_collection.inventory_index.add_callback,
351
561
add_callback=self._pack_collection.text_index.add_callback,
352
562
parents=True, is_locked=self.is_locked),
353
563
access=self._pack_collection.text_index.data_access)
354
if chk_support and _format.supports_chks:
355
# No graph, no compression:- references from chks are between
356
# different objects not temporal versions of the same; and without
357
# some sort of temporal structure knit compression will just fail.
358
self.chk_bytes = GroupCompressVersionedFiles(
359
_GCGraphIndex(self._pack_collection.chk_index.combined_index,
360
add_callback=self._pack_collection.chk_index.add_callback,
361
parents=False, is_locked=self.is_locked),
362
access=self._pack_collection.chk_index.data_access)
364
self.chk_bytes = None
564
assert _format.supports_chks
565
# No parents, individual CHK pages don't have specific ancestry
566
self.chk_bytes = GroupCompressVersionedFiles(
567
_GCGraphIndex(self._pack_collection.chk_index.combined_index,
568
add_callback=self._pack_collection.chk_index.add_callback,
569
parents=False, is_locked=self.is_locked),
570
access=self._pack_collection.chk_index.data_access)
365
571
# True when the repository object is 'write locked' (as opposed to the
366
# physical lock only taken out around changes to the pack-names list.)
572
# physical lock only taken out around changes to the pack-names list.)
367
573
# Another way to represent this would be a decorator around the control
368
574
# files object that presents logical locks as physical ones - if this
369
575
# gets ugly consider that alternative design. RBC 20071011
423
638
", interoperates with pack-0.92-subtrees\n")
426
'Bazaar development format - 1.9+gc (needs bzr.dev from 1.9)\n',
427
class RepositoryFormatPackGCPlainCHK(RepositoryFormatPackDevelopment4):
641
class RepositoryFormatPackGCPlainCHK(RepositoryFormatPackDevelopment5):
428
642
"""A CHK+group compress pack repository."""
430
644
repository_class = GCCHKPackRepository
432
646
def get_format_string(self):
433
647
"""See RepositoryFormat.get_format_string()."""
434
return ('Bazaar development format - chk+gc '
435
'(needs bzr.dev from 1.12)\n')
437
def get_format_description(self):
438
"""See RepositoryFormat.get_format_description()."""
439
return ("Development repository format - chk+groupcompress "
440
", interoperates with pack-0.92\n")
648
return ('Bazaar development format - chk+gc'
649
' (needs bzr.dev from 1.13)\n')
651
def get_format_description(self):
652
"""See RepositoryFormat.get_format_description()."""
653
return ("Development repository format - chk+groupcompress")
656
class RepositoryFormatPackGCPlainCHK16(RepositoryFormatPackDevelopment5Hash16):
657
"""A hashed CHK+group compress pack repository."""
659
repository_class = GCCHKPackRepository
661
def get_format_string(self):
662
"""See RepositoryFormat.get_format_string()."""
663
return ('Bazaar development format - hash16chk+gc'
664
' (needs bzr.dev from 1.13)\n')
666
def get_format_description(self):
667
"""See RepositoryFormat.get_format_description()."""
668
return ("Development repository format - hash16chk+groupcompress")
671
## class RepositoryFormatPackGCPlainCHK16b(RepositoryFormatPackDevelopment5Hash16b):
672
## """A hashed CHK+group compress pack repository."""
674
## repository_class = GCCHKPackRepository
676
## def get_format_string(self):
677
## """See RepositoryFormat.get_format_string()."""
678
## return ('Bazaar development format - hash16bchk+gc'
679
## ' (needs bzr.dev from 1.13)\n')
681
## def get_format_description(self):
682
## """See RepositoryFormat.get_format_description()."""
683
## return ("Development repository format - hash16bchk+groupcompress")
686
## class RepositoryFormatPackGCPlainCHK63(RepositoryFormatPackDevelopment5Hash63):
687
## """A hashed CHK+group compress pack repository."""
689
## repository_class = GCCHKPackRepository
691
## def get_format_string(self):
692
## """See RepositoryFormat.get_format_string()."""
693
## return ('Bazaar development format - hash63+gc'
694
## ' (needs bzr.dev from 1.13)\n')
696
## def get_format_description(self):
697
## """See RepositoryFormat.get_format_description()."""
698
## return ("Development repository format - hash63+groupcompress")
701
## class RepositoryFormatPackGCPlainCHK127a(RepositoryFormatPackDevelopment5Hash127a):
702
## """A hashed CHK+group compress pack repository."""
704
## repository_class = GCCHKPackRepository
706
## def get_format_string(self):
707
## """See RepositoryFormat.get_format_string()."""
708
## return ('Bazaar development format - hash127a+gc'
709
## ' (needs bzr.dev from 1.13)\n')
711
## def get_format_description(self):
712
## """See RepositoryFormat.get_format_description()."""
713
## return ("Development repository format - hash127a+groupcompress")
716
## class RepositoryFormatPackGCPlainCHK127b(RepositoryFormatPackDevelopment5Hash127b):
717
## """A hashed CHK+group compress pack repository."""
719
## repository_class = GCCHKPackRepository
721
## def get_format_string(self):
722
## """See RepositoryFormat.get_format_string()."""
723
## return ('Bazaar development format - hash127b+gc'
724
## ' (needs bzr.dev from 1.13)\n')
726
## def get_format_description(self):
727
## """See RepositoryFormat.get_format_description()."""
728
## return ("Development repository format - hash127b+groupcompress")
731
class RepositoryFormatPackGCPlainCHK255(RepositoryFormatPackDevelopment5Hash255):
732
"""A hashed CHK+group compress pack repository."""
734
repository_class = GCCHKPackRepository
736
def get_format_string(self):
737
"""See RepositoryFormat.get_format_string()."""
738
return ('Bazaar development format - hash255chk+gc'
739
' (needs bzr.dev from 1.13)\n')
741
def get_format_description(self):
742
"""See RepositoryFormat.get_format_description()."""
743
return ("Development repository format - hash255chk+groupcompress")
446
746
def pack_incompatible(source, target, orig_method=InterPackRepo.is_compatible):