1
# Copyright (C) 2008-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Repository formats using CHK inventories and groupcompress compression."""
31
revision as _mod_revision,
36
from bzrlib.btree_index import (
40
from bzrlib.decorators import needs_write_lock
41
from bzrlib.groupcompress import (
43
GroupCompressVersionedFiles,
45
from bzrlib.repofmt.pack_repo import (
50
PackRootCommitBuilder,
51
RepositoryPackCollection,
56
from bzrlib.vf_repository import (
59
from bzrlib.static_tuple import StaticTuple
62
class GCPack(NewPack):
64
def __init__(self, pack_collection, upload_suffix='', file_mode=None):
65
"""Create a NewPack instance.
67
:param pack_collection: A PackCollection into which this is being
69
:param upload_suffix: An optional suffix to be given to any temporary
70
files created during the pack creation. e.g '.autopack'
71
:param file_mode: An optional file mode to create the new files with.
73
# replaced from NewPack to:
74
# - change inventory reference list length to 1
75
# - change texts reference lists to 1
76
# TODO: patch this to be parameterised
78
# The relative locations of the packs are constrained, but all are
79
# passed in because the caller has them, so as to avoid object churn.
80
index_builder_class = pack_collection._index_builder_class
82
if pack_collection.chk_index is not None:
83
chk_index = index_builder_class(reference_lists=0)
87
# Revisions: parents list, no text compression.
88
index_builder_class(reference_lists=1),
89
# Inventory: We want to map compression only, but currently the
90
# knit code hasn't been updated enough to understand that, so we
91
# have a regular 2-list index giving parents and compression
93
index_builder_class(reference_lists=1),
94
# Texts: per file graph, for all fileids - so one reference list
95
# and two elements in the key tuple.
96
index_builder_class(reference_lists=1, key_elements=2),
97
# Signatures: Just blobs to store, no compression, no parents
99
index_builder_class(reference_lists=0),
100
# CHK based storage - just blobs, no compression or parents.
103
self._pack_collection = pack_collection
104
# When we make readonly indices, we need this.
105
self.index_class = pack_collection._index_class
106
# where should the new pack be opened
107
self.upload_transport = pack_collection._upload_transport
108
# where are indices written out to
109
self.index_transport = pack_collection._index_transport
110
# where is the pack renamed to when it is finished?
111
self.pack_transport = pack_collection._pack_transport
112
# What file mode to upload the pack and indices with.
113
self._file_mode = file_mode
114
# tracks the content written to the .pack file.
115
self._hash = osutils.md5()
116
# a four-tuple with the length in bytes of the indices, once the pack
117
# is finalised. (rev, inv, text, sigs)
118
self.index_sizes = None
119
# How much data to cache when writing packs. Note that this is not
120
# synchronised with reads, because it's not in the transport layer, so
121
# is not safe unless the client knows it won't be reading from the pack
123
self._cache_limit = 0
124
# the temporary pack file name.
125
self.random_name = osutils.rand_chars(20) + upload_suffix
126
# when was this pack started ?
127
self.start_time = time.time()
128
# open an output stream for the data added to the pack.
129
self.write_stream = self.upload_transport.open_write_stream(
130
self.random_name, mode=self._file_mode)
131
if 'pack' in debug.debug_flags:
132
trace.mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
133
time.ctime(), self.upload_transport.base, self.random_name,
134
time.time() - self.start_time)
135
# A list of byte sequences to be written to the new pack, and the
136
# aggregate size of them. Stored as a list rather than separate
137
# variables so that the _write_data closure below can update them.
138
self._buffer = [[], 0]
139
# create a callable for adding data
141
# robertc says- this is a closure rather than a method on the object
142
# so that the variables are locals, and faster than accessing object
144
def _write_data(bytes, flush=False, _buffer=self._buffer,
145
_write=self.write_stream.write, _update=self._hash.update):
146
_buffer[0].append(bytes)
147
_buffer[1] += len(bytes)
149
if _buffer[1] > self._cache_limit or flush:
150
bytes = ''.join(_buffer[0])
154
# expose this on self, for the occasion when clients want to add data.
155
self._write_data = _write_data
156
# a pack writer object to serialise pack records.
157
self._writer = pack.ContainerWriter(self._write_data)
159
# what state is the pack in? (open, finished, aborted)
161
# no name until we finish writing the content
164
def _check_references(self):
165
"""Make sure our external references are present.
167
Packs are allowed to have deltas whose base is not in the pack, but it
168
must be present somewhere in this collection. It is not allowed to
169
have deltas based on a fallback repository.
170
(See <https://bugs.launchpad.net/bzr/+bug/288751>)
172
# Groupcompress packs don't have any external references, arguably CHK
173
# pages have external references, but we cannot 'cheaply' determine
174
# them without actually walking all of the chk pages.
177
class ResumedGCPack(ResumedPack):
179
def _check_references(self):
180
"""Make sure our external compression parents are present."""
181
# See GCPack._check_references for why this is empty
183
def _get_external_refs(self, index):
184
# GC repositories don't have compression parents external to a given
189
class GCCHKPacker(Packer):
190
"""This class understand what it takes to collect a GCCHK repo."""
192
def __init__(self, pack_collection, packs, suffix, revision_ids=None,
194
super(GCCHKPacker, self).__init__(pack_collection, packs, suffix,
195
revision_ids=revision_ids,
196
reload_func=reload_func)
197
self._pack_collection = pack_collection
198
# ATM, We only support this for GCCHK repositories
199
if pack_collection.chk_index is None:
200
raise AssertionError('pack_collection.chk_index should not be None')
201
self._gather_text_refs = False
202
self._chk_id_roots = []
203
self._chk_p_id_roots = []
204
self._text_refs = None
205
# set by .pack() if self.revision_ids is not None
206
self.revision_keys = None
208
def _get_progress_stream(self, source_vf, keys, message, pb):
210
substream = source_vf.get_record_stream(keys, 'groupcompress', True)
211
for idx, record in enumerate(substream):
213
pb.update(message, idx + 1, len(keys))
217
def _get_filtered_inv_stream(self, source_vf, keys, message, pb=None):
218
"""Filter the texts of inventories, to find the chk pages."""
219
total_keys = len(keys)
220
def _filtered_inv_stream():
222
p_id_roots_set = set()
223
stream = source_vf.get_record_stream(keys, 'groupcompress', True)
224
for idx, record in enumerate(stream):
225
# Inventories should always be with revisions; assume success.
226
bytes = record.get_bytes_as('fulltext')
227
chk_inv = inventory.CHKInventory.deserialise(None, bytes,
230
pb.update('inv', idx, total_keys)
231
key = chk_inv.id_to_entry.key()
232
if key not in id_roots_set:
233
self._chk_id_roots.append(key)
234
id_roots_set.add(key)
235
p_id_map = chk_inv.parent_id_basename_to_file_id
237
raise AssertionError('Parent id -> file_id map not set')
239
if key not in p_id_roots_set:
240
p_id_roots_set.add(key)
241
self._chk_p_id_roots.append(key)
243
# We have finished processing all of the inventory records, we
244
# don't need these sets anymore
246
p_id_roots_set.clear()
247
return _filtered_inv_stream()
249
def _get_chk_streams(self, source_vf, keys, pb=None):
250
# We want to stream the keys from 'id_roots', and things they
251
# reference, and then stream things from p_id_roots and things they
252
# reference, and then any remaining keys that we didn't get to.
254
# We also group referenced texts together, so if one root references a
255
# text with prefix 'a', and another root references a node with prefix
256
# 'a', we want to yield those nodes before we yield the nodes for 'b'
257
# This keeps 'similar' nodes together.
259
# Note: We probably actually want multiple streams here, to help the
260
# client understand that the different levels won't compress well
261
# against each other.
262
# Test the difference between using one Group per level, and
263
# using 1 Group per prefix. (so '' (root) would get a group, then
264
# all the references to search-key 'a' would get a group, etc.)
265
total_keys = len(keys)
266
remaining_keys = set(keys)
268
if self._gather_text_refs:
269
self._text_refs = set()
270
def _get_referenced_stream(root_keys, parse_leaf_nodes=False):
273
keys_by_search_prefix = {}
274
remaining_keys.difference_update(cur_keys)
276
def handle_internal_node(node):
277
for prefix, value in node._items.iteritems():
278
# We don't want to request the same key twice, and we
279
# want to order it by the first time it is seen.
280
# Even further, we don't want to request a key which is
281
# not in this group of pack files (it should be in the
282
# repo, but it doesn't have to be in the group being
284
# TODO: consider how to treat externally referenced chk
285
# pages as 'external_references' so that we
286
# always fill them in for stacked branches
287
if value not in next_keys and value in remaining_keys:
288
keys_by_search_prefix.setdefault(prefix,
291
def handle_leaf_node(node):
292
# Store is None, because we know we have a LeafNode, and we
293
# just want its entries
294
for file_id, bytes in node.iteritems(None):
295
self._text_refs.add(chk_map._bytes_to_text_key(bytes))
297
stream = source_vf.get_record_stream(cur_keys,
298
'as-requested', True)
299
for record in stream:
300
if record.storage_kind == 'absent':
301
# An absent CHK record: we assume that the missing
302
# record is in a different pack - e.g. a page not
303
# altered by the commit we're packing.
305
bytes = record.get_bytes_as('fulltext')
306
# We don't care about search_key_func for this code,
307
# because we only care about external references.
308
node = chk_map._deserialise(bytes, record.key,
309
search_key_func=None)
310
common_base = node._search_prefix
311
if isinstance(node, chk_map.InternalNode):
312
handle_internal_node(node)
313
elif parse_leaf_nodes:
314
handle_leaf_node(node)
317
pb.update('chk node', counter[0], total_keys)
320
# Double check that we won't be emitting any keys twice
321
# If we get rid of the pre-calculation of all keys, we could
322
# turn this around and do
323
# next_keys.difference_update(seen_keys)
324
# However, we also may have references to chk pages in another
325
# pack file during autopack. We filter earlier, so we should no
326
# longer need to do this
327
# next_keys = next_keys.intersection(remaining_keys)
329
for prefix in sorted(keys_by_search_prefix):
330
cur_keys.extend(keys_by_search_prefix.pop(prefix))
331
for stream in _get_referenced_stream(self._chk_id_roots,
332
self._gather_text_refs):
334
del self._chk_id_roots
335
# while it isn't really possible for chk_id_roots to not be in the
336
# local group of packs, it is possible that the tree shape has not
337
# changed recently, so we need to filter _chk_p_id_roots by the
339
chk_p_id_roots = [key for key in self._chk_p_id_roots
340
if key in remaining_keys]
341
del self._chk_p_id_roots
342
for stream in _get_referenced_stream(chk_p_id_roots, False):
345
trace.mutter('There were %d keys in the chk index, %d of which'
346
' were not referenced', total_keys,
348
if self.revision_ids is None:
349
stream = source_vf.get_record_stream(remaining_keys,
353
def _build_vf(self, index_name, parents, delta, for_write=False):
354
"""Build a VersionedFiles instance on top of this group of packs."""
355
index_name = index_name + '_index'
357
access = _DirectPackAccess(index_to_pack,
358
reload_func=self._reload_func)
361
if self.new_pack is None:
362
raise AssertionError('No new pack has been set')
363
index = getattr(self.new_pack, index_name)
364
index_to_pack[index] = self.new_pack.access_tuple()
365
index.set_optimize(for_size=True)
366
access.set_writer(self.new_pack._writer, index,
367
self.new_pack.access_tuple())
368
add_callback = index.add_nodes
371
for pack in self.packs:
372
sub_index = getattr(pack, index_name)
373
index_to_pack[sub_index] = pack.access_tuple()
374
indices.append(sub_index)
375
index = _mod_index.CombinedGraphIndex(indices)
377
vf = GroupCompressVersionedFiles(
379
add_callback=add_callback,
381
is_locked=self._pack_collection.repo.is_locked),
386
def _build_vfs(self, index_name, parents, delta):
387
"""Build the source and target VersionedFiles."""
388
source_vf = self._build_vf(index_name, parents,
389
delta, for_write=False)
390
target_vf = self._build_vf(index_name, parents,
391
delta, for_write=True)
392
return source_vf, target_vf
394
def _copy_stream(self, source_vf, target_vf, keys, message, vf_to_stream,
396
trace.mutter('repacking %d %s', len(keys), message)
397
self.pb.update('repacking %s' % (message,), pb_offset)
398
child_pb = ui.ui_factory.nested_progress_bar()
400
stream = vf_to_stream(source_vf, keys, message, child_pb)
401
for _ in target_vf._insert_record_stream(stream,
408
def _copy_revision_texts(self):
409
source_vf, target_vf = self._build_vfs('revision', True, False)
410
if not self.revision_keys:
411
# We are doing a full fetch, aka 'pack'
412
self.revision_keys = source_vf.keys()
413
self._copy_stream(source_vf, target_vf, self.revision_keys,
414
'revisions', self._get_progress_stream, 1)
416
def _copy_inventory_texts(self):
417
source_vf, target_vf = self._build_vfs('inventory', True, True)
418
# It is not sufficient to just use self.revision_keys, as stacked
419
# repositories can have more inventories than they have revisions.
420
# One alternative would be to do something with
421
# get_parent_map(self.revision_keys), but that shouldn't be any faster
423
inventory_keys = source_vf.keys()
424
missing_inventories = set(self.revision_keys).difference(inventory_keys)
425
if missing_inventories:
426
# Go back to the original repo, to see if these are really missing
427
# https://bugs.launchpad.net/bzr/+bug/437003
428
# If we are packing a subset of the repo, it is fine to just have
429
# the data in another Pack file, which is not included in this pack
431
inv_index = self._pack_collection.repo.inventories._index
432
pmap = inv_index.get_parent_map(missing_inventories)
433
really_missing = missing_inventories.difference(pmap)
435
missing_inventories = sorted(really_missing)
436
raise ValueError('We are missing inventories for revisions: %s'
437
% (missing_inventories,))
438
self._copy_stream(source_vf, target_vf, inventory_keys,
439
'inventories', self._get_filtered_inv_stream, 2)
441
def _get_chk_vfs_for_copy(self):
442
return self._build_vfs('chk', False, False)
444
def _copy_chk_texts(self):
445
source_vf, target_vf = self._get_chk_vfs_for_copy()
446
# TODO: This is technically spurious... if it is a performance issue,
448
total_keys = source_vf.keys()
449
trace.mutter('repacking chk: %d id_to_entry roots,'
450
' %d p_id_map roots, %d total keys',
451
len(self._chk_id_roots), len(self._chk_p_id_roots),
453
self.pb.update('repacking chk', 3)
454
child_pb = ui.ui_factory.nested_progress_bar()
456
for stream in self._get_chk_streams(source_vf, total_keys,
458
for _ in target_vf._insert_record_stream(stream,
465
def _copy_text_texts(self):
466
source_vf, target_vf = self._build_vfs('text', True, True)
467
# XXX: We don't walk the chk map to determine referenced (file_id,
468
# revision_id) keys. We don't do it yet because you really need
469
# to filter out the ones that are present in the parents of the
470
# rev just before the ones you are copying, otherwise the filter
471
# is grabbing too many keys...
472
text_keys = source_vf.keys()
473
self._copy_stream(source_vf, target_vf, text_keys,
474
'texts', self._get_progress_stream, 4)
476
def _copy_signature_texts(self):
477
source_vf, target_vf = self._build_vfs('signature', False, False)
478
signature_keys = source_vf.keys()
479
signature_keys.intersection(self.revision_keys)
480
self._copy_stream(source_vf, target_vf, signature_keys,
481
'signatures', self._get_progress_stream, 5)
483
def _create_pack_from_packs(self):
484
self.pb.update('repacking', 0, 7)
485
self.new_pack = self.open_pack()
486
# Is this necessary for GC ?
487
self.new_pack.set_write_cache_size(1024*1024)
488
self._copy_revision_texts()
489
self._copy_inventory_texts()
490
self._copy_chk_texts()
491
self._copy_text_texts()
492
self._copy_signature_texts()
493
self.new_pack._check_references()
494
if not self._use_pack(self.new_pack):
495
self.new_pack.abort()
497
self.new_pack.finish_content()
498
if len(self.packs) == 1:
499
old_pack = self.packs[0]
500
if old_pack.name == self.new_pack._hash.hexdigest():
501
# The single old pack was already optimally packed.
502
trace.mutter('single pack %s was already optimally packed',
504
self.new_pack.abort()
506
self.pb.update('finishing repack', 6, 7)
507
self.new_pack.finish()
508
self._pack_collection.allocate(self.new_pack)
512
class GCCHKReconcilePacker(GCCHKPacker):
513
"""A packer which regenerates indices etc as it copies.
515
This is used by ``bzr reconcile`` to cause parent text pointers to be
519
def __init__(self, *args, **kwargs):
520
super(GCCHKReconcilePacker, self).__init__(*args, **kwargs)
521
self._data_changed = False
522
self._gather_text_refs = True
524
def _copy_inventory_texts(self):
525
source_vf, target_vf = self._build_vfs('inventory', True, True)
526
self._copy_stream(source_vf, target_vf, self.revision_keys,
527
'inventories', self._get_filtered_inv_stream, 2)
528
if source_vf.keys() != self.revision_keys:
529
self._data_changed = True
531
def _copy_text_texts(self):
532
"""generate what texts we should have and then copy."""
533
source_vf, target_vf = self._build_vfs('text', True, True)
534
trace.mutter('repacking %d texts', len(self._text_refs))
535
self.pb.update("repacking texts", 4)
536
# we have three major tasks here:
537
# 1) generate the ideal index
538
repo = self._pack_collection.repo
539
# We want the one we just wrote, so base it on self.new_pack
540
revision_vf = self._build_vf('revision', True, False, for_write=True)
541
ancestor_keys = revision_vf.get_parent_map(revision_vf.keys())
542
# Strip keys back into revision_ids.
543
ancestors = dict((k[0], tuple([p[0] for p in parents]))
544
for k, parents in ancestor_keys.iteritems())
546
# TODO: _generate_text_key_index should be much cheaper to generate from
547
# a chk repository, rather than the current implementation
548
ideal_index = repo._generate_text_key_index(None, ancestors)
549
file_id_parent_map = source_vf.get_parent_map(self._text_refs)
550
# 2) generate a keys list that contains all the entries that can
551
# be used as-is, with corrected parents.
553
new_parent_keys = {} # (key, parent_keys)
555
NULL_REVISION = _mod_revision.NULL_REVISION
556
for key in self._text_refs:
562
ideal_parents = tuple(ideal_index[key])
564
discarded_keys.append(key)
565
self._data_changed = True
567
if ideal_parents == (NULL_REVISION,):
569
source_parents = file_id_parent_map[key]
570
if ideal_parents == source_parents:
574
# We need to change the parent graph, but we don't need to
575
# re-insert the text (since we don't pun the compression
576
# parent with the parents list)
577
self._data_changed = True
578
new_parent_keys[key] = ideal_parents
579
# we're finished with some data.
581
del file_id_parent_map
582
# 3) bulk copy the data, updating records than need it
583
def _update_parents_for_texts():
584
stream = source_vf.get_record_stream(self._text_refs,
585
'groupcompress', False)
586
for record in stream:
587
if record.key in new_parent_keys:
588
record.parents = new_parent_keys[record.key]
590
target_vf.insert_record_stream(_update_parents_for_texts())
592
def _use_pack(self, new_pack):
593
"""Override _use_pack to check for reconcile having changed content."""
594
return new_pack.data_inserted() and self._data_changed
597
class GCCHKCanonicalizingPacker(GCCHKPacker):
598
"""A packer that ensures inventories have canonical-form CHK maps.
600
Ideally this would be part of reconcile, but it's very slow and rarely
601
needed. (It repairs repositories affected by
602
https://bugs.launchpad.net/bzr/+bug/522637).
605
def __init__(self, *args, **kwargs):
606
super(GCCHKCanonicalizingPacker, self).__init__(*args, **kwargs)
607
self._data_changed = False
609
def _exhaust_stream(self, source_vf, keys, message, vf_to_stream, pb_offset):
610
"""Create and exhaust a stream, but don't insert it.
612
This is useful to get the side-effects of generating a stream.
614
self.pb.update('scanning %s' % (message,), pb_offset)
615
child_pb = ui.ui_factory.nested_progress_bar()
617
list(vf_to_stream(source_vf, keys, message, child_pb))
621
def _copy_inventory_texts(self):
622
source_vf, target_vf = self._build_vfs('inventory', True, True)
623
source_chk_vf, target_chk_vf = self._get_chk_vfs_for_copy()
624
inventory_keys = source_vf.keys()
625
# First, copy the existing CHKs on the assumption that most of them
626
# will be correct. This will save us from having to reinsert (and
627
# recompress) these records later at the cost of perhaps preserving a
629
# (Iterate but don't insert _get_filtered_inv_stream to populate the
630
# variables needed by GCCHKPacker._copy_chk_texts.)
631
self._exhaust_stream(source_vf, inventory_keys, 'inventories',
632
self._get_filtered_inv_stream, 2)
633
GCCHKPacker._copy_chk_texts(self)
634
# Now copy and fix the inventories, and any regenerated CHKs.
635
def chk_canonicalizing_inv_stream(source_vf, keys, message, pb=None):
636
return self._get_filtered_canonicalizing_inv_stream(
637
source_vf, keys, message, pb, source_chk_vf, target_chk_vf)
638
self._copy_stream(source_vf, target_vf, inventory_keys,
639
'inventories', chk_canonicalizing_inv_stream, 4)
641
def _copy_chk_texts(self):
642
# No-op; in this class this happens during _copy_inventory_texts.
645
def _get_filtered_canonicalizing_inv_stream(self, source_vf, keys, message,
646
pb=None, source_chk_vf=None, target_chk_vf=None):
647
"""Filter the texts of inventories, regenerating CHKs to make sure they
650
total_keys = len(keys)
651
target_chk_vf = versionedfile.NoDupeAddLinesDecorator(target_chk_vf)
652
def _filtered_inv_stream():
653
stream = source_vf.get_record_stream(keys, 'groupcompress', True)
654
search_key_name = None
655
for idx, record in enumerate(stream):
656
# Inventories should always be with revisions; assume success.
657
bytes = record.get_bytes_as('fulltext')
658
chk_inv = inventory.CHKInventory.deserialise(
659
source_chk_vf, bytes, record.key)
661
pb.update('inv', idx, total_keys)
662
chk_inv.id_to_entry._ensure_root()
663
if search_key_name is None:
664
# Find the name corresponding to the search_key_func
665
search_key_reg = chk_map.search_key_registry
666
for search_key_name, func in search_key_reg.iteritems():
667
if func == chk_inv.id_to_entry._search_key_func:
669
canonical_inv = inventory.CHKInventory.from_inventory(
670
target_chk_vf, chk_inv,
671
maximum_size=chk_inv.id_to_entry._root_node._maximum_size,
672
search_key_name=search_key_name)
673
if chk_inv.id_to_entry.key() != canonical_inv.id_to_entry.key():
675
'Non-canonical CHK map for id_to_entry of inv: %s '
676
'(root is %s, should be %s)' % (chk_inv.revision_id,
677
chk_inv.id_to_entry.key()[0],
678
canonical_inv.id_to_entry.key()[0]))
679
self._data_changed = True
680
p_id_map = chk_inv.parent_id_basename_to_file_id
681
p_id_map._ensure_root()
682
canon_p_id_map = canonical_inv.parent_id_basename_to_file_id
683
if p_id_map.key() != canon_p_id_map.key():
685
'Non-canonical CHK map for parent_id_to_basename of '
686
'inv: %s (root is %s, should be %s)'
687
% (chk_inv.revision_id, p_id_map.key()[0],
688
canon_p_id_map.key()[0]))
689
self._data_changed = True
690
yield versionedfile.ChunkedContentFactory(record.key,
691
record.parents, record.sha1,
692
canonical_inv.to_lines())
693
# We have finished processing all of the inventory records, we
694
# don't need these sets anymore
695
return _filtered_inv_stream()
697
def _use_pack(self, new_pack):
698
"""Override _use_pack to check for reconcile having changed content."""
699
return new_pack.data_inserted() and self._data_changed
702
class GCRepositoryPackCollection(RepositoryPackCollection):
704
pack_factory = GCPack
705
resumed_pack_factory = ResumedGCPack
706
normal_packer_class = GCCHKPacker
707
optimising_packer_class = GCCHKPacker
709
def _check_new_inventories(self):
710
"""Detect missing inventories or chk root entries for the new revisions
713
:returns: list of strs, summarising any problems found. If the list is
714
empty no problems were found.
716
# Ensure that all revisions added in this write group have:
717
# - corresponding inventories,
718
# - chk root entries for those inventories,
719
# - and any present parent inventories have their chk root
721
# And all this should be independent of any fallback repository.
723
key_deps = self.repo.revisions._index._key_dependencies
724
new_revisions_keys = key_deps.get_new_keys()
725
no_fallback_inv_index = self.repo.inventories._index
726
no_fallback_chk_bytes_index = self.repo.chk_bytes._index
727
no_fallback_texts_index = self.repo.texts._index
728
inv_parent_map = no_fallback_inv_index.get_parent_map(
730
# Are any inventories for corresponding to the new revisions missing?
731
corresponding_invs = set(inv_parent_map)
732
missing_corresponding = set(new_revisions_keys)
733
missing_corresponding.difference_update(corresponding_invs)
734
if missing_corresponding:
735
problems.append("inventories missing for revisions %s" %
736
(sorted(missing_corresponding),))
738
# Are any chk root entries missing for any inventories? This includes
739
# any present parent inventories, which may be used when calculating
740
# deltas for streaming.
741
all_inv_keys = set(corresponding_invs)
742
for parent_inv_keys in inv_parent_map.itervalues():
743
all_inv_keys.update(parent_inv_keys)
744
# Filter out ghost parents.
745
all_inv_keys.intersection_update(
746
no_fallback_inv_index.get_parent_map(all_inv_keys))
747
parent_invs_only_keys = all_inv_keys.symmetric_difference(
750
inv_ids = [key[-1] for key in all_inv_keys]
751
parent_invs_only_ids = [key[-1] for key in parent_invs_only_keys]
752
root_key_info = _build_interesting_key_sets(
753
self.repo, inv_ids, parent_invs_only_ids)
754
expected_chk_roots = root_key_info.all_keys()
755
present_chk_roots = no_fallback_chk_bytes_index.get_parent_map(
757
missing_chk_roots = expected_chk_roots.difference(present_chk_roots)
758
if missing_chk_roots:
759
problems.append("missing referenced chk root keys: %s"
760
% (sorted(missing_chk_roots),))
761
# Don't bother checking any further.
763
# Find all interesting chk_bytes records, and make sure they are
764
# present, as well as the text keys they reference.
765
chk_bytes_no_fallbacks = self.repo.chk_bytes.without_fallbacks()
766
chk_bytes_no_fallbacks._search_key_func = \
767
self.repo.chk_bytes._search_key_func
768
chk_diff = chk_map.iter_interesting_nodes(
769
chk_bytes_no_fallbacks, root_key_info.interesting_root_keys,
770
root_key_info.uninteresting_root_keys)
773
for record in _filter_text_keys(chk_diff, text_keys,
774
chk_map._bytes_to_text_key):
776
except errors.NoSuchRevision, e:
777
# XXX: It would be nice if we could give a more precise error here.
778
problems.append("missing chk node(s) for id_to_entry maps")
779
chk_diff = chk_map.iter_interesting_nodes(
780
chk_bytes_no_fallbacks, root_key_info.interesting_pid_root_keys,
781
root_key_info.uninteresting_pid_root_keys)
783
for interesting_rec, interesting_map in chk_diff:
785
except errors.NoSuchRevision, e:
787
"missing chk node(s) for parent_id_basename_to_file_id maps")
788
present_text_keys = no_fallback_texts_index.get_parent_map(text_keys)
789
missing_text_keys = text_keys.difference(present_text_keys)
790
if missing_text_keys:
791
problems.append("missing text keys: %r"
792
% (sorted(missing_text_keys),))
796
class CHKInventoryRepository(PackRepository):
797
"""subclass of PackRepository that uses CHK based inventories."""
799
def __init__(self, _format, a_bzrdir, control_files, _commit_builder_class,
801
"""Overridden to change pack collection class."""
802
super(CHKInventoryRepository, self).__init__(_format, a_bzrdir,
803
control_files, _commit_builder_class, _serializer)
804
index_transport = self._transport.clone('indices')
805
self._pack_collection = GCRepositoryPackCollection(self,
806
self._transport, index_transport,
807
self._transport.clone('upload'),
808
self._transport.clone('packs'),
809
_format.index_builder_class,
811
use_chk_index=self._format.supports_chks,
813
self.inventories = GroupCompressVersionedFiles(
814
_GCGraphIndex(self._pack_collection.inventory_index.combined_index,
815
add_callback=self._pack_collection.inventory_index.add_callback,
816
parents=True, is_locked=self.is_locked,
817
inconsistency_fatal=False),
818
access=self._pack_collection.inventory_index.data_access)
819
self.revisions = GroupCompressVersionedFiles(
820
_GCGraphIndex(self._pack_collection.revision_index.combined_index,
821
add_callback=self._pack_collection.revision_index.add_callback,
822
parents=True, is_locked=self.is_locked,
823
track_external_parent_refs=True, track_new_keys=True),
824
access=self._pack_collection.revision_index.data_access,
826
self.signatures = GroupCompressVersionedFiles(
827
_GCGraphIndex(self._pack_collection.signature_index.combined_index,
828
add_callback=self._pack_collection.signature_index.add_callback,
829
parents=False, is_locked=self.is_locked,
830
inconsistency_fatal=False),
831
access=self._pack_collection.signature_index.data_access,
833
self.texts = GroupCompressVersionedFiles(
834
_GCGraphIndex(self._pack_collection.text_index.combined_index,
835
add_callback=self._pack_collection.text_index.add_callback,
836
parents=True, is_locked=self.is_locked,
837
inconsistency_fatal=False),
838
access=self._pack_collection.text_index.data_access)
839
# No parents, individual CHK pages don't have specific ancestry
840
self.chk_bytes = GroupCompressVersionedFiles(
841
_GCGraphIndex(self._pack_collection.chk_index.combined_index,
842
add_callback=self._pack_collection.chk_index.add_callback,
843
parents=False, is_locked=self.is_locked,
844
inconsistency_fatal=False),
845
access=self._pack_collection.chk_index.data_access)
846
search_key_name = self._format._serializer.search_key_name
847
search_key_func = chk_map.search_key_registry.get(search_key_name)
848
self.chk_bytes._search_key_func = search_key_func
849
# True when the repository object is 'write locked' (as opposed to the
850
# physical lock only taken out around changes to the pack-names list.)
851
# Another way to represent this would be a decorator around the control
852
# files object that presents logical locks as physical ones - if this
853
# gets ugly consider that alternative design. RBC 20071011
854
self._write_lock_count = 0
855
self._transaction = None
857
self._reconcile_does_inventory_gc = True
858
self._reconcile_fixes_text_parents = True
859
self._reconcile_backsup_inventory = False
861
def _add_inventory_checked(self, revision_id, inv, parents):
862
"""Add inv to the repository after checking the inputs.
864
This function can be overridden to allow different inventory styles.
866
:seealso: add_inventory, for the contract.
869
serializer = self._format._serializer
870
result = inventory.CHKInventory.from_inventory(self.chk_bytes, inv,
871
maximum_size=serializer.maximum_size,
872
search_key_name=serializer.search_key_name)
873
inv_lines = result.to_lines()
874
return self._inventory_add_lines(revision_id, parents,
875
inv_lines, check_content=False)
877
def _create_inv_from_null(self, delta, revision_id):
878
"""This will mutate new_inv directly.
880
This is a simplified form of create_by_apply_delta which knows that all
881
the old values must be None, so everything is a create.
883
serializer = self._format._serializer
884
new_inv = inventory.CHKInventory(serializer.search_key_name)
885
new_inv.revision_id = revision_id
886
entry_to_bytes = new_inv._entry_to_bytes
887
id_to_entry_dict = {}
888
parent_id_basename_dict = {}
889
for old_path, new_path, file_id, entry in delta:
890
if old_path is not None:
891
raise ValueError('Invalid delta, somebody tried to delete %r'
892
' from the NULL_REVISION'
893
% ((old_path, file_id),))
895
raise ValueError('Invalid delta, delta from NULL_REVISION has'
896
' no new_path %r' % (file_id,))
898
new_inv.root_id = file_id
899
parent_id_basename_key = StaticTuple('', '').intern()
901
utf8_entry_name = entry.name.encode('utf-8')
902
parent_id_basename_key = StaticTuple(entry.parent_id,
903
utf8_entry_name).intern()
904
new_value = entry_to_bytes(entry)
906
# new_inv._path_to_fileid_cache[new_path] = file_id
907
key = StaticTuple(file_id).intern()
908
id_to_entry_dict[key] = new_value
909
parent_id_basename_dict[parent_id_basename_key] = file_id
911
new_inv._populate_from_dicts(self.chk_bytes, id_to_entry_dict,
912
parent_id_basename_dict, maximum_size=serializer.maximum_size)
915
def add_inventory_by_delta(self, basis_revision_id, delta, new_revision_id,
916
parents, basis_inv=None, propagate_caches=False):
917
"""Add a new inventory expressed as a delta against another revision.
919
:param basis_revision_id: The inventory id the delta was created
921
:param delta: The inventory delta (see Inventory.apply_delta for
923
:param new_revision_id: The revision id that the inventory is being
925
:param parents: The revision ids of the parents that revision_id is
926
known to have and are in the repository already. These are supplied
927
for repositories that depend on the inventory graph for revision
928
graph access, as well as for those that pun ancestry with delta
930
:param basis_inv: The basis inventory if it is already known,
932
:param propagate_caches: If True, the caches for this inventory are
933
copied to and updated for the result if possible.
935
:returns: (validator, new_inv)
936
The validator(which is a sha1 digest, though what is sha'd is
937
repository format specific) of the serialized inventory, and the
940
if not self.is_in_write_group():
941
raise AssertionError("%r not in write group" % (self,))
942
_mod_revision.check_not_reserved_id(new_revision_id)
944
if basis_inv is None:
945
if basis_revision_id == _mod_revision.NULL_REVISION:
946
new_inv = self._create_inv_from_null(delta, new_revision_id)
947
if new_inv.root_id is None:
948
raise errors.RootMissing()
949
inv_lines = new_inv.to_lines()
950
return self._inventory_add_lines(new_revision_id, parents,
951
inv_lines, check_content=False), new_inv
953
basis_tree = self.revision_tree(basis_revision_id)
954
basis_tree.lock_read()
955
basis_inv = basis_tree.inventory
957
result = basis_inv.create_by_apply_delta(delta, new_revision_id,
958
propagate_caches=propagate_caches)
959
inv_lines = result.to_lines()
960
return self._inventory_add_lines(new_revision_id, parents,
961
inv_lines, check_content=False), result
963
if basis_tree is not None:
966
def _deserialise_inventory(self, revision_id, bytes):
967
return inventory.CHKInventory.deserialise(self.chk_bytes, bytes,
970
def _iter_inventories(self, revision_ids, ordering):
971
"""Iterate over many inventory objects."""
973
ordering = 'unordered'
974
keys = [(revision_id,) for revision_id in revision_ids]
975
stream = self.inventories.get_record_stream(keys, ordering, True)
977
for record in stream:
978
if record.storage_kind != 'absent':
979
texts[record.key] = record.get_bytes_as('fulltext')
981
raise errors.NoSuchRevision(self, record.key)
983
yield inventory.CHKInventory.deserialise(self.chk_bytes, texts[key], key)
985
def _iter_inventory_xmls(self, revision_ids, ordering):
986
# Without a native 'xml' inventory, this method doesn't make sense.
987
# However older working trees, and older bundles want it - so we supply
988
# it allowing _get_inventory_xml to work. Bundles currently use the
989
# serializer directly; this also isn't ideal, but there isn't an xml
990
# iteration interface offered at all for repositories. We could make
991
# _iter_inventory_xmls be part of the contract, even if kept private.
992
inv_to_str = self._serializer.write_inventory_to_string
993
for inv in self.iter_inventories(revision_ids, ordering=ordering):
994
yield inv_to_str(inv), inv.revision_id
996
def _find_present_inventory_keys(self, revision_keys):
997
parent_map = self.inventories.get_parent_map(revision_keys)
998
present_inventory_keys = set(k for k in parent_map)
999
return present_inventory_keys
1001
def fileids_altered_by_revision_ids(self, revision_ids, _inv_weave=None):
1002
"""Find the file ids and versions affected by revisions.
1004
:param revisions: an iterable containing revision ids.
1005
:param _inv_weave: The inventory weave from this repository or None.
1006
If None, the inventory weave will be opened automatically.
1007
:return: a dictionary mapping altered file-ids to an iterable of
1008
revision_ids. Each altered file-ids has the exact revision_ids that
1009
altered it listed explicitly.
1011
rich_root = self.supports_rich_root()
1012
bytes_to_info = inventory.CHKInventory._bytes_to_utf8name_key
1013
file_id_revisions = {}
1014
pb = ui.ui_factory.nested_progress_bar()
1016
revision_keys = [(r,) for r in revision_ids]
1017
parent_keys = self._find_parent_keys_of_revisions(revision_keys)
1018
# TODO: instead of using _find_present_inventory_keys, change the
1019
# code paths to allow missing inventories to be tolerated.
1020
# However, we only want to tolerate missing parent
1021
# inventories, not missing inventories for revision_ids
1022
present_parent_inv_keys = self._find_present_inventory_keys(
1024
present_parent_inv_ids = set(
1025
[k[-1] for k in present_parent_inv_keys])
1026
inventories_to_read = set(revision_ids)
1027
inventories_to_read.update(present_parent_inv_ids)
1028
root_key_info = _build_interesting_key_sets(
1029
self, inventories_to_read, present_parent_inv_ids)
1030
interesting_root_keys = root_key_info.interesting_root_keys
1031
uninteresting_root_keys = root_key_info.uninteresting_root_keys
1032
chk_bytes = self.chk_bytes
1033
for record, items in chk_map.iter_interesting_nodes(chk_bytes,
1034
interesting_root_keys, uninteresting_root_keys,
1036
for name, bytes in items:
1037
(name_utf8, file_id, revision_id) = bytes_to_info(bytes)
1038
# TODO: consider interning file_id, revision_id here, or
1039
# pushing that intern() into bytes_to_info()
1040
# TODO: rich_root should always be True here, for all
1041
# repositories that support chk_bytes
1042
if not rich_root and name_utf8 == '':
1045
file_id_revisions[file_id].add(revision_id)
1047
file_id_revisions[file_id] = set([revision_id])
1050
return file_id_revisions
1052
def find_text_key_references(self):
1053
"""Find the text key references within the repository.
1055
:return: A dictionary mapping text keys ((fileid, revision_id) tuples)
1056
to whether they were referred to by the inventory of the
1057
revision_id that they contain. The inventory texts from all present
1058
revision ids are assessed to generate this report.
1060
# XXX: Slow version but correct: rewrite as a series of delta
1061
# examinations/direct tree traversal. Note that that will require care
1062
# as a common node is reachable both from the inventory that added it,
1063
# and others afterwards.
1064
revision_keys = self.revisions.keys()
1066
rich_roots = self.supports_rich_root()
1067
pb = ui.ui_factory.nested_progress_bar()
1069
all_revs = self.all_revision_ids()
1070
total = len(all_revs)
1071
for pos, inv in enumerate(self.iter_inventories(all_revs)):
1072
pb.update("Finding text references", pos, total)
1073
for _, entry in inv.iter_entries():
1074
if not rich_roots and entry.file_id == inv.root_id:
1076
key = (entry.file_id, entry.revision)
1077
result.setdefault(key, False)
1078
if entry.revision == inv.revision_id:
1085
def reconcile_canonicalize_chks(self):
1086
"""Reconcile this repository to make sure all CHKs are in canonical
1089
from bzrlib.reconcile import PackReconciler
1090
reconciler = PackReconciler(self, thorough=True, canonicalize_chks=True)
1091
reconciler.reconcile()
1094
def _reconcile_pack(self, collection, packs, extension, revs, pb):
1095
packer = GCCHKReconcilePacker(collection, packs, extension)
1096
return packer.pack(pb)
1098
def _canonicalize_chks_pack(self, collection, packs, extension, revs, pb):
1099
packer = GCCHKCanonicalizingPacker(collection, packs, extension, revs)
1100
return packer.pack(pb)
1102
def _get_source(self, to_format):
1103
"""Return a source for streaming from this repository."""
1104
if self._format._serializer == to_format._serializer:
1105
# We must be exactly the same format, otherwise stuff like the chk
1106
# page layout might be different.
1107
# Actually, this test is just slightly looser than exact so that
1108
# CHK2 <-> 2a transfers will work.
1109
return GroupCHKStreamSource(self, to_format)
1110
return super(CHKInventoryRepository, self)._get_source(to_format)
1112
def _find_inconsistent_revision_parents(self, revisions_iterator=None):
1113
"""Find revisions with different parent lists in the revision object
1114
and in the index graph.
1116
:param revisions_iterator: None, or an iterator of (revid,
1117
Revision-or-None). This iterator controls the revisions checked.
1118
:returns: an iterator yielding tuples of (revison-id, parents-in-index,
1119
parents-in-revision).
1121
if not self.is_locked():
1122
raise AssertionError()
1124
if revisions_iterator is None:
1125
revisions_iterator = self._iter_revisions(None)
1126
for revid, revision in revisions_iterator:
1127
if revision is None:
1129
parent_map = vf.get_parent_map([(revid,)])
1130
parents_according_to_index = tuple(parent[-1] for parent in
1131
parent_map[(revid,)])
1132
parents_according_to_revision = tuple(revision.parent_ids)
1133
if parents_according_to_index != parents_according_to_revision:
1134
yield (revid, parents_according_to_index,
1135
parents_according_to_revision)
1137
def _check_for_inconsistent_revision_parents(self):
1138
inconsistencies = list(self._find_inconsistent_revision_parents())
1140
raise errors.BzrCheckError(
1141
"Revision index has inconsistent parents.")
1144
class GroupCHKStreamSource(StreamSource):
1145
"""Used when both the source and target repo are GroupCHK repos."""
1147
def __init__(self, from_repository, to_format):
1148
"""Create a StreamSource streaming from from_repository."""
1149
super(GroupCHKStreamSource, self).__init__(from_repository, to_format)
1150
self._revision_keys = None
1151
self._text_keys = None
1152
self._text_fetch_order = 'groupcompress'
1153
self._chk_id_roots = None
1154
self._chk_p_id_roots = None
1156
def _get_inventory_stream(self, inventory_keys, allow_absent=False):
1157
"""Get a stream of inventory texts.
1159
When this function returns, self._chk_id_roots and self._chk_p_id_roots
1160
should be populated.
1162
self._chk_id_roots = []
1163
self._chk_p_id_roots = []
1164
def _filtered_inv_stream():
1165
id_roots_set = set()
1166
p_id_roots_set = set()
1167
source_vf = self.from_repository.inventories
1168
stream = source_vf.get_record_stream(inventory_keys,
1169
'groupcompress', True)
1170
for record in stream:
1171
if record.storage_kind == 'absent':
1175
raise errors.NoSuchRevision(self, record.key)
1176
bytes = record.get_bytes_as('fulltext')
1177
chk_inv = inventory.CHKInventory.deserialise(None, bytes,
1179
key = chk_inv.id_to_entry.key()
1180
if key not in id_roots_set:
1181
self._chk_id_roots.append(key)
1182
id_roots_set.add(key)
1183
p_id_map = chk_inv.parent_id_basename_to_file_id
1184
if p_id_map is None:
1185
raise AssertionError('Parent id -> file_id map not set')
1186
key = p_id_map.key()
1187
if key not in p_id_roots_set:
1188
p_id_roots_set.add(key)
1189
self._chk_p_id_roots.append(key)
1191
# We have finished processing all of the inventory records, we
1192
# don't need these sets anymore
1193
id_roots_set.clear()
1194
p_id_roots_set.clear()
1195
return ('inventories', _filtered_inv_stream())
1197
def _get_filtered_chk_streams(self, excluded_revision_keys):
1198
self._text_keys = set()
1199
excluded_revision_keys.discard(_mod_revision.NULL_REVISION)
1200
if not excluded_revision_keys:
1201
uninteresting_root_keys = set()
1202
uninteresting_pid_root_keys = set()
1204
# filter out any excluded revisions whose inventories are not
1206
# TODO: Update Repository.iter_inventories() to add
1207
# ignore_missing=True
1208
present_keys = self.from_repository._find_present_inventory_keys(
1209
excluded_revision_keys)
1210
present_ids = [k[-1] for k in present_keys]
1211
uninteresting_root_keys = set()
1212
uninteresting_pid_root_keys = set()
1213
for inv in self.from_repository.iter_inventories(present_ids):
1214
uninteresting_root_keys.add(inv.id_to_entry.key())
1215
uninteresting_pid_root_keys.add(
1216
inv.parent_id_basename_to_file_id.key())
1217
chk_bytes = self.from_repository.chk_bytes
1218
def _filter_id_to_entry():
1219
interesting_nodes = chk_map.iter_interesting_nodes(chk_bytes,
1220
self._chk_id_roots, uninteresting_root_keys)
1221
for record in _filter_text_keys(interesting_nodes, self._text_keys,
1222
chk_map._bytes_to_text_key):
1223
if record is not None:
1226
self._chk_id_roots = None
1227
yield 'chk_bytes', _filter_id_to_entry()
1228
def _get_parent_id_basename_to_file_id_pages():
1229
for record, items in chk_map.iter_interesting_nodes(chk_bytes,
1230
self._chk_p_id_roots, uninteresting_pid_root_keys):
1231
if record is not None:
1234
self._chk_p_id_roots = None
1235
yield 'chk_bytes', _get_parent_id_basename_to_file_id_pages()
1237
def _get_text_stream(self):
1238
# Note: We know we don't have to handle adding root keys, because both
1239
# the source and target are the identical network name.
1240
text_stream = self.from_repository.texts.get_record_stream(
1241
self._text_keys, self._text_fetch_order, False)
1242
return ('texts', text_stream)
1244
def get_stream(self, search):
1245
def wrap_and_count(pb, rc, stream):
1246
"""Yield records from stream while showing progress."""
1248
for record in stream:
1249
if count == rc.STEP:
1251
pb.update('Estimate', rc.current, rc.max)
1256
revision_ids = search.get_keys()
1257
pb = ui.ui_factory.nested_progress_bar()
1258
rc = self._record_counter
1259
self._record_counter.setup(len(revision_ids))
1260
for stream_info in self._fetch_revision_texts(revision_ids):
1261
yield (stream_info[0],
1262
wrap_and_count(pb, rc, stream_info[1]))
1263
self._revision_keys = [(rev_id,) for rev_id in revision_ids]
1264
# TODO: The keys to exclude might be part of the search recipe
1265
# For now, exclude all parents that are at the edge of ancestry, for
1266
# which we have inventories
1267
from_repo = self.from_repository
1268
parent_keys = from_repo._find_parent_keys_of_revisions(
1269
self._revision_keys)
1270
self.from_repository.revisions.clear_cache()
1271
self.from_repository.signatures.clear_cache()
1272
# Clear the repo's get_parent_map cache too.
1273
self.from_repository._unstacked_provider.disable_cache()
1274
self.from_repository._unstacked_provider.enable_cache()
1275
s = self._get_inventory_stream(self._revision_keys)
1276
yield (s[0], wrap_and_count(pb, rc, s[1]))
1277
self.from_repository.inventories.clear_cache()
1278
for stream_info in self._get_filtered_chk_streams(parent_keys):
1279
yield (stream_info[0], wrap_and_count(pb, rc, stream_info[1]))
1280
self.from_repository.chk_bytes.clear_cache()
1281
s = self._get_text_stream()
1282
yield (s[0], wrap_and_count(pb, rc, s[1]))
1283
self.from_repository.texts.clear_cache()
1284
pb.update('Done', rc.max, rc.max)
1287
def get_stream_for_missing_keys(self, missing_keys):
1288
# missing keys can only occur when we are byte copying and not
1289
# translating (because translation means we don't send
1290
# unreconstructable deltas ever).
1291
missing_inventory_keys = set()
1292
for key in missing_keys:
1293
if key[0] != 'inventories':
1294
raise AssertionError('The only missing keys we should'
1295
' be filling in are inventory keys, not %s'
1297
missing_inventory_keys.add(key[1:])
1298
if self._chk_id_roots or self._chk_p_id_roots:
1299
raise AssertionError('Cannot call get_stream_for_missing_keys'
1300
' until all of get_stream() has been consumed.')
1301
# Yield the inventory stream, so we can find the chk stream
1302
# Some of the missing_keys will be missing because they are ghosts.
1303
# As such, we can ignore them. The Sink is required to verify there are
1304
# no unavailable texts when the ghost inventories are not filled in.
1305
yield self._get_inventory_stream(missing_inventory_keys,
1307
# We use the empty set for excluded_revision_keys, to make it clear
1308
# that we want to transmit all referenced chk pages.
1309
for stream_info in self._get_filtered_chk_streams(set()):
1313
class _InterestingKeyInfo(object):
1315
self.interesting_root_keys = set()
1316
self.interesting_pid_root_keys = set()
1317
self.uninteresting_root_keys = set()
1318
self.uninteresting_pid_root_keys = set()
1320
def all_interesting(self):
1321
return self.interesting_root_keys.union(self.interesting_pid_root_keys)
1323
def all_uninteresting(self):
1324
return self.uninteresting_root_keys.union(
1325
self.uninteresting_pid_root_keys)
1328
return self.all_interesting().union(self.all_uninteresting())
1331
def _build_interesting_key_sets(repo, inventory_ids, parent_only_inv_ids):
1332
result = _InterestingKeyInfo()
1333
for inv in repo.iter_inventories(inventory_ids, 'unordered'):
1334
root_key = inv.id_to_entry.key()
1335
pid_root_key = inv.parent_id_basename_to_file_id.key()
1336
if inv.revision_id in parent_only_inv_ids:
1337
result.uninteresting_root_keys.add(root_key)
1338
result.uninteresting_pid_root_keys.add(pid_root_key)
1340
result.interesting_root_keys.add(root_key)
1341
result.interesting_pid_root_keys.add(pid_root_key)
1345
def _filter_text_keys(interesting_nodes_iterable, text_keys, bytes_to_text_key):
1346
"""Iterate the result of iter_interesting_nodes, yielding the records
1347
and adding to text_keys.
1349
text_keys_update = text_keys.update
1350
for record, items in interesting_nodes_iterable:
1351
text_keys_update([bytes_to_text_key(b) for n,b in items])
1355
class RepositoryFormat2a(RepositoryFormatPack):
1356
"""A CHK repository that uses the bencode revision serializer."""
1358
repository_class = CHKInventoryRepository
1359
supports_external_lookups = True
1360
supports_chks = True
1361
_commit_builder_class = PackRootCommitBuilder
1362
rich_root_data = True
1363
_serializer = chk_serializer.chk_bencode_serializer
1364
_commit_inv_deltas = True
1365
# What index classes to use
1366
index_builder_class = BTreeBuilder
1367
index_class = BTreeGraphIndex
1368
# Note: We cannot unpack a delta that references a text we haven't
1369
# seen yet. There are 2 options, work in fulltexts, or require
1370
# topological sorting. Using fulltexts is more optimal for local
1371
# operations, because the source can be smart about extracting
1372
# multiple in-a-row (and sharing strings). Topological is better
1373
# for remote, because we access less data.
1374
_fetch_order = 'unordered'
1375
_fetch_uses_deltas = False # essentially ignored by the groupcompress code.
1377
pack_compresses = True
1379
def _get_matching_bzrdir(self):
1380
return bzrdir.format_registry.make_bzrdir('2a')
1382
def _ignore_setting_bzrdir(self, format):
1385
_matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
1387
def get_format_string(self):
1388
return ('Bazaar repository format 2a (needs bzr 1.16 or later)\n')
1390
def get_format_description(self):
1391
"""See RepositoryFormat.get_format_description()."""
1392
return ("Repository format 2a - rich roots, group compression"
1393
" and chk inventories")
1396
class RepositoryFormat2aSubtree(RepositoryFormat2a):
1397
"""A 2a repository format that supports nested trees.
1401
def _get_matching_bzrdir(self):
1402
return bzrdir.format_registry.make_bzrdir('development-subtree')
1404
def _ignore_setting_bzrdir(self, format):
1407
_matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
1409
def get_format_string(self):
1410
return ('Bazaar development format 8\n')
1412
def get_format_description(self):
1413
"""See RepositoryFormat.get_format_description()."""
1414
return ("Development repository format 8 - nested trees, "
1415
"group compression and chk inventories")
1418
supports_tree_reference = True