1
# Copyright (C) 2008, 2009 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Repository formats using CHK inventories and groupcompress compression."""
34
revision as _mod_revision,
38
from bzrlib.btree_index import (
42
from bzrlib.index import GraphIndex, GraphIndexBuilder
43
from bzrlib.groupcompress import (
45
GroupCompressVersionedFiles,
47
from bzrlib.repofmt.pack_repo import (
51
PackRootCommitBuilder,
52
RepositoryPackCollection,
59
class GCPack(NewPack):
61
def __init__(self, pack_collection, upload_suffix='', file_mode=None):
62
"""Create a NewPack instance.
64
:param pack_collection: A PackCollection into which this is being
66
:param upload_suffix: An optional suffix to be given to any temporary
67
files created during the pack creation. e.g '.autopack'
68
:param file_mode: An optional file mode to create the new files with.
70
# replaced from NewPack to:
71
# - change inventory reference list length to 1
72
# - change texts reference lists to 1
73
# TODO: patch this to be parameterised
75
# The relative locations of the packs are constrained, but all are
76
# passed in because the caller has them, so as to avoid object churn.
77
index_builder_class = pack_collection._index_builder_class
79
if pack_collection.chk_index is not None:
80
chk_index = index_builder_class(reference_lists=0)
84
# Revisions: parents list, no text compression.
85
index_builder_class(reference_lists=1),
86
# Inventory: We want to map compression only, but currently the
87
# knit code hasn't been updated enough to understand that, so we
88
# have a regular 2-list index giving parents and compression
90
index_builder_class(reference_lists=1),
91
# Texts: per file graph, for all fileids - so one reference list
92
# and two elements in the key tuple.
93
index_builder_class(reference_lists=1, key_elements=2),
94
# Signatures: Just blobs to store, no compression, no parents
96
index_builder_class(reference_lists=0),
97
# CHK based storage - just blobs, no compression or parents.
100
self._pack_collection = pack_collection
101
# When we make readonly indices, we need this.
102
self.index_class = pack_collection._index_class
103
# where should the new pack be opened
104
self.upload_transport = pack_collection._upload_transport
105
# where are indices written out to
106
self.index_transport = pack_collection._index_transport
107
# where is the pack renamed to when it is finished?
108
self.pack_transport = pack_collection._pack_transport
109
# What file mode to upload the pack and indices with.
110
self._file_mode = file_mode
111
# tracks the content written to the .pack file.
112
self._hash = osutils.md5()
113
# a four-tuple with the length in bytes of the indices, once the pack
114
# is finalised. (rev, inv, text, sigs)
115
self.index_sizes = None
116
# How much data to cache when writing packs. Note that this is not
117
# synchronised with reads, because it's not in the transport layer, so
118
# is not safe unless the client knows it won't be reading from the pack
120
self._cache_limit = 0
121
# the temporary pack file name.
122
self.random_name = osutils.rand_chars(20) + upload_suffix
123
# when was this pack started ?
124
self.start_time = time.time()
125
# open an output stream for the data added to the pack.
126
self.write_stream = self.upload_transport.open_write_stream(
127
self.random_name, mode=self._file_mode)
128
if 'pack' in debug.debug_flags:
129
trace.mutter('%s: create_pack: pack stream open: %s%s t+%6.3fs',
130
time.ctime(), self.upload_transport.base, self.random_name,
131
time.time() - self.start_time)
132
# A list of byte sequences to be written to the new pack, and the
133
# aggregate size of them. Stored as a list rather than separate
134
# variables so that the _write_data closure below can update them.
135
self._buffer = [[], 0]
136
# create a callable for adding data
138
# robertc says- this is a closure rather than a method on the object
139
# so that the variables are locals, and faster than accessing object
141
def _write_data(bytes, flush=False, _buffer=self._buffer,
142
_write=self.write_stream.write, _update=self._hash.update):
143
_buffer[0].append(bytes)
144
_buffer[1] += len(bytes)
146
if _buffer[1] > self._cache_limit or flush:
147
bytes = ''.join(_buffer[0])
151
# expose this on self, for the occasion when clients want to add data.
152
self._write_data = _write_data
153
# a pack writer object to serialise pack records.
154
self._writer = pack.ContainerWriter(self._write_data)
156
# what state is the pack in? (open, finished, aborted)
159
def _check_references(self):
160
"""Make sure our external references are present.
162
Packs are allowed to have deltas whose base is not in the pack, but it
163
must be present somewhere in this collection. It is not allowed to
164
have deltas based on a fallback repository.
165
(See <https://bugs.launchpad.net/bzr/+bug/288751>)
167
# Groupcompress packs don't have any external references, arguably CHK
168
# pages have external references, but we cannot 'cheaply' determine
169
# them without actually walking all of the chk pages.
172
class ResumedGCPack(ResumedPack):
174
def _check_references(self):
175
"""Make sure our external compression parents are present."""
176
# See GCPack._check_references for why this is empty
178
def _get_external_refs(self, index):
179
# GC repositories don't have compression parents external to a given
184
class GCCHKPacker(Packer):
185
"""This class understand what it takes to collect a GCCHK repo."""
187
def __init__(self, pack_collection, packs, suffix, revision_ids=None,
189
super(GCCHKPacker, self).__init__(pack_collection, packs, suffix,
190
revision_ids=revision_ids,
191
reload_func=reload_func)
192
self._pack_collection = pack_collection
193
# ATM, We only support this for GCCHK repositories
194
if pack_collection.chk_index is None:
195
raise AssertionError('pack_collection.chk_index should not be None')
196
self._gather_text_refs = False
197
self._chk_id_roots = []
198
self._chk_p_id_roots = []
199
self._text_refs = None
200
# set by .pack() if self.revision_ids is not None
201
self.revision_keys = None
203
def _get_progress_stream(self, source_vf, keys, message, pb):
205
substream = source_vf.get_record_stream(keys, 'groupcompress', True)
206
for idx, record in enumerate(substream):
208
pb.update(message, idx + 1, len(keys))
212
def _get_filtered_inv_stream(self, source_vf, keys, message, pb=None):
213
"""Filter the texts of inventories, to find the chk pages."""
214
total_keys = len(keys)
215
def _filtered_inv_stream():
217
p_id_roots_set = set()
218
stream = source_vf.get_record_stream(keys, 'groupcompress', True)
219
for idx, record in enumerate(stream):
220
bytes = record.get_bytes_as('fulltext')
221
chk_inv = inventory.CHKInventory.deserialise(None, bytes,
224
pb.update('inv', idx, total_keys)
225
key = chk_inv.id_to_entry.key()
226
if key not in id_roots_set:
227
self._chk_id_roots.append(key)
228
id_roots_set.add(key)
229
p_id_map = chk_inv.parent_id_basename_to_file_id
231
raise AssertionError('Parent id -> file_id map not set')
233
if key not in p_id_roots_set:
234
p_id_roots_set.add(key)
235
self._chk_p_id_roots.append(key)
237
# We have finished processing all of the inventory records, we
238
# don't need these sets anymore
240
p_id_roots_set.clear()
241
return _filtered_inv_stream()
243
def _get_chk_streams(self, source_vf, keys, pb=None):
244
# We want to stream the keys from 'id_roots', and things they
245
# reference, and then stream things from p_id_roots and things they
246
# reference, and then any remaining keys that we didn't get to.
248
# We also group referenced texts together, so if one root references a
249
# text with prefix 'a', and another root references a node with prefix
250
# 'a', we want to yield those nodes before we yield the nodes for 'b'
251
# This keeps 'similar' nodes together.
253
# Note: We probably actually want multiple streams here, to help the
254
# client understand that the different levels won't compress well
255
# against each other.
256
# Test the difference between using one Group per level, and
257
# using 1 Group per prefix. (so '' (root) would get a group, then
258
# all the references to search-key 'a' would get a group, etc.)
259
total_keys = len(keys)
260
remaining_keys = set(keys)
262
if self._gather_text_refs:
263
bytes_to_info = inventory.CHKInventory._bytes_to_utf8name_key
264
self._text_refs = set()
265
def _get_referenced_stream(root_keys, parse_leaf_nodes=False):
268
keys_by_search_prefix = {}
269
remaining_keys.difference_update(cur_keys)
271
def handle_internal_node(node):
272
for prefix, value in node._items.iteritems():
273
# We don't want to request the same key twice, and we
274
# want to order it by the first time it is seen.
275
# Even further, we don't want to request a key which is
276
# not in this group of pack files (it should be in the
277
# repo, but it doesn't have to be in the group being
279
# TODO: consider how to treat externally referenced chk
280
# pages as 'external_references' so that we
281
# always fill them in for stacked branches
282
if value not in next_keys and value in remaining_keys:
283
keys_by_search_prefix.setdefault(prefix,
286
def handle_leaf_node(node):
287
# Store is None, because we know we have a LeafNode, and we
288
# just want its entries
289
for file_id, bytes in node.iteritems(None):
290
name_utf8, file_id, revision_id = bytes_to_info(bytes)
291
self._text_refs.add((file_id, revision_id))
293
stream = source_vf.get_record_stream(cur_keys,
294
'as-requested', True)
295
for record in stream:
296
bytes = record.get_bytes_as('fulltext')
297
# We don't care about search_key_func for this code,
298
# because we only care about external references.
299
node = chk_map._deserialise(bytes, record.key,
300
search_key_func=None)
301
common_base = node._search_prefix
302
if isinstance(node, chk_map.InternalNode):
303
handle_internal_node(node)
304
elif parse_leaf_nodes:
305
handle_leaf_node(node)
308
pb.update('chk node', counter[0], total_keys)
311
# Double check that we won't be emitting any keys twice
312
# If we get rid of the pre-calculation of all keys, we could
313
# turn this around and do
314
# next_keys.difference_update(seen_keys)
315
# However, we also may have references to chk pages in another
316
# pack file during autopack. We filter earlier, so we should no
317
# longer need to do this
318
# next_keys = next_keys.intersection(remaining_keys)
320
for prefix in sorted(keys_by_search_prefix):
321
cur_keys.extend(keys_by_search_prefix.pop(prefix))
322
for stream in _get_referenced_stream(self._chk_id_roots,
323
self._gather_text_refs):
325
del self._chk_id_roots
326
# while it isn't really possible for chk_id_roots to not be in the
327
# local group of packs, it is possible that the tree shape has not
328
# changed recently, so we need to filter _chk_p_id_roots by the
330
chk_p_id_roots = [key for key in self._chk_p_id_roots
331
if key in remaining_keys]
332
del self._chk_p_id_roots
333
for stream in _get_referenced_stream(chk_p_id_roots, False):
336
trace.mutter('There were %d keys in the chk index, %d of which'
337
' were not referenced', total_keys,
339
if self.revision_ids is None:
340
stream = source_vf.get_record_stream(remaining_keys,
344
def _build_vf(self, index_name, parents, delta, for_write=False):
345
"""Build a VersionedFiles instance on top of this group of packs."""
346
index_name = index_name + '_index'
348
access = knit._DirectPackAccess(index_to_pack)
351
if self.new_pack is None:
352
raise AssertionError('No new pack has been set')
353
index = getattr(self.new_pack, index_name)
354
index_to_pack[index] = self.new_pack.access_tuple()
355
index.set_optimize(for_size=True)
356
access.set_writer(self.new_pack._writer, index,
357
self.new_pack.access_tuple())
358
add_callback = index.add_nodes
361
for pack in self.packs:
362
sub_index = getattr(pack, index_name)
363
index_to_pack[sub_index] = pack.access_tuple()
364
indices.append(sub_index)
365
index = _mod_index.CombinedGraphIndex(indices)
367
vf = GroupCompressVersionedFiles(
369
add_callback=add_callback,
371
is_locked=self._pack_collection.repo.is_locked),
376
def _build_vfs(self, index_name, parents, delta):
377
"""Build the source and target VersionedFiles."""
378
source_vf = self._build_vf(index_name, parents,
379
delta, for_write=False)
380
target_vf = self._build_vf(index_name, parents,
381
delta, for_write=True)
382
return source_vf, target_vf
384
def _copy_stream(self, source_vf, target_vf, keys, message, vf_to_stream,
386
trace.mutter('repacking %d %s', len(keys), message)
387
self.pb.update('repacking %s' % (message,), pb_offset)
388
child_pb = ui.ui_factory.nested_progress_bar()
390
stream = vf_to_stream(source_vf, keys, message, child_pb)
391
for _ in target_vf._insert_record_stream(stream,
398
def _copy_revision_texts(self):
399
source_vf, target_vf = self._build_vfs('revision', True, False)
400
if not self.revision_keys:
401
# We are doing a full fetch, aka 'pack'
402
self.revision_keys = source_vf.keys()
403
self._copy_stream(source_vf, target_vf, self.revision_keys,
404
'revisions', self._get_progress_stream, 1)
406
def _copy_inventory_texts(self):
407
source_vf, target_vf = self._build_vfs('inventory', True, True)
408
self._copy_stream(source_vf, target_vf, self.revision_keys,
409
'inventories', self._get_filtered_inv_stream, 2)
411
def _copy_chk_texts(self):
412
source_vf, target_vf = self._build_vfs('chk', False, False)
413
# TODO: This is technically spurious... if it is a performance issue,
415
total_keys = source_vf.keys()
416
trace.mutter('repacking chk: %d id_to_entry roots,'
417
' %d p_id_map roots, %d total keys',
418
len(self._chk_id_roots), len(self._chk_p_id_roots),
420
self.pb.update('repacking chk', 3)
421
child_pb = ui.ui_factory.nested_progress_bar()
423
for stream in self._get_chk_streams(source_vf, total_keys,
425
for _ in target_vf._insert_record_stream(stream,
432
def _copy_text_texts(self):
433
source_vf, target_vf = self._build_vfs('text', True, True)
434
# XXX: We don't walk the chk map to determine referenced (file_id,
435
# revision_id) keys. We don't do it yet because you really need
436
# to filter out the ones that are present in the parents of the
437
# rev just before the ones you are copying, otherwise the filter
438
# is grabbing too many keys...
439
text_keys = source_vf.keys()
440
self._copy_stream(source_vf, target_vf, text_keys,
441
'text', self._get_progress_stream, 4)
443
def _copy_signature_texts(self):
444
source_vf, target_vf = self._build_vfs('signature', False, False)
445
signature_keys = source_vf.keys()
446
signature_keys.intersection(self.revision_keys)
447
self._copy_stream(source_vf, target_vf, signature_keys,
448
'signatures', self._get_progress_stream, 5)
450
def _create_pack_from_packs(self):
451
self.pb.update('repacking', 0, 7)
452
self.new_pack = self.open_pack()
453
# Is this necessary for GC ?
454
self.new_pack.set_write_cache_size(1024*1024)
455
self._copy_revision_texts()
456
self._copy_inventory_texts()
457
self._copy_chk_texts()
458
self._copy_text_texts()
459
self._copy_signature_texts()
460
self.new_pack._check_references()
461
if not self._use_pack(self.new_pack):
462
self.new_pack.abort()
464
self.pb.update('finishing repack', 6, 7)
465
self.new_pack.finish()
466
self._pack_collection.allocate(self.new_pack)
470
class GCCHKReconcilePacker(GCCHKPacker):
471
"""A packer which regenerates indices etc as it copies.
473
This is used by ``bzr reconcile`` to cause parent text pointers to be
477
def __init__(self, *args, **kwargs):
478
super(GCCHKReconcilePacker, self).__init__(*args, **kwargs)
479
self._data_changed = False
480
self._gather_text_refs = True
482
def _copy_inventory_texts(self):
483
source_vf, target_vf = self._build_vfs('inventory', True, True)
484
self._copy_stream(source_vf, target_vf, self.revision_keys,
485
'inventories', self._get_filtered_inv_stream, 2)
486
if source_vf.keys() != self.revision_keys:
487
self._data_changed = True
489
def _copy_text_texts(self):
490
"""generate what texts we should have and then copy."""
491
source_vf, target_vf = self._build_vfs('text', True, True)
492
trace.mutter('repacking %d texts', len(self._text_refs))
493
self.pb.update("repacking texts", 4)
494
# we have three major tasks here:
495
# 1) generate the ideal index
496
repo = self._pack_collection.repo
497
# We want the one we just wrote, so base it on self.new_pack
498
revision_vf = self._build_vf('revision', True, False, for_write=True)
499
ancestor_keys = revision_vf.get_parent_map(revision_vf.keys())
500
# Strip keys back into revision_ids.
501
ancestors = dict((k[0], tuple([p[0] for p in parents]))
502
for k, parents in ancestor_keys.iteritems())
504
# TODO: _generate_text_key_index should be much cheaper to generate from
505
# a chk repository, rather than the current implementation
506
ideal_index = repo._generate_text_key_index(None, ancestors)
507
file_id_parent_map = source_vf.get_parent_map(self._text_refs)
508
# 2) generate a keys list that contains all the entries that can
509
# be used as-is, with corrected parents.
511
new_parent_keys = {} # (key, parent_keys)
513
NULL_REVISION = _mod_revision.NULL_REVISION
514
for key in self._text_refs:
520
ideal_parents = tuple(ideal_index[key])
522
discarded_keys.append(key)
523
self._data_changed = True
525
if ideal_parents == (NULL_REVISION,):
527
source_parents = file_id_parent_map[key]
528
if ideal_parents == source_parents:
532
# We need to change the parent graph, but we don't need to
533
# re-insert the text (since we don't pun the compression
534
# parent with the parents list)
535
self._data_changed = True
536
new_parent_keys[key] = ideal_parents
537
# we're finished with some data.
539
del file_id_parent_map
540
# 3) bulk copy the data, updating records than need it
541
def _update_parents_for_texts():
542
stream = source_vf.get_record_stream(self._text_refs,
543
'groupcompress', False)
544
for record in stream:
545
if record.key in new_parent_keys:
546
record.parents = new_parent_keys[record.key]
548
target_vf.insert_record_stream(_update_parents_for_texts())
550
def _use_pack(self, new_pack):
551
"""Override _use_pack to check for reconcile having changed content."""
552
return new_pack.data_inserted() and self._data_changed
555
class GCRepositoryPackCollection(RepositoryPackCollection):
557
pack_factory = GCPack
558
resumed_pack_factory = ResumedGCPack
560
def _already_packed(self):
561
"""Is the collection already packed?"""
562
# Always repack GC repositories for now
565
def _execute_pack_operations(self, pack_operations,
566
_packer_class=GCCHKPacker,
568
"""Execute a series of pack operations.
570
:param pack_operations: A list of [revision_count, packs_to_combine].
571
:param _packer_class: The class of packer to use (default: Packer).
574
# XXX: Copied across from RepositoryPackCollection simply because we
575
# want to override the _packer_class ... :(
576
for revision_count, packs in pack_operations:
577
# we may have no-ops from the setup logic
580
packer = GCCHKPacker(self, packs, '.autopack',
581
reload_func=reload_func)
584
except errors.RetryWithNewPacks:
585
# An exception is propagating out of this context, make sure
586
# this packer has cleaned up. Packer() doesn't set its new_pack
587
# state into the RepositoryPackCollection object, so we only
588
# have access to it directly here.
589
if packer.new_pack is not None:
590
packer.new_pack.abort()
593
self._remove_pack_from_memory(pack)
594
# record the newly available packs and stop advertising the old
596
self._save_pack_names(clear_obsolete_packs=True)
597
# Move the old packs out of the way now they are no longer referenced.
598
for revision_count, packs in pack_operations:
599
self._obsolete_packs(packs)
602
class CHKInventoryRepository(KnitPackRepository):
603
"""subclass of KnitPackRepository that uses CHK based inventories."""
605
def __init__(self, _format, a_bzrdir, control_files, _commit_builder_class,
607
"""Overridden to change pack collection class."""
608
KnitPackRepository.__init__(self, _format, a_bzrdir, control_files,
609
_commit_builder_class, _serializer)
610
# and now replace everything it did :)
611
index_transport = self._transport.clone('indices')
612
self._pack_collection = GCRepositoryPackCollection(self,
613
self._transport, index_transport,
614
self._transport.clone('upload'),
615
self._transport.clone('packs'),
616
_format.index_builder_class,
618
use_chk_index=self._format.supports_chks,
620
self.inventories = GroupCompressVersionedFiles(
621
_GCGraphIndex(self._pack_collection.inventory_index.combined_index,
622
add_callback=self._pack_collection.inventory_index.add_callback,
623
parents=True, is_locked=self.is_locked),
624
access=self._pack_collection.inventory_index.data_access)
625
self.revisions = GroupCompressVersionedFiles(
626
_GCGraphIndex(self._pack_collection.revision_index.combined_index,
627
add_callback=self._pack_collection.revision_index.add_callback,
628
parents=True, is_locked=self.is_locked,
629
track_external_parent_refs=True),
630
access=self._pack_collection.revision_index.data_access,
632
self.signatures = GroupCompressVersionedFiles(
633
_GCGraphIndex(self._pack_collection.signature_index.combined_index,
634
add_callback=self._pack_collection.signature_index.add_callback,
635
parents=False, is_locked=self.is_locked),
636
access=self._pack_collection.signature_index.data_access,
638
self.texts = GroupCompressVersionedFiles(
639
_GCGraphIndex(self._pack_collection.text_index.combined_index,
640
add_callback=self._pack_collection.text_index.add_callback,
641
parents=True, is_locked=self.is_locked),
642
access=self._pack_collection.text_index.data_access)
643
# No parents, individual CHK pages don't have specific ancestry
644
self.chk_bytes = GroupCompressVersionedFiles(
645
_GCGraphIndex(self._pack_collection.chk_index.combined_index,
646
add_callback=self._pack_collection.chk_index.add_callback,
647
parents=False, is_locked=self.is_locked),
648
access=self._pack_collection.chk_index.data_access)
649
# True when the repository object is 'write locked' (as opposed to the
650
# physical lock only taken out around changes to the pack-names list.)
651
# Another way to represent this would be a decorator around the control
652
# files object that presents logical locks as physical ones - if this
653
# gets ugly consider that alternative design. RBC 20071011
654
self._write_lock_count = 0
655
self._transaction = None
657
self._reconcile_does_inventory_gc = True
658
self._reconcile_fixes_text_parents = True
659
self._reconcile_backsup_inventory = False
661
def _add_inventory_checked(self, revision_id, inv, parents):
662
"""Add inv to the repository after checking the inputs.
664
This function can be overridden to allow different inventory styles.
666
:seealso: add_inventory, for the contract.
669
serializer = self._format._serializer
670
result = inventory.CHKInventory.from_inventory(self.chk_bytes, inv,
671
maximum_size=serializer.maximum_size,
672
search_key_name=serializer.search_key_name)
673
inv_lines = result.to_lines()
674
return self._inventory_add_lines(revision_id, parents,
675
inv_lines, check_content=False)
677
def add_inventory_by_delta(self, basis_revision_id, delta, new_revision_id,
678
parents, basis_inv=None, propagate_caches=False):
679
"""Add a new inventory expressed as a delta against another revision.
681
:param basis_revision_id: The inventory id the delta was created
683
:param delta: The inventory delta (see Inventory.apply_delta for
685
:param new_revision_id: The revision id that the inventory is being
687
:param parents: The revision ids of the parents that revision_id is
688
known to have and are in the repository already. These are supplied
689
for repositories that depend on the inventory graph for revision
690
graph access, as well as for those that pun ancestry with delta
692
:param basis_inv: The basis inventory if it is already known,
694
:param propagate_caches: If True, the caches for this inventory are
695
copied to and updated for the result if possible.
697
:returns: (validator, new_inv)
698
The validator(which is a sha1 digest, though what is sha'd is
699
repository format specific) of the serialized inventory, and the
702
if basis_revision_id == _mod_revision.NULL_REVISION:
703
return KnitPackRepository.add_inventory_by_delta(self,
704
basis_revision_id, delta, new_revision_id, parents)
705
if not self.is_in_write_group():
706
raise AssertionError("%r not in write group" % (self,))
707
_mod_revision.check_not_reserved_id(new_revision_id)
708
basis_tree = self.revision_tree(basis_revision_id)
709
basis_tree.lock_read()
711
if basis_inv is None:
712
basis_inv = basis_tree.inventory
713
result = basis_inv.create_by_apply_delta(delta, new_revision_id,
714
propagate_caches=propagate_caches)
715
inv_lines = result.to_lines()
716
return self._inventory_add_lines(new_revision_id, parents,
717
inv_lines, check_content=False), result
721
def _iter_inventories(self, revision_ids):
722
"""Iterate over many inventory objects."""
723
keys = [(revision_id,) for revision_id in revision_ids]
724
stream = self.inventories.get_record_stream(keys, 'unordered', True)
726
for record in stream:
727
if record.storage_kind != 'absent':
728
texts[record.key] = record.get_bytes_as('fulltext')
730
raise errors.NoSuchRevision(self, record.key)
732
yield inventory.CHKInventory.deserialise(self.chk_bytes, texts[key], key)
734
def _iter_inventory_xmls(self, revision_ids):
735
# Without a native 'xml' inventory, this method doesn't make sense, so
736
# make it raise to trap naughty direct users.
737
raise NotImplementedError(self._iter_inventory_xmls)
739
def _find_parent_ids_of_revisions(self, revision_ids):
740
# TODO: we probably want to make this a helper that other code can get
742
parent_map = self.get_parent_map(revision_ids)
744
map(parents.update, parent_map.itervalues())
745
parents.difference_update(revision_ids)
746
parents.discard(_mod_revision.NULL_REVISION)
749
def _find_present_inventory_ids(self, revision_ids):
750
keys = [(r,) for r in revision_ids]
751
parent_map = self.inventories.get_parent_map(keys)
752
present_inventory_ids = set(k[-1] for k in parent_map)
753
return present_inventory_ids
755
def fileids_altered_by_revision_ids(self, revision_ids, _inv_weave=None):
756
"""Find the file ids and versions affected by revisions.
758
:param revisions: an iterable containing revision ids.
759
:param _inv_weave: The inventory weave from this repository or None.
760
If None, the inventory weave will be opened automatically.
761
:return: a dictionary mapping altered file-ids to an iterable of
762
revision_ids. Each altered file-ids has the exact revision_ids that
763
altered it listed explicitly.
765
rich_root = self.supports_rich_root()
766
bytes_to_info = inventory.CHKInventory._bytes_to_utf8name_key
767
file_id_revisions = {}
768
pb = ui.ui_factory.nested_progress_bar()
770
parent_ids = self._find_parent_ids_of_revisions(revision_ids)
771
present_parent_inv_ids = self._find_present_inventory_ids(parent_ids)
772
uninteresting_root_keys = set()
773
interesting_root_keys = set()
774
inventories_to_read = set(present_parent_inv_ids)
775
inventories_to_read.update(revision_ids)
776
for inv in self.iter_inventories(inventories_to_read):
777
entry_chk_root_key = inv.id_to_entry.key()
778
if inv.revision_id in present_parent_inv_ids:
779
uninteresting_root_keys.add(entry_chk_root_key)
781
interesting_root_keys.add(entry_chk_root_key)
783
chk_bytes = self.chk_bytes
784
for record, items in chk_map.iter_interesting_nodes(chk_bytes,
785
interesting_root_keys, uninteresting_root_keys,
787
for name, bytes in items:
788
(name_utf8, file_id, revision_id) = bytes_to_info(bytes)
789
if not rich_root and name_utf8 == '':
792
file_id_revisions[file_id].add(revision_id)
794
file_id_revisions[file_id] = set([revision_id])
797
return file_id_revisions
799
def find_text_key_references(self):
800
"""Find the text key references within the repository.
802
:return: A dictionary mapping text keys ((fileid, revision_id) tuples)
803
to whether they were referred to by the inventory of the
804
revision_id that they contain. The inventory texts from all present
805
revision ids are assessed to generate this report.
807
# XXX: Slow version but correct: rewrite as a series of delta
808
# examinations/direct tree traversal. Note that that will require care
809
# as a common node is reachable both from the inventory that added it,
810
# and others afterwards.
811
revision_keys = self.revisions.keys()
813
rich_roots = self.supports_rich_root()
814
pb = ui.ui_factory.nested_progress_bar()
816
all_revs = self.all_revision_ids()
817
total = len(all_revs)
818
for pos, inv in enumerate(self.iter_inventories(all_revs)):
819
pb.update("Finding text references", pos, total)
820
for _, entry in inv.iter_entries():
821
if not rich_roots and entry.file_id == inv.root_id:
823
key = (entry.file_id, entry.revision)
824
result.setdefault(key, False)
825
if entry.revision == inv.revision_id:
831
def _reconcile_pack(self, collection, packs, extension, revs, pb):
832
packer = GCCHKReconcilePacker(collection, packs, extension)
833
return packer.pack(pb)
835
def _get_source(self, to_format):
836
"""Return a source for streaming from this repository."""
837
if isinstance(to_format, remote.RemoteRepositoryFormat):
838
# Can't just check attributes on to_format with the current code,
840
to_format._ensure_real()
841
to_format = to_format._custom_format
842
if to_format.__class__ is self._format.__class__:
843
# We must be exactly the same format, otherwise stuff like the chk
844
# page layout might be different
845
return GroupCHKStreamSource(self, to_format)
846
return super(CHKInventoryRepository, self)._get_source(to_format)
849
class GroupCHKStreamSource(repository.StreamSource):
850
"""Used when both the source and target repo are GroupCHK repos."""
852
def __init__(self, from_repository, to_format):
853
"""Create a StreamSource streaming from from_repository."""
854
super(GroupCHKStreamSource, self).__init__(from_repository, to_format)
855
self._revision_keys = None
856
self._text_keys = None
857
self._chk_id_roots = None
858
self._chk_p_id_roots = None
860
def _get_inventory_stream(self, inventory_keys, allow_absent=False):
861
"""Get a stream of inventory texts.
863
When this function returns, self._chk_id_roots and self._chk_p_id_roots
866
self._chk_id_roots = []
867
self._chk_p_id_roots = []
868
def _filtered_inv_stream():
870
p_id_roots_set = set()
871
source_vf = self.from_repository.inventories
872
stream = source_vf.get_record_stream(inventory_keys,
873
'groupcompress', True)
874
for record in stream:
875
if record.storage_kind == 'absent':
879
raise errors.NoSuchRevision(self, record.key)
880
bytes = record.get_bytes_as('fulltext')
881
chk_inv = inventory.CHKInventory.deserialise(None, bytes,
883
key = chk_inv.id_to_entry.key()
884
if key not in id_roots_set:
885
self._chk_id_roots.append(key)
886
id_roots_set.add(key)
887
p_id_map = chk_inv.parent_id_basename_to_file_id
889
raise AssertionError('Parent id -> file_id map not set')
891
if key not in p_id_roots_set:
892
p_id_roots_set.add(key)
893
self._chk_p_id_roots.append(key)
895
# We have finished processing all of the inventory records, we
896
# don't need these sets anymore
898
p_id_roots_set.clear()
899
return ('inventories', _filtered_inv_stream())
901
def _find_present_inventories(self, revision_ids):
902
revision_keys = [(r,) for r in revision_ids]
903
inventories = self.from_repository.inventories
904
present_inventories = inventories.get_parent_map(revision_keys)
905
return [p[-1] for p in present_inventories]
907
def _get_filtered_chk_streams(self, excluded_revision_ids):
908
self._text_keys = set()
909
excluded_revision_ids.discard(_mod_revision.NULL_REVISION)
910
if not excluded_revision_ids:
911
uninteresting_root_keys = set()
912
uninteresting_pid_root_keys = set()
914
# filter out any excluded revisions whose inventories are not
916
# TODO: Update Repository.iter_inventories() to add
917
# ignore_missing=True
918
present_ids = self.from_repository._find_present_inventory_ids(
919
excluded_revision_ids)
920
present_ids = self._find_present_inventories(excluded_revision_ids)
921
uninteresting_root_keys = set()
922
uninteresting_pid_root_keys = set()
923
for inv in self.from_repository.iter_inventories(present_ids):
924
uninteresting_root_keys.add(inv.id_to_entry.key())
925
uninteresting_pid_root_keys.add(
926
inv.parent_id_basename_to_file_id.key())
927
bytes_to_info = inventory.CHKInventory._bytes_to_utf8name_key
928
chk_bytes = self.from_repository.chk_bytes
929
def _filter_id_to_entry():
930
for record, items in chk_map.iter_interesting_nodes(chk_bytes,
931
self._chk_id_roots, uninteresting_root_keys):
932
for name, bytes in items:
933
# Note: we don't care about name_utf8, because we are always
935
_, file_id, revision_id = bytes_to_info(bytes)
936
self._text_keys.add((file_id, revision_id))
937
if record is not None:
940
self._chk_id_roots = None
941
yield 'chk_bytes', _filter_id_to_entry()
942
def _get_parent_id_basename_to_file_id_pages():
943
for record, items in chk_map.iter_interesting_nodes(chk_bytes,
944
self._chk_p_id_roots, uninteresting_pid_root_keys):
945
if record is not None:
948
self._chk_p_id_roots = None
949
yield 'chk_bytes', _get_parent_id_basename_to_file_id_pages()
951
def _get_text_stream(self):
952
# Note: We know we don't have to handle adding root keys, because both
953
# the source and target are GCCHK, and those always support rich-roots
954
# We may want to request as 'unordered', in case the source has done a
956
return ('texts', self.from_repository.texts.get_record_stream(
957
self._text_keys, 'groupcompress', False))
959
def get_stream(self, search):
960
revision_ids = search.get_keys()
961
for stream_info in self._fetch_revision_texts(revision_ids):
963
self._revision_keys = [(rev_id,) for rev_id in revision_ids]
964
yield self._get_inventory_stream(self._revision_keys)
965
# TODO: The keys to exclude might be part of the search recipe
966
# For now, exclude all parents that are at the edge of ancestry, for
967
# which we have inventories
968
from_repo = self.from_repository
969
parent_ids = from_repo._find_parent_ids_of_revisions(revision_ids)
970
for stream_info in self._get_filtered_chk_streams(parent_ids):
972
yield self._get_text_stream()
974
def get_stream_for_missing_keys(self, missing_keys):
975
# missing keys can only occur when we are byte copying and not
976
# translating (because translation means we don't send
977
# unreconstructable deltas ever).
978
missing_inventory_keys = set()
979
for key in missing_keys:
980
if key[0] != 'inventories':
981
raise AssertionError('The only missing keys we should'
982
' be filling in are inventory keys, not %s'
984
missing_inventory_keys.add(key[1:])
985
if self._chk_id_roots or self._chk_p_id_roots:
986
raise AssertionError('Cannot call get_stream_for_missing_keys'
987
' untill all of get_stream() has been consumed.')
988
# Yield the inventory stream, so we can find the chk stream
989
# Some of the missing_keys will be missing because they are ghosts.
990
# As such, we can ignore them. The Sink is required to verify there are
991
# no unavailable texts when the ghost inventories are not filled in.
992
yield self._get_inventory_stream(missing_inventory_keys,
994
# We use the empty set for excluded_revision_ids, to make it clear that
995
# we want to transmit all referenced chk pages.
996
for stream_info in self._get_filtered_chk_streams(set()):
1000
class RepositoryFormatCHK1(RepositoryFormatPack):
1001
"""A hashed CHK+group compress pack repository."""
1003
repository_class = CHKInventoryRepository
1004
supports_external_lookups = True
1005
supports_chks = True
1006
# For right now, setting this to True gives us InterModel1And2 rather
1007
# than InterDifferingSerializer
1008
_commit_builder_class = PackRootCommitBuilder
1009
rich_root_data = True
1010
_serializer = chk_serializer.chk_serializer_255_bigpage
1011
_commit_inv_deltas = True
1012
# What index classes to use
1013
index_builder_class = BTreeBuilder
1014
index_class = BTreeGraphIndex
1015
# Note: We cannot unpack a delta that references a text we haven't
1016
# seen yet. There are 2 options, work in fulltexts, or require
1017
# topological sorting. Using fulltexts is more optimal for local
1018
# operations, because the source can be smart about extracting
1019
# multiple in-a-row (and sharing strings). Topological is better
1020
# for remote, because we access less data.
1021
_fetch_order = 'unordered'
1022
_fetch_uses_deltas = False # essentially ignored by the groupcompress code.
1025
def _get_matching_bzrdir(self):
1026
return bzrdir.format_registry.make_bzrdir('development6-rich-root')
1028
def _ignore_setting_bzrdir(self, format):
1031
_matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
1033
def get_format_string(self):
1034
"""See RepositoryFormat.get_format_string()."""
1035
return ('Bazaar development format - group compression and chk inventory'
1036
' (needs bzr.dev from 1.14)\n')
1038
def get_format_description(self):
1039
"""See RepositoryFormat.get_format_description()."""
1040
return ("Development repository format - rich roots, group compression"
1041
" and chk inventories")
1043
def check_conversion_target(self, target_format):
1044
if not target_format.rich_root_data:
1045
raise errors.BadConversionTarget(
1046
'Does not support rich root data.', target_format)
1047
if not getattr(target_format, 'supports_tree_reference', False):
1048
raise errors.BadConversionTarget(
1049
'Does not support nested trees', target_format)
1053
class RepositoryFormatCHK2(RepositoryFormatCHK1):
1054
"""A CHK repository that uses the bencode revision serializer."""
1056
_serializer = chk_serializer.chk_bencode_serializer
1058
def _get_matching_bzrdir(self):
1059
return bzrdir.format_registry.make_bzrdir('development7-rich-root')
1061
def _ignore_setting_bzrdir(self, format):
1064
_matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
1066
def get_format_string(self):
1067
"""See RepositoryFormat.get_format_string()."""
1068
return ('Bazaar development format - chk repository with bencode '
1069
'revision serialization (needs bzr.dev from 1.16)\n')
1072
class RepositoryFormat2a(RepositoryFormatCHK2):
1073
"""A CHK repository that uses the bencode revision serializer.
1075
This is the same as RepositoryFormatCHK2 but with a public name.
1078
_serializer = chk_serializer.chk_bencode_serializer
1080
def _get_matching_bzrdir(self):
1081
return bzrdir.format_registry.make_bzrdir('2a')
1083
def _ignore_setting_bzrdir(self, format):
1086
_matchingbzrdir = property(_get_matching_bzrdir, _ignore_setting_bzrdir)
1088
def get_format_string(self):
1089
return ('Bazaar repository format 2a (needs bzr 1.16 or later)\n')