1
# Copyright (C) 2008-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Repository formats using CHK inventories and groupcompress compression."""
19
from __future__ import absolute_import
33
revision as _mod_revision,
38
from bzrlib.btree_index import (
42
from bzrlib.decorators import needs_write_lock
43
from bzrlib.groupcompress import (
45
GroupCompressVersionedFiles,
47
from bzrlib.repofmt.pack_repo import (
52
PackRootCommitBuilder,
53
RepositoryPackCollection,
58
from bzrlib.vf_repository import (
61
from bzrlib.static_tuple import StaticTuple
64
class GCPack(NewPack):
66
def __init__(self, pack_collection, upload_suffix='', file_mode=None):
67
"""Create a NewPack instance.
69
:param pack_collection: A PackCollection into which this is being
71
:param upload_suffix: An optional suffix to be given to any temporary
72
files created during the pack creation. e.g '.autopack'
73
:param file_mode: An optional file mode to create the new files with.
75
# replaced from NewPack to:
76
# - change inventory reference list length to 1
77
# - change texts reference lists to 1
78
# TODO: patch this to be parameterised
80
# The relative locations of the packs are constrained, but all are
81
# passed in because the caller has them, so as to avoid object churn.
82
index_builder_class = pack_collection._index_builder_class
84
if pack_collection.chk_index is not None:
85
chk_index = index_builder_class(reference_lists=0)
89
# Revisions: parents list, no text compression.
90
index_builder_class(reference_lists=1),
91
# Inventory: We want to map compression only, but currently the
92
# knit code hasn't been updated enough to understand that, so we
93
# have a regular 2-list index giving parents and compression
95
index_builder_class(reference_lists=1),
96
# Texts: per file graph, for all fileids - so one reference list
97
# and two elements in the key tuple.
98
index_builder_class(reference_lists=1, key_elements=2),
99
# Signatures: Just blobs to store, no compression, no parents
101
index_builder_class(reference_lists=0),
102
# CHK based storage - just blobs, no compression or parents.
105
self._pack_collection = pack_collection
106
# When we make readonly indices, we need this.
107
self.index_class = pack_collection._index_class
108
# where should the new pack be opened
109
self.upload_transport = pack_collection._upload_transport
110
# where are indices written out to
111
self.index_transport = pack_collection._index_transport
112
# where is the pack renamed to when it is finished?
113
self.pack_transport = pack_collection._pack_transport
114
# What file mode to upload the pack and indices with.
115
self._file_mode = file_mode
116
# tracks the content written to the .pack file.
117
self._hash = osutils.md5()
118
# a four-tuple with the length in bytes of the indices, once the pack
119
# is finalised. (rev, inv, text, sigs)
120
self.index_sizes = None
121
# How much data to cache when writing packs. Note that this is not
122
# synchronised with reads, because it's not in the transport layer, so
123
# is not safe unless the client knows it won't be reading from the pack
125
self._cache_limit = 0
126
# the temporary pack file name.
127
self.random_name = osutils.rand_chars(20) + upload_suffix
128
# when was this pack started ?
129
self.start_time = time.time()
130
# open an output stream for the data added to the pack.
131
self.write_stream = self.upload_transport.open_write_stream(
132
self.random_name, mode=self._file_mode)
133
if 'pack' in debug.debug_flags:
134
trace.mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
135
time.ctime(), self.upload_transport.base, self.random_name,
136
time.time() - self.start_time)
137
# A list of byte sequences to be written to the new pack, and the
138
# aggregate size of them. Stored as a list rather than separate
139
# variables so that the _write_data closure below can update them.
140
self._buffer = [[], 0]
141
# create a callable for adding data
143
# robertc says- this is a closure rather than a method on the object
144
# so that the variables are locals, and faster than accessing object
146
def _write_data(bytes, flush=False, _buffer=self._buffer,
147
_write=self.write_stream.write, _update=self._hash.update):
148
_buffer[0].append(bytes)
149
_buffer[1] += len(bytes)
151
if _buffer[1] > self._cache_limit or flush:
152
bytes = ''.join(_buffer[0])
156
# expose this on self, for the occasion when clients want to add data.
157
self._write_data = _write_data
158
# a pack writer object to serialise pack records.
159
self._writer = pack.ContainerWriter(self._write_data)
161
# what state is the pack in? (open, finished, aborted)
163
# no name until we finish writing the content
166
def _check_references(self):
167
"""Make sure our external references are present.
169
Packs are allowed to have deltas whose base is not in the pack, but it
170
must be present somewhere in this collection. It is not allowed to
171
have deltas based on a fallback repository.
172
(See <https://bugs.launchpad.net/bzr/+bug/288751>)
174
# Groupcompress packs don't have any external references, arguably CHK
175
# pages have external references, but we cannot 'cheaply' determine
176
# them without actually walking all of the chk pages.
179
class ResumedGCPack(ResumedPack):
181
def _check_references(self):
182
"""Make sure our external compression parents are present."""
183
# See GCPack._check_references for why this is empty
185
def _get_external_refs(self, index):
186
# GC repositories don't have compression parents external to a given
191
class GCCHKPacker(Packer):
192
"""This class understand what it takes to collect a GCCHK repo."""
194
def __init__(self, pack_collection, packs, suffix, revision_ids=None,
196
super(GCCHKPacker, self).__init__(pack_collection, packs, suffix,
197
revision_ids=revision_ids,
198
reload_func=reload_func)
199
self._pack_collection = pack_collection
200
# ATM, We only support this for GCCHK repositories
201
if pack_collection.chk_index is None:
202
raise AssertionError('pack_collection.chk_index should not be None')
203
self._gather_text_refs = False
204
self._chk_id_roots = []
205
self._chk_p_id_roots = []
206
self._text_refs = None
207
# set by .pack() if self.revision_ids is not None
208
self.revision_keys = None
210
def _get_progress_stream(self, source_vf, keys, message, pb):
212
substream = source_vf.get_record_stream(keys, 'groupcompress', True)
213
for idx, record in enumerate(substream):
215
pb.update(message, idx + 1, len(keys))
219
def _get_filtered_inv_stream(self, source_vf, keys, message, pb=None):
220
"""Filter the texts of inventories, to find the chk pages."""
221
total_keys = len(keys)
222
def _filtered_inv_stream():
224
p_id_roots_set = set()
225
stream = source_vf.get_record_stream(keys, 'groupcompress', True)
226
for idx, record in enumerate(stream):
227
# Inventories should always be with revisions; assume success.
228
bytes = record.get_bytes_as('fulltext')
229
chk_inv = inventory.CHKInventory.deserialise(None, bytes,
232
pb.update('inv', idx, total_keys)
233
key = chk_inv.id_to_entry.key()
234
if key not in id_roots_set:
235
self._chk_id_roots.append(key)
236
id_roots_set.add(key)
237
p_id_map = chk_inv.parent_id_basename_to_file_id
239
raise AssertionError('Parent id -> file_id map not set')
241
if key not in p_id_roots_set:
242
p_id_roots_set.add(key)
243
self._chk_p_id_roots.append(key)
245
# We have finished processing all of the inventory records, we
246
# don't need these sets anymore
248
p_id_roots_set.clear()
249
return _filtered_inv_stream()
251
def _get_chk_streams(self, source_vf, keys, pb=None):
252
# We want to stream the keys from 'id_roots', and things they
253
# reference, and then stream things from p_id_roots and things they
254
# reference, and then any remaining keys that we didn't get to.
256
# We also group referenced texts together, so if one root references a
257
# text with prefix 'a', and another root references a node with prefix
258
# 'a', we want to yield those nodes before we yield the nodes for 'b'
259
# This keeps 'similar' nodes together.
261
# Note: We probably actually want multiple streams here, to help the
262
# client understand that the different levels won't compress well
263
# against each other.
264
# Test the difference between using one Group per level, and
265
# using 1 Group per prefix. (so '' (root) would get a group, then
266
# all the references to search-key 'a' would get a group, etc.)
267
total_keys = len(keys)
268
remaining_keys = set(keys)
270
if self._gather_text_refs:
271
self._text_refs = set()
272
def _get_referenced_stream(root_keys, parse_leaf_nodes=False):
275
keys_by_search_prefix = {}
276
remaining_keys.difference_update(cur_keys)
278
def handle_internal_node(node):
279
for prefix, value in node._items.iteritems():
280
# We don't want to request the same key twice, and we
281
# want to order it by the first time it is seen.
282
# Even further, we don't want to request a key which is
283
# not in this group of pack files (it should be in the
284
# repo, but it doesn't have to be in the group being
286
# TODO: consider how to treat externally referenced chk
287
# pages as 'external_references' so that we
288
# always fill them in for stacked branches
289
if value not in next_keys and value in remaining_keys:
290
keys_by_search_prefix.setdefault(prefix,
293
def handle_leaf_node(node):
294
# Store is None, because we know we have a LeafNode, and we
295
# just want its entries
296
for file_id, bytes in node.iteritems(None):
297
self._text_refs.add(chk_map._bytes_to_text_key(bytes))
299
stream = source_vf.get_record_stream(cur_keys,
300
'as-requested', True)
301
for record in stream:
302
if record.storage_kind == 'absent':
303
# An absent CHK record: we assume that the missing
304
# record is in a different pack - e.g. a page not
305
# altered by the commit we're packing.
307
bytes = record.get_bytes_as('fulltext')
308
# We don't care about search_key_func for this code,
309
# because we only care about external references.
310
node = chk_map._deserialise(bytes, record.key,
311
search_key_func=None)
312
common_base = node._search_prefix
313
if isinstance(node, chk_map.InternalNode):
314
handle_internal_node(node)
315
elif parse_leaf_nodes:
316
handle_leaf_node(node)
319
pb.update('chk node', counter[0], total_keys)
322
# Double check that we won't be emitting any keys twice
323
# If we get rid of the pre-calculation of all keys, we could
324
# turn this around and do
325
# next_keys.difference_update(seen_keys)
326
# However, we also may have references to chk pages in another
327
# pack file during autopack. We filter earlier, so we should no
328
# longer need to do this
329
# next_keys = next_keys.intersection(remaining_keys)
331
for prefix in sorted(keys_by_search_prefix):
332
cur_keys.extend(keys_by_search_prefix.pop(prefix))
333
for stream in _get_referenced_stream(self._chk_id_roots,
334
self._gather_text_refs):
336
del self._chk_id_roots
337
# while it isn't really possible for chk_id_roots to not be in the
338
# local group of packs, it is possible that the tree shape has not
339
# changed recently, so we need to filter _chk_p_id_roots by the
341
chk_p_id_roots = [key for key in self._chk_p_id_roots
342
if key in remaining_keys]
343
del self._chk_p_id_roots
344
for stream in _get_referenced_stream(chk_p_id_roots, False):
347
trace.mutter('There were %d keys in the chk index, %d of which'
348
' were not referenced', total_keys,
350
if self.revision_ids is None:
351
stream = source_vf.get_record_stream(remaining_keys,
355
def _build_vf(self, index_name, parents, delta, for_write=False):
356
"""Build a VersionedFiles instance on top of this group of packs."""
357
index_name = index_name + '_index'
359
access = _DirectPackAccess(index_to_pack,
360
reload_func=self._reload_func)
363
if self.new_pack is None:
364
raise AssertionError('No new pack has been set')
365
index = getattr(self.new_pack, index_name)
366
index_to_pack[index] = self.new_pack.access_tuple()
367
index.set_optimize(for_size=True)
368
access.set_writer(self.new_pack._writer, index,
369
self.new_pack.access_tuple())
370
add_callback = index.add_nodes
373
for pack in self.packs:
374
sub_index = getattr(pack, index_name)
375
index_to_pack[sub_index] = pack.access_tuple()
376
indices.append(sub_index)
377
index = _mod_index.CombinedGraphIndex(indices)
379
vf = GroupCompressVersionedFiles(
381
add_callback=add_callback,
383
is_locked=self._pack_collection.repo.is_locked),
388
def _build_vfs(self, index_name, parents, delta):
389
"""Build the source and target VersionedFiles."""
390
source_vf = self._build_vf(index_name, parents,
391
delta, for_write=False)
392
target_vf = self._build_vf(index_name, parents,
393
delta, for_write=True)
394
return source_vf, target_vf
396
def _copy_stream(self, source_vf, target_vf, keys, message, vf_to_stream,
398
trace.mutter('repacking %d %s', len(keys), message)
399
self.pb.update('repacking %s' % (message,), pb_offset)
400
child_pb = ui.ui_factory.nested_progress_bar()
402
stream = vf_to_stream(source_vf, keys, message, child_pb)
403
for _ in target_vf._insert_record_stream(stream,
410
def _copy_revision_texts(self):
411
source_vf, target_vf = self._build_vfs('revision', True, False)
412
if not self.revision_keys:
413
# We are doing a full fetch, aka 'pack'
414
self.revision_keys = source_vf.keys()
415
self._copy_stream(source_vf, target_vf, self.revision_keys,
416
'revisions', self._get_progress_stream, 1)
418
def _copy_inventory_texts(self):
419
source_vf, target_vf = self._build_vfs('inventory', True, True)
420
# It is not sufficient to just use self.revision_keys, as stacked
421
# repositories can have more inventories than they have revisions.
422
# One alternative would be to do something with
423
# get_parent_map(self.revision_keys), but that shouldn't be any faster
425
inventory_keys = source_vf.keys()
426
missing_inventories = set(self.revision_keys).difference(inventory_keys)
427
if missing_inventories:
428
# Go back to the original repo, to see if these are really missing
429
# https://bugs.launchpad.net/bzr/+bug/437003
430
# If we are packing a subset of the repo, it is fine to just have
431
# the data in another Pack file, which is not included in this pack
433
inv_index = self._pack_collection.repo.inventories._index
434
pmap = inv_index.get_parent_map(missing_inventories)
435
really_missing = missing_inventories.difference(pmap)
437
missing_inventories = sorted(really_missing)
438
raise ValueError('We are missing inventories for revisions: %s'
439
% (missing_inventories,))
440
self._copy_stream(source_vf, target_vf, inventory_keys,
441
'inventories', self._get_filtered_inv_stream, 2)
443
def _get_chk_vfs_for_copy(self):
444
return self._build_vfs('chk', False, False)
446
def _copy_chk_texts(self):
447
source_vf, target_vf = self._get_chk_vfs_for_copy()
448
# TODO: This is technically spurious... if it is a performance issue,
450
total_keys = source_vf.keys()
451
trace.mutter('repacking chk: %d id_to_entry roots,'
452
' %d p_id_map roots, %d total keys',
453
len(self._chk_id_roots), len(self._chk_p_id_roots),
455
self.pb.update('repacking chk', 3)
456
child_pb = ui.ui_factory.nested_progress_bar()
458
for stream in self._get_chk_streams(source_vf, total_keys,
460
for _ in target_vf._insert_record_stream(stream,
467
def _copy_text_texts(self):
468
source_vf, target_vf = self._build_vfs('text', True, True)
469
# XXX: We don't walk the chk map to determine referenced (file_id,
470
# revision_id) keys. We don't do it yet because you really need
471
# to filter out the ones that are present in the parents of the
472
# rev just before the ones you are copying, otherwise the filter
473
# is grabbing too many keys...
474
text_keys = source_vf.keys()
475
self._copy_stream(source_vf, target_vf, text_keys,
476
'texts', self._get_progress_stream, 4)
478
def _copy_signature_texts(self):
479
source_vf, target_vf = self._build_vfs('signature', False, False)
480
signature_keys = source_vf.keys()
481
signature_keys.intersection(self.revision_keys)
482
self._copy_stream(source_vf, target_vf, signature_keys,
483
'signatures', self._get_progress_stream, 5)
485
def _create_pack_from_packs(self):
486
self.pb.update('repacking', 0, 7)
487
self.new_pack = self.open_pack()
488
# Is this necessary for GC ?
489
self.new_pack.set_write_cache_size(1024*1024)
490
self._copy_revision_texts()
491
self._copy_inventory_texts()
492
self._copy_chk_texts()
493
self._copy_text_texts()
494
self._copy_signature_texts()
495
self.new_pack._check_references()
496
if not self._use_pack(self.new_pack):
497
self.new_pack.abort()
499
self.new_pack.finish_content()
500
if len(self.packs) == 1:
501
old_pack = self.packs[0]
502
if old_pack.name == self.new_pack._hash.hexdigest():
503
# The single old pack was already optimally packed.
504
trace.mutter('single pack %s was already optimally packed',
506
self.new_pack.abort()
508
self.pb.update('finishing repack', 6, 7)
509
self.new_pack.finish()
510
self._pack_collection.allocate(self.new_pack)
514
class GCCHKReconcilePacker(GCCHKPacker):
515
"""A packer which regenerates indices etc as it copies.
517
This is used by ``bzr reconcile`` to cause parent text pointers to be
521
def __init__(self, *args, **kwargs):
522
super(GCCHKReconcilePacker, self).__init__(*args, **kwargs)
523
self._data_changed = False
524
self._gather_text_refs = True
526
def _copy_inventory_texts(self):
527
source_vf, target_vf = self._build_vfs('inventory', True, True)
528
self._copy_stream(source_vf, target_vf, self.revision_keys,
529
'inventories', self._get_filtered_inv_stream, 2)
530
if source_vf.keys() != self.revision_keys:
531
self._data_changed = True
533
def _copy_text_texts(self):
534
"""generate what texts we should have and then copy."""
535
source_vf, target_vf = self._build_vfs('text', True, True)
536
trace.mutter('repacking %d texts', len(self._text_refs))
537
self.pb.update("repacking texts", 4)
538
# we have three major tasks here:
539
# 1) generate the ideal index
540
repo = self._pack_collection.repo
541
# We want the one we just wrote, so base it on self.new_pack
542
revision_vf = self._build_vf('revision', True, False, for_write=True)
543
ancestor_keys = revision_vf.get_parent_map(revision_vf.keys())
544
# Strip keys back into revision_ids.
545
ancestors = dict((k[0], tuple([p[0] for p in parents]))
546
for k, parents in ancestor_keys.iteritems())
548
# TODO: _generate_text_key_index should be much cheaper to generate from
549
# a chk repository, rather than the current implementation
550
ideal_index = repo._generate_text_key_index(None, ancestors)
551
file_id_parent_map = source_vf.get_parent_map(self._text_refs)
552
# 2) generate a keys list that contains all the entries that can
553
# be used as-is, with corrected parents.
555
new_parent_keys = {} # (key, parent_keys)
557
NULL_REVISION = _mod_revision.NULL_REVISION
558
for key in self._text_refs:
564
ideal_parents = tuple(ideal_index[key])
566
discarded_keys.append(key)
567
self._data_changed = True
569
if ideal_parents == (NULL_REVISION,):
571
source_parents = file_id_parent_map[key]
572
if ideal_parents == source_parents:
576
# We need to change the parent graph, but we don't need to
577
# re-insert the text (since we don't pun the compression
578
# parent with the parents list)
579
self._data_changed = True
580
new_parent_keys[key] = ideal_parents
581
# we're finished with some data.
583
del file_id_parent_map
584
# 3) bulk copy the data, updating records than need it
585
def _update_parents_for_texts():
586
stream = source_vf.get_record_stream(self._text_refs,
587
'groupcompress', False)
588
for record in stream:
589
if record.key in new_parent_keys:
590
record.parents = new_parent_keys[record.key]
592
target_vf.insert_record_stream(_update_parents_for_texts())
594
def _use_pack(self, new_pack):
595
"""Override _use_pack to check for reconcile having changed content."""
596
return new_pack.data_inserted() and self._data_changed
599
class GCCHKCanonicalizingPacker(GCCHKPacker):
600
"""A packer that ensures inventories have canonical-form CHK maps.
602
Ideally this would be part of reconcile, but it's very slow and rarely
603
needed. (It repairs repositories affected by
604
https://bugs.launchpad.net/bzr/+bug/522637).
607
def __init__(self, *args, **kwargs):
608
super(GCCHKCanonicalizingPacker, self).__init__(*args, **kwargs)
609
self._data_changed = False
611
def _exhaust_stream(self, source_vf, keys, message, vf_to_stream, pb_offset):
612
"""Create and exhaust a stream, but don't insert it.
614
This is useful to get the side-effects of generating a stream.
616
self.pb.update('scanning %s' % (message,), pb_offset)
617
child_pb = ui.ui_factory.nested_progress_bar()
619
list(vf_to_stream(source_vf, keys, message, child_pb))
623
def _copy_inventory_texts(self):
624
source_vf, target_vf = self._build_vfs('inventory', True, True)
625
source_chk_vf, target_chk_vf = self._get_chk_vfs_for_copy()
626
inventory_keys = source_vf.keys()
627
# First, copy the existing CHKs on the assumption that most of them
628
# will be correct. This will save us from having to reinsert (and
629
# recompress) these records later at the cost of perhaps preserving a
631
# (Iterate but don't insert _get_filtered_inv_stream to populate the
632
# variables needed by GCCHKPacker._copy_chk_texts.)
633
self._exhaust_stream(source_vf, inventory_keys, 'inventories',
634
self._get_filtered_inv_stream, 2)
635
GCCHKPacker._copy_chk_texts(self)
636
# Now copy and fix the inventories, and any regenerated CHKs.
637
def chk_canonicalizing_inv_stream(source_vf, keys, message, pb=None):
638
return self._get_filtered_canonicalizing_inv_stream(
639
source_vf, keys, message, pb, source_chk_vf, target_chk_vf)
640
self._copy_stream(source_vf, target_vf, inventory_keys,
641
'inventories', chk_canonicalizing_inv_stream, 4)
643
def _copy_chk_texts(self):
644
# No-op; in this class this happens during _copy_inventory_texts.
647
def _get_filtered_canonicalizing_inv_stream(self, source_vf, keys, message,
648
pb=None, source_chk_vf=None, target_chk_vf=None):
649
"""Filter the texts of inventories, regenerating CHKs to make sure they
652
total_keys = len(keys)
653
target_chk_vf = versionedfile.NoDupeAddLinesDecorator(target_chk_vf)
654
def _filtered_inv_stream():
655
stream = source_vf.get_record_stream(keys, 'groupcompress', True)
656
search_key_name = None
657
for idx, record in enumerate(stream):
658
# Inventories should always be with revisions; assume success.
659
bytes = record.get_bytes_as('fulltext')
660
chk_inv = inventory.CHKInventory.deserialise(
661
source_chk_vf, bytes, record.key)
663
pb.update('inv', idx, total_keys)
664
chk_inv.id_to_entry._ensure_root()
665
if search_key_name is None:
666
# Find the name corresponding to the search_key_func
667
search_key_reg = chk_map.search_key_registry
668
for search_key_name, func in search_key_reg.iteritems():
669
if func == chk_inv.id_to_entry._search_key_func:
671
canonical_inv = inventory.CHKInventory.from_inventory(
672
target_chk_vf, chk_inv,
673
maximum_size=chk_inv.id_to_entry._root_node._maximum_size,
674
search_key_name=search_key_name)
675
if chk_inv.id_to_entry.key() != canonical_inv.id_to_entry.key():
677
'Non-canonical CHK map for id_to_entry of inv: %s '
678
'(root is %s, should be %s)' % (chk_inv.revision_id,
679
chk_inv.id_to_entry.key()[0],
680
canonical_inv.id_to_entry.key()[0]))
681
self._data_changed = True
682
p_id_map = chk_inv.parent_id_basename_to_file_id
683
p_id_map._ensure_root()
684
canon_p_id_map = canonical_inv.parent_id_basename_to_file_id
685
if p_id_map.key() != canon_p_id_map.key():
687
'Non-canonical CHK map for parent_id_to_basename of '
688
'inv: %s (root is %s, should be %s)'
689
% (chk_inv.revision_id, p_id_map.key()[0],
690
canon_p_id_map.key()[0]))
691
self._data_changed = True
692
yield versionedfile.ChunkedContentFactory(record.key,
693
record.parents, record.sha1,
694
canonical_inv.to_lines())
695
# We have finished processing all of the inventory records, we
696
# don't need these sets anymore
697
return _filtered_inv_stream()
699
def _use_pack(self, new_pack):
700
"""Override _use_pack to check for reconcile having changed content."""
701
return new_pack.data_inserted() and self._data_changed
704
class GCRepositoryPackCollection(RepositoryPackCollection):
706
pack_factory = GCPack
707
resumed_pack_factory = ResumedGCPack
708
normal_packer_class = GCCHKPacker
709
optimising_packer_class = GCCHKPacker
711
def _check_new_inventories(self):
712
"""Detect missing inventories or chk root entries for the new revisions
715
:returns: list of strs, summarising any problems found. If the list is
716
empty no problems were found.
718
# Ensure that all revisions added in this write group have:
719
# - corresponding inventories,
720
# - chk root entries for those inventories,
721
# - and any present parent inventories have their chk root
723
# And all this should be independent of any fallback repository.
725
key_deps = self.repo.revisions._index._key_dependencies
726
new_revisions_keys = key_deps.get_new_keys()
727
no_fallback_inv_index = self.repo.inventories._index
728
no_fallback_chk_bytes_index = self.repo.chk_bytes._index
729
no_fallback_texts_index = self.repo.texts._index
730
inv_parent_map = no_fallback_inv_index.get_parent_map(
732
# Are any inventories for corresponding to the new revisions missing?
733
corresponding_invs = set(inv_parent_map)
734
missing_corresponding = set(new_revisions_keys)
735
missing_corresponding.difference_update(corresponding_invs)
736
if missing_corresponding:
737
problems.append("inventories missing for revisions %s" %
738
(sorted(missing_corresponding),))
740
# Are any chk root entries missing for any inventories? This includes
741
# any present parent inventories, which may be used when calculating
742
# deltas for streaming.
743
all_inv_keys = set(corresponding_invs)
744
for parent_inv_keys in inv_parent_map.itervalues():
745
all_inv_keys.update(parent_inv_keys)
746
# Filter out ghost parents.
747
all_inv_keys.intersection_update(
748
no_fallback_inv_index.get_parent_map(all_inv_keys))
749
parent_invs_only_keys = all_inv_keys.symmetric_difference(
752
inv_ids = [key[-1] for key in all_inv_keys]
753
parent_invs_only_ids = [key[-1] for key in parent_invs_only_keys]
754
root_key_info = _build_interesting_key_sets(
755
self.repo, inv_ids, parent_invs_only_ids)
756
expected_chk_roots = root_key_info.all_keys()
757
present_chk_roots = no_fallback_chk_bytes_index.get_parent_map(
759
missing_chk_roots = expected_chk_roots.difference(present_chk_roots)
760
if missing_chk_roots:
761
problems.append("missing referenced chk root keys: %s"
762
% (sorted(missing_chk_roots),))
763
# Don't bother checking any further.
765
# Find all interesting chk_bytes records, and make sure they are
766
# present, as well as the text keys they reference.
767
chk_bytes_no_fallbacks = self.repo.chk_bytes.without_fallbacks()
768
chk_bytes_no_fallbacks._search_key_func = \
769
self.repo.chk_bytes._search_key_func
770
chk_diff = chk_map.iter_interesting_nodes(
771
chk_bytes_no_fallbacks, root_key_info.interesting_root_keys,
772
root_key_info.uninteresting_root_keys)
775
for record in _filter_text_keys(chk_diff, text_keys,
776
chk_map._bytes_to_text_key):
778
except errors.NoSuchRevision, e:
779
# XXX: It would be nice if we could give a more precise error here.
780
problems.append("missing chk node(s) for id_to_entry maps")
781
chk_diff = chk_map.iter_interesting_nodes(
782
chk_bytes_no_fallbacks, root_key_info.interesting_pid_root_keys,
783
root_key_info.uninteresting_pid_root_keys)
785
for interesting_rec, interesting_map in chk_diff:
787
except errors.NoSuchRevision, e:
789
"missing chk node(s) for parent_id_basename_to_file_id maps")
790
present_text_keys = no_fallback_texts_index.get_parent_map(text_keys)
791
missing_text_keys = text_keys.difference(present_text_keys)
792
if missing_text_keys:
793
problems.append("missing text keys: %r"
794
% (sorted(missing_text_keys),))
798
class CHKInventoryRepository(PackRepository):
799
"""subclass of PackRepository that uses CHK based inventories."""
801
def __init__(self, _format, a_bzrdir, control_files, _commit_builder_class,
803
"""Overridden to change pack collection class."""
804
super(CHKInventoryRepository, self).__init__(_format, a_bzrdir,
805
control_files, _commit_builder_class, _serializer)
806
index_transport = self._transport.clone('indices')
807
self._pack_collection = GCRepositoryPackCollection(self,
808
self._transport, index_transport,
809
self._transport.clone('upload'),
810
self._transport.clone('packs'),
811
_format.index_builder_class,
813
use_chk_index=self._format.supports_chks,
815
self.inventories = GroupCompressVersionedFiles(
816
_GCGraphIndex(self._pack_collection.inventory_index.combined_index,
817
add_callback=self._pack_collection.inventory_index.add_callback,
818
parents=True, is_locked=self.is_locked,
819
inconsistency_fatal=False),
820
access=self._pack_collection.inventory_index.data_access)
821
self.revisions = GroupCompressVersionedFiles(
822
_GCGraphIndex(self._pack_collection.revision_index.combined_index,
823
add_callback=self._pack_collection.revision_index.add_callback,
824
parents=True, is_locked=self.is_locked,
825
track_external_parent_refs=True, track_new_keys=True),
826
access=self._pack_collection.revision_index.data_access,
828
self.signatures = GroupCompressVersionedFiles(
829
_GCGraphIndex(self._pack_collection.signature_index.combined_index,
830
add_callback=self._pack_collection.signature_index.add_callback,
831
parents=False, is_locked=self.is_locked,
832
inconsistency_fatal=False),
833
access=self._pack_collection.signature_index.data_access,
835
self.texts = GroupCompressVersionedFiles(
836
_GCGraphIndex(self._pack_collection.text_index.combined_index,
837
add_callback=self._pack_collection.text_index.add_callback,
838
parents=True, is_locked=self.is_locked,
839
inconsistency_fatal=False),
840
access=self._pack_collection.text_index.data_access)
841
# No parents, individual CHK pages don't have specific ancestry
842
self.chk_bytes = GroupCompressVersionedFiles(
843
_GCGraphIndex(self._pack_collection.chk_index.combined_index,
844
add_callback=self._pack_collection.chk_index.add_callback,
845
parents=False, is_locked=self.is_locked,
846
inconsistency_fatal=False),
847
access=self._pack_collection.chk_index.data_access)
848
search_key_name = self._format._serializer.search_key_name
849
search_key_func = chk_map.search_key_registry.get(search_key_name)
850
self.chk_bytes._search_key_func = search_key_func
851
# True when the repository object is 'write locked' (as opposed to the
852
# physical lock only taken out around changes to the pack-names list.)
853
# Another way to represent this would be a decorator around the control
854
# files object that presents logical locks as physical ones - if this
855
# gets ugly consider that alternative design. RBC 20071011
856
self._write_lock_count = 0
857
self._transaction = None
859
self._reconcile_does_inventory_gc = True
860
self._reconcile_fixes_text_parents = True
861
self._reconcile_backsup_inventory = False
863
def _add_inventory_checked(self, revision_id, inv, parents):
864
"""Add inv to the repository after checking the inputs.
866
This function can be overridden to allow different inventory styles.
868
:seealso: add_inventory, for the contract.
871
serializer = self._format._serializer
872
result = inventory.CHKInventory.from_inventory(self.chk_bytes, inv,
873
maximum_size=serializer.maximum_size,
874
search_key_name=serializer.search_key_name)
875
inv_lines = result.to_lines()
876
return self._inventory_add_lines(revision_id, parents,
877
inv_lines, check_content=False)
879
def _create_inv_from_null(self, delta, revision_id):
880
"""This will mutate new_inv directly.
882
This is a simplified form of create_by_apply_delta which knows that all
883
the old values must be None, so everything is a create.
885
serializer = self._format._serializer
886
new_inv = inventory.CHKInventory(serializer.search_key_name)
887
new_inv.revision_id = revision_id
888
entry_to_bytes = new_inv._entry_to_bytes
889
id_to_entry_dict = {}
890
parent_id_basename_dict = {}
891
for old_path, new_path, file_id, entry in delta:
892
if old_path is not None:
893
raise ValueError('Invalid delta, somebody tried to delete %r'
894
' from the NULL_REVISION'
895
% ((old_path, file_id),))
897
raise ValueError('Invalid delta, delta from NULL_REVISION has'
898
' no new_path %r' % (file_id,))
900
new_inv.root_id = file_id
901
parent_id_basename_key = StaticTuple('', '').intern()
903
utf8_entry_name = entry.name.encode('utf-8')
904
parent_id_basename_key = StaticTuple(entry.parent_id,
905
utf8_entry_name).intern()
906
new_value = entry_to_bytes(entry)
908
# new_inv._path_to_fileid_cache[new_path] = file_id
909
key = StaticTuple(file_id).intern()
910
id_to_entry_dict[key] = new_value
911
parent_id_basename_dict[parent_id_basename_key] = file_id
913
new_inv._populate_from_dicts(self.chk_bytes, id_to_entry_dict,
914
parent_id_basename_dict, maximum_size=serializer.maximum_size)
917
def add_inventory_by_delta(self, basis_revision_id, delta, new_revision_id,
918
parents, basis_inv=None, propagate_caches=False):
919
"""Add a new inventory expressed as a delta against another revision.
921
:param basis_revision_id: The inventory id the delta was created
923
:param delta: The inventory delta (see Inventory.apply_delta for
925
:param new_revision_id: The revision id that the inventory is being
927
:param parents: The revision ids of the parents that revision_id is
928
known to have and are in the repository already. These are supplied
929
for repositories that depend on the inventory graph for revision
930
graph access, as well as for those that pun ancestry with delta
932
:param basis_inv: The basis inventory if it is already known,
934
:param propagate_caches: If True, the caches for this inventory are
935
copied to and updated for the result if possible.
937
:returns: (validator, new_inv)
938
The validator(which is a sha1 digest, though what is sha'd is
939
repository format specific) of the serialized inventory, and the
942
if not self.is_in_write_group():
943
raise AssertionError("%r not in write group" % (self,))
944
_mod_revision.check_not_reserved_id(new_revision_id)
946
if basis_inv is None:
947
if basis_revision_id == _mod_revision.NULL_REVISION:
948
new_inv = self._create_inv_from_null(delta, new_revision_id)
949
if new_inv.root_id is None:
950
raise errors.RootMissing()
951
inv_lines = new_inv.to_lines()
952
return self._inventory_add_lines(new_revision_id, parents,
953
inv_lines, check_content=False), new_inv
955
basis_tree = self.revision_tree(basis_revision_id)
956
basis_tree.lock_read()
957
basis_inv = basis_tree.inventory
959
result = basis_inv.create_by_apply_delta(delta, new_revision_id,
960
propagate_caches=propagate_caches)
961
inv_lines = result.to_lines()
962
return self._inventory_add_lines(new_revision_id, parents,
963
inv_lines, check_content=False), result
965
if basis_tree is not None:
968
def _deserialise_inventory(self, revision_id, bytes):
969
return inventory.CHKInventory.deserialise(self.chk_bytes, bytes,
972
def _iter_inventories(self, revision_ids, ordering):
973
"""Iterate over many inventory objects."""
975
ordering = 'unordered'
976
keys = [(revision_id,) for revision_id in revision_ids]
977
stream = self.inventories.get_record_stream(keys, ordering, True)
979
for record in stream:
980
if record.storage_kind != 'absent':
981
texts[record.key] = record.get_bytes_as('fulltext')
983
texts[record.key] = None
987
yield (None, key[-1])
989
yield (inventory.CHKInventory.deserialise(
990
self.chk_bytes, bytes, key), key[-1])
992
def _get_inventory_xml(self, revision_id):
993
"""Get serialized inventory as a string."""
994
# Without a native 'xml' inventory, this method doesn't make sense.
995
# However older working trees, and older bundles want it - so we supply
996
# it allowing _get_inventory_xml to work. Bundles currently use the
997
# serializer directly; this also isn't ideal, but there isn't an xml
998
# iteration interface offered at all for repositories.
999
return self._serializer.write_inventory_to_string(
1000
self.get_inventory(revision_id))
1002
def _find_present_inventory_keys(self, revision_keys):
1003
parent_map = self.inventories.get_parent_map(revision_keys)
1004
present_inventory_keys = set(k for k in parent_map)
1005
return present_inventory_keys
1007
def fileids_altered_by_revision_ids(self, revision_ids, _inv_weave=None):
1008
"""Find the file ids and versions affected by revisions.
1010
:param revisions: an iterable containing revision ids.
1011
:param _inv_weave: The inventory weave from this repository or None.
1012
If None, the inventory weave will be opened automatically.
1013
:return: a dictionary mapping altered file-ids to an iterable of
1014
revision_ids. Each altered file-ids has the exact revision_ids that
1015
altered it listed explicitly.
1017
rich_root = self.supports_rich_root()
1018
bytes_to_info = inventory.CHKInventory._bytes_to_utf8name_key
1019
file_id_revisions = {}
1020
pb = ui.ui_factory.nested_progress_bar()
1022
revision_keys = [(r,) for r in revision_ids]
1023
parent_keys = self._find_parent_keys_of_revisions(revision_keys)
1024
# TODO: instead of using _find_present_inventory_keys, change the
1025
# code paths to allow missing inventories to be tolerated.
1026
# However, we only want to tolerate missing parent
1027
# inventories, not missing inventories for revision_ids
1028
present_parent_inv_keys = self._find_present_inventory_keys(
1030
present_parent_inv_ids = set(
1031
[k[-1] for k in present_parent_inv_keys])
1032
inventories_to_read = set(revision_ids)
1033
inventories_to_read.update(present_parent_inv_ids)
1034
root_key_info = _build_interesting_key_sets(
1035
self, inventories_to_read, present_parent_inv_ids)
1036
interesting_root_keys = root_key_info.interesting_root_keys
1037
uninteresting_root_keys = root_key_info.uninteresting_root_keys
1038
chk_bytes = self.chk_bytes
1039
for record, items in chk_map.iter_interesting_nodes(chk_bytes,
1040
interesting_root_keys, uninteresting_root_keys,
1042
for name, bytes in items:
1043
(name_utf8, file_id, revision_id) = bytes_to_info(bytes)
1044
# TODO: consider interning file_id, revision_id here, or
1045
# pushing that intern() into bytes_to_info()
1046
# TODO: rich_root should always be True here, for all
1047
# repositories that support chk_bytes
1048
if not rich_root and name_utf8 == '':
1051
file_id_revisions[file_id].add(revision_id)
1053
file_id_revisions[file_id] = set([revision_id])
1056
return file_id_revisions
1058
def find_text_key_references(self):
1059
"""Find the text key references within the repository.
1061
:return: A dictionary mapping text keys ((fileid, revision_id) tuples)
1062
to whether they were referred to by the inventory of the
1063
revision_id that they contain. The inventory texts from all present
1064
revision ids are assessed to generate this report.
1066
# XXX: Slow version but correct: rewrite as a series of delta
1067
# examinations/direct tree traversal. Note that that will require care
1068
# as a common node is reachable both from the inventory that added it,
1069
# and others afterwards.
1070
revision_keys = self.revisions.keys()
1072
rich_roots = self.supports_rich_root()
1073
pb = ui.ui_factory.nested_progress_bar()
1075
all_revs = self.all_revision_ids()
1076
total = len(all_revs)
1077
for pos, inv in enumerate(self.iter_inventories(all_revs)):
1078
pb.update("Finding text references", pos, total)
1079
for _, entry in inv.iter_entries():
1080
if not rich_roots and entry.file_id == inv.root_id:
1082
key = (entry.file_id, entry.revision)
1083
result.setdefault(key, False)
1084
if entry.revision == inv.revision_id:
1091
def reconcile_canonicalize_chks(self):
1092
"""Reconcile this repository to make sure all CHKs are in canonical
1095
from bzrlib.reconcile import PackReconciler
1096
reconciler = PackReconciler(self, thorough=True, canonicalize_chks=True)
1097
reconciler.reconcile()
1100
def _reconcile_pack(self, collection, packs, extension, revs, pb):
1101
packer = GCCHKReconcilePacker(collection, packs, extension)
1102
return packer.pack(pb)
1104
def _canonicalize_chks_pack(self, collection, packs, extension, revs, pb):
1105
packer = GCCHKCanonicalizingPacker(collection, packs, extension, revs)
1106
return packer.pack(pb)
1108
def _get_source(self, to_format):
1109
"""Return a source for streaming from this repository."""
1110
if self._format._serializer == to_format._serializer:
1111
# We must be exactly the same format, otherwise stuff like the chk
1112
# page layout might be different.
1113
# Actually, this test is just slightly looser than exact so that
1114
# CHK2 <-> 2a transfers will work.
1115
return GroupCHKStreamSource(self, to_format)
1116
return super(CHKInventoryRepository, self)._get_source(to_format)
1118
def _find_inconsistent_revision_parents(self, revisions_iterator=None):
1119
"""Find revisions with different parent lists in the revision object
1120
and in the index graph.
1122
:param revisions_iterator: None, or an iterator of (revid,
1123
Revision-or-None). This iterator controls the revisions checked.
1124
:returns: an iterator yielding tuples of (revison-id, parents-in-index,
1125
parents-in-revision).
1127
if not self.is_locked():
1128
raise AssertionError()
1130
if revisions_iterator is None:
1131
revisions_iterator = self._iter_revisions(None)
1132
for revid, revision in revisions_iterator:
1133
if revision is None:
1135
parent_map = vf.get_parent_map([(revid,)])
1136
parents_according_to_index = tuple(parent[-1] for parent in
1137
parent_map[(revid,)])
1138
parents_according_to_revision = tuple(revision.parent_ids)
1139
if parents_according_to_index != parents_according_to_revision:
1140
yield (revid, parents_according_to_index,
1141
parents_according_to_revision)
1143
def _check_for_inconsistent_revision_parents(self):
1144
inconsistencies = list(self._find_inconsistent_revision_parents())
1146
raise errors.BzrCheckError(
1147
"Revision index has inconsistent parents.")
1150
class GroupCHKStreamSource(StreamSource):
1151
"""Used when both the source and target repo are GroupCHK repos."""
1153
def __init__(self, from_repository, to_format):
1154
"""Create a StreamSource streaming from from_repository."""
1155
super(GroupCHKStreamSource, self).__init__(from_repository, to_format)
1156
self._revision_keys = None
1157
self._text_keys = None
1158
self._text_fetch_order = 'groupcompress'
1159
self._chk_id_roots = None
1160
self._chk_p_id_roots = None
1162
def _get_inventory_stream(self, inventory_keys, allow_absent=False):
1163
"""Get a stream of inventory texts.
1165
When this function returns, self._chk_id_roots and self._chk_p_id_roots
1166
should be populated.
1168
self._chk_id_roots = []
1169
self._chk_p_id_roots = []
1170
def _filtered_inv_stream():
1171
id_roots_set = set()
1172
p_id_roots_set = set()
1173
source_vf = self.from_repository.inventories
1174
stream = source_vf.get_record_stream(inventory_keys,
1175
'groupcompress', True)
1176
for record in stream:
1177
if record.storage_kind == 'absent':
1181
raise errors.NoSuchRevision(self, record.key)
1182
bytes = record.get_bytes_as('fulltext')
1183
chk_inv = inventory.CHKInventory.deserialise(None, bytes,
1185
key = chk_inv.id_to_entry.key()
1186
if key not in id_roots_set:
1187
self._chk_id_roots.append(key)
1188
id_roots_set.add(key)
1189
p_id_map = chk_inv.parent_id_basename_to_file_id
1190
if p_id_map is None:
1191
raise AssertionError('Parent id -> file_id map not set')
1192
key = p_id_map.key()
1193
if key not in p_id_roots_set:
1194
p_id_roots_set.add(key)
1195
self._chk_p_id_roots.append(key)
1197
# We have finished processing all of the inventory records, we
1198
# don't need these sets anymore
1199
id_roots_set.clear()
1200
p_id_roots_set.clear()
1201
return ('inventories', _filtered_inv_stream())
1203
def _get_filtered_chk_streams(self, excluded_revision_keys):
1204
self._text_keys = set()
1205
excluded_revision_keys.discard(_mod_revision.NULL_REVISION)
1206
if not excluded_revision_keys:
1207
uninteresting_root_keys = set()
1208
uninteresting_pid_root_keys = set()
1210
# filter out any excluded revisions whose inventories are not
1212
# TODO: Update Repository.iter_inventories() to add
1213
# ignore_missing=True
1214
present_keys = self.from_repository._find_present_inventory_keys(
1215
excluded_revision_keys)
1216
present_ids = [k[-1] for k in present_keys]
1217
uninteresting_root_keys = set()
1218
uninteresting_pid_root_keys = set()
1219
for inv in self.from_repository.iter_inventories(present_ids):
1220
uninteresting_root_keys.add(inv.id_to_entry.key())
1221
uninteresting_pid_root_keys.add(
1222
inv.parent_id_basename_to_file_id.key())
1223
chk_bytes = self.from_repository.chk_bytes
1224
def _filter_id_to_entry():
1225
interesting_nodes = chk_map.iter_interesting_nodes(chk_bytes,
1226
self._chk_id_roots, uninteresting_root_keys)
1227
for record in _filter_text_keys(interesting_nodes, self._text_keys,
1228
chk_map._bytes_to_text_key):
1229
if record is not None:
1232
self._chk_id_roots = None
1233
yield 'chk_bytes', _filter_id_to_entry()
1234
def _get_parent_id_basename_to_file_id_pages():
1235
for record, items in chk_map.iter_interesting_nodes(chk_bytes,
1236
self._chk_p_id_roots, uninteresting_pid_root_keys):
1237
if record is not None:
1240
self._chk_p_id_roots = None
1241
yield 'chk_bytes', _get_parent_id_basename_to_file_id_pages()
1243
def _get_text_stream(self):
1244
# Note: We know we don't have to handle adding root keys, because both
1245
# the source and target are the identical network name.
1246
text_stream = self.from_repository.texts.get_record_stream(
1247
self._text_keys, self._text_fetch_order, False)
1248
return ('texts', text_stream)
1250
def get_stream(self, search):
1251
def wrap_and_count(pb, rc, stream):
1252
"""Yield records from stream while showing progress."""
1254
for record in stream:
1255
if count == rc.STEP:
1257
pb.update('Estimate', rc.current, rc.max)
1262
revision_ids = search.get_keys()
1263
pb = ui.ui_factory.nested_progress_bar()
1264
rc = self._record_counter
1265
self._record_counter.setup(len(revision_ids))
1266
for stream_info in self._fetch_revision_texts(revision_ids):
1267
yield (stream_info[0],
1268
wrap_and_count(pb, rc, stream_info[1]))
1269
self._revision_keys = [(rev_id,) for rev_id in revision_ids]
1270
# TODO: The keys to exclude might be part of the search recipe
1271
# For now, exclude all parents that are at the edge of ancestry, for
1272
# which we have inventories
1273
from_repo = self.from_repository
1274
parent_keys = from_repo._find_parent_keys_of_revisions(
1275
self._revision_keys)
1276
self.from_repository.revisions.clear_cache()
1277
self.from_repository.signatures.clear_cache()
1278
# Clear the repo's get_parent_map cache too.
1279
self.from_repository._unstacked_provider.disable_cache()
1280
self.from_repository._unstacked_provider.enable_cache()
1281
s = self._get_inventory_stream(self._revision_keys)
1282
yield (s[0], wrap_and_count(pb, rc, s[1]))
1283
self.from_repository.inventories.clear_cache()
1284
for stream_info in self._get_filtered_chk_streams(parent_keys):
1285
yield (stream_info[0], wrap_and_count(pb, rc, stream_info[1]))
1286
self.from_repository.chk_bytes.clear_cache()
1287
s = self._get_text_stream()
1288
yield (s[0], wrap_and_count(pb, rc, s[1]))
1289
self.from_repository.texts.clear_cache()
1290
pb.update('Done', rc.max, rc.max)
1293
def get_stream_for_missing_keys(self, missing_keys):
1294
# missing keys can only occur when we are byte copying and not
1295
# translating (because translation means we don't send
1296
# unreconstructable deltas ever).
1297
missing_inventory_keys = set()
1298
for key in missing_keys:
1299
if key[0] != 'inventories':
1300
raise AssertionError('The only missing keys we should'
1301
' be filling in are inventory keys, not %s'
1303
missing_inventory_keys.add(key[1:])
1304
if self._chk_id_roots or self._chk_p_id_roots:
1305
raise AssertionError('Cannot call get_stream_for_missing_keys'
1306
' until all of get_stream() has been consumed.')
1307
# Yield the inventory stream, so we can find the chk stream
1308
# Some of the missing_keys will be missing because they are ghosts.
1309
# As such, we can ignore them. The Sink is required to verify there are
1310
# no unavailable texts when the ghost inventories are not filled in.
1311
yield self._get_inventory_stream(missing_inventory_keys,
1313
# We use the empty set for excluded_revision_keys, to make it clear
1314
# that we want to transmit all referenced chk pages.
1315
for stream_info in self._get_filtered_chk_streams(set()):
1319
class _InterestingKeyInfo(object):
1321
self.interesting_root_keys = set()
1322
self.interesting_pid_root_keys = set()
1323
self.uninteresting_root_keys = set()
1324
self.uninteresting_pid_root_keys = set()
1326
def all_interesting(self):
1327
return self.interesting_root_keys.union(self.interesting_pid_root_keys)
1329
def all_uninteresting(self):
1330
return self.uninteresting_root_keys.union(
1331
self.uninteresting_pid_root_keys)
1334
return self.all_interesting().union(self.all_uninteresting())
1337
def _build_interesting_key_sets(repo, inventory_ids, parent_only_inv_ids):
1338
result = _InterestingKeyInfo()
1339
for inv in repo.iter_inventories(inventory_ids, 'unordered'):
1340
root_key = inv.id_to_entry.key()
1341
pid_root_key = inv.parent_id_basename_to_file_id.key()
1342
if inv.revision_id in parent_only_inv_ids:
1343
result.uninteresting_root_keys.add(root_key)
1344
result.uninteresting_pid_root_keys.add(pid_root_key)
1346
result.interesting_root_keys.add(root_key)
1347
result.interesting_pid_root_keys.add(pid_root_key)
1351
def _filter_text_keys(interesting_nodes_iterable, text_keys, bytes_to_text_key):
1352
"""Iterate the result of iter_interesting_nodes, yielding the records
1353
and adding to text_keys.
1355
text_keys_update = text_keys.update
1356
for record, items in interesting_nodes_iterable:
1357
text_keys_update([bytes_to_text_key(b) for n,b in items])
1361
class RepositoryFormat2a(RepositoryFormatPack):
1362
"""A CHK repository that uses the bencode revision serializer."""
1364
repository_class = CHKInventoryRepository
1365
supports_external_lookups = True
1366
supports_chks = True
1367
_commit_builder_class = PackRootCommitBuilder
1368
rich_root_data = True
1369
_serializer = chk_serializer.chk_bencode_serializer
1370
_commit_inv_deltas = True
1371
# What index classes to use
1372
index_builder_class = BTreeBuilder
1373
index_class = BTreeGraphIndex
1374
# Note: We cannot unpack a delta that references a text we haven't
1375
# seen yet. There are 2 options, work in fulltexts, or require
1376
# topological sorting. Using fulltexts is more optimal for local
1377
# operations, because the source can be smart about extracting
1378
# multiple in-a-row (and sharing strings). Topological is better
1379
# for remote, because we access less data.
1380
_fetch_order = 'unordered'
1381
_fetch_uses_deltas = False # essentially ignored by the groupcompress code.
1383
pack_compresses = True
1385
def _get_matching_bzrdir(self):
1386
return bzrdir.format_registry.make_bzrdir('2a')
1388
def _ignore_setting_bzrdir(self, format):
1391
_matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
1394
def get_format_string(cls):
1395
return ('Bazaar repository format 2a (needs bzr 1.16 or later)\n')
1397
def get_format_description(self):
1398
"""See RepositoryFormat.get_format_description()."""
1399
return ("Repository format 2a - rich roots, group compression"
1400
" and chk inventories")
1403
class RepositoryFormat2aSubtree(RepositoryFormat2a):
1404
"""A 2a repository format that supports nested trees.
1408
def _get_matching_bzrdir(self):
1409
return bzrdir.format_registry.make_bzrdir('development-subtree')
1411
def _ignore_setting_bzrdir(self, format):
1414
_matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
1417
def get_format_string(cls):
1418
return ('Bazaar development format 8\n')
1420
def get_format_description(self):
1421
"""See RepositoryFormat.get_format_description()."""
1422
return ("Development repository format 8 - nested trees, "
1423
"group compression and chk inventories")
1426
supports_tree_reference = True