1
# Copyright (C) 2008-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Repository formats using CHK inventories and groupcompress compression."""
31
revision as _mod_revision,
36
from bzrlib.btree_index import (
40
from bzrlib.decorators import needs_write_lock
41
from bzrlib.groupcompress import (
43
GroupCompressVersionedFiles,
45
from bzrlib.repofmt.pack_repo import (
50
PackRootCommitBuilder,
51
RepositoryPackCollection,
56
from bzrlib.vf_repository import (
59
from bzrlib.static_tuple import StaticTuple
62
class GCPack(NewPack):
64
def __init__(self, pack_collection, upload_suffix='', file_mode=None):
65
"""Create a NewPack instance.
67
:param pack_collection: A PackCollection into which this is being
69
:param upload_suffix: An optional suffix to be given to any temporary
70
files created during the pack creation. e.g '.autopack'
71
:param file_mode: An optional file mode to create the new files with.
73
# replaced from NewPack to:
74
# - change inventory reference list length to 1
75
# - change texts reference lists to 1
76
# TODO: patch this to be parameterised
78
# The relative locations of the packs are constrained, but all are
79
# passed in because the caller has them, so as to avoid object churn.
80
index_builder_class = pack_collection._index_builder_class
82
if pack_collection.chk_index is not None:
83
chk_index = index_builder_class(reference_lists=0)
87
# Revisions: parents list, no text compression.
88
index_builder_class(reference_lists=1),
89
# Inventory: We want to map compression only, but currently the
90
# knit code hasn't been updated enough to understand that, so we
91
# have a regular 2-list index giving parents and compression
93
index_builder_class(reference_lists=1),
94
# Texts: per file graph, for all fileids - so one reference list
95
# and two elements in the key tuple.
96
index_builder_class(reference_lists=1, key_elements=2),
97
# Signatures: Just blobs to store, no compression, no parents
99
index_builder_class(reference_lists=0),
100
# CHK based storage - just blobs, no compression or parents.
103
self._pack_collection = pack_collection
104
# When we make readonly indices, we need this.
105
self.index_class = pack_collection._index_class
106
# where should the new pack be opened
107
self.upload_transport = pack_collection._upload_transport
108
# where are indices written out to
109
self.index_transport = pack_collection._index_transport
110
# where is the pack renamed to when it is finished?
111
self.pack_transport = pack_collection._pack_transport
112
# What file mode to upload the pack and indices with.
113
self._file_mode = file_mode
114
# tracks the content written to the .pack file.
115
self._hash = osutils.md5()
116
# a four-tuple with the length in bytes of the indices, once the pack
117
# is finalised. (rev, inv, text, sigs)
118
self.index_sizes = None
119
# How much data to cache when writing packs. Note that this is not
120
# synchronised with reads, because it's not in the transport layer, so
121
# is not safe unless the client knows it won't be reading from the pack
123
self._cache_limit = 0
124
# the temporary pack file name.
125
self.random_name = osutils.rand_chars(20) + upload_suffix
126
# when was this pack started ?
127
self.start_time = time.time()
128
# open an output stream for the data added to the pack.
129
self.write_stream = self.upload_transport.open_write_stream(
130
self.random_name, mode=self._file_mode)
131
if 'pack' in debug.debug_flags:
132
trace.mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
133
time.ctime(), self.upload_transport.base, self.random_name,
134
time.time() - self.start_time)
135
# A list of byte sequences to be written to the new pack, and the
136
# aggregate size of them. Stored as a list rather than separate
137
# variables so that the _write_data closure below can update them.
138
self._buffer = [[], 0]
139
# create a callable for adding data
141
# robertc says- this is a closure rather than a method on the object
142
# so that the variables are locals, and faster than accessing object
144
def _write_data(bytes, flush=False, _buffer=self._buffer,
145
_write=self.write_stream.write, _update=self._hash.update):
146
_buffer[0].append(bytes)
147
_buffer[1] += len(bytes)
149
if _buffer[1] > self._cache_limit or flush:
150
bytes = ''.join(_buffer[0])
154
# expose this on self, for the occasion when clients want to add data.
155
self._write_data = _write_data
156
# a pack writer object to serialise pack records.
157
self._writer = pack.ContainerWriter(self._write_data)
159
# what state is the pack in? (open, finished, aborted)
161
# no name until we finish writing the content
164
def _check_references(self):
165
"""Make sure our external references are present.
167
Packs are allowed to have deltas whose base is not in the pack, but it
168
must be present somewhere in this collection. It is not allowed to
169
have deltas based on a fallback repository.
170
(See <https://bugs.launchpad.net/bzr/+bug/288751>)
172
# Groupcompress packs don't have any external references, arguably CHK
173
# pages have external references, but we cannot 'cheaply' determine
174
# them without actually walking all of the chk pages.
177
class ResumedGCPack(ResumedPack):
179
def _check_references(self):
180
"""Make sure our external compression parents are present."""
181
# See GCPack._check_references for why this is empty
183
def _get_external_refs(self, index):
184
# GC repositories don't have compression parents external to a given
189
class GCCHKPacker(Packer):
190
"""This class understand what it takes to collect a GCCHK repo."""
192
def __init__(self, pack_collection, packs, suffix, revision_ids=None,
194
super(GCCHKPacker, self).__init__(pack_collection, packs, suffix,
195
revision_ids=revision_ids,
196
reload_func=reload_func)
197
self._pack_collection = pack_collection
198
# ATM, We only support this for GCCHK repositories
199
if pack_collection.chk_index is None:
200
raise AssertionError('pack_collection.chk_index should not be None')
201
self._gather_text_refs = False
202
self._chk_id_roots = []
203
self._chk_p_id_roots = []
204
self._text_refs = None
205
# set by .pack() if self.revision_ids is not None
206
self.revision_keys = None
208
def _get_progress_stream(self, source_vf, keys, message, pb):
210
substream = source_vf.get_record_stream(keys, 'groupcompress', True)
211
for idx, record in enumerate(substream):
213
pb.update(message, idx + 1, len(keys))
217
def _get_filtered_inv_stream(self, source_vf, keys, message, pb=None):
218
"""Filter the texts of inventories, to find the chk pages."""
219
total_keys = len(keys)
220
def _filtered_inv_stream():
222
p_id_roots_set = set()
223
stream = source_vf.get_record_stream(keys, 'groupcompress', True)
224
for idx, record in enumerate(stream):
225
# Inventories should always be with revisions; assume success.
226
bytes = record.get_bytes_as('fulltext')
227
chk_inv = inventory.CHKInventory.deserialise(None, bytes,
230
pb.update('inv', idx, total_keys)
231
key = chk_inv.id_to_entry.key()
232
if key not in id_roots_set:
233
self._chk_id_roots.append(key)
234
id_roots_set.add(key)
235
p_id_map = chk_inv.parent_id_basename_to_file_id
237
raise AssertionError('Parent id -> file_id map not set')
239
if key not in p_id_roots_set:
240
p_id_roots_set.add(key)
241
self._chk_p_id_roots.append(key)
243
# We have finished processing all of the inventory records, we
244
# don't need these sets anymore
246
p_id_roots_set.clear()
247
return _filtered_inv_stream()
249
def _get_chk_streams(self, source_vf, keys, pb=None):
250
# We want to stream the keys from 'id_roots', and things they
251
# reference, and then stream things from p_id_roots and things they
252
# reference, and then any remaining keys that we didn't get to.
254
# We also group referenced texts together, so if one root references a
255
# text with prefix 'a', and another root references a node with prefix
256
# 'a', we want to yield those nodes before we yield the nodes for 'b'
257
# This keeps 'similar' nodes together.
259
# Note: We probably actually want multiple streams here, to help the
260
# client understand that the different levels won't compress well
261
# against each other.
262
# Test the difference between using one Group per level, and
263
# using 1 Group per prefix. (so '' (root) would get a group, then
264
# all the references to search-key 'a' would get a group, etc.)
265
total_keys = len(keys)
266
remaining_keys = set(keys)
268
if self._gather_text_refs:
269
self._text_refs = set()
270
def _get_referenced_stream(root_keys, parse_leaf_nodes=False):
273
keys_by_search_prefix = {}
274
remaining_keys.difference_update(cur_keys)
276
def handle_internal_node(node):
277
for prefix, value in node._items.iteritems():
278
# We don't want to request the same key twice, and we
279
# want to order it by the first time it is seen.
280
# Even further, we don't want to request a key which is
281
# not in this group of pack files (it should be in the
282
# repo, but it doesn't have to be in the group being
284
# TODO: consider how to treat externally referenced chk
285
# pages as 'external_references' so that we
286
# always fill them in for stacked branches
287
if value not in next_keys and value in remaining_keys:
288
keys_by_search_prefix.setdefault(prefix,
291
def handle_leaf_node(node):
292
# Store is None, because we know we have a LeafNode, and we
293
# just want its entries
294
for file_id, bytes in node.iteritems(None):
295
self._text_refs.add(chk_map._bytes_to_text_key(bytes))
297
stream = source_vf.get_record_stream(cur_keys,
298
'as-requested', True)
299
for record in stream:
300
if record.storage_kind == 'absent':
301
# An absent CHK record: we assume that the missing
302
# record is in a different pack - e.g. a page not
303
# altered by the commit we're packing.
305
bytes = record.get_bytes_as('fulltext')
306
# We don't care about search_key_func for this code,
307
# because we only care about external references.
308
node = chk_map._deserialise(bytes, record.key,
309
search_key_func=None)
310
common_base = node._search_prefix
311
if isinstance(node, chk_map.InternalNode):
312
handle_internal_node(node)
313
elif parse_leaf_nodes:
314
handle_leaf_node(node)
317
pb.update('chk node', counter[0], total_keys)
320
# Double check that we won't be emitting any keys twice
321
# If we get rid of the pre-calculation of all keys, we could
322
# turn this around and do
323
# next_keys.difference_update(seen_keys)
324
# However, we also may have references to chk pages in another
325
# pack file during autopack. We filter earlier, so we should no
326
# longer need to do this
327
# next_keys = next_keys.intersection(remaining_keys)
329
for prefix in sorted(keys_by_search_prefix):
330
cur_keys.extend(keys_by_search_prefix.pop(prefix))
331
for stream in _get_referenced_stream(self._chk_id_roots,
332
self._gather_text_refs):
334
del self._chk_id_roots
335
# while it isn't really possible for chk_id_roots to not be in the
336
# local group of packs, it is possible that the tree shape has not
337
# changed recently, so we need to filter _chk_p_id_roots by the
339
chk_p_id_roots = [key for key in self._chk_p_id_roots
340
if key in remaining_keys]
341
del self._chk_p_id_roots
342
for stream in _get_referenced_stream(chk_p_id_roots, False):
345
trace.mutter('There were %d keys in the chk index, %d of which'
346
' were not referenced', total_keys,
348
if self.revision_ids is None:
349
stream = source_vf.get_record_stream(remaining_keys,
353
def _build_vf(self, index_name, parents, delta, for_write=False):
354
"""Build a VersionedFiles instance on top of this group of packs."""
355
index_name = index_name + '_index'
357
access = _DirectPackAccess(index_to_pack,
358
reload_func=self._reload_func)
361
if self.new_pack is None:
362
raise AssertionError('No new pack has been set')
363
index = getattr(self.new_pack, index_name)
364
index_to_pack[index] = self.new_pack.access_tuple()
365
index.set_optimize(for_size=True)
366
access.set_writer(self.new_pack._writer, index,
367
self.new_pack.access_tuple())
368
add_callback = index.add_nodes
371
for pack in self.packs:
372
sub_index = getattr(pack, index_name)
373
index_to_pack[sub_index] = pack.access_tuple()
374
indices.append(sub_index)
375
index = _mod_index.CombinedGraphIndex(indices)
377
vf = GroupCompressVersionedFiles(
379
add_callback=add_callback,
381
is_locked=self._pack_collection.repo.is_locked),
386
def _build_vfs(self, index_name, parents, delta):
387
"""Build the source and target VersionedFiles."""
388
source_vf = self._build_vf(index_name, parents,
389
delta, for_write=False)
390
target_vf = self._build_vf(index_name, parents,
391
delta, for_write=True)
392
return source_vf, target_vf
394
def _copy_stream(self, source_vf, target_vf, keys, message, vf_to_stream,
396
trace.mutter('repacking %d %s', len(keys), message)
397
self.pb.update('repacking %s' % (message,), pb_offset)
398
child_pb = ui.ui_factory.nested_progress_bar()
400
stream = vf_to_stream(source_vf, keys, message, child_pb)
401
for _ in target_vf._insert_record_stream(stream,
408
def _copy_revision_texts(self):
409
source_vf, target_vf = self._build_vfs('revision', True, False)
410
if not self.revision_keys:
411
# We are doing a full fetch, aka 'pack'
412
self.revision_keys = source_vf.keys()
413
self._copy_stream(source_vf, target_vf, self.revision_keys,
414
'revisions', self._get_progress_stream, 1)
416
def _copy_inventory_texts(self):
417
source_vf, target_vf = self._build_vfs('inventory', True, True)
418
# It is not sufficient to just use self.revision_keys, as stacked
419
# repositories can have more inventories than they have revisions.
420
# One alternative would be to do something with
421
# get_parent_map(self.revision_keys), but that shouldn't be any faster
423
inventory_keys = source_vf.keys()
424
missing_inventories = set(self.revision_keys).difference(inventory_keys)
425
if missing_inventories:
426
# Go back to the original repo, to see if these are really missing
427
# https://bugs.launchpad.net/bzr/+bug/437003
428
# If we are packing a subset of the repo, it is fine to just have
429
# the data in another Pack file, which is not included in this pack
431
inv_index = self._pack_collection.repo.inventories._index
432
pmap = inv_index.get_parent_map(missing_inventories)
433
really_missing = missing_inventories.difference(pmap)
435
missing_inventories = sorted(really_missing)
436
raise ValueError('We are missing inventories for revisions: %s'
437
% (missing_inventories,))
438
self._copy_stream(source_vf, target_vf, inventory_keys,
439
'inventories', self._get_filtered_inv_stream, 2)
441
def _get_chk_vfs_for_copy(self):
442
return self._build_vfs('chk', False, False)
444
def _copy_chk_texts(self):
445
source_vf, target_vf = self._get_chk_vfs_for_copy()
446
# TODO: This is technically spurious... if it is a performance issue,
448
total_keys = source_vf.keys()
449
trace.mutter('repacking chk: %d id_to_entry roots,'
450
' %d p_id_map roots, %d total keys',
451
len(self._chk_id_roots), len(self._chk_p_id_roots),
453
self.pb.update('repacking chk', 3)
454
child_pb = ui.ui_factory.nested_progress_bar()
456
for stream in self._get_chk_streams(source_vf, total_keys,
458
for _ in target_vf._insert_record_stream(stream,
465
def _copy_text_texts(self):
466
source_vf, target_vf = self._build_vfs('text', True, True)
467
# XXX: We don't walk the chk map to determine referenced (file_id,
468
# revision_id) keys. We don't do it yet because you really need
469
# to filter out the ones that are present in the parents of the
470
# rev just before the ones you are copying, otherwise the filter
471
# is grabbing too many keys...
472
text_keys = source_vf.keys()
473
self._copy_stream(source_vf, target_vf, text_keys,
474
'texts', self._get_progress_stream, 4)
476
def _copy_signature_texts(self):
477
source_vf, target_vf = self._build_vfs('signature', False, False)
478
signature_keys = source_vf.keys()
479
signature_keys.intersection(self.revision_keys)
480
self._copy_stream(source_vf, target_vf, signature_keys,
481
'signatures', self._get_progress_stream, 5)
483
def _create_pack_from_packs(self):
484
self.pb.update('repacking', 0, 7)
485
self.new_pack = self.open_pack()
486
# Is this necessary for GC ?
487
self.new_pack.set_write_cache_size(1024*1024)
488
self._copy_revision_texts()
489
self._copy_inventory_texts()
490
self._copy_chk_texts()
491
self._copy_text_texts()
492
self._copy_signature_texts()
493
self.new_pack._check_references()
494
if not self._use_pack(self.new_pack):
495
self.new_pack.abort()
497
self.new_pack.finish_content()
498
if len(self.packs) == 1:
499
old_pack = self.packs[0]
500
if old_pack.name == self.new_pack._hash.hexdigest():
501
# The single old pack was already optimally packed.
502
trace.mutter('single pack %s was already optimally packed',
504
self.new_pack.abort()
506
self.pb.update('finishing repack', 6, 7)
507
self.new_pack.finish()
508
self._pack_collection.allocate(self.new_pack)
512
class GCCHKReconcilePacker(GCCHKPacker):
513
"""A packer which regenerates indices etc as it copies.
515
This is used by ``bzr reconcile`` to cause parent text pointers to be
519
def __init__(self, *args, **kwargs):
520
super(GCCHKReconcilePacker, self).__init__(*args, **kwargs)
521
self._data_changed = False
522
self._gather_text_refs = True
524
def _copy_inventory_texts(self):
525
source_vf, target_vf = self._build_vfs('inventory', True, True)
526
self._copy_stream(source_vf, target_vf, self.revision_keys,
527
'inventories', self._get_filtered_inv_stream, 2)
528
if source_vf.keys() != self.revision_keys:
529
self._data_changed = True
531
def _copy_text_texts(self):
532
"""generate what texts we should have and then copy."""
533
source_vf, target_vf = self._build_vfs('text', True, True)
534
trace.mutter('repacking %d texts', len(self._text_refs))
535
self.pb.update("repacking texts", 4)
536
# we have three major tasks here:
537
# 1) generate the ideal index
538
repo = self._pack_collection.repo
539
# We want the one we just wrote, so base it on self.new_pack
540
revision_vf = self._build_vf('revision', True, False, for_write=True)
541
ancestor_keys = revision_vf.get_parent_map(revision_vf.keys())
542
# Strip keys back into revision_ids.
543
ancestors = dict((k[0], tuple([p[0] for p in parents]))
544
for k, parents in ancestor_keys.iteritems())
546
# TODO: _generate_text_key_index should be much cheaper to generate from
547
# a chk repository, rather than the current implementation
548
ideal_index = repo._generate_text_key_index(None, ancestors)
549
file_id_parent_map = source_vf.get_parent_map(self._text_refs)
550
# 2) generate a keys list that contains all the entries that can
551
# be used as-is, with corrected parents.
553
new_parent_keys = {} # (key, parent_keys)
555
NULL_REVISION = _mod_revision.NULL_REVISION
556
for key in self._text_refs:
562
ideal_parents = tuple(ideal_index[key])
564
discarded_keys.append(key)
565
self._data_changed = True
567
if ideal_parents == (NULL_REVISION,):
569
source_parents = file_id_parent_map[key]
570
if ideal_parents == source_parents:
574
# We need to change the parent graph, but we don't need to
575
# re-insert the text (since we don't pun the compression
576
# parent with the parents list)
577
self._data_changed = True
578
new_parent_keys[key] = ideal_parents
579
# we're finished with some data.
581
del file_id_parent_map
582
# 3) bulk copy the data, updating records than need it
583
def _update_parents_for_texts():
584
stream = source_vf.get_record_stream(self._text_refs,
585
'groupcompress', False)
586
for record in stream:
587
if record.key in new_parent_keys:
588
record.parents = new_parent_keys[record.key]
590
target_vf.insert_record_stream(_update_parents_for_texts())
592
def _use_pack(self, new_pack):
593
"""Override _use_pack to check for reconcile having changed content."""
594
return new_pack.data_inserted() and self._data_changed
597
class GCCHKCanonicalizingPacker(GCCHKPacker):
598
"""A packer that ensures inventories have canonical-form CHK maps.
600
Ideally this would be part of reconcile, but it's very slow and rarely
601
needed. (It repairs repositories affected by
602
https://bugs.launchpad.net/bzr/+bug/522637).
605
def __init__(self, *args, **kwargs):
606
super(GCCHKCanonicalizingPacker, self).__init__(*args, **kwargs)
607
self._data_changed = False
609
def _exhaust_stream(self, source_vf, keys, message, vf_to_stream, pb_offset):
610
"""Create and exhaust a stream, but don't insert it.
612
This is useful to get the side-effects of generating a stream.
614
self.pb.update('scanning %s' % (message,), pb_offset)
615
child_pb = ui.ui_factory.nested_progress_bar()
617
list(vf_to_stream(source_vf, keys, message, child_pb))
621
def _copy_inventory_texts(self):
622
source_vf, target_vf = self._build_vfs('inventory', True, True)
623
source_chk_vf, target_chk_vf = self._get_chk_vfs_for_copy()
624
inventory_keys = source_vf.keys()
625
# First, copy the existing CHKs on the assumption that most of them
626
# will be correct. This will save us from having to reinsert (and
627
# recompress) these records later at the cost of perhaps preserving a
629
# (Iterate but don't insert _get_filtered_inv_stream to populate the
630
# variables needed by GCCHKPacker._copy_chk_texts.)
631
self._exhaust_stream(source_vf, inventory_keys, 'inventories',
632
self._get_filtered_inv_stream, 2)
633
GCCHKPacker._copy_chk_texts(self)
634
# Now copy and fix the inventories, and any regenerated CHKs.
635
def chk_canonicalizing_inv_stream(source_vf, keys, message, pb=None):
636
return self._get_filtered_canonicalizing_inv_stream(
637
source_vf, keys, message, pb, source_chk_vf, target_chk_vf)
638
self._copy_stream(source_vf, target_vf, inventory_keys,
639
'inventories', chk_canonicalizing_inv_stream, 4)
641
def _copy_chk_texts(self):
642
# No-op; in this class this happens during _copy_inventory_texts.
645
def _get_filtered_canonicalizing_inv_stream(self, source_vf, keys, message,
646
pb=None, source_chk_vf=None, target_chk_vf=None):
647
"""Filter the texts of inventories, regenerating CHKs to make sure they
650
total_keys = len(keys)
651
target_chk_vf = versionedfile.NoDupeAddLinesDecorator(target_chk_vf)
652
def _filtered_inv_stream():
653
stream = source_vf.get_record_stream(keys, 'groupcompress', True)
654
search_key_name = None
655
for idx, record in enumerate(stream):
656
# Inventories should always be with revisions; assume success.
657
bytes = record.get_bytes_as('fulltext')
658
chk_inv = inventory.CHKInventory.deserialise(
659
source_chk_vf, bytes, record.key)
661
pb.update('inv', idx, total_keys)
662
chk_inv.id_to_entry._ensure_root()
663
if search_key_name is None:
664
# Find the name corresponding to the search_key_func
665
search_key_reg = chk_map.search_key_registry
666
for search_key_name, func in search_key_reg.iteritems():
667
if func == chk_inv.id_to_entry._search_key_func:
669
canonical_inv = inventory.CHKInventory.from_inventory(
670
target_chk_vf, chk_inv,
671
maximum_size=chk_inv.id_to_entry._root_node._maximum_size,
672
search_key_name=search_key_name)
673
if chk_inv.id_to_entry.key() != canonical_inv.id_to_entry.key():
675
'Non-canonical CHK map for id_to_entry of inv: %s '
676
'(root is %s, should be %s)' % (chk_inv.revision_id,
677
chk_inv.id_to_entry.key()[0],
678
canonical_inv.id_to_entry.key()[0]))
679
self._data_changed = True
680
p_id_map = chk_inv.parent_id_basename_to_file_id
681
p_id_map._ensure_root()
682
canon_p_id_map = canonical_inv.parent_id_basename_to_file_id
683
if p_id_map.key() != canon_p_id_map.key():
685
'Non-canonical CHK map for parent_id_to_basename of '
686
'inv: %s (root is %s, should be %s)'
687
% (chk_inv.revision_id, p_id_map.key()[0],
688
canon_p_id_map.key()[0]))
689
self._data_changed = True
690
yield versionedfile.ChunkedContentFactory(record.key,
691
record.parents, record.sha1,
692
canonical_inv.to_lines())
693
# We have finished processing all of the inventory records, we
694
# don't need these sets anymore
695
return _filtered_inv_stream()
697
def _use_pack(self, new_pack):
698
"""Override _use_pack to check for reconcile having changed content."""
699
return new_pack.data_inserted() and self._data_changed
702
class GCRepositoryPackCollection(RepositoryPackCollection):
704
pack_factory = GCPack
705
resumed_pack_factory = ResumedGCPack
706
normal_packer_class = GCCHKPacker
707
optimising_packer_class = GCCHKPacker
709
def _check_new_inventories(self):
710
"""Detect missing inventories or chk root entries for the new revisions
713
:returns: list of strs, summarising any problems found. If the list is
714
empty no problems were found.
716
# Ensure that all revisions added in this write group have:
717
# - corresponding inventories,
718
# - chk root entries for those inventories,
719
# - and any present parent inventories have their chk root
721
# And all this should be independent of any fallback repository.
723
key_deps = self.repo.revisions._index._key_dependencies
724
new_revisions_keys = key_deps.get_new_keys()
725
no_fallback_inv_index = self.repo.inventories._index
726
no_fallback_chk_bytes_index = self.repo.chk_bytes._index
727
no_fallback_texts_index = self.repo.texts._index
728
inv_parent_map = no_fallback_inv_index.get_parent_map(
730
# Are any inventories for corresponding to the new revisions missing?
731
corresponding_invs = set(inv_parent_map)
732
missing_corresponding = set(new_revisions_keys)
733
missing_corresponding.difference_update(corresponding_invs)
734
if missing_corresponding:
735
problems.append("inventories missing for revisions %s" %
736
(sorted(missing_corresponding),))
738
# Are any chk root entries missing for any inventories? This includes
739
# any present parent inventories, which may be used when calculating
740
# deltas for streaming.
741
all_inv_keys = set(corresponding_invs)
742
for parent_inv_keys in inv_parent_map.itervalues():
743
all_inv_keys.update(parent_inv_keys)
744
# Filter out ghost parents.
745
all_inv_keys.intersection_update(
746
no_fallback_inv_index.get_parent_map(all_inv_keys))
747
parent_invs_only_keys = all_inv_keys.symmetric_difference(
750
inv_ids = [key[-1] for key in all_inv_keys]
751
parent_invs_only_ids = [key[-1] for key in parent_invs_only_keys]
752
root_key_info = _build_interesting_key_sets(
753
self.repo, inv_ids, parent_invs_only_ids)
754
expected_chk_roots = root_key_info.all_keys()
755
present_chk_roots = no_fallback_chk_bytes_index.get_parent_map(
757
missing_chk_roots = expected_chk_roots.difference(present_chk_roots)
758
if missing_chk_roots:
759
problems.append("missing referenced chk root keys: %s"
760
% (sorted(missing_chk_roots),))
761
# Don't bother checking any further.
763
# Find all interesting chk_bytes records, and make sure they are
764
# present, as well as the text keys they reference.
765
chk_bytes_no_fallbacks = self.repo.chk_bytes.without_fallbacks()
766
chk_bytes_no_fallbacks._search_key_func = \
767
self.repo.chk_bytes._search_key_func
768
chk_diff = chk_map.iter_interesting_nodes(
769
chk_bytes_no_fallbacks, root_key_info.interesting_root_keys,
770
root_key_info.uninteresting_root_keys)
773
for record in _filter_text_keys(chk_diff, text_keys,
774
chk_map._bytes_to_text_key):
776
except errors.NoSuchRevision, e:
777
# XXX: It would be nice if we could give a more precise error here.
778
problems.append("missing chk node(s) for id_to_entry maps")
779
chk_diff = chk_map.iter_interesting_nodes(
780
chk_bytes_no_fallbacks, root_key_info.interesting_pid_root_keys,
781
root_key_info.uninteresting_pid_root_keys)
783
for interesting_rec, interesting_map in chk_diff:
785
except errors.NoSuchRevision, e:
787
"missing chk node(s) for parent_id_basename_to_file_id maps")
788
present_text_keys = no_fallback_texts_index.get_parent_map(text_keys)
789
missing_text_keys = text_keys.difference(present_text_keys)
790
if missing_text_keys:
791
problems.append("missing text keys: %r"
792
% (sorted(missing_text_keys),))
796
class CHKInventoryRepository(PackRepository):
797
"""subclass of PackRepository that uses CHK based inventories."""
799
def __init__(self, _format, a_bzrdir, control_files, _commit_builder_class,
801
"""Overridden to change pack collection class."""
802
super(CHKInventoryRepository, self).__init__(_format, a_bzrdir,
803
control_files, _commit_builder_class, _serializer)
804
index_transport = self._transport.clone('indices')
805
self._pack_collection = GCRepositoryPackCollection(self,
806
self._transport, index_transport,
807
self._transport.clone('upload'),
808
self._transport.clone('packs'),
809
_format.index_builder_class,
811
use_chk_index=self._format.supports_chks,
813
self.inventories = GroupCompressVersionedFiles(
814
_GCGraphIndex(self._pack_collection.inventory_index.combined_index,
815
add_callback=self._pack_collection.inventory_index.add_callback,
816
parents=True, is_locked=self.is_locked,
817
inconsistency_fatal=False),
818
access=self._pack_collection.inventory_index.data_access)
819
self.revisions = GroupCompressVersionedFiles(
820
_GCGraphIndex(self._pack_collection.revision_index.combined_index,
821
add_callback=self._pack_collection.revision_index.add_callback,
822
parents=True, is_locked=self.is_locked,
823
track_external_parent_refs=True, track_new_keys=True),
824
access=self._pack_collection.revision_index.data_access,
826
self.signatures = GroupCompressVersionedFiles(
827
_GCGraphIndex(self._pack_collection.signature_index.combined_index,
828
add_callback=self._pack_collection.signature_index.add_callback,
829
parents=False, is_locked=self.is_locked,
830
inconsistency_fatal=False),
831
access=self._pack_collection.signature_index.data_access,
833
self.texts = GroupCompressVersionedFiles(
834
_GCGraphIndex(self._pack_collection.text_index.combined_index,
835
add_callback=self._pack_collection.text_index.add_callback,
836
parents=True, is_locked=self.is_locked,
837
inconsistency_fatal=False),
838
access=self._pack_collection.text_index.data_access)
839
# No parents, individual CHK pages don't have specific ancestry
840
self.chk_bytes = GroupCompressVersionedFiles(
841
_GCGraphIndex(self._pack_collection.chk_index.combined_index,
842
add_callback=self._pack_collection.chk_index.add_callback,
843
parents=False, is_locked=self.is_locked,
844
inconsistency_fatal=False),
845
access=self._pack_collection.chk_index.data_access)
846
search_key_name = self._format._serializer.search_key_name
847
search_key_func = chk_map.search_key_registry.get(search_key_name)
848
self.chk_bytes._search_key_func = search_key_func
849
# True when the repository object is 'write locked' (as opposed to the
850
# physical lock only taken out around changes to the pack-names list.)
851
# Another way to represent this would be a decorator around the control
852
# files object that presents logical locks as physical ones - if this
853
# gets ugly consider that alternative design. RBC 20071011
854
self._write_lock_count = 0
855
self._transaction = None
857
self._reconcile_does_inventory_gc = True
858
self._reconcile_fixes_text_parents = True
859
self._reconcile_backsup_inventory = False
861
def _add_inventory_checked(self, revision_id, inv, parents):
862
"""Add inv to the repository after checking the inputs.
864
This function can be overridden to allow different inventory styles.
866
:seealso: add_inventory, for the contract.
869
serializer = self._format._serializer
870
result = inventory.CHKInventory.from_inventory(self.chk_bytes, inv,
871
maximum_size=serializer.maximum_size,
872
search_key_name=serializer.search_key_name)
873
inv_lines = result.to_lines()
874
return self._inventory_add_lines(revision_id, parents,
875
inv_lines, check_content=False)
877
def _create_inv_from_null(self, delta, revision_id):
878
"""This will mutate new_inv directly.
880
This is a simplified form of create_by_apply_delta which knows that all
881
the old values must be None, so everything is a create.
883
serializer = self._format._serializer
884
new_inv = inventory.CHKInventory(serializer.search_key_name)
885
new_inv.revision_id = revision_id
886
entry_to_bytes = new_inv._entry_to_bytes
887
id_to_entry_dict = {}
888
parent_id_basename_dict = {}
889
for old_path, new_path, file_id, entry in delta:
890
if old_path is not None:
891
raise ValueError('Invalid delta, somebody tried to delete %r'
892
' from the NULL_REVISION'
893
% ((old_path, file_id),))
895
raise ValueError('Invalid delta, delta from NULL_REVISION has'
896
' no new_path %r' % (file_id,))
898
new_inv.root_id = file_id
899
parent_id_basename_key = StaticTuple('', '').intern()
901
utf8_entry_name = entry.name.encode('utf-8')
902
parent_id_basename_key = StaticTuple(entry.parent_id,
903
utf8_entry_name).intern()
904
new_value = entry_to_bytes(entry)
906
# new_inv._path_to_fileid_cache[new_path] = file_id
907
key = StaticTuple(file_id).intern()
908
id_to_entry_dict[key] = new_value
909
parent_id_basename_dict[parent_id_basename_key] = file_id
911
new_inv._populate_from_dicts(self.chk_bytes, id_to_entry_dict,
912
parent_id_basename_dict, maximum_size=serializer.maximum_size)
915
def add_inventory_by_delta(self, basis_revision_id, delta, new_revision_id,
916
parents, basis_inv=None, propagate_caches=False):
917
"""Add a new inventory expressed as a delta against another revision.
919
:param basis_revision_id: The inventory id the delta was created
921
:param delta: The inventory delta (see Inventory.apply_delta for
923
:param new_revision_id: The revision id that the inventory is being
925
:param parents: The revision ids of the parents that revision_id is
926
known to have and are in the repository already. These are supplied
927
for repositories that depend on the inventory graph for revision
928
graph access, as well as for those that pun ancestry with delta
930
:param basis_inv: The basis inventory if it is already known,
932
:param propagate_caches: If True, the caches for this inventory are
933
copied to and updated for the result if possible.
935
:returns: (validator, new_inv)
936
The validator(which is a sha1 digest, though what is sha'd is
937
repository format specific) of the serialized inventory, and the
940
if not self.is_in_write_group():
941
raise AssertionError("%r not in write group" % (self,))
942
_mod_revision.check_not_reserved_id(new_revision_id)
944
if basis_inv is None:
945
if basis_revision_id == _mod_revision.NULL_REVISION:
946
new_inv = self._create_inv_from_null(delta, new_revision_id)
947
if new_inv.root_id is None:
948
raise errors.RootMissing()
949
inv_lines = new_inv.to_lines()
950
return self._inventory_add_lines(new_revision_id, parents,
951
inv_lines, check_content=False), new_inv
953
basis_tree = self.revision_tree(basis_revision_id)
954
basis_tree.lock_read()
955
basis_inv = basis_tree.inventory
957
result = basis_inv.create_by_apply_delta(delta, new_revision_id,
958
propagate_caches=propagate_caches)
959
inv_lines = result.to_lines()
960
return self._inventory_add_lines(new_revision_id, parents,
961
inv_lines, check_content=False), result
963
if basis_tree is not None:
966
def _deserialise_inventory(self, revision_id, bytes):
967
return inventory.CHKInventory.deserialise(self.chk_bytes, bytes,
970
def _iter_inventories(self, revision_ids, ordering):
971
"""Iterate over many inventory objects."""
973
ordering = 'unordered'
974
keys = [(revision_id,) for revision_id in revision_ids]
975
stream = self.inventories.get_record_stream(keys, ordering, True)
977
for record in stream:
978
if record.storage_kind != 'absent':
979
texts[record.key] = record.get_bytes_as('fulltext')
981
texts[record.key] = None
985
yield (None, key[-1])
987
yield (inventory.CHKInventory.deserialise(
988
self.chk_bytes, bytes, key), key[-1])
990
def _get_inventory_xml(self, revision_id):
991
"""Get serialized inventory as a string."""
992
# Without a native 'xml' inventory, this method doesn't make sense.
993
# However older working trees, and older bundles want it - so we supply
994
# it allowing _get_inventory_xml to work. Bundles currently use the
995
# serializer directly; this also isn't ideal, but there isn't an xml
996
# iteration interface offered at all for repositories.
997
return self._serializer.write_inventory_to_string(
998
self.get_inventory(revision_id))
1000
def _find_present_inventory_keys(self, revision_keys):
1001
parent_map = self.inventories.get_parent_map(revision_keys)
1002
present_inventory_keys = set(k for k in parent_map)
1003
return present_inventory_keys
1005
def fileids_altered_by_revision_ids(self, revision_ids, _inv_weave=None):
1006
"""Find the file ids and versions affected by revisions.
1008
:param revisions: an iterable containing revision ids.
1009
:param _inv_weave: The inventory weave from this repository or None.
1010
If None, the inventory weave will be opened automatically.
1011
:return: a dictionary mapping altered file-ids to an iterable of
1012
revision_ids. Each altered file-ids has the exact revision_ids that
1013
altered it listed explicitly.
1015
rich_root = self.supports_rich_root()
1016
bytes_to_info = inventory.CHKInventory._bytes_to_utf8name_key
1017
file_id_revisions = {}
1018
pb = ui.ui_factory.nested_progress_bar()
1020
revision_keys = [(r,) for r in revision_ids]
1021
parent_keys = self._find_parent_keys_of_revisions(revision_keys)
1022
# TODO: instead of using _find_present_inventory_keys, change the
1023
# code paths to allow missing inventories to be tolerated.
1024
# However, we only want to tolerate missing parent
1025
# inventories, not missing inventories for revision_ids
1026
present_parent_inv_keys = self._find_present_inventory_keys(
1028
present_parent_inv_ids = set(
1029
[k[-1] for k in present_parent_inv_keys])
1030
inventories_to_read = set(revision_ids)
1031
inventories_to_read.update(present_parent_inv_ids)
1032
root_key_info = _build_interesting_key_sets(
1033
self, inventories_to_read, present_parent_inv_ids)
1034
interesting_root_keys = root_key_info.interesting_root_keys
1035
uninteresting_root_keys = root_key_info.uninteresting_root_keys
1036
chk_bytes = self.chk_bytes
1037
for record, items in chk_map.iter_interesting_nodes(chk_bytes,
1038
interesting_root_keys, uninteresting_root_keys,
1040
for name, bytes in items:
1041
(name_utf8, file_id, revision_id) = bytes_to_info(bytes)
1042
# TODO: consider interning file_id, revision_id here, or
1043
# pushing that intern() into bytes_to_info()
1044
# TODO: rich_root should always be True here, for all
1045
# repositories that support chk_bytes
1046
if not rich_root and name_utf8 == '':
1049
file_id_revisions[file_id].add(revision_id)
1051
file_id_revisions[file_id] = set([revision_id])
1054
return file_id_revisions
1056
def find_text_key_references(self):
1057
"""Find the text key references within the repository.
1059
:return: A dictionary mapping text keys ((fileid, revision_id) tuples)
1060
to whether they were referred to by the inventory of the
1061
revision_id that they contain. The inventory texts from all present
1062
revision ids are assessed to generate this report.
1064
# XXX: Slow version but correct: rewrite as a series of delta
1065
# examinations/direct tree traversal. Note that that will require care
1066
# as a common node is reachable both from the inventory that added it,
1067
# and others afterwards.
1068
revision_keys = self.revisions.keys()
1070
rich_roots = self.supports_rich_root()
1071
pb = ui.ui_factory.nested_progress_bar()
1073
all_revs = self.all_revision_ids()
1074
total = len(all_revs)
1075
for pos, inv in enumerate(self.iter_inventories(all_revs)):
1076
pb.update("Finding text references", pos, total)
1077
for _, entry in inv.iter_entries():
1078
if not rich_roots and entry.file_id == inv.root_id:
1080
key = (entry.file_id, entry.revision)
1081
result.setdefault(key, False)
1082
if entry.revision == inv.revision_id:
1089
def reconcile_canonicalize_chks(self):
1090
"""Reconcile this repository to make sure all CHKs are in canonical
1093
from bzrlib.reconcile import PackReconciler
1094
reconciler = PackReconciler(self, thorough=True, canonicalize_chks=True)
1095
reconciler.reconcile()
1098
def _reconcile_pack(self, collection, packs, extension, revs, pb):
1099
packer = GCCHKReconcilePacker(collection, packs, extension)
1100
return packer.pack(pb)
1102
def _canonicalize_chks_pack(self, collection, packs, extension, revs, pb):
1103
packer = GCCHKCanonicalizingPacker(collection, packs, extension, revs)
1104
return packer.pack(pb)
1106
def _get_source(self, to_format):
1107
"""Return a source for streaming from this repository."""
1108
if self._format._serializer == to_format._serializer:
1109
# We must be exactly the same format, otherwise stuff like the chk
1110
# page layout might be different.
1111
# Actually, this test is just slightly looser than exact so that
1112
# CHK2 <-> 2a transfers will work.
1113
return GroupCHKStreamSource(self, to_format)
1114
return super(CHKInventoryRepository, self)._get_source(to_format)
1116
def _find_inconsistent_revision_parents(self, revisions_iterator=None):
1117
"""Find revisions with different parent lists in the revision object
1118
and in the index graph.
1120
:param revisions_iterator: None, or an iterator of (revid,
1121
Revision-or-None). This iterator controls the revisions checked.
1122
:returns: an iterator yielding tuples of (revison-id, parents-in-index,
1123
parents-in-revision).
1125
if not self.is_locked():
1126
raise AssertionError()
1128
if revisions_iterator is None:
1129
revisions_iterator = self._iter_revisions(None)
1130
for revid, revision in revisions_iterator:
1131
if revision is None:
1133
parent_map = vf.get_parent_map([(revid,)])
1134
parents_according_to_index = tuple(parent[-1] for parent in
1135
parent_map[(revid,)])
1136
parents_according_to_revision = tuple(revision.parent_ids)
1137
if parents_according_to_index != parents_according_to_revision:
1138
yield (revid, parents_according_to_index,
1139
parents_according_to_revision)
1141
def _check_for_inconsistent_revision_parents(self):
1142
inconsistencies = list(self._find_inconsistent_revision_parents())
1144
raise errors.BzrCheckError(
1145
"Revision index has inconsistent parents.")
1148
class GroupCHKStreamSource(StreamSource):
1149
"""Used when both the source and target repo are GroupCHK repos."""
1151
def __init__(self, from_repository, to_format):
1152
"""Create a StreamSource streaming from from_repository."""
1153
super(GroupCHKStreamSource, self).__init__(from_repository, to_format)
1154
self._revision_keys = None
1155
self._text_keys = None
1156
self._text_fetch_order = 'groupcompress'
1157
self._chk_id_roots = None
1158
self._chk_p_id_roots = None
1160
def _get_inventory_stream(self, inventory_keys, allow_absent=False):
1161
"""Get a stream of inventory texts.
1163
When this function returns, self._chk_id_roots and self._chk_p_id_roots
1164
should be populated.
1166
self._chk_id_roots = []
1167
self._chk_p_id_roots = []
1168
def _filtered_inv_stream():
1169
id_roots_set = set()
1170
p_id_roots_set = set()
1171
source_vf = self.from_repository.inventories
1172
stream = source_vf.get_record_stream(inventory_keys,
1173
'groupcompress', True)
1174
for record in stream:
1175
if record.storage_kind == 'absent':
1179
raise errors.NoSuchRevision(self, record.key)
1180
bytes = record.get_bytes_as('fulltext')
1181
chk_inv = inventory.CHKInventory.deserialise(None, bytes,
1183
key = chk_inv.id_to_entry.key()
1184
if key not in id_roots_set:
1185
self._chk_id_roots.append(key)
1186
id_roots_set.add(key)
1187
p_id_map = chk_inv.parent_id_basename_to_file_id
1188
if p_id_map is None:
1189
raise AssertionError('Parent id -> file_id map not set')
1190
key = p_id_map.key()
1191
if key not in p_id_roots_set:
1192
p_id_roots_set.add(key)
1193
self._chk_p_id_roots.append(key)
1195
# We have finished processing all of the inventory records, we
1196
# don't need these sets anymore
1197
id_roots_set.clear()
1198
p_id_roots_set.clear()
1199
return ('inventories', _filtered_inv_stream())
1201
def _get_filtered_chk_streams(self, excluded_revision_keys):
1202
self._text_keys = set()
1203
excluded_revision_keys.discard(_mod_revision.NULL_REVISION)
1204
if not excluded_revision_keys:
1205
uninteresting_root_keys = set()
1206
uninteresting_pid_root_keys = set()
1208
# filter out any excluded revisions whose inventories are not
1210
# TODO: Update Repository.iter_inventories() to add
1211
# ignore_missing=True
1212
present_keys = self.from_repository._find_present_inventory_keys(
1213
excluded_revision_keys)
1214
present_ids = [k[-1] for k in present_keys]
1215
uninteresting_root_keys = set()
1216
uninteresting_pid_root_keys = set()
1217
for inv in self.from_repository.iter_inventories(present_ids):
1218
uninteresting_root_keys.add(inv.id_to_entry.key())
1219
uninteresting_pid_root_keys.add(
1220
inv.parent_id_basename_to_file_id.key())
1221
chk_bytes = self.from_repository.chk_bytes
1222
def _filter_id_to_entry():
1223
interesting_nodes = chk_map.iter_interesting_nodes(chk_bytes,
1224
self._chk_id_roots, uninteresting_root_keys)
1225
for record in _filter_text_keys(interesting_nodes, self._text_keys,
1226
chk_map._bytes_to_text_key):
1227
if record is not None:
1230
self._chk_id_roots = None
1231
yield 'chk_bytes', _filter_id_to_entry()
1232
def _get_parent_id_basename_to_file_id_pages():
1233
for record, items in chk_map.iter_interesting_nodes(chk_bytes,
1234
self._chk_p_id_roots, uninteresting_pid_root_keys):
1235
if record is not None:
1238
self._chk_p_id_roots = None
1239
yield 'chk_bytes', _get_parent_id_basename_to_file_id_pages()
1241
def _get_text_stream(self):
1242
# Note: We know we don't have to handle adding root keys, because both
1243
# the source and target are the identical network name.
1244
text_stream = self.from_repository.texts.get_record_stream(
1245
self._text_keys, self._text_fetch_order, False)
1246
return ('texts', text_stream)
1248
def get_stream(self, search):
1249
def wrap_and_count(pb, rc, stream):
1250
"""Yield records from stream while showing progress."""
1252
for record in stream:
1253
if count == rc.STEP:
1255
pb.update('Estimate', rc.current, rc.max)
1260
revision_ids = search.get_keys()
1261
pb = ui.ui_factory.nested_progress_bar()
1262
rc = self._record_counter
1263
self._record_counter.setup(len(revision_ids))
1264
for stream_info in self._fetch_revision_texts(revision_ids):
1265
yield (stream_info[0],
1266
wrap_and_count(pb, rc, stream_info[1]))
1267
self._revision_keys = [(rev_id,) for rev_id in revision_ids]
1268
# TODO: The keys to exclude might be part of the search recipe
1269
# For now, exclude all parents that are at the edge of ancestry, for
1270
# which we have inventories
1271
from_repo = self.from_repository
1272
parent_keys = from_repo._find_parent_keys_of_revisions(
1273
self._revision_keys)
1274
self.from_repository.revisions.clear_cache()
1275
self.from_repository.signatures.clear_cache()
1276
# Clear the repo's get_parent_map cache too.
1277
self.from_repository._unstacked_provider.disable_cache()
1278
self.from_repository._unstacked_provider.enable_cache()
1279
s = self._get_inventory_stream(self._revision_keys)
1280
yield (s[0], wrap_and_count(pb, rc, s[1]))
1281
self.from_repository.inventories.clear_cache()
1282
for stream_info in self._get_filtered_chk_streams(parent_keys):
1283
yield (stream_info[0], wrap_and_count(pb, rc, stream_info[1]))
1284
self.from_repository.chk_bytes.clear_cache()
1285
s = self._get_text_stream()
1286
yield (s[0], wrap_and_count(pb, rc, s[1]))
1287
self.from_repository.texts.clear_cache()
1288
pb.update('Done', rc.max, rc.max)
1291
def get_stream_for_missing_keys(self, missing_keys):
1292
# missing keys can only occur when we are byte copying and not
1293
# translating (because translation means we don't send
1294
# unreconstructable deltas ever).
1295
missing_inventory_keys = set()
1296
for key in missing_keys:
1297
if key[0] != 'inventories':
1298
raise AssertionError('The only missing keys we should'
1299
' be filling in are inventory keys, not %s'
1301
missing_inventory_keys.add(key[1:])
1302
if self._chk_id_roots or self._chk_p_id_roots:
1303
raise AssertionError('Cannot call get_stream_for_missing_keys'
1304
' until all of get_stream() has been consumed.')
1305
# Yield the inventory stream, so we can find the chk stream
1306
# Some of the missing_keys will be missing because they are ghosts.
1307
# As such, we can ignore them. The Sink is required to verify there are
1308
# no unavailable texts when the ghost inventories are not filled in.
1309
yield self._get_inventory_stream(missing_inventory_keys,
1311
# We use the empty set for excluded_revision_keys, to make it clear
1312
# that we want to transmit all referenced chk pages.
1313
for stream_info in self._get_filtered_chk_streams(set()):
1317
class _InterestingKeyInfo(object):
1319
self.interesting_root_keys = set()
1320
self.interesting_pid_root_keys = set()
1321
self.uninteresting_root_keys = set()
1322
self.uninteresting_pid_root_keys = set()
1324
def all_interesting(self):
1325
return self.interesting_root_keys.union(self.interesting_pid_root_keys)
1327
def all_uninteresting(self):
1328
return self.uninteresting_root_keys.union(
1329
self.uninteresting_pid_root_keys)
1332
return self.all_interesting().union(self.all_uninteresting())
1335
def _build_interesting_key_sets(repo, inventory_ids, parent_only_inv_ids):
1336
result = _InterestingKeyInfo()
1337
for inv in repo.iter_inventories(inventory_ids, 'unordered'):
1338
root_key = inv.id_to_entry.key()
1339
pid_root_key = inv.parent_id_basename_to_file_id.key()
1340
if inv.revision_id in parent_only_inv_ids:
1341
result.uninteresting_root_keys.add(root_key)
1342
result.uninteresting_pid_root_keys.add(pid_root_key)
1344
result.interesting_root_keys.add(root_key)
1345
result.interesting_pid_root_keys.add(pid_root_key)
1349
def _filter_text_keys(interesting_nodes_iterable, text_keys, bytes_to_text_key):
1350
"""Iterate the result of iter_interesting_nodes, yielding the records
1351
and adding to text_keys.
1353
text_keys_update = text_keys.update
1354
for record, items in interesting_nodes_iterable:
1355
text_keys_update([bytes_to_text_key(b) for n,b in items])
1359
class RepositoryFormat2a(RepositoryFormatPack):
1360
"""A CHK repository that uses the bencode revision serializer."""
1362
repository_class = CHKInventoryRepository
1363
supports_external_lookups = True
1364
supports_chks = True
1365
_commit_builder_class = PackRootCommitBuilder
1366
rich_root_data = True
1367
_serializer = chk_serializer.chk_bencode_serializer
1368
_commit_inv_deltas = True
1369
# What index classes to use
1370
index_builder_class = BTreeBuilder
1371
index_class = BTreeGraphIndex
1372
# Note: We cannot unpack a delta that references a text we haven't
1373
# seen yet. There are 2 options, work in fulltexts, or require
1374
# topological sorting. Using fulltexts is more optimal for local
1375
# operations, because the source can be smart about extracting
1376
# multiple in-a-row (and sharing strings). Topological is better
1377
# for remote, because we access less data.
1378
_fetch_order = 'unordered'
1379
_fetch_uses_deltas = False # essentially ignored by the groupcompress code.
1381
pack_compresses = True
1383
def _get_matching_bzrdir(self):
1384
return bzrdir.format_registry.make_bzrdir('2a')
1386
def _ignore_setting_bzrdir(self, format):
1389
_matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
1391
def get_format_string(self):
1392
return ('Bazaar repository format 2a (needs bzr 1.16 or later)\n')
1394
def get_format_description(self):
1395
"""See RepositoryFormat.get_format_description()."""
1396
return ("Repository format 2a - rich roots, group compression"
1397
" and chk inventories")
1400
class RepositoryFormat2aSubtree(RepositoryFormat2a):
1401
"""A 2a repository format that supports nested trees.
1405
def _get_matching_bzrdir(self):
1406
return bzrdir.format_registry.make_bzrdir('development-subtree')
1408
def _ignore_setting_bzrdir(self, format):
1411
_matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
1413
def get_format_string(self):
1414
return ('Bazaar development format 8\n')
1416
def get_format_description(self):
1417
"""See RepositoryFormat.get_format_description()."""
1418
return ("Development repository format 8 - nested trees, "
1419
"group compression and chk inventories")
1422
supports_tree_reference = True